Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly include Filter Data step in ingestion server removal IP #4524

Merged
merged 2 commits into from
Jun 21, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ steps, which will be discussed separately in this IP:
entire contents of the upstream media table are copied into a new temp table
in the API database. This temp table will later replace the main media table
in the API.
- **Clean Data**: Data clean up which includes removing denylisted tags and
cleaning URLs. This step will become unnecessary when the
- **Clean Data**: Data clean up which includes cleaning URLs. This step will
become unnecessary when the
[Catalog Data Cleaning project](https://github.com/WordPress/openverse/issues/430)
is completed, which moves all data cleaning steps upstream into the initial
provider ingestion process.
- **Filter Data**: Separate from the clean up step (but currently included as
part of it). Filter out denylisted tags and machine-generated tags which are
below our current confidence threshold. This will _not_ be a part of the data
cleaning project and must be carried forward.
- **Create Index**: Create a new Elasticsearch index, matching the configuration
of the existing media index.
- **Distributed Reindex**: Convert each record from the new temp table to the
Expand Down Expand Up @@ -267,8 +271,8 @@ developed as separate DAGs alongside the current ones.
1. Create a new data refresh factory to generate new data refresh DAGs for each
media type for staging and production, with the
`<environment>_<media>_data_refresh` DAG id. For simplicity of code review,
these initial DAGs should only perform the `Copy Data`, and `Create Index`
steps, which we will perform entirely in Airflow.
these initial DAGs should only perform the `Copy Data`, `Filter Data`, and
`Create Index` steps, which we will perform entirely in Airflow.
1. Create a new `catalog-indexer-worker` in the Catalog, and build the new
indexer worker image locally.
1. Add the distributed reindexing step to the DAG (excludes infrastructure
Expand Down Expand Up @@ -333,14 +337,18 @@ already implemented in the current data refresh and can simply be copied:
We will include new tasks to perform the initial few steps of the ingestion
server's work:

- Copy Data: this should be a TaskGroup that will have multiple tasks for
- `Copy Data`: this should be a TaskGroup that will have multiple tasks for
creating the FDW from the upstream DB to the downstream DB, running the
copy_data query, and so on. It should fully replace the implementation of
[`refresh_api_table`](https://github.com/WordPress/openverse/blob/05ff48d05f2163104151c5589cf352a156bc6a97/ingestion_server/ingestion_server/ingest.py#L248)
in the ingestion server. All steps in this section are SQL queries that can be
implemented using the existing
[PostgresHook and PGExecuteQueryOperator](https://github.com/WordPress/openverse/blob/05ff48d05f2163104151c5589cf352a156bc6a97/catalog/dags/common/sql.py).
- Create Index: we can use our existing
- `Filter Data`: initially, this should be a single python task which exactly
mirrors the behavior of the
[`clean_image_data` function of `cleanup.py`](https://github.com/WordPress/openverse/blob/47fe5df0e9b8ad3dba06021f4cd4af9139977644/ingestion_server/ingestion_server/cleanup.py#L295)
(only applying the tag-specific steps) on the ingestion server[^3].
- `Create Index`: we can use our existing
[Elasticsearch tasks](https://github.com/WordPress/openverse/blob/05ff48d05f2163104151c5589cf352a156bc6a97/catalog/dags/common/elasticsearch.py#L82)
to create the new elasticsearch index with the index suffix generated in the
previous task.
Expand Down Expand Up @@ -609,3 +617,52 @@ monitor the first production data refreshes closely.

[^2]:
https://towardsdatascience.com/using-apache-airflow-dockeroperator-with-docker-compose-57d0217c8219

[^3]:
See #4456 for further context on this. The filtering is a _necessary_ step
of the data refresh we need to carry forward even after removing the other
cleanup steps. There are a number of ways we can accomplish this, but by far
AetherUnbound marked this conversation as resolved.
Show resolved Hide resolved
the easiest would be to directly map the functionality of the ingestion
server on this step within a single Airflow task. The steps for this task
are as follows:

1. Get a batch of records from the database using `CLEANUP_BUFFER_SIZE`
2. Divide batch up into `multiprocessing.cpu_count()` subbatches
3. Split the filtering up into separate workers using multiprocessing
4. On each process
1. Create a new DB connection & cursor per worker
2. Iterate through each record
1. Remove tags below confidence level
2. Remove tags that need to be filtered (denylisted, machine-generated
filter list, provider, etc)
3. Only surface the record if it needs to be changed
4. Update each records one by one with a single `UPDATE`
3. Commit cursor and close connection
5. Repeat steps 1-4 until all batches are consumed

The Airflow scheduler container has access to 4 cores, which is the same as
the ingestion server where this step was originally running. At present it
takes about 8 hours for all cleanup steps, but that includes the URL
cleaning which is certainly more time intensive than the tag filtering since
it makes outbound requests. Running the tag filtering on Airflow should not
impact any of the other running tasks or saturate the instance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but that includes the URL cleaning which is certainly more time intensive than the tag filtering since it makes outbound requests.

Great point I totally hadn't thought of!


There are a few ways this process could be improved, but none of them are
required _at this moment_. We can follow up after this project is complete
to assess what further optimizations might be necessary at this step. Some
potential suggestions for that time:

- Instead of single `UPDATE` queries for each affected records, we could
insert records from each subbatch to a temporary table. Then the base
table could be updated with an `UPDATE ... FROM` in bulk. Since the
indices haven't been applied to the base table yet, this should be fairly
performant.
- Instead of using multiprocessing, we could pre-define the batches and run
the filtering chunks on a set of mapped tasks. The multiprocessing has the
benefit of iterating over a cursor on the database rather than having to
manage the record ranges explicitly, but this would allow further
parallelization and task management.
- The indexer workers themselves could be expanded to run on larger chunks
of the database for this filtering. This would likely require the most
work as it would involve expanding the indexer workers' API to handle this
additional task.
Loading