Building an ELT Pipeline with CocoIndex, Snowflake, and LLMs
Sep 30, 2025
I recently worked with a clothing manufacturer who wanted to simplify their invoice process. Every day, they receive around 20–22 supplier invoices in PDF format. All these invoices are stored in Azure Blob Storage.
The finance team used to open each PDF manually and copy the details into their system. This took a lot of time and effort. On top of that, they already had a backlog of 8,000 old invoices waiting to be processed.
At first, I built a flow using n8n. This solution read the invoices from Azure Blob Storage, used Mistral AI to pull out the fields from each PDF, and then loaded the results into Snowflake.
The setup worked fine for a while. But as the number of invoices grew, the workflow started to break. Debugging errors inside a no-code tool like n8n became harder and harder. That’s when I decided to switch to a coding solution.
I came across CocoIndex, an open-source ETL framework designed to transform data for AI, with support for real-time incremental processing. It allowed me to build a pipeline that was both reliable and scalable for this use case.
In this blog, I will walk you through how I built the pipeline using the CocoIndex framework.
What is CocoIndex?
CocoIndex is an open-source tool that helps move data from one place to another in a smart and structured way. In technical terms, this is called ETL (Extract, Transform, Load). But in simple words, you can think of it as a smart conveyor belt in a factory.
On one end of the belt, you place the raw material (PDF invoices).
As the belt moves, the items pass through different stations (transformations). Some stations clean the data, others format it, and some even use AI as inspectors to read and label information.
At the end of the belt, the finished product is neatly packed into boxes (your Snowflake database).
And here’s the best part: CocoIndex is “smart.” It remembers what it has already processed. That means if you add a new invoice tomorrow, it won’t redo all the old ones. It will just pick up the new files and process only those. This is called incremental processing, and it saves a huge amount of time and cost.
How CocoIndex Works with AI
CocoIndex isn’t just about moving data, it works with AI models during the transformation step.
Here’s how:
It reads the text from your PDF or file.
It sends the text to an AI model with clear instructions: “Pull out the invoice number, date, totals, customer name, and items.”
The AI acts like a smart inspector, turning unstructured text into neat rows and columns.
CocoIndex then loads that structured output into Snowflake.
Thanks to incremental updates, only new files trigger AI calls, saving cost and avoiding duplicate work.
Why choose CocoIndex?
Open-source: Free to use and supported by a growing community.
AI-friendly: Works smoothly with large language models (LLMs).
Incremental: Only processes new or changed data, not everything every time.
Transparent: Lets you trace how each piece of data was transformed.
Fast & Reliable: Built with Rust under the hood for high performance.
In my client project, suppliers uploaded about 20 to 22 invoices every day into Blob Storage. We first did a full load of around 8,000 old invoices, and after that, the pipeline ran in incremental mode, only picking up the new daily invoices. The setup worked efficiently, and since CocoIndex is open source, we deployed the pipeline with Azure Functions to automate the process.
For this blog, I found a GitHub repository that contains sample invoice PDFs, and I used those files to build and test this use case. You can get the data here. Used local machine to run this use case.
Building the Pipeline
For this use case, I followed the ELT approach (Extract, Load, Transform). This means:
Extract invoices from Azure Blob Storage,
Load them into Snowflake, and
Later perform the Transformations inside Snowflake.
In this blog, focussed only on the Extract and Load steps, getting the invoice data out of Blob Storage and into Snowflake.
All the PDF invoices are loaded into Azure Blob Storage, then CocoIndex uses an LLM to extract the data from the PDFs and load it into Snowflake.

Before writing a code, we need to setup a Postgres database, CocoIndex needs a small database in the background to keep track of what has already been processed (Incremental processing). This database acts like a logbook, it remembers which invoices were read, what data was extracted, and whether something has changed. Basically it keeps saving the metadata.
You can install Postgres in different ways: directly on your computer, with Docker, or through VS Code extensions. For this use case, I set it up directly inside VS Code.
Connecting to Azure Blob Storage
The next step is to integrate Azure Blob Storage, since our invoices are uploaded there by suppliers. To connect, we use the Azure CLI (Command Line Interface).
First, check if Azure CLI is installed:
Then log in to your Azure account:
Now you can list all the invoices in your container by providing the account name and container name:
This will prompt you to log in and then show all the blobs (files) inside the invoice
container.

Setting up the .env
File
We first create a .env
file to store the credentials for OpenAI, Postgres, Snowflake, and Azure Blob Storage. This keeps all secrets in one place and out of the main code.
Creating main.py
We start by importing the necessary libraries.
dataclasses is used to create structured “blueprints” for invoices and their line items, so all data follows the same format. MarkItDown converts PDF files into Markdown (plain text with structure). This makes it easier for AI to read invoices, since tables and headings are kept in order.
We now load all the secrets (API keys, database credentials, etc.) from the .env
file.
The line load_dotenv()
reads all values from the .env
file. os.getenv(”...”)
fetches each secret (like account names, passwords, API keys).
We are extracting the following fields from each invoice. Below is a sample invoice (from the GitHub repo) showing the data mapping. Each highlighted area represents a field we want to capture and store in Snowflake: Invoice Number, Date, Customer Name, Bill To, Ship To, Subtotal, Discount, Shipping, Total, Balance Due, Order ID, Ship Mode, Notes, Terms, Line Items (Product Name, Quantity, Rate, Amount, SKU, Category)
This mapping ensures that every important piece of information from the invoice PDF is extracted in a structured format.

