DocsGuidesConnect Unstructured Data

Connect Unstructured Data

This tutorial shows you how to connect your own vector data and retrieval functions to PromptQL.

Prerequisites

In this example, we use a simple ecommerce Postgres dataset & OpenAI to create embeddings.

  1. To follow along, make sure you have a Postgres database with this schema & see data loaded.
    You’ll also need an OpenAI API key.
  2. Alternatively, if you already have a retrieval function for your vector database set up, follow the steps below and replace the env vars and code snippets with your own as appropriate.

Create a new PromptQL project

Follow the instructions to set up your environment and start a new project. Start from scratch

Initialize a new connector

ddn connector init mypython -i
  • Select hasura/python
  • Skip port, etc. Press “return”.

Add your API key

We’ll be using the OpenAI API key to create embeddings for our unstructured data. In your project directory, add the key to your .env file.

echo APP_MYPYTHON_OPENAI_API_KEY='sk-proj...' >> .env

Add your database connection string

We’ll be using the database connection string to connect to our vector database (Postgres in this example).

In your project directory, add the to your .env file.

# Replace postgres:/.... your database connection string
echo APP_DATABASE_URL=postgres://username:password@host:port/database >> .env

Make these environment variables available to your connector

  • In mysupergraph/app/connector/mypython/connector.yaml under definition: envMapping:, add:
DATABASE_URL:
  fromEnv: APP_DATABASE_URL
OPENAI_API_KEY:
  fromEnv: APP_MYPYTHON_OPENAI_API_KEY

Write custom functions

In mysupergraph/app/connector/mypython/functions.py, replace the boilerplate code with your custom functions to vectorize product reviews and perform semantic search.

Click to show Python code
from hasura_ndc import start
from hasura_ndc.function_connector import FunctionConnector
from pydantic import BaseModel
from typing import List, Optional
import os
import aiohttp
import asyncpg
import asyncio
 
connector = FunctionConnector()
 
class ReviewRow(BaseModel):
    reviewId: int
 
@connector.register_query
async def semanticSearchReviews(text: str, limit: Optional[int] = None, offset: Optional[int] = None) -> List[ReviewRow]:
    openai_api_key = os.environ.get("OPENAI_API_KEY")
    pg_connection_uri = os.environ.get("DATABASE_URL")
 
    try:
        # Generate embedding for the search text
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {openai_api_key}",
        }
        payload = {
            "input": text,
            "model": "text-embedding-3-large",
        }
 
        async with aiohttp.ClientSession() as session:
            async with session.post("https://api.openai.com/v1/embeddings", headers=headers, json=payload) as response:
                embeddingData = await response.json()
 
        embedding = embeddingData['data'][0]['embedding']
        formattedEmbedding = '[' + ','.join(map(str, embedding)) + ']'
 
        # Connect to the database
        conn = await asyncpg.connect(pg_connection_uri)
 
        # Base query to find reviews with similar embeddings
        searchQuery = """
            SELECT
                review_id,
                1 - (embedding <=> $1::vector) as similarity
            FROM Reviews
            WHERE embedding IS NOT NULL
            ORDER BY embedding <=> $1::vector
        """
 
        if limit is not None:
            searchQuery += f" LIMIT {limit}"
            if offset is not None:
                searchQuery += f" OFFSET {offset}"
        else:
            searchQuery += " LIMIT 20"
 
        queryParams = [formattedEmbedding]
 
        results = await conn.fetch(searchQuery, *queryParams)
 
        # Map the results to the expected ReviewRow interface
        reviewRows = [ReviewRow(reviewId=row['review_id']) for row in results]
 
        await conn.close()
 
        return reviewRows
    except Exception as e:
        print(f"Error performing semantic search: {e}")
        return []
 
@connector.register_mutation
async def vectorize() -> str:
    openai_api_key = os.environ.get("OPENAI_API_KEY")
    pg_connection_uri = os.environ.get("DATABASE_URL")
 
    try:
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {openai_api_key}",
        }
 
        conn = await asyncpg.connect(pg_connection_uri)
 
        # Get all reviews that don't have embeddings yet
        getReviewsQuery = """
            SELECT review_id, comment
            FROM Reviews
            WHERE embedding IS NULL AND comment IS NOT NULL
        """
        reviews = await conn.fetch(getReviewsQuery)
 
        # Process reviews in batches to avoid rate limits
        batchSize = 100
        for i in range(0, len(reviews), batchSize):
            batch = reviews[i:i+batchSize]
 
            async def get_embedding_for_review(review):
                payload = {
                    "input": review['comment'],
                    "model": "text-embedding-3-large",
                }
                async with aiohttp.ClientSession() as session:
                    async with session.post("https://api.openai.com/v1/embeddings", headers=headers, json=payload) as response:
                        embeddingData = await response.json()
                embedding = embeddingData['data'][0]['embedding']
                return {
                    'review_id': review['review_id'],
                    'embedding': embedding
                }
 
            tasks = [get_embedding_for_review(review) for review in batch]
            embeddings = await asyncio.gather(*tasks)
 
            # Update reviews with their embeddings
            updateQuery = """
                UPDATE Reviews
                SET embedding = $1::vector
                WHERE review_id = $2
            """
            for item in embeddings:
                formattedEmbedding = '[' + ','.join(map(str, item['embedding'])) + ']'
                await conn.execute(updateQuery, formattedEmbedding, item['review_id'])
 
            # Log progress
            print(f"Processed {min(i + batchSize, len(reviews))} out of {len(reviews)} reviews")
 
        await conn.close()
 
        return "SUCCESS"
    except Exception as e:
        print(f"Error vectorizing reviews: {e}")
        raise e
 
if __name__ == "__main__":
    start(connector)

Add required libraries

In mysupergraph/app/connector/mypython/requirements.txt, add:

aiohttp==3.10.10
asyncpg==0.30.0

Introspect your connector

Make sure Docker is running, then execute:

ddn connector introspect mypython

Add your resources

Create metadata for the commands in your supergraph:

ddn command add mypython '*'

Build your supergraph

Create your supergraph build locally:

ddn supergraph build local

Start your supergraph locally

ddn run docker-start

Head to your local DDN console

Run the following from your project’s directory:

ddn console --local

Vectorize your data

In this example, we also vectorize the data as part of the setup. But if your vectors are already created, skip to the next step!

You can now use your vectorize command as a mutation and vectorize your review data:

mutation VectorizeReviews {
  vectorize
}

You should see a response like this:

{
  "data": {
    "vectorize": "SUCCESS"
  }
}

Talk to your data

Now, you can ask semantic questions on your data:

> Which review would be best for a customer testimonial?
> Which customer feedback best highlights our product’s unique value?
> Which client comment would be most persuasive for a case study?