docsGuidesConnecting Unstructured Data

Connect Unstructured data

We will use OpenAI to create embeddings for some unstructured data.

Initialize a new connector

ddn connector init mypython -i
  • Select hasura/python
  • Skip port, etc. Press “return”.

Add your API key

echo APP_MYPYTHON_OPENAI_API_KEY='sk-proj...' >> .env
  • In mysupergraph/app/connector/mypython/compose.yaml under services: app_mypython: environment:, add:
PG_CONNECTION_URI: $APP_MYPOSTGRES_CONNECTION_URI
OPENAI_API_KEY: $APP_MYPYTHON_OPENAI_API_KEY
  • In mysupergraph/app/connector/mypython/connector.yaml under definition: envMapping:, add:
PG_CONNECTION_URI:
  fromEnv: APP_MYPOSTGRES_CONNECTION_URI
OPENAI_API_KEY:
  fromEnv: APP_MYPYTHON_OPENAI_API_KEY

Write custom functions

In mysupergraph/app/connector/mypython/functions.py, replace the boilerplate code with your custom functions to vectorize product reviews and perform semantic search.

Click to show Python code
from hasura_ndc import start
from hasura_ndc.function_connector import FunctionConnector
from pydantic import BaseModel
from typing import List, Optional
import os
import aiohttp
import asyncpg
import asyncio
 
connector = FunctionConnector()
 
class ReviewRow(BaseModel):
    reviewId: int
 
@connector.register_query
async def semanticSearchReviews(text: str, limit: Optional[int] = None, offset: Optional[int] = None) -> List[ReviewRow]:
    openai_api_key = os.environ.get("OPENAI_API_KEY")
    pg_connection_uri = os.environ.get("PG_CONNECTION_URI")
 
    try:
        # Generate embedding for the search text
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {openai_api_key}",
        }
        payload = {
            "input": text,
            "model": "text-embedding-3-large",
        }
 
        async with aiohttp.ClientSession() as session:
            async with session.post("https://api.openai.com/v1/embeddings", headers=headers, json=payload) as response:
                embeddingData = await response.json()
 
        embedding = embeddingData['data'][0]['embedding']
        formattedEmbedding = '[' + ','.join(map(str, embedding)) + ']'
 
        # Connect to the database
        conn = await asyncpg.connect(pg_connection_uri)
 
        # Base query to find reviews with similar embeddings
        searchQuery = """
            SELECT
                review_id,
                1 - (embedding <=> $1::vector) as similarity
            FROM Reviews
            WHERE embedding IS NOT NULL
            ORDER BY embedding <=> $1::vector
        """
 
        if limit is not None:
            searchQuery += f" LIMIT {limit}"
            if offset is not None:
                searchQuery += f" OFFSET {offset}"
        else:
            searchQuery += " LIMIT 20"
 
        queryParams = [formattedEmbedding]
 
        results = await conn.fetch(searchQuery, *queryParams)
 
        # Map the results to the expected ReviewRow interface
        reviewRows = [ReviewRow(reviewId=row['review_id']) for row in results]
 
        await conn.close()
 
        return reviewRows
    except Exception as e:
        print(f"Error performing semantic search: {e}")
        return []
 
@connector.register_mutation
async def vectorize() -> str:
    openai_api_key = os.environ.get("OPENAI_API_KEY")
    pg_connection_uri = os.environ.get("PG_CONNECTION_URI")
 
    try:
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {openai_api_key}",
        }
 
        conn = await asyncpg.connect(pg_connection_uri)
 
        # Get all reviews that don't have embeddings yet
        getReviewsQuery = """
            SELECT review_id, comment
            FROM Reviews
            WHERE embedding IS NULL AND comment IS NOT NULL
        """
        reviews = await conn.fetch(getReviewsQuery)
 
        # Process reviews in batches to avoid rate limits
        batchSize = 100
        for i in range(0, len(reviews), batchSize):
            batch = reviews[i:i+batchSize]
 
            async def get_embedding_for_review(review):
                payload = {
                    "input": review['comment'],
                    "model": "text-embedding-3-large",
                }
                async with aiohttp.ClientSession() as session:
                    async with session.post("https://api.openai.com/v1/embeddings", headers=headers, json=payload) as response:
                        embeddingData = await response.json()
                embedding = embeddingData['data'][0]['embedding']
                return {
                    'review_id': review['review_id'],
                    'embedding': embedding
                }
 
            tasks = [get_embedding_for_review(review) for review in batch]
            embeddings = await asyncio.gather(*tasks)
 
            # Update reviews with their embeddings
            updateQuery = """
                UPDATE Reviews
                SET embedding = $1::vector
                WHERE review_id = $2
            """
            for item in embeddings:
                formattedEmbedding = '[' + ','.join(map(str, item['embedding'])) + ']'
                await conn.execute(updateQuery, formattedEmbedding, item['review_id'])
 
            # Log progress
            print(f"Processed {min(i + batchSize, len(reviews))} out of {len(reviews)} reviews")
 
        await conn.close()
 
        return "SUCCESS"
    except Exception as e:
        print(f"Error vectorizing reviews: {e}")
        raise e
 
if __name__ == "__main__":
    start(connector)

Add required libraries

In mysupergraph/app/connector/mypython/requirements.txt, add:

aiohttp==3.10.10
asyncpg==0.30.0

Introspect your connector

Make sure Docker is running, then execute:

ddn connector introspect mypython

Add your resources

Create metadata for the commands in your supergraph:

ddn command add mypython '*'

Build your supergraph

Create your supergraph build locally:

ddn supergraph build local

Start your supergraph locally

ddn run docker-start

Head to your local DDN console

Run the following from your project’s directory:

ddn console --local

Vectorize your data

You can now use your command as a mutation and vectorize your review data:

mutation VectorizeReviews {
  vectorize
}

You should see a response like this:

{
  "data": {
    "vectorize": "SUCCESS"
  }
}

Talk to your data

Now, you can ask semantic questions on your data:

> Which review would be best for a customer testimonial?
> Which customer feedback best highlights our product’s unique value?
> Which client comment would be most persuasive for a case study?