Next, we create two dataclasses (simple templates for storing information in a structured way).
LineItem holds product details such as description, quantity, rate, amount, SKU, and category.
Invoice holds the overall invoice details (number, date, customer, totals, etc.) and also includes a list of line items.
Inside the Invoice dataclass, we also include clear instructions (a “prompt”) that guide the AI on how to extract data consistently. For example:
Always return numbers without currency symbols (e.g., 58.11, not $58.11).
If a field is missing, return an empty string (”“).
Never swap values between fields.
Keep line_items as a structured list of objects.
This built-in prompt acts like documentation for the schema. When the AI processes each invoice, it uses these rules to reliably map the PDF data into the right fields for Snowflake.
Next is PDF Markdown. This part of the code converts invoice PDFs into plain text using the MarkItDown library. Since AI models can’t easily read raw PDFs, we first turn them into Markdown, a lightweight text format that keeps structure like headings and tables. The code briefly saves the PDF content into a temporary file, passes it to MarkItDown for conversion, and then deletes the file. The result is clean, structured text that’s ready for AI to extract the invoice fields.
After converting the invoice into structured data, now needs to get the invoice number. This small function does exactly that:
It looks inside the
Invoice
object.If an
invoice_number
exists, it returns it.If it’s missing or something goes wrong, it safely returns an empty string instead of breaking the pipeline.
This code block is important because the invoice number acts like a unique ID, so we later use it as the primary key in Snowflake to avoid duplicates.
The next step is to load the extracted invoice data into Snowflake. For this, I created a target table called INVOICES
Since this is just the raw load step (no transformations yet), all columns are stored as VARCHAR
. This way, the data goes in exactly as it is extracted, and we can do the transformation inside the Snowflake.
the below code blocks uses the credentials (user, password, account, warehouse, database, schema, table) from the .env
file.
It then uses a MERGE command. This is important because:
- If an invoice already exists (same invoice number), the row is updated with the latest data.
- If it’s a new invoice, the row is inserted as a new record.
Line items are stored as a JSON array, so each invoice can have multiple products inside it.
CREATED_AT timestamp is also added, so we know when the data was last loaded.
This ensures the Snowflake table is always clean, no duplicates, and always up to date with the latest invoices.
Building the Flow
Now we build the flow, this defines how invoices move from Azure Blob Storage all the way into Snowflake.
1. Source (Input)
The flow starts by connecting to Azure Blob Storage and reading all the invoice PDFs from the container.
2. Convert PDFs to Text
Each PDF is run through the ToMarkdown step, which converts it into structured text that the AI can understand.
3. AI Extraction (with schema prompt)
The extracted text is then processed by OpenAI through CocoIndex. Instead of hardcoding all the rules here, the AI now follows the Invoice dataclass docstring, which contains clear instructions for how to map fields (like invoice number, date, totals, and line items). This built-in schema acts like a guide, ensuring the AI always:
Extracts fields in the correct format.
Returns numbers without symbols.
Leaves fields empty (”“) if data is missing.
Keeps line items structured.
This way, the AI outputs consistent, clean data that fits directly into Snowflake.
4. Invoice Number Check
A helper function (GetInvoiceNumber) ensures each invoice has a proper ID. This ID is used as the primary key to keep data unique.
5. Collector
All invoices are grouped together and prepared for export.
6. Export to Snowflake
Finally, the invoices are loaded into the Snowflake table (INVOICES). If an invoice already exists, it’s updated; if it’s new, it’s inserted.
Running the Pipeline
The last code block to run the flow
It prints which Azure Blob container is being used.
invoice_flow.setup()
prepares the pipeline — checking sources (Azure), the target (Snowflake), and making sure everything is ready.invoice_flow.update()
runs the flow. It reads new invoices from Azure, extracts the fields with AI, and loads them into Snowflake.Finally, it prints a summary (
Update stats
) showing how many records were inserted or updated.
Best thing about CocoIndex is it supports real-time incremental processing, it only processes and exports new or changed files since the last run.
if reexport_targets = False (the default)
- Only new or updated invoices are exported to Snowflake. Old ones are skipped.if reexport_targets = True
- It forces all invoices (new + old) to be re-exported into Snowflake, even if they haven’t changed.
So you use True if you want a full refresh, and False if you just want the incremental updates.

Now the pipeline loads all invoices from Azure Blob Storage into the INVOICES table in Snowflake without doing any transformations. This blog focuses only on the Extract and Load steps. Transformations can be done inside the Snowflake.
Challenges
Since this pipeline uses AI to extract data from invoices, it’s not always perfect. Evaluation is a real challenge. When I tested with original client data, many fields were mismatched at first. By refining the prompt, I was able to improve accuracy. For this blog’s demo data the results were more reliable, but with messy or inconsistent invoices, AI can still make mistakes or “hallucinate.” Structured PDFs are usually easier to handle.
It’s important to note that CocoIndex itself is not a parser. It’s a framework and incremental engine that allows you to plug in any AI model or custom logic. For parsing invoices, you can choose the tool that works best for your use case. For example, I used OpenAI here, but tools like Google Document AI or other specialized invoice parsers could also be a good fit. In some cases, fine-tuning a model for your own domain may give the best results.
For more information on CocoIndex, please check the documentation.
Thanks for reading