Connect Unstructured Data
This tutorial shows you how to connect your own vector data and retrieval functions to PromptQL.
Prerequisites
In this example, we use a simple ecommerce Postgres dataset & OpenAI to create embeddings.
- To follow along, make sure you have a Postgres database with this schema
& see data loaded.
You’ll also need an OpenAI API key. - Alternatively, if you already have a retrieval function for your vector database set up, follow the steps below and replace the env vars and code snippets with your own as appropriate.
Create a new PromptQL project
Follow the instructions to set up your environment and start a new project. Start from scratch
Initialize a new connector
ddn connector init mypython -i
- Select hasura/python
- Skip port, etc. Press “return”.
Add your API key
We’ll be using the OpenAI API key to create embeddings for our unstructured data.
In your project directory, add the key to your .env
file.
echo APP_MYPYTHON_OPENAI_API_KEY='sk-proj...' >> .env
Add your database connection string
We’ll be using the database connection string to connect to our vector database (Postgres in this example).
In your project directory, add the to your .env
file.
# Replace postgres:/.... your database connection string
echo APP_DATABASE_URL=postgres://username:password@host:port/database >> .env
Make these environment variables available to your connector
- In
mysupergraph/app/connector/mypython/connector.yaml
underdefinition: envMapping:
, add:
DATABASE_URL:
fromEnv: APP_DATABASE_URL
OPENAI_API_KEY:
fromEnv: APP_MYPYTHON_OPENAI_API_KEY
Write custom functions
In mysupergraph/app/connector/mypython/functions.py
, replace the boilerplate code with your custom functions to
vectorize product reviews and perform semantic search.
Click to show Python code
from hasura_ndc import start
from hasura_ndc.function_connector import FunctionConnector
from pydantic import BaseModel
from typing import List, Optional
import os
import aiohttp
import asyncpg
import asyncio
connector = FunctionConnector()
class ReviewRow(BaseModel):
reviewId: int
@connector.register_query
async def semanticSearchReviews(text: str, limit: Optional[int] = None, offset: Optional[int] = None) -> List[ReviewRow]:
openai_api_key = os.environ.get("OPENAI_API_KEY")
pg_connection_uri = os.environ.get("DATABASE_URL")
try:
# Generate embedding for the search text
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {openai_api_key}",
}
payload = {
"input": text,
"model": "text-embedding-3-large",
}
async with aiohttp.ClientSession() as session:
async with session.post("https://api.openai.com/v1/embeddings", headers=headers, json=payload) as response:
embeddingData = await response.json()
embedding = embeddingData['data'][0]['embedding']
formattedEmbedding = '[' + ','.join(map(str, embedding)) + ']'
# Connect to the database
conn = await asyncpg.connect(pg_connection_uri)
# Base query to find reviews with similar embeddings
searchQuery = """
SELECT
review_id,
1 - (embedding <=> $1::vector) as similarity
FROM Reviews
WHERE embedding IS NOT NULL
ORDER BY embedding <=> $1::vector
"""
if limit is not None:
searchQuery += f" LIMIT {limit}"
if offset is not None:
searchQuery += f" OFFSET {offset}"
else:
searchQuery += " LIMIT 20"
queryParams = [formattedEmbedding]
results = await conn.fetch(searchQuery, *queryParams)
# Map the results to the expected ReviewRow interface
reviewRows = [ReviewRow(reviewId=row['review_id']) for row in results]
await conn.close()
return reviewRows
except Exception as e:
print(f"Error performing semantic search: {e}")
return []
@connector.register_mutation
async def vectorize() -> str:
openai_api_key = os.environ.get("OPENAI_API_KEY")
pg_connection_uri = os.environ.get("DATABASE_URL")
try:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {openai_api_key}",
}
conn = await asyncpg.connect(pg_connection_uri)
# Get all reviews that don't have embeddings yet
getReviewsQuery = """
SELECT review_id, comment
FROM Reviews
WHERE embedding IS NULL AND comment IS NOT NULL
"""
reviews = await conn.fetch(getReviewsQuery)
# Process reviews in batches to avoid rate limits
batchSize = 100
for i in range(0, len(reviews), batchSize):
batch = reviews[i:i+batchSize]
async def get_embedding_for_review(review):
payload = {
"input": review['comment'],
"model": "text-embedding-3-large",
}
async with aiohttp.ClientSession() as session:
async with session.post("https://api.openai.com/v1/embeddings", headers=headers, json=payload) as response:
embeddingData = await response.json()
embedding = embeddingData['data'][0]['embedding']
return {
'review_id': review['review_id'],
'embedding': embedding
}
tasks = [get_embedding_for_review(review) for review in batch]
embeddings = await asyncio.gather(*tasks)
# Update reviews with their embeddings
updateQuery = """
UPDATE Reviews
SET embedding = $1::vector
WHERE review_id = $2
"""
for item in embeddings:
formattedEmbedding = '[' + ','.join(map(str, item['embedding'])) + ']'
await conn.execute(updateQuery, formattedEmbedding, item['review_id'])
# Log progress
print(f"Processed {min(i + batchSize, len(reviews))} out of {len(reviews)} reviews")
await conn.close()
return "SUCCESS"
except Exception as e:
print(f"Error vectorizing reviews: {e}")
raise e
if __name__ == "__main__":
start(connector)
Add required libraries
In mysupergraph/app/connector/mypython/requirements.txt
, add:
aiohttp==3.10.10
asyncpg==0.30.0
Introspect your connector
Make sure Docker is running, then execute:
ddn connector introspect mypython
Add your resources
Create metadata for the commands in your supergraph:
ddn command add mypython '*'
Build your supergraph
Create your supergraph build locally:
ddn supergraph build local
Start your supergraph locally
ddn run docker-start
Head to your local DDN console
Run the following from your project’s directory:
ddn console --local
Vectorize your data
In this example, we also vectorize the data as part of the setup. But if your vectors are already created, skip to the next step!
You can now use your vectorize command as a mutation and vectorize your review data:
mutation VectorizeReviews {
vectorize
}
You should see a response like this:
{
"data": {
"vectorize": "SUCCESS"
}
}
Talk to your data
Now, you can ask semantic questions on your data:
> Which review would be best for a customer testimonial?
> Which customer feedback best highlights our product’s unique value?
> Which client comment would be most persuasive for a case study?