DocsConnecting Unstructured Data

Connect Unstructured data

We will use OpenAI to create embeddings for some unstructured data.

Make sure you have the alpha version of the CLI to use this example.
ddn update-cli --version v2.12.0-alpha.2

Initialize a new connector

ddn connector init mypython -i
  • Select hasura/python
  • Skip port, etc. Press “return”.

Add your API key

echo APP_MYPYTHON_OPENAI_API_KEY='sk-proj...' >> .env
  • In mysupergraph/app/connector/mypython/compose.yaml under services: app_mypython: environment:, add:
PG_CONNECTION_URI: $APP_MYPOSTGRES_CONNECTION_URI
OPENAI_API_KEY: $APP_MYPYTHON_OPENAI_API_KEY
  • In mysupergraph/app/connector/mypython/connector.yaml under definition: envMapping:, add:
PG_CONNECTION_URI:
  fromEnv: APP_MYPOSTGRES_CONNECTION_URI
OPENAI_API_KEY:
  fromEnv: APP_MYPYTHON_OPENAI_API_KEY

Write custom functions

In mysupergraph/app/connector/mypython/functions.py, replace the boilerplate code with your custom functions to vectorize product reviews and perform semantic search.

Click to show Python code
from hasura_ndc import start
from hasura_ndc.function_connector import FunctionConnector
from pydantic import BaseModel
from typing import List, Optional
import os
import aiohttp
import asyncpg
import asyncio
 
connector = FunctionConnector()
 
class ReviewRow(BaseModel):
    reviewId: int
 
@connector.register_query
async def semanticSearchReviews(text: str, limit: Optional[int] = None, offset: Optional[int] = None) -> List[ReviewRow]:
    openai_api_key = os.environ.get("OPENAI_API_KEY")
    pg_connection_uri = os.environ.get("PG_CONNECTION_URI")
 
    try:
        # Generate embedding for the search text
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {openai_api_key}",
        }
        payload = {
            "input": text,
            "model": "text-embedding-3-large",
        }
 
        async with aiohttp.ClientSession() as session:
            async with session.post("https://api.openai.com/v1/embeddings", headers=headers, json=payload) as response:
                embeddingData = await response.json()
 
        embedding = embeddingData['data'][0]['embedding']
        formattedEmbedding = '[' + ','.join(map(str, embedding)) + ']'
 
        # Connect to the database
        conn = await asyncpg.connect(pg_connection_uri)
 
        # Base query to find reviews with similar embeddings
        searchQuery = """
            SELECT
                review_id,
                1 - (embedding <=> $1::vector) as similarity
            FROM Reviews
            WHERE embedding IS NOT NULL
            ORDER BY embedding <=> $1::vector
        """
 
        if limit is not None:
            searchQuery += f" LIMIT {limit}"
            if offset is not None:
                searchQuery += f" OFFSET {offset}"
        else:
            searchQuery += " LIMIT 20"
 
        queryParams = [formattedEmbedding]
 
        results = await conn.fetch(searchQuery, *queryParams)
 
        # Map the results to the expected ReviewRow interface
        reviewRows = [ReviewRow(reviewId=row['review_id']) for row in results]
 
        await conn.close()
 
        return reviewRows
    except Exception as e:
        print(f"Error performing semantic search: {e}")
        return []
 
@connector.register_mutation
async def vectorize() -> str:
    openai_api_key = os.environ.get("OPENAI_API_KEY")
    pg_connection_uri = os.environ.get("PG_CONNECTION_URI")
 
    try:
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {openai_api_key}",
        }
 
        conn = await asyncpg.connect(pg_connection_uri)
 
        # Get all reviews that don't have embeddings yet
        getReviewsQuery = """
            SELECT review_id, comment
            FROM Reviews
            WHERE embedding IS NULL AND comment IS NOT NULL
        """
        reviews = await conn.fetch(getReviewsQuery)
 
        # Process reviews in batches to avoid rate limits
        batchSize = 100
        for i in range(0, len(reviews), batchSize):
            batch = reviews[i:i+batchSize]
 
            async def get_embedding_for_review(review):
                payload = {
                    "input": review['comment'],
                    "model": "text-embedding-3-large",
                }
                async with aiohttp.ClientSession() as session:
                    async with session.post("https://api.openai.com/v1/embeddings", headers=headers, json=payload) as response:
                        embeddingData = await response.json()
                embedding = embeddingData['data'][0]['embedding']
                return {
                    'review_id': review['review_id'],
                    'embedding': embedding
                }
 
            tasks = [get_embedding_for_review(review) for review in batch]
            embeddings = await asyncio.gather(*tasks)
 
            # Update reviews with their embeddings
            updateQuery = """
                UPDATE Reviews
                SET embedding = $1::vector
                WHERE review_id = $2
            """
            for item in embeddings:
                formattedEmbedding = '[' + ','.join(map(str, item['embedding'])) + ']'
                await conn.execute(updateQuery, formattedEmbedding, item['review_id'])
 
            # Log progress
            print(f"Processed {min(i + batchSize, len(reviews))} out of {len(reviews)} reviews")
 
        await conn.close()
 
        return "SUCCESS"
    except Exception as e:
        print(f"Error vectorizing reviews: {e}")
        raise e
 
if __name__ == "__main__":
    start(connector)

Add required libraries

In mysupergraph/app/connector/mypython/requirements.txt, add:

aiohttp==3.10.10
asyncpg==0.30.0

Introspect your connector

Make sure Docker is running, then execute:

ddn connector introspect mypython

Add your resources

Create metadata for the commands in your supergraph:

ddn command add mypython '*'

Build your supergraph

Create your supergraph build locally:

ddn supergraph build local

Start your supergraph locally

ddn run docker-start

Head to your local DDN console

Run the following from your project’s directory:

ddn console --local

Vectorize your data

You can now use your command as a mutation and vectorize your review data:

mutation VectorizeReviews {
  vectorize
}

You should see a response like this:

{
  "data": {
    "vectorize": "SUCCESS"
  }
}

Talk to your data

Now, you can ask semantic questions on your data:

> Which review would be best for a customer testimonial?
> Which customer feedback best highlights our product’s unique value?
> Which client comment would be most persuasive for a case study?