Change Data Capture (CDC) from PostgreSQL into Upstash Vector using Kafka, Python and Quix
Change Data Capture (CDC) is a database management technique that detects and captures changes to data. The most common use case is database replication but it also unlocks other event-driven use cases such as monitoring. Its main advantage is efficiency; it processes, transmits, and stores only the updated information so you don't have to reprocess the entire dataset.
This approach is particularly useful for vector databases, which are vital for AI-chatbots that require current, domain-specific knowledge. Traditional batch updates often result in data being outdated by almost a day, which is impractical in fast-moving fields like e-commerce, where an AI chatbot needs the latest inventory updates to make accurate product recommendations. CDC supports an event-driven process, eliminating the delays inherent in batch processing and keeping your vector database instantly up-to-date. This tutorial will demonstrate the power of CDC in maintaining real-time accuracy in vector databases.
Using Continuous, Event-based Vector Ingestion
Here's the basic idea: as soon as new product entries are added to a database, an event is emitted to Kafka with the details of the change as a payload. A consumer process immediately passes the event to the embedding model as it arrives. Embeddings are created and the payload is enriched with the relevant vectors. This data is streamed to another Kafka topic where an ingestion process consumes the data and upserts the vectors at a pace that the vector database can handle.
To show you how this works, we've created a prototype application, which you can replicate yourself. It uses Upstash's serverless Kafka along with a complementary tool called Quix.
What is Quix?
Quix provides a pure-Python stream processing framework through its open source library and cloud platform. It's designed for teams looking to build real-time data pipelines using Python and DataFrames (tabular representation of streaming data). In a sense, Quix Streams is very similar to Faust. It has been described as “...more or less Faust 2.0 - pure Python with the annoying bits handled.” —and with its own serverless cloud platform.
The following diagram illustrates how these two cloud platforms (Quix and Upstash) work together to fulfill this use case:
Essentially, you'll be using the Upstash Kafka and Vector database services for data storage and Quix for data processing and Change Data Capture (CDC).
You can find the full code for this prototype in the accompanying GitHub repository.
Prerequisites
To try out the pipeline, you'll need free accounts with both Quix and Upstash. Use the following links if you don't have these accounts already.
Both services let you use an existing Google or GitHub account so you can sign up with a few clicks.
You'll also need an Upstash Kafka cluster. If you don't have one already, go to the Upstash console and create a Kafka cluster under the **Kafka **tab.
You can create the Kafka cluster by following the Upstash Kafka docs.
Quix Setup
You'll be able to try out this whole process yourself using a Quix template—this basically uses the previously linked GitHub repo to spin up an environment in Quix Cloud.
To get started, click this Quix template creation link.
You'll be prompted to sign up with Quix (if you haven't already), and guided through a wizard to create a project. If you're prompted to “Choose your configuration”, select the “advanced” option.
When you're asked to configure a message broker, select Upstash as your broker provider
Configure the connection settings shown in the following screenshot:
After you've completed this wizard, click Sync environment.
Another wizard will appear to guide you through the process of importing the code from GitHub into the environment.
During the wizard, you'll be prompted to add any missing credentials—in this case you'll need to add your Upstash token.
- In the prompt that appears, Click Add secrets, paste your token into both fields, click Save Changes, and proceed with the wizard.
Secondly, you'll need to add the endpoint URL for your vector database as an environment variable.
- Open the “edit” menu for the “Ingest to Upstash VectorDB” deployment,
- Update the
upstash_vectordb_endpoint
environment variable with the URL to your own vector database and click Save.
Now you're ready to add some data to a regular database and see that data upserted into your Upstash Vector database.
This project pipeline includes a fully functioning PostgreSQL database service and pgAdmin service so you can test the implementation without setting up your own database (of course, in production, you should use your own database server. In Quix, the database is only for testing and the data is dropped every time you restart the service)
To get started, start all the services so that they all have the “Running” status (click the status button to change the status).
Database Setup
First, we'll create a table in the default test database and add some data.
Since we're focused on e-commerce, let's say we're running an online bookstore and we're constantly updating the books catalog. We need to make sure our vector store as the embeddings for the newest book descriptions whenever new books are added.
Here, we'll simulate catalog updates by writing to the database directly:
Log into pgAdmin
-
Open the pgAdmin UI by clicking the launch icon next to the service name
-
Log into pgAdmin with the user name “admin@admin.com” and the password “root”
Configure a connection to the PostgreSQL database
-
Click **Servers **> **Register **> Server
-
In the dialog that appears, enter any name on the “General” tab, then configure the connection with host “postgresdb” and “root” as both the Username and Password, and “80” as the port, then click Save.
Adding Data
Create a “books” table
- Navigate to **Servers **> **postgresdb **>Databases > test_db > Schemas, right click on Tables, and select Query Tool
- In the Query tool that appears, paste and run the following query:
Now add some books by executing the following SQL query:
The change should be automatically picked up and the data sent through the pipeline.
You can confirm this in Quix Cloud by navigating to **Topics **> postgres-cdc-source.
In the “OFFSET” dropdown, select “Newest” and you should see messages start to come through:
Now let's make sure the vectors have been ingested properly.
In the Upstash vector console, open the data browser.
Search for “books like star wars” — the top result should be “Dune”.
We can assume it matched because the words in the description are semantically similar to the query: “planet" is semantically close to "star" and "struggles" is semantically close to "wars".
Time to add some more books. Go back to pgAdmin and execute the following SQL query.
Lets see how this update has affected the results for our similarity search
In the Upstash data browser, search for “books like star wars” again—the top result should now be “Old man's war”, and the second result should be “Dune”.
We can assume that Dune has been knocked off the top spot because the new addition has a more semantically relevant description: the "term" war is almost a direct hit, and "interstellar" is probably semantically closer to the search term "star" than "planet".
How it Works Under the Hood
Now that you understand what this prototype does, let's explore how it works.
First, let's zoom in on the processing part of our architecture diagram. You can see that there are three services that are small continuously running Python applications: ”CDC”, “Create embeddings”, and “Upsert into Upstash VectorDB”
Each application uses the Quix Streams python library to receive data, process it in some way, then send it to a downstream Kafka topic or write it to a sink of some sort.
Let's walk through the source code for each application.
Configuring CDC
Before I get into the code behind the CDC process, it's worthwhile to note some extra prerequisites for your PostgreSQL database:
-
The Write Ahead Log needs to be set to “logical
- You can check the current setting by running the SQL query:
SHOW wal_level;
- If it's not already set to “logical”, update the
postgresql.conf
file and set thewal_level
towal_level = logical
. \
- You can check the current setting by running the SQL query:
-
The wal2json plugin needs to be installed on the server or container where PostgreSQL is running.
Once those prerequisites are in place you can start experimenting with the CDC code. Instead of using the Debezium Source PostgreSQL Connector, we use the Quix Python CDC connector. You can find the full CDC code files in this GitHub folder.
Connecting to a PostgreSQL database
The connection to PostgreSQL is defined in the file postgres_helper.py
through environment variables—so if you want to connect to your own database you just need to change the relevant variables.
Producing the change data to Kafka
For the sake of brevity, I won't get into how the change data is actually captured in this article, but you can inspect the postgres_helper.py
file for more details on how the write ahead log is used to capture changes.
Here, let's focus on the structure of the data and how it's produced to Kafka.
First, we initialize a Kafka producer. If you've used the Python examples from our “Connect to your cluster” page, this part should be familiar. But in this case, we didn't want to use kafka-python
or confluent-kafka
. Instead, we use the Quix Streams Python library. Why? For one, it comes with several convenience functions.
For example:
- When you run it in a Quix Cloud Docker container, it automatically detects the Kafka broker address and seamlessly authenticates with Kafka without you having to define any config parameters (like “
bootstrap_servers
” or “security_protocol
” which you have to define in other libraries) - It uses the concept of Streaming Dataframes which makes it a lot easier to process data, and reuse Pandas code written for static datasets.
With Quix Streams, we define an application and initial output topic in main.py
, like so:
We then add a function to get the latest database changes, add them to a buffer, then iterate through the buffer and send results to our output Kafka topic.
The resulting payload has the following structure:
Later, we simplify this structure so that it is easier to process.
Creating the Embeddings
This is what we call a “transformation” process, in other words, it sits between two Kafka topics reading from one and writing to another. The full source code files are in this GitHub folder.
The Quix Streams library provides a simplified process for implementing transformations. Instead of defining a producer and consumer as you would with other libraries, you put the relevant settings in the Application
constructor and define your input and output topics via the app
instance.
Example:
Further down, you'll see how we produce and consume data using the app.dataframe
method, but first we define the functions that we want to apply to the data.
The first function compresses the structure of the change data capture payload:
Which results in the following payload structure:
The second function uses the Sentence Transformers library to create an embedding for the “description” field in the simplified payload
Finally, we consume the data, apply the functions, and produce the data to a downstream Kafka topic.
Note the difference between sdf.apply()
and sdf.update()
:
.apply()
passes the result of the callback downstream. It takes the original data and processes it to produce new data. This method doesn't alter the original data itself; instead, it generates a new version based on the original.- For example, if you use
.apply()
to add a new key to a dictionary, it actually creates a new dictionary with that addition. - In our case, we use
sdf.apply(simplify_data)
to transform the CDC payload into a simple dictionary andsdf.apply(create_embeddings
) to compute the vector and write it to a new “embeddings” field _within _that dictionary.
- For example, if you use
.update()
passes the actual callback argument downstream. It allows you to modify or use the original data directly. However, it's most commonly used to log the data to the console or write it to an external database (It's similar to the peek() method in Kafka Streams).
And last but not least, we simply use sdf.to_topic
to produce the transformed data to the downstream topic.
Upserting to Upstash
This process uses the sdf.update()
method again but first we need to define the function that we'll pass to sdf.update
— namely, a function to ingest the incoming vector and metadata to Upstash. As usual, you'll find the full code in this GitHub folder.
Here, we use environment variables to define the connection to Upstash, extract the relevant data from the Streaming Dataframe row, and use the upsert()
method to add an entry to the Upstash Vector store.
Finally we read from the input topic and pass the ingest_vectors
function to sdf.update()
. As a reminder, we're using sdf.update()
because this is the terminus of our pipeline. There's no downstream topic to pass data to, we're simply updating the data “in-place” (i.e. sending it to Upstash).
Lessons Learned
Keeping the underlying data fresh is a crucial component to search results quality. We saw how we were able to give the user more semantically accurate search results by updating the vector store.
We could have just updated the vector store manually, by exporting data from the database and writing to the vector store in batches. Yet this introduces several questions such as:
- How does this work in a production e-commerce scenario where your product catalog is constantly changing?
- How do you organize your batches and what is an acceptable delay between the product arriving in the catalog and it being included in user search queries?
If you set up an event-based system where embeddings are created and ingested as soon as data is entered (via CDC), you don't have to deal with these questions, which is why Kafka-based architectures are so popular.
Many large enterprises are already using event based solutions such as Apache Kafka for traditional search indexing. For example, the DoorDash engineering team produced the article “Building Faster Indexing with Apache Kafka and Elasticsearch” which provides a spot-on description of the problem-solution fit. The challenges involved in keeping text embeddings up to date are similar so it makes sense to apply the same approach to text embeddings.
Further Resources
For more information on how to set up this architecture with Quix and Upstash, join the Quix Community Slack and start a conversation with the Quix team.
- To learn more about the Quix Streams Python library, check out the introduction in the Quix documentation.
- To see more Quix project template, check out the Quix template gallery.
And if you'd like to learn more about Upstash and Gen AI, check out the latest Upstash blog posts. As a reminder, you can find all the relevant code in this project's GitHub repository.