Connect Unstructured data
We will use OpenAI to create embeddings for some unstructured data.
Initialize a new connector
ddn connector init mypython -i
- Select hasura/python
- Skip port, etc. Press “return”.
Add your API key
echo APP_MYPYTHON_OPENAI_API_KEY='sk-proj...' >> .env
- In
mysupergraph/app/connector/mypython/compose.yaml
underservices: app_mypython: environment:
, add:
PG_CONNECTION_URI: $APP_MYPOSTGRES_CONNECTION_URI
OPENAI_API_KEY: $APP_MYPYTHON_OPENAI_API_KEY
- In
mysupergraph/app/connector/mypython/connector.yaml
underdefinition: envMapping:
, add:
PG_CONNECTION_URI:
fromEnv: APP_MYPOSTGRES_CONNECTION_URI
OPENAI_API_KEY:
fromEnv: APP_MYPYTHON_OPENAI_API_KEY
Write custom functions
In mysupergraph/app/connector/mypython/functions.py
, replace the boilerplate code with your custom functions to
vectorize product reviews and perform semantic search.
Click to show Python code
from hasura_ndc import start
from hasura_ndc.function_connector import FunctionConnector
from pydantic import BaseModel
from typing import List, Optional
import os
import aiohttp
import asyncpg
import asyncio
connector = FunctionConnector()
class ReviewRow(BaseModel):
reviewId: int
@connector.register_query
async def semanticSearchReviews(text: str, limit: Optional[int] = None, offset: Optional[int] = None) -> List[ReviewRow]:
openai_api_key = os.environ.get("OPENAI_API_KEY")
pg_connection_uri = os.environ.get("PG_CONNECTION_URI")
try:
# Generate embedding for the search text
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {openai_api_key}",
}
payload = {
"input": text,
"model": "text-embedding-3-large",
}
async with aiohttp.ClientSession() as session:
async with session.post("https://api.openai.com/v1/embeddings", headers=headers, json=payload) as response:
embeddingData = await response.json()
embedding = embeddingData['data'][0]['embedding']
formattedEmbedding = '[' + ','.join(map(str, embedding)) + ']'
# Connect to the database
conn = await asyncpg.connect(pg_connection_uri)
# Base query to find reviews with similar embeddings
searchQuery = """
SELECT
review_id,
1 - (embedding <=> $1::vector) as similarity
FROM Reviews
WHERE embedding IS NOT NULL
ORDER BY embedding <=> $1::vector
"""
if limit is not None:
searchQuery += f" LIMIT {limit}"
if offset is not None:
searchQuery += f" OFFSET {offset}"
else:
searchQuery += " LIMIT 20"
queryParams = [formattedEmbedding]
results = await conn.fetch(searchQuery, *queryParams)
# Map the results to the expected ReviewRow interface
reviewRows = [ReviewRow(reviewId=row['review_id']) for row in results]
await conn.close()
return reviewRows
except Exception as e:
print(f"Error performing semantic search: {e}")
return []
@connector.register_mutation
async def vectorize() -> str:
openai_api_key = os.environ.get("OPENAI_API_KEY")
pg_connection_uri = os.environ.get("PG_CONNECTION_URI")
try:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {openai_api_key}",
}
conn = await asyncpg.connect(pg_connection_uri)
# Get all reviews that don't have embeddings yet
getReviewsQuery = """
SELECT review_id, comment
FROM Reviews
WHERE embedding IS NULL AND comment IS NOT NULL
"""
reviews = await conn.fetch(getReviewsQuery)
# Process reviews in batches to avoid rate limits
batchSize = 100
for i in range(0, len(reviews), batchSize):
batch = reviews[i:i+batchSize]
async def get_embedding_for_review(review):
payload = {
"input": review['comment'],
"model": "text-embedding-3-large",
}
async with aiohttp.ClientSession() as session:
async with session.post("https://api.openai.com/v1/embeddings", headers=headers, json=payload) as response:
embeddingData = await response.json()
embedding = embeddingData['data'][0]['embedding']
return {
'review_id': review['review_id'],
'embedding': embedding
}
tasks = [get_embedding_for_review(review) for review in batch]
embeddings = await asyncio.gather(*tasks)
# Update reviews with their embeddings
updateQuery = """
UPDATE Reviews
SET embedding = $1::vector
WHERE review_id = $2
"""
for item in embeddings:
formattedEmbedding = '[' + ','.join(map(str, item['embedding'])) + ']'
await conn.execute(updateQuery, formattedEmbedding, item['review_id'])
# Log progress
print(f"Processed {min(i + batchSize, len(reviews))} out of {len(reviews)} reviews")
await conn.close()
return "SUCCESS"
except Exception as e:
print(f"Error vectorizing reviews: {e}")
raise e
if __name__ == "__main__":
start(connector)
Add required libraries
In mysupergraph/app/connector/mypython/requirements.txt
, add:
aiohttp==3.10.10
asyncpg==0.30.0
Introspect your connector
Make sure Docker is running, then execute:
ddn connector introspect mypython
Add your resources
Create metadata for the commands in your supergraph:
ddn command add mypython '*'
Build your supergraph
Create your supergraph build locally:
ddn supergraph build local
Start your supergraph locally
ddn run docker-start
Head to your local DDN console
Run the following from your project’s directory:
ddn console --local
Vectorize your data
You can now use your command as a mutation and vectorize your review data:
mutation VectorizeReviews {
vectorize
}
You should see a response like this:
{
"data": {
"vectorize": "SUCCESS"
}
}
Talk to your data
Now, you can ask semantic questions on your data:
> Which review would be best for a customer testimonial?
> Which customer feedback best highlights our product’s unique value?
> Which client comment would be most persuasive for a case study?