Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filter data step to the data refresh DAG #4541

Closed
AetherUnbound opened this issue Jun 21, 2024 · 1 comment · Fixed by #4684
Closed

Add filter data step to the data refresh DAG #4541

AetherUnbound opened this issue Jun 21, 2024 · 1 comment · Fixed by #4684
Assignees
Labels
💻 aspect: code Concerns the software code in the repository ✨ goal: improvement Improvement to an existing user-facing feature 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs 🧱 stack: ingestion server Related to the ingestion/data refresh server

Comments

@AetherUnbound
Copy link
Collaborator

AetherUnbound commented Jun 21, 2024

Description

In addition to the steps outlined in #4149 and #4146, we also need to add the Filter Data step to the data refresh DAG. Initially, this should be a single python task which exactly mirrors the behavior of the clean_image_data function of cleanup.py (only applying the tag-specific steps) on the ingestion server[3]. The easiest way to do this would be to directly map the functionality of the ingestion server on this step within a single Airflow task. The steps for this task are as follows:

  1. Get a batch of records from the database using CLEANUP_BUFFER_SIZE
  2. Divide batch up into multiprocessing.cpu_count() subbatches
  3. Split the filtering up into separate workers using multiprocessing
  4. On each process
    1. Create a new DB connection & cursor per worker
    2. Iterate through each record
      1. Remove tags below confidence level
      2. Remove tags that need to be filtered (denylisted, machine-generated filter list, provider, etc)
      3. Only surface the record if it needs to be changed
      4. Update each records one by one with a single UPDATE
    3. Commit cursor and close connection
  5. Repeat steps 1-4 until all batches are consumed

Alternatives

See Alternatives in the IP for possible future directions.

There are a number of ways to accomplish the data filtering, including several ways to improve the approach mentioned.

The Airflow scheduler container has access to 4 cores, which is the same as the ingestion server where this step was originally running. At present, it takes about 8 hours for all cleanup steps, but that includes the URL cleaning which is certainly more time intensive than the tag filtering since it makes outbound requests. Running the tag filtering on Airflow should not impact any of the other running tasks or saturate the instance.

There are a few ways this process could be improved, but none of them are required at this moment. We can follow up after this project is complete to assess what further optimizations might be necessary at this step. Some potential suggestions for that time:

  • Instead of single UPDATE queries for each affected records, we could insert records from each subbatch to a temporary table. Then the base table could be updated with an UPDATE ... FROM in bulk. Since the indices haven't been applied to the base table yet, this should be fairly performant.

  • Instead of using multiprocessing, we could pre-define the batches and run the filtering chunks on a set of mapped tasks. The multiprocessing has the benefit of iterating over a cursor on the database rather than having to manage the record ranges explicitly, but this would allow further parallelization and task management.

  • The indexer workers themselves could be expanded to run on larger chunks of the database for this filtering. This would likely require the most work as it would involve expanding the indexer workers' API to handle this additional task.

Additional context

See this section of the IP.

@AetherUnbound AetherUnbound added ✨ goal: improvement Improvement to an existing user-facing feature 💻 aspect: code Concerns the software code in the repository 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs 🧱 stack: ingestion server Related to the ingestion/data refresh server labels Jun 21, 2024
@AetherUnbound
Copy link
Collaborator Author

Unfortunately, based on apache/airflow#14896, it looks like we won't be able to use multiprocessing here 😓 I might have to take an approach similar to what @stacimc has done in #4572 - we can work on these steps separately, then combine like functionality after-the-fact once each of the individual pieces are in. Shame this can't be as easy as copying the old code we were using!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository ✨ goal: improvement Improvement to an existing user-facing feature 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs 🧱 stack: ingestion server Related to the ingestion/data refresh server
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

1 participant