diff --git a/documentation/projects/proposals/ingestion_server_removal/20240328-implementation_plan_ingestion_server_removal.md b/documentation/projects/proposals/ingestion_server_removal/20240328-implementation_plan_ingestion_server_removal.md index cf3aa8553b9..cd1543b74da 100644 --- a/documentation/projects/proposals/ingestion_server_removal/20240328-implementation_plan_ingestion_server_removal.md +++ b/documentation/projects/proposals/ingestion_server_removal/20240328-implementation_plan_ingestion_server_removal.md @@ -38,11 +38,15 @@ steps, which will be discussed separately in this IP: entire contents of the upstream media table are copied into a new temp table in the API database. This temp table will later replace the main media table in the API. -- **Clean Data**: Data clean up which includes removing denylisted tags and - cleaning URLs. This step will become unnecessary when the +- **Clean Data**: Data clean up which includes cleaning URLs. This step will + become unnecessary when the [Catalog Data Cleaning project](https://github.com/WordPress/openverse/issues/430) is completed, which moves all data cleaning steps upstream into the initial provider ingestion process. +- **Filter Data**: Separate from the clean up step (but currently included as + part of it). Filter out denylisted tags and machine-generated tags which are + below our current confidence threshold. This will _not_ be a part of the data + cleaning project and must be carried forward. - **Create Index**: Create a new Elasticsearch index, matching the configuration of the existing media index. - **Distributed Reindex**: Convert each record from the new temp table to the @@ -267,8 +271,8 @@ developed as separate DAGs alongside the current ones. 1. Create a new data refresh factory to generate new data refresh DAGs for each media type for staging and production, with the `__data_refresh` DAG id. For simplicity of code review, - these initial DAGs should only perform the `Copy Data`, and `Create Index` - steps, which we will perform entirely in Airflow. + these initial DAGs should only perform the `Copy Data`, `Filter Data`, and + `Create Index` steps, which we will perform entirely in Airflow. 1. Create a new `catalog-indexer-worker` in the Catalog, and build the new indexer worker image locally. 1. Add the distributed reindexing step to the DAG (excludes infrastructure @@ -333,14 +337,35 @@ already implemented in the current data refresh and can simply be copied: We will include new tasks to perform the initial few steps of the ingestion server's work: -- Copy Data: this should be a TaskGroup that will have multiple tasks for +- `Copy Data`: this should be a TaskGroup that will have multiple tasks for creating the FDW from the upstream DB to the downstream DB, running the copy_data query, and so on. It should fully replace the implementation of [`refresh_api_table`](https://github.com/WordPress/openverse/blob/05ff48d05f2163104151c5589cf352a156bc6a97/ingestion_server/ingestion_server/ingest.py#L248) in the ingestion server. All steps in this section are SQL queries that can be implemented using the existing [PostgresHook and PGExecuteQueryOperator](https://github.com/WordPress/openverse/blob/05ff48d05f2163104151c5589cf352a156bc6a97/catalog/dags/common/sql.py). -- Create Index: we can use our existing +- `Filter Data`: initially, this should be a single python task which exactly + mirrors the behavior of the + [`clean_image_data` function of `cleanup.py`](https://github.com/WordPress/openverse/blob/47fe5df0e9b8ad3dba06021f4cd4af9139977644/ingestion_server/ingestion_server/cleanup.py#L295) + (only applying the tag-specific steps) on the ingestion server[^3]. The + easiest way to do this would be to directly map the functionality of the + ingestion server on this step within a single Airflow task. The steps for this + task are as follows (see [Alternatives](#filtering-approach) for possible + future directions): + 1. Get a batch of records from the database using `CLEANUP_BUFFER_SIZE` + 2. Divide batch up into `multiprocessing.cpu_count()` subbatches + 3. Split the filtering up into separate workers using multiprocessing + 4. On each process + 1. Create a new DB connection & cursor per worker + 2. Iterate through each record + 1. Remove tags below confidence level + 2. Remove tags that need to be filtered (denylisted, machine-generated + filter list, provider, etc) + 3. Only surface the record if it needs to be changed + 4. Update each records one by one with a single `UPDATE` + 3. Commit cursor and close connection + 5. Repeat steps 1-4 until all batches are consumed +- `Create Index`: we can use our existing [Elasticsearch tasks](https://github.com/WordPress/openverse/blob/05ff48d05f2163104151c5589cf352a156bc6a97/catalog/dags/common/elasticsearch.py#L82) to create the new elasticsearch index with the index suffix generated in the previous task. @@ -557,6 +582,8 @@ No new tools or packages are required. +### ECS approach + The alternative options of using an ECS approach or performing the reindex entirely in Airflow are discussed at length in the [Approach to the Distributed Reindex](#approach-to-the-distributed-reindex) @@ -567,6 +594,37 @@ using EC2 operators to start and stop the instances as needed. However, more infrastructure work is required in this approach, and we would require deployments whenever there are code changes in the indexer workers. +### Filtering approach + +There are a number of ways to accomplish the data filtering, including several +ways to improve the approach mentioned. + +The Airflow scheduler container has access to 4 cores, which is the same as the +ingestion server where this step was originally running. At present, it takes +about 8 hours for all cleanup steps, but that includes the URL cleaning which is +certainly more time intensive than the tag filtering since it makes outbound +requests. Running the tag filtering on Airflow should not impact any of the +other running tasks or saturate the instance. + +There are a few ways this process could be improved, but none of them are +required _at this moment_. We can follow up after this project is complete to +assess what further optimizations might be necessary at this step. Some +potential suggestions for that time: + +- Instead of single `UPDATE` queries for each affected records, we could insert + records from each subbatch to a temporary table. Then the base table could be + updated with an `UPDATE ... FROM` in bulk. Since the indices haven't been + applied to the base table yet, this should be fairly performant. +- Instead of using multiprocessing, we could pre-define the batches and run the + filtering chunks on a set of mapped tasks. The multiprocessing has the benefit + of iterating over a cursor on the database rather than having to manage the + record ranges explicitly, but this would allow further parallelization and + task management. +- The indexer workers themselves could be expanded to run on larger chunks of + the database for this filtering. This would likely require the most work as it + would involve expanding the indexer workers' API to handle this additional + task. + ## Blockers @@ -609,3 +667,8 @@ monitor the first production data refreshes closely. [^2]: https://towardsdatascience.com/using-apache-airflow-dockeroperator-with-docker-compose-57d0217c8219 + +[^3]: + See #4456 for further context on this. The filtering is a _necessary_ step + of the data refresh we need to carry forward even after removing the other + cleanup steps.