Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use openai's batch processing to create large volumes of embeddings #280

Open
wants to merge 24 commits into
base: main
Choose a base branch
from

Conversation

kolaente
Copy link
Contributor

@kolaente kolaente commented Dec 5, 2024

This PR adds support for OpenAI's batch api. The implemented functionality creates batches for async processing and then checks their status. This allows processing huge amounts of data a lot more quickly, because the batch api has significantly higher rate limits.

The PR is very much a draft at this point, please see the TODOs I've added in various places. I've run this locally, and it seems to work, but the way I implemented this is probably not the best way to do this - I attribute this to my lack of knowledge about the codebase and my python skills.
I have implemented a similar thing in the past (which is currently running in production), but in a Laravel application.

Resolves #23

Open Questions and To-Dos:

These are things I need help with (because I don't know the codebase good enough). Some of them also have TODOs in code.

  • Use the configured embeddings model instead of hardcoding text-embedding-3-small
  • Use the existing openai client instead of recreating one each time
  • Run the batch embedding only when the backlog of items to be processed is very large (ideally, this would be configurable) Changed so that the switch needs to happen manually
  • Run at the correct place in the code, and only when Openai is configured and supported
  • Properly create the new tables with a migration or similar mechanism
  • Delete items that were submitted to batch processing from the queue
  • Increase the number of items put in a batch - as the batch api can handle significantly higher volume than what fits in typical rate limits, we should use that
  • Fix the concurrency issue (see comment)
  • Fix out of memory error (see comment)
  • Make sure the batch fetch query is save for concurrent use
  • Delete batch files once the batch is processed or failed

@kolaente kolaente requested a review from a team as a code owner December 5, 2024 19:35

await self._delete_embeddings(conn, all_items)
for records in all_records:
await self._copy_embeddings(conn, records)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails with postgres running out of memory:

ERROR:  out of memory
DETAIL:  Cannot enlarge string buffer containing 0 bytes by 2130706440 more bytes.
CONTEXT:  COPY wiki_orig_embeddings_store, line 1, column wiki_id
STATEMENT:  COPY "public"."wiki_orig_embeddings_store" ("wiki_id" ,"paragraph_id", chunk_seq, chunk, embedding) FROM STDIN WITH (FORMAT BINARY)

It first failed when I tried inserting all records at once, but inserting them one by one does not seem to work either.

@kolaente
Copy link
Contributor Author

kolaente commented Dec 5, 2024

Running this with more than one worker fails with this error:

2024-12-05 18:16:23.584 UTC [1518] ERROR:  deadlock detected at character 30
2024-12-05 18:16:23.584 UTC [1518] DETAIL:  Process 1518 waits for RowExclusiveLock on relation 24075 of database 16384; blocked by process 1517.
	Process 1517 waits for RowExclusiveLock on relation 24075 of database 16384; blocked by process 1518.
	Process 1518:
	                INSERT INTO ai.embedding_batches (
	                    openai_batch_id,
	                    input_file_id,
	                    output_file_id,
	                    status,
	                    errors,
	                    expires_at
	                ) VALUES (
	                    $1,
	                    $2,
	                    $3,
	                    $4,
	                    $5,
	                    $6
	                )

	Process 1517:
	                INSERT INTO ai.embedding_batches (
	                    openai_batch_id,
	                    input_file_id,
	                    output_file_id,
	                    status,
	                    errors,
	                    expires_at
	                ) VALUES (
	                    $1,
	                    $2,
	                    $3,
	                    $4,
	                    $5,
	                    $6
	                )

2024-12-05 18:16:23.584 UTC [1518] HINT:  See server log for query details.
2024-12-05 18:16:23.584 UTC [1518] STATEMENT:
	                INSERT INTO ai.embedding_batches (
	                    openai_batch_id,
	                    input_file_id,
	                    output_file_id,
	                    status,
	                    errors,
	                    expires_at
	                ) VALUES (
	                    $1,
	                    $2,
	                    $3,
	                    $4,
	                    $5,
	                    $6
	                )

Setting the worker count to 1 fixes this, but that's obviously not a solution.

Can someone point me in the right direction on how to fix this?

Copy link
Contributor

@Askir Askir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, this is super cool. Thank you so much for providing a PR already in an early stage so we can give some input.

I tried to answer some of your questions, but didn't look at everything in detail yet.

projects/pgai/pgai/vectorizer/embeddings.py Outdated Show resolved Hide resolved

temp_file.close()

client = openai.OpenAI() # TODO there has to be a client already which I could use instead?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._embedder is the property that's currently used but I don't think this exposes the batch embedding api.
Probably makes sense to unify this under a simple self._client property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'd just add that as a new property? AFAIK just initializing will look for credentials from the environment, is that what we want?

@@ -464,13 +467,177 @@ async def run(self) -> int:
await register_vector_async(conn)
while True:
if not self._continue_processing(loops, res):
await self._check_and_process_openai_batches(conn)
# TODO how can we run this only after hitting the rate limit of the normal openai batch embedding api?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not entirely sure this is how it should work.
My gut idea would be to have a flag on the create_vectorizer call that allows enabling and disabling the batch_api. I'd then also put a configurable threshhold there, which would trigger the batch calls when the queue grows e.g. larger than 1000 items?

Not entirely sure about this interface. Maybe @alejandrodnm has another idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think making this configurable makes sense. It will be easier to implement at least 😁

How would I create a new config variable for this? I know how to add a new env variable for this, but maybe that's not what we want?

return res
items_processed = await self._do_batch(conn)
if items_processed == 0:
return res
res += items_processed
loops += 1

async def _create_batch_table(self, conn: AsyncConnection):
# TODO this does not feel like the way to go, is there a way to do these kind of migrations properly?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a look at the extension code and the create_vectorizer function in particular. We create e.g. the queue table there so this code should probably also live in there.
I also think we need one of these for each vectorizer and not a shared one for all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine I'll need to create a migration for this? Do I need to redefine everything or only the create_vectorizer function?

Copy link
Contributor Author

@kolaente kolaente Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added it to a new migration, please have a look.

projects/pgai/pgai/vectorizer/vectorizer.py Outdated Show resolved Hide resolved
@alejandrodnm alejandrodnm self-assigned this Dec 10, 2024
@alejandrodnm
Copy link
Contributor

Hey @kolaente, awesome work. I'm going to start reviewing this. Since this is a non-trivial change, we are going to consider this a Strawman PR

https://github.com/timescale/pgai/blob/main/CONTRIBUTING.md#rfc-stages

Do you think you could explain more the solution to the feature? Considerations you've made? Something like:

  • Are you creating new tables? (I saw the batches table briefly)
  • How are you changing the vectorizer to deal with batches? For example, on every loop of the vectorizer a new batch is created, the batch is stored in the new table.
  • When are the batches retrieved?
  • What happens on errors?

I'm already reviewing and leaving some feedback.

Copy link
Contributor

@alejandrodnm alejandrodnm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an initial pass on the sql code.

I'm not a SQL or extensions experts. This will have to be re-reviewed by my other colleagues.

I haven't reviewed the python code. But, if you add a new openai batch implementation, you can create a new OpenAIBatch embedder in projects/pgai/pgai/vectorizer/embeddings.py, and let that drive the batching logic.

I haven't review that part, I'm guessing we'll need something else to trigger the fetch of the batches.

Can you explain how do you plan to handle that part? In the same loop? Another worker?

projects/extension/sql/ai--0.6.0.sql Outdated Show resolved Hide resolved
-- create the batches table
select pg_catalog.format
( $sql$create table %I.%I(
openai_batch_id VARCHAR(255) PRIMARY KEY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
openai_batch_id VARCHAR(255) PRIMARY KEY,
id VARCHAR(255) PRIMARY KEY,

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think these could be generalize to other non-openai batch APIs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ID I'm using here is the one that's returned by OpenAI, that's why I called it that way. Renaming won't be a problem, but how can I make it clear then this ID is "owned" by OpenAI?

I'll have to take another look, but I think OpenAI is the only provider with a batch api for embeddings.

Copy link
Contributor Author

@kolaente kolaente Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took another look and it seems like cohere is the only other currently supported provider which supports batch embedding jobs like OpenAI's. I would assume there will be others in the future at some point. I could abstract my implementation so that it will be possible to add other providers in the future, but then the "only do this once you hit a rate limit" approach will be more complicated.

Unsure how to proceed here?

projects/extension/sql/ai--0.6.0.sql Outdated Show resolved Hide resolved

-- create batch embedding tables
select (embedding operator (pg_catalog.->> 'implementation'))::text into _implementation;
if _implementation = 'openai' then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be created only if the embedding implementation the user set is for openai batch. We have, I think 2 options. Add a new boolean attribute to the current ai.embedding_openai like batch_api=true, or use_batch_api=true. The other is to create a new embedding configuration for openai batch api.

The benefit I see of creating a new configuration, is that it'll be easier to handle the different logics. Also, both APIs don't support the exact same options, for example, the batch API doesn't support the user option and the non-batch doesn't require a completion_window.

You can create a new implementation and can call it ai.embedding_openai_batch. You can use the ai.embedding_openai one for reference:

https://github.com/timescale/pgai/blob/59c32947972bea851193a6a382006c169eed1969/projects/extension/sql/idempotent/008-embedding.sql?5C#L3-L22

You can create it in the same file.

@kolaente
Copy link
Contributor Author

kolaente commented Dec 10, 2024

@alejandrodnm Thanks for the review! Let me outline the solution more generally:

I've discovered the problem I'm solving here in an application we're building, which creates embeddings from a lot of documents. The documents are basically everything the company has ever done, around 3m text documents. We were ingesting so much data into the database that we'd hit OpenAI's rate limit. It would have taken multiple months just with the limits of OpenAI's regular embeddings api. To solve this, I've implemented a system like the one in this PR, which batches the changes and then uses OpenAI's batch api to create embeddings. Because that has significantly higher rate limits, this would be a lot faster, even though a batch might take 24h to complete.
I've built this into my Laravel application and it seems to work. Now, I want to migrate everything over to pgai as I can then retire the chunking service that I've built, which is another part of the application.

The implementation here is a (almost) 1:1 translation of the one I implemented in my Laravel application. The main difference is the trigger - here it should automatically start to use the batch api instead of requiring a manual trigger (in my Laravel app, this was a cli command which would submit everything that has no embeddings yet to the batch api).

I've tried my best in creating a diagram to illustrate how I think it should work:

image

@kolaente
Copy link
Contributor Author

I haven't reviewed the python code. But, if you add a new openai batch implementation, you can create a new OpenAIBatch embedder in projects/pgai/pgai/vectorizer/embeddings.py, and let that drive the batching logic.

AFAIK there can (or should) be only one vectorizer per table? Because then if I implement the batch processing as another vectorizer, the "fallback after hitting OpenAI's rate limit" will not work.

I haven't review that part, I'm guessing we'll need something else to trigger the fetch of the batches.

Can you explain how do you plan to handle that part? In the same loop? Another worker?

This should be a seperate loop, but could also be included in the openai worker main loop.

@alejandrodnm
Copy link
Contributor

alejandrodnm commented Dec 12, 2024

Here’s a proofread version with simple, clear language:

Comment: Implementing Batch API Support for Embedding Generation

Ok, I'm going to do a brain dump of how I understand the problem, the proposed approach, some considerations, alternatives, questions, etc. I'll share this with others on the team for their input.

The Batch API introduces extra steps to the workflow. Instead of directly making an embedding request, multiple request payloads are written into a file that is uploaded to OpenAI. Then, a batch job request is made using the file ID, which returns the batch ID. This batch ID needs to be stored. It’s used to check if OpenAI has finished processing the batch. Once a batch is completed, the embeddings are downloaded and saved to the database.

A batch file can contain up to 50,000 embedding inputs across all requests and has a size limit of 200 MB.

About files: I read that OpenAI keeps the files until they are manually deleted by the user. I’m not entirely sure about the implications of this since it’s not well-documented. If we decide to delete them manually, we need to store file IDs to make the extra requests for deletion.

Key Changes

  1. Batch Table as a Queue
    A new table will store batch IDs, acting as a queue similar to the one used for source object IDs needing vectorization. @kolaente, this is already part of your changes. Do we need to store all the batch metadata? I think storing just the batch ID and the last time it was checked is enough.

    Small note: @kolaente, the query you added to fetch batches isn’t safe for concurrent usage. We’ll need to tweak it.

  2. Workflow Adjustments
    The flow for sending chunks, generating batches, and saving batch IDs might replace the current process, though I might be missing something. The key change is switching from storing embeddings in the store table to saving the batch ID. Existing logic, like deleting items from the vectorizer queue or fetching items to embed, should still work. My only concern is that this might result in many conditional (if) branches across the codebase.

  3. Mapping Batches to Source Table
    Right now, we take the primary key of the source, generate chunks, and use that when inserting into the store table. Since batches are asynchronous, we lose the PK information during batch creation.

    How do we match batch content back to the source table? @kolaente's implementation sends each chunk in a different request object, using a chunk ID (a combination of the source PK and chunk sequence) as the custom_id.

    Alternatively, if parsing all responses becomes too resource-intensive, we could reduce the number of requests written to the file by grouping all chunks from the same source item into a single request's input fiels. In this case:

    • The source PK could be the custom_id.
    • The array index could represent the chunk sequence.

    However, this would change the current behavior. Filtering out oversized chunks for retries, as we do now, might become too complicated.

Batch Processing

  1. Proposed Worker Logic
    @kolaente suggests handling batch processing in Worker.run() after the regular queue is emptied. I thought about splitting the process into two workers—one for creating batch objects and another for processing them—but that feels over-complicated. Given the 50k input or 200 MB limit per batch, I expect batch creation to be quick, with most time spent processing batches.

    The only potential issue I see is in systems with continuous data ingestion, where the regular queue might never fully empty, delaying batch processing. I’d like input from others on this.

  2. Cloud Functions and Time Limits
    On time-limited platforms like our cloud functions, how do we minimize wasted effort during restarts? It would be useful to measure processing time for a near-limit batch file to decide if we need to adjust worker exit conditions.

Batch Queue Handling

We can use a similar mechanism as the regular queue by adding a last_processed_at column and filtering for batches not processed in the last X minutes or hours. For completed batches:

  • Download the file.
  • Save the embeddings.
  • Remove the batch from the queue.

For incomplete batches, update last_processed_at to avoid redundant retrievals in the same worker run.

Alternatives and OpenAI API Considerations

  1. Deleting Files
    Should we delete processed files? If yes, we’ll need to store file IDs to make the extra requests for deletion.

  2. Listing Completed Jobs
    OpenAI’s API lets us list up to 100 batch jobs at a time with pagination. An alternative is fetching all completed jobs and matching them to our queue by batch ID. However, this could result in very large lists, and OpenAI’s batch retention policy isn’t clearly defined.

  3. New Embedding Implementation
    We could either:

    • Extend the existing OpenAI embedding class with a new attribute for API type. Batch API has attributes like completion_window, which is currently fixed at 24h. We could hardcode it for now, but future changes might complicate this.
    • Create a new embedding implementation, e.g., embedding_openai_batch.

    Should we add a Batch interface to reduce branching logic with isinstance?

Use Case Questions

  1. Rate Limit Transition
    @kolaente mentioned dynamically switching between non-batch and batch APIs based on rate limits. Is this something we want to support? It might add significant complexity.

  2. Switching Between APIs
    Batch API works well for backfilling large datasets, but for new data, switching back to the non-batch API might make sense. If this is implemented as an attribute on the existing openAI embedding implementation, then the switch would be just updating the configuration, we'd have to make sure we update the vectorizer information after a loop run.

    If it's implement as a separate embedding implementation, then user's would have to:

    • Drop the vectorizer.
    • Create a new non-batch vectorizer pointing to the same store, ensuring consistent configurations (e.g., same table name, model).
    • Prevent existing source items from re-entering the queue. New changes to the source table would need to be added.

This is a lot, and I’m sure I missed some things. Let’s get input from others to refine this approach.

@kolaente kolaente force-pushed the feature/openai-batch-processing branch 2 times, most recently from eb5dc7b to 03fd259 Compare December 16, 2024 12:03
@kolaente kolaente force-pushed the feature/openai-batch-processing branch from 03fd259 to cdad4bc Compare December 16, 2024 12:13
@kolaente
Copy link
Contributor Author

About files: I read that OpenAI keeps the files until they are manually deleted by the user. I’m not entirely sure about the implications of this since it’s not well-documented. If we decide to delete them manually, we need to store file IDs to make the extra requests for deletion.

Looks like it. Since there's a limit on the total size of files in an org, we should delete the files once the batch is processed.

Do we need to store all the batch metadata? I think storing just the batch ID and the last time it was checked is enough.

I think we should at least store the status, that way it is possible to check the status of in progress batches with one sql query. I've used that to get a quick overview in my application about this.

The flow for sending chunks, generating batches, and saving batch IDs might replace the current process, though I might be missing something.

I would think of it more like extending the current process, but it's also possible to model it separately.

The array index could represent the chunk sequence.

The problem with this approach is the output order is not guaranteed. We should not rely on the order and only use the custom_id.

Filtering out oversized chunks for retries, as we do now, might become too complicated.

I think we can circumvent this by leaving well enough margin to the batch limit and send more batches instead.

The only potential issue I see is in systems with continuous data ingestion, where the regular queue might never fully empty, delaying batch processing.

If the regular queue is never fully empty, doesn't that indicate some kind of backstop?

@kolaente mentioned dynamically switching between non-batch and batch APIs based on rate limits. Is this something we want to support? It might add significant complexity.

This is my original use-case, that's why I proposed this. I'm unsure about moving the implementation to a separate vectorizer because then it's an either or decision. Ideally, I'd like to use this for backfilling large amounts of data and use the current implementation for everything else.

@smoya
Copy link
Contributor

smoya commented Dec 17, 2024

I think we should at least store the status, that way it is possible to check the status of in progress batches with one sql query. I've used that to get a quick overview in my application about this.

What would all the status be? Correct me if I'm wrong but from how I see it, that column will report the last status but not the real up-to-date status. Wondering if we really need to have such info. WDYT?

@alejandrodnm
Copy link
Contributor

alejandrodnm commented Dec 17, 2024

I think we should at least store the status, that way it is possible to check the status of in progress batches with one sql query. I've used that to get a quick overview in my application about this.

What would all the status be? Correct me if I'm wrong but from how I see it, that column will report the last status but not the real up-to-date status. Wondering if we really need to have such info. WDYT?

I see a use for this. When going over the queue of batches to import we can have a flow like this:

  • Lock the row with select for update skip locked, and update a last_checked, or delay column in the row. This will make it concurrently safe, and by updating the field, we increase the delay so if the import fails, or the batch is not ready we don't check it again until significant time has passed.
  • Create a save point. This save point allows the import to fail, but maintain the delay of the transaction.
  • Import the file and update or not the status to done.
  • Create a save point. This save point allows to keep the done status, so that if deleting the file fails, or we exit at this point, we can just retry the file deletion, and not re-importing.
  • Delete the file.
  • Delete the item from the queue.

@kolaente Do you need the list of files/batches you already imported? Once a batch is imported and the file deleted, having them still in the DB might not give you any kind of value. The important thing is the embeddings, and you can find which items in your source queue have not been embedded.

You'd still have 2 queue, the regular embedding queue, which tells you the items that need to be embedded. The other is the batches queue, which will list, pending batches, or batches that have been imported but the files have not been deleted.

Ideally, if a batch doesn't have a problem, you won't see it as done in the queue.


To support the use case of switching seamlessly between batch and non-batch API lets go with extending the existing embeddings configuration.

I'll leave out some pointers in the PR diff, because there's something else I want you to move there.

With this approach, you'd just have to run an UPDATE statement to update the vectorizer config to switch between the 2 APIs. Is that good enough for your use case? I'm a little hesitant about auto fallback to batch API.


The problem with this approach is the output order is not guaranteed. We should not rely on the order and only use the custom_id.

That docs says that the order of the requests is not guaranteed. Each request will have a custom_id, that's correct. Your implementation matches one request to a chunk, and that's also correct. My suggestion, is to reduce the number of requests we create in the file, instead of one request per chunk, we can do one request per document, making the chunk_id equal to the pk.

In an embed request (not the batch file) you can send a string (or list of tokens), or a list of strings (or list of list of tokens). When you use the list of strings (or list of list of tokens) the response returns a list of the same size with the embeddings, the positions of the items in the response (embeddings) match the position of the items send in the input. See this forum discussion. This is kind of minor, we can start with what you did, the limits are the same either way, maybe less requests can be processed faster by openAI, but I don't know.


If the regular queue is never fully empty, doesn't that indicate some kind of backstop?

I don't know if it would be a real case scenario, just wanted to pointed out. Your implementation waits for all the batches to be created (no more items in the regular queue), before start polling for finished batches. If you have a system that keeps adding items in the queue (continuous insert/updates), you'll be forever creating batches and never processing them. Again, this might not be a real scenario, and we expect the batch API to be used for backfilling. We can leave as it is, and come back to it later if there's any issues.


I'm going to do another pass, now that we've settled on some things, and understand the problem better. It'll mostly be:

  • Extending the openAI embeddings configuration with the batch properties.
  • Handling the batch queue in a concurrently safe manner. Adding save points that allow us to recover in case of failures.

@kolaente
Copy link
Contributor Author

we increase the delay so if the import fails, or the batch is not ready we don't check it again until significant time has passed.

What do you think is significant? From my experience, most batches complete faster than the 24h window, so it would make sense to check a lot more often.

@kolaente Do you need the list of files/batches you already imported? Once a batch is imported and the file deleted, having them still in the DB might not give you any kind of value. The important thing is the embeddings, and you can find which items in your source queue have not been embedded.

I don't think there's a need for having old batch data once the embeddings are stored.

With this approach, you'd just have to run an UPDATE statement to update the vectorizer config to switch between the 2 APIs. Is that good enough for your use case? I'm a little hesitant about auto fallback to batch API.

It will require a little more plumbing on my side, but probably work for the use-case yeah.

What makes you hesitant about auto fallback?

instead of one request per chunk, we can do one request per document, making the chunk_id equal to the pk.

Gotcha. I didn't know the embeddings api could handle more than one input string in a single request.

This is kind of minor, we can start with what you did, the limits are the same either way, maybe less requests can be processed faster by openAI, but I don't know.

Both is fine by me. Which of the two would increase the chances of this getting merged?

Your implementation waits for all the batches to be created (no more items in the regular queue), before start polling for finished batches. If you have a system that keeps adding items in the queue (continuous insert/updates), you'll be forever creating batches and never processing them.

Ahhhh yes, that should be changed. My intention was to check batches in every run of the worker, but only after the current embedding run was done and processed. How would I add this? Seems like I didn't understand the code good enough.

Copy link
Contributor

@alejandrodnm alejandrodnm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the previous discussion. We are going to add an use_batch_api attribute to ai.embedding_openai, these changes should allow us to easily change between the 2 APIs with something like:

UPDATE ai.vectorizer
 SET config = jsonb_set(
     config,
     '{embedding,use_batch_api}',
     'false',
     false
 )
 WHERE id = 1;

This review only covers the extension part. I'll do another for the python code.

There's a part missing, which is updating existing vectorizers. I think we can tackle that in a different PR.

projects/extension/sql/idempotent/013-vectorizer-api.sql Outdated Show resolved Hide resolved
projects/extension/sql/idempotent/013-vectorizer-api.sql Outdated Show resolved Hide resolved
Comment on lines 237 to 252
-- make sure embedding batch table name is available
if pg_catalog.to_regclass(pg_catalog.format('%I.%I', embedding_batch_schema, embedding_batch_table)) is not null then
raise exception 'an object named %.% already exists. specify an alternate embedding_batch_table explicitly', queue_schema, queue_table;
end if;

-- make sure embedding batch chunks table name is available
if pg_catalog.to_regclass(pg_catalog.format('%I.%I', embedding_batch_schema, embedding_batch_chunks_table)) is not null then
raise exception 'an object named %.% already exists. specify an alternate embedding_batch_chunks_table explicitly', queue_schema, queue_table;
end if;

perform ai._vectorizer_create_embedding_batches_table
(embedding_batch_schema
, embedding_batch_table
, embedding_batch_chunks_table
, grant_to
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this logic to its own function. It'll receive the embedding variable as argument, and it'll retrieve the schema and name from it.

Copy link
Contributor Author

@kolaente kolaente Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it make sense to move the check if the tables exist to ai._validate_embedding?

Then all that's left here would be calling ai._vectorizer_create_embedding_batches_table.

projects/extension/sql/idempotent/013-vectorizer-api.sql Outdated Show resolved Hide resolved
projects/extension/sql/idempotent/016-openai-batch-api.sql Outdated Show resolved Hide resolved
Copy link
Contributor

@alejandrodnm alejandrodnm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second review, I need to stop a little for now. I'll try to continue later today.

projects/pgai/pgai/vectorizer/vectorizer.py Outdated Show resolved Hide resolved
projects/extension/sql/idempotent/016-openai-batch-api.sql Outdated Show resolved Hide resolved
Comment on lines 334 to 335
SELECT openai_batch_id, output_file_id FROM {}.{}
WHERE status not in('failed', 'processed', 'prepared')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SELECT openai_batch_id, output_file_id FROM {}.{}
WHERE status not in('failed', 'processed', 'prepared')
@cached_property
def fetch_batches_to_process_query(self) -> sql.Composed:
if not isinstance(self.vectorizer.config.embedding, OpenAI):
raise Exception("batch support is only available for openai")
batch_schema = self.vectorizer.config.embedding.batch_schema
batch_table = self.vectorizer.config.embedding.batch_table
return sql.SQL(
"""
WITH locked_rows AS (
SELECT openai_batch_id
FROM {batch_table}
WHERE next_attempt_after is null or next_attempt_after < NOW()
ORDER BY created_at DESC
LIMIT 1
FOR UPDATE SKIP LOCKED
),
UPDATE
{batch_table} batches
SET
total_attempts = batches.total_attempts + 1,
next_attempt_after = %s
FROM
locked_rows l
WHERE
l.openai_batch_id = cfw.openai_batch_id
RETURNING l.openai_batch_id
"""
).format(batch_table=sql.Identifier(batch_schema, batch_table))

We'd need to double check this query. The gist is that we lock one batch to make the query concurrently safe. Only batches that match next_attempt_after is null or next_attempt_after < NOW() are returned, that means that we can set a backoff for checking batches that we recently checked and are not ready. It also updates a counter that could help with determining if the backoff is too aggressive.

We could also make a Batcher abstract class, and check against that, but let's keep it simple for now.

Copy link
Contributor Author

@kolaente kolaente Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense.

Does that also mean changing the implementation so that only in-progress batches are stored in the database, getting rid of storing and evaluating the status? I noticed your query does not contain the WHERE status not in('failed', 'processed', 'prepared') anymore.
Because then we should also remove these from the table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now implemented the changes to the query as per your suggestion.

@kolaente
Copy link
Contributor Author

There's a part missing, which is updating existing vectorizers. I think we can tackle that in a different PR.

If we add an attribute to use the batch embedding api, do we really need to update existing vectorizers? Can't we just evaluate if the attribute exists and fall back to false if it does not exist?

@@ -466,13 +552,117 @@ async def run(self) -> int:
await self.vectorizer.config.embedding.setup()
while True:
if not self._continue_processing(loops, res):
await self._check_and_process_openai_batches(conn)
Copy link
Contributor

@alejandrodnm alejandrodnm Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's extend the Embedder abstract class

Add a new method to signal if the API we are using is async or not. I was thinking of:

    @abstractmethod
    def is_api_async(self) -> bool:
        return false 

Then for our openAI implementation we can override this with return self.use_batch_api. I choose is_api_async instead of is_batch_api, because other providers call it differently. For example, cohere calls them embedding jobs https://docs.cohere.com/v2/reference/create-embed-job . I think calling them async conveys better the message for multiple providers.

We can then wrap this code like:

if self.vectorizer.config.embedding.is_api_async():
    res = self.process_async_embeddings()
return res

The res variable returns the number of embeddings created. When using async embeddings, then we need to update the value with the actual amount of embeddings that were generated.

That process_async_embeddings function should encapsulate the work you've done in:

  • _check_and_process_openai_batches
  • _do_openai_batch

The way I see it it should be something like:

  • Open a transaction.
  • Pull an item from the queue. This will update the total_attempts, and next_attempt_after.
  • Create a savepoint. We want the changes we made so far to always be committed, you can wrap what comes next in a try/catch and commit in finally. To handle errors we are going to be creating more savepoints. Any errors should be re-raised after committing the changes up to this savepoint.
  • Check the batch status. This should be done by the OpenAI embedder. We could make this part of the Embedder class, a fetch_async_embedding_status. This should return if the embedding is ready or not.
  • If the batch is not ready. We commit and try another batch.
  • If the batch is imported, then we skip to cleaning up the file (this is explained below).
  • If the batch is ready we process it. Add to the Embedder class a new function called process_async_embedding. The implementation will go into the OpenAI embedder. That'll solve the comment you have here about how to reuse the openAI client. Let's not get fancy here yet, let's do the most simple thing possible, if there are performance issues we can find better ways to handle them later. This function will return the embeddings.
  • Insert the embeddings into the database.
  • Mark the batch job as imported, or something like that. The point of this status is to signal that we imported the embeddings, but we haven't cleaned up the file. Maybe there was an error the last time we tried.
  • Create a savepoint. At this point we already imported the embeddings. So we want this committed if the next steps fails (cleaning the file).
  • Clean up the async embedding. Add to the Embedder class a new function called finalize_async_embedding, the openAI implementation will delete the file. You can make it accept the whole row if you want
  • Delete the batch item from the queue. We don't need them anymore, and keeping the queue smaller will make things smoother.

@kolaente is this clear?

Copy link
Contributor Author

@kolaente kolaente Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! (and a lot better than what I originally cooked up)

Mark the batch job as imported, or something like that. The point of this status is to signal that we imported the embeddings, but we haven't cleaned up the file. Maybe there was an error the last time we tried.

You're talking about cleaning up the file stored in openai here?

I'll take a stab at implementing this tomorrow or Monday.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now implemented this, please take another look.

I also added a new type AsyncBatch to make passing the data from the database around a little clearer wrt what to expect.
The implementation seems to have cleared a few items from my checklist (first comment here) as well.

projects/pgai/pgai/vectorizer/vectorizer.py Outdated Show resolved Hide resolved
@alejandrodnm
Copy link
Contributor

There's a part missing, which is updating existing vectorizers. I think we can tackle that in a different PR.

If we add an attribute to use the batch embedding api, do we really need to update existing vectorizers? Can't we just evaluate if the attribute exists and fall back to false if it does not exist?

My point here is what would happen for an existing vectorizer that would like to change to the async API once the feature is release. But let's not worry about that for now.

@alejandrodnm
Copy link
Contributor

@kolaente if you'd like to reach out to me in the pgai discord

https://discord.com/channels/1246241636019605616/1250186765214548019

I'm there as adn

@kolaente
Copy link
Contributor Author

With that being done, I'll be pretty much offline for the next two weeks (Jan 6th) - happy holidays etc :) so please don't expect me to respond quickly.

@alejandrodnm
Copy link
Contributor

With that being done, I'll be pretty much offline for the next two weeks (Jan 6th) - happy holidays etc :) so please don't expect me to respond quickly.

I'm also out for the holidays. I'll try reviewing on the 7th when I get back.

Thanks for your effort. We really appreciate it.

@alejandrodnm
Copy link
Contributor

alejandrodnm commented Jan 7, 2025

Hey @kolaente , happy new year. I did a brief pass. Things are looking better. Good job.

We require some tests for the new feature. I was thinking.

  • Correct embedding and creation of the jobs.
  • Correct processing of the job.
  • Failures in each of the savepoints, commit the data we want and can be restarted.
    • Fetch a new batch job to process, but there's an error processing it commits the update to next_attempt_after.
    • A batch that's processed but there's an error deleting the file, commits the embeddings, and the batch status.
    • A batch that has been processed, when being fetched from the queue again, tries to delete the file, not update the embeddings.

There are also some lint issues.

You can run the tests with just ci.

WDYT?

PS: I know we've been going long with this, but it's not a trivial change. If at some point you feel like you don't want to work on it anymore, we can take it over. Just wanted to let you know.

Cheers.

@kolaente
Copy link
Contributor Author

kolaente commented Jan 9, 2025

Hey @alejandrodnm, I'll probably do another round on this next week. I'll add some tests as you outlined.

How would I trigger the errors, using mocks in the openai client?

@alejandrodnm
Copy link
Contributor

Hey @kolaente That's an option. Another could be to set up the vcr cassette to return a 500 http error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support bulk operations
4 participants