Add Vector Search to PostgreSQL

This tutorial shows you how to connect your own vector data and retrieval functions to PromptQL.

We will use OpenAI models to create embeddings. To continue this tutorial, you’ll also need an OpenAI API key.

For Vector Search functionality, we need two things

  • Logic to Vectorize unstructured data.
    • Fetch data from the database, create embeddings using OpenAI’s embedding model.
    • Insert the embedding into the database as a vector column, managed by pg_vector extension.
  • Function to perform Semantic Search retrieval.
    • Create an embedding for the input search query.
    • Perform a similarity search query, backed by pg_vector with the embedding and return the matched results.

Note: Depending on the use case, feel free to use any text embedding model from OpenAI or from other providers like Cohere for example. In this example, we are going ahead with OpenAI’s embedding model text-embedding-3-large.

Alternatively, if you already have a retrieval function for your vector database set up, follow the steps below and replace the env vars and code snippets with your own as appropriate.

We will add a Python connector to add the Vector Search functionality.

Initialize a new connector

ddn connector init mypython -i
  • Select hasura/python
  • Skip port, etc. Press “return”.

Add your OpenAI API key

We’ll be using the OpenAI API key to create embeddings for our unstructured data.

In your project root directory, add the key to your .env file.

echo APP_MYPYTHON_OPENAI_API_KEY='sk-proj...' >> .env

Add your Postgres connection string

We’ll be using the database connection string in the Python code to connect.

In your project directory, add the PG connection string to your .env file.

# Replace postgres:/.... your database connection string
echo APP_DATABASE_URL=postgres://username:password@host:port/database >> .env

Make these environment variables available to your connector

  • In mysupergraph/app/connector/mypython/connector.yaml under definition: envMapping:, add:

Write custom functions

In mysupergraph/app/connector/mypython/, replace the boilerplate code with your custom functions to vectorize product reviews and perform semantic search.

from hasura_ndc import start
from hasura_ndc.function_connector import FunctionConnector
from pydantic import BaseModel
from typing import List, Optional
import os
import aiohttp
import asyncpg
import asyncio
connector = FunctionConnector()
class ReviewRow(BaseModel):
    reviewId: int
async def semanticSearchReviews(text: str, limit: Optional[int] = None, offset: Optional[int] = None) -> List[ReviewRow]:
    openai_api_key = os.environ.get("OPENAI_API_KEY")
    pg_connection_uri = os.environ.get("DATABASE_URL")
        # Generate embedding for the search text
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {openai_api_key}",
        payload = {
            "input": text,
            "model": "text-embedding-3-large",
        async with aiohttp.ClientSession() as session:
            async with"", headers=headers, json=payload) as response:
                embeddingData = await response.json()
        embedding = embeddingData['data'][0]['embedding']
        formattedEmbedding = '[' + ','.join(map(str, embedding)) + ']'
        # Connect to the database
        conn = await asyncpg.connect(pg_connection_uri)
        # Base query to find reviews with similar embeddings
        searchQuery = """
                1 - (embedding <=> $1::vector) as similarity
            FROM Reviews
            WHERE embedding IS NOT NULL
            ORDER BY embedding <=> $1::vector
        if limit is not None:
            searchQuery += f" LIMIT {limit}"
            if offset is not None:
                searchQuery += f" OFFSET {offset}"
            searchQuery += " LIMIT 20"
        queryParams = [formattedEmbedding]
        results = await conn.fetch(searchQuery, *queryParams)
        # Map the results to the expected ReviewRow interface
        reviewRows = [ReviewRow(reviewId=row['review_id']) for row in results]
        await conn.close()
        return reviewRows
    except Exception as e:
        print(f"Error performing semantic search: {e}")
        return []
async def vectorize() -> str:
    openai_api_key = os.environ.get("OPENAI_API_KEY")
    pg_connection_uri = os.environ.get("DATABASE_URL")
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {openai_api_key}",
        conn = await asyncpg.connect(pg_connection_uri)
        # Get all reviews that don't have embeddings yet
        getReviewsQuery = """
            SELECT review_id, comment
            FROM Reviews
            WHERE embedding IS NULL AND comment IS NOT NULL
        reviews = await conn.fetch(getReviewsQuery)
        # Process reviews in batches to avoid rate limits
        batchSize = 100
        for i in range(0, len(reviews), batchSize):
            batch = reviews[i:i+batchSize]
            async def get_embedding_for_review(review):
                payload = {
                    "input": review['comment'],
                    "model": "text-embedding-3-large",
                async with aiohttp.ClientSession() as session:
                    async with"", headers=headers, json=payload) as response:
                        embeddingData = await response.json()
                embedding = embeddingData['data'][0]['embedding']
                return {
                    'review_id': review['review_id'],
                    'embedding': embedding
            tasks = [get_embedding_for_review(review) for review in batch]
            embeddings = await asyncio.gather(*tasks)
            # Update reviews with their embeddings
            updateQuery = """
                UPDATE Reviews
                SET embedding = $1::vector
                WHERE review_id = $2
            for item in embeddings:
                formattedEmbedding = '[' + ','.join(map(str, item['embedding'])) + ']'
                await conn.execute(updateQuery, formattedEmbedding, item['review_id'])
            # Log progress
            print(f"Processed {min(i + batchSize, len(reviews))} out of {len(reviews)} reviews")
        await conn.close()
        return "SUCCESS"
    except Exception as e:
        print(f"Error vectorizing reviews: {e}")
        raise e
if __name__ == "__main__":

Add required libraries

In mysupergraph/app/connector/mypython/requirements.txt, add:


Introspect your connector

Make sure Docker is running, then execute:

ddn connector introspect mypython

Add your resources

Create metadata for the commands in your supergraph:

ddn command add mypython '*'

Build your supergraph

Create your supergraph build locally:

ddn supergraph build local

Start your supergraph locally

ddn run docker-start

Head to your local DDN console

Run the following from your project’s directory:

ddn console --local

Vectorize your data

In this example, we also vectorize the data as part of the setup. But if your vectors are already created, skip to the next step!

You can now use your vectorize command as a mutation and vectorize your review data:

mutation VectorizeReviews {

You should see a response like this:

  "data": {
    "vectorize": "SUCCESS"

Talk to your data

Now, you can ask semantic questions on your data:

> Which review would be best for a customer testimonial?
> Which customer feedback best highlights our product’s unique value?
> Which client comment would be most persuasive for a case study?