Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distributed reindex steps #4572

Merged
merged 20 commits into from
Aug 23, 2024
Merged

Add distributed reindex steps #4572

merged 20 commits into from
Aug 23, 2024

Conversation

stacimc
Copy link
Contributor

@stacimc stacimc commented Jun 29, 2024

Fixes

Fixes #4148 by @stacimc

Description

Adds steps to the new data refresh DAGs to create the temporary ES index (which will later be promoted), and populating it via the distributed reindex:

Screenshot 2024-08-19 at 4 30 34 PM

Testing Instructions

./ov just api/init && ./ov just up and then run a few DAGs in order to ingest some new data (I used Jamendo for audio and Cleveland for image, and manually marked them as success as soon as some records were recorded). Then run the new data refresh DAGs (eg staging_audio_data_refresh). They should run successfully, but skip the tasks to create/terminate EC2 instances. Go to elasticvue and you should see a new index matching the output of the generate_index_name task.

It should have no aliases. If you refresh the index in Elasticvue, you'll also see that it has the expected number of records (so for example, if you started from sample data the main audio index will have 5000 records. If you ingested 100 records from Jamendo before running the data refresh, your new temp index will have 5100 records.)

Run ./ov just api/pgcli and select count(*) from audio (or image, depending on which data refresh you ran). Then verify the count is the same as the number of docs in the new index in Elasticvue.

Also note that you'll see somewhat unexpected behavior if you run the DAGs multiple times in a row right now, because we don't yet delete the temp table. That's expected! If you need to run it a second time, just drop the temp import tables manually in the API DB first.

It's not simple to test this against the staging environment when running locally because of the requirement that we skip these tasks locally, but I did so by configuring the AIRFLOW_CONN_AWS_DEFAULT locally and commenting out the logic to skip. This should technically be safe even in production because no data is destroyed or promoted, but if you do want to test this you should ONLY run the staging DAGs to be safe, and we should clean up the dangling tables/indices afterward. There are additional issues in the milestone for testing this formally in those environments so I do not think it is a requirement for testers here.

Checklist

  • My pull request has a descriptive title (not a vague title likeUpdate index.md).
  • My pull request targets the default branch of the repository (main) or a parent feature branch.
  • My commit messages follow best practices.
  • My code follows the established code style of the repository.
  • I added or updated tests for the changes I made (if applicable).
  • I added or updated documentation (if applicable).
  • I tried running the project locally and verified that there are no visible errors.
  • I ran the DAG documentation generator (./ov just catalog/generate-docs for catalog
    PRs) or the media properties generator (./ov just catalog/generate-docs media-props
    for the catalog or ./ov just api/generate-docs for the API) where applicable.

Developer Certificate of Origin

Developer Certificate of Origin
Developer Certificate of Origin
Version 1.1

Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129

Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.


Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.

@stacimc stacimc added 🟨 priority: medium Not blocking but should be addressed soon 🌟 goal: addition Addition of new feature 💻 aspect: code Concerns the software code in the repository 🧱 stack: catalog Related to the catalog and Airflow DAGs labels Jun 29, 2024
@stacimc stacimc self-assigned this Jun 29, 2024
@stacimc stacimc marked this pull request as ready for review July 22, 2024 20:15
@stacimc stacimc requested a review from a team as a code owner July 22, 2024 20:15
Copy link
Contributor

@obulat obulat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried running this DAG locally. The DAG created the index, but it was empty. After some investigation I saw that the id in my temp_import_audio start with 20001, not 1. Could that be the reason.
Another question: indexer logs say that the data was copied to ES from audio table. Shouldn't we copy from the temporary table?

@AetherUnbound
Copy link
Collaborator

I tried running this DAG locally. The DAG created the index, but it was empty.

I also tried and noticed this as well - I think it was because the index was not refreshed after the documents were added. I hit "Refresh index" in the Elasticvue UI and the 5000 documents showed up!

Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a huge bolus of code, great work on getting this off the ground Staci! I've got a number of comments here, the most blocking one being what @obulat pointed out - we should probably be referencing the temp table on the API rather than the base one for creating the indices. Other than that, I have a few code structure comments, but nothing else major!

catalog/dags/common/elasticsearch.py Show resolved Hide resolved
catalog/dags/data_refresh/create_and_promote_index.py Outdated Show resolved Hide resolved
return f"{media_type}-{uuid.uuid4().hex}"


@task_group(group_id="create_temp_index")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay for being able to do this with decorators!

catalog/dags/data_refresh/distributed_reindex.py Outdated Show resolved Hide resolved
catalog/dags/data_refresh/distributed_reindex.py Outdated Show resolved Hide resolved
catalog/dags/data_refresh/distributed_reindex.py Outdated Show resolved Hide resolved
catalog/dags/data_refresh/distributed_reindex.py Outdated Show resolved Hide resolved
catalog/dags/data_refresh/distributed_reindex.py Outdated Show resolved Hide resolved
@AetherUnbound
Copy link
Collaborator

AetherUnbound commented Jul 31, 2024

One other thought here - given our recent strategy around PR review requirements, it would be great to have some new tests for all of the relevant pieces here alongside this PR. I know much of this is using operators, but for any of the pieces that you think make the most sense 😄

Copy link
Collaborator

@sarayourfriend sarayourfriend left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had some time to review this, so I've left some requested changes. Namely, the blockers for me are:

  1. Use launch template version number to reference what is retrieved in the get_launch_template_version task, so that it is disambiguated from the launch template version resource: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2/client/describe_launch_template_versions.html
  2. Move retrieval of the launch template version number out of the mapped task group to avoid the race condition possible if each task group retrieves the version number independently of the others
  3. Add a default value to the next call to avoid an unhandled StopIteration, or, if there are always values, then replace the get calls with indexer syntax and add a comment explaining why it is safe to proceed without default values.

Otherwise, I agree with Madison's comments, but the rest of mine are not blockers, just suggestions or extra context/information to help you make choices if you want.

catalog/dags/data_refresh/distributed_reindex.py Outdated Show resolved Hide resolved
catalog/dags/data_refresh/distributed_reindex.py Outdated Show resolved Hide resolved
catalog/dags/data_refresh/distributed_reindex.py Outdated Show resolved Hide resolved
catalog/dags/data_refresh/distributed_reindex.py Outdated Show resolved Hide resolved
@openverse-bot
Copy link
Collaborator

Based on the medium urgency of this PR, the following reviewers are being gently reminded to review this PR:

@AetherUnbound
@sarayourfriend
This reminder is being automatically generated due to the urgency configuration.

Excluding weekend1 days, this PR was ready for review 20 day(s) ago. PRs labelled with medium urgency are expected to be reviewed within 4 weekday(s)2.

@stacimc, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings.

Footnotes

  1. Specifically, Saturday and Sunday.

  2. For the purpose of these reminders we treat Monday - Friday as weekdays. Please note that the operation that generates these reminders runs at midnight UTC on Monday - Friday. This means that depending on your timezone, you may be pinged outside of the expected range.

@stacimc stacimc requested a review from obulat August 20, 2024 16:52
Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fantastic, we're getting closer to having this whole thing complete!! 🤩 Just a few small things but nothing blocking, local testing worked as expected 🚀

catalog/dags/data_refresh/distributed_reindex.py Outdated Show resolved Hide resolved


class TempConnectionHTTPOperator(HttpOperator):
def setup_ec2_hook(func: callable) -> callable:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this!

Copy link
Collaborator

@sarayourfriend sarayourfriend left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I've only re-reviewed this from a process perspective rather than testing it locally (didn't want to duplicate what Madison has done) and it makes good sense! Nice work, really excited to see this come together 😀

@obulat
Copy link
Contributor

obulat commented Aug 21, 2024

I managed to run the audio refresh locally.
When running the image refresh, I ran into a couple of problems:

  • get_alter_batches returns weird batches. I only have 15000 items in the temp table, but the batches returned are (5000, 105000)
  • wait for reindex task is always yellow, and I don't know how to debug this. Here's what I see in the Airflow UI logs:
[2024-08-21, 16:23:10 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 460, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/http/sensors/http.py", line 164, in execute
    super().execute(context=context)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sensors/base.py", line 264, in execute
    raise e
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sensors/base.py", line 246, in execute
    poke_return = self.poke(context)
                  ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/http/sensors/http.py", line 149, in poke
    return self.response_check(response, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/catalog/dags/data_refresh/distributed_reindex.py", line 79, in response_check_wait_for_completion
    raise ValueError("An error was encountered during reindexing.")
ValueError: An error was encountered during reindexing.

Copy link
Contributor

@obulat obulat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think something is wrong with my setup: audio refresh ran fine, but running image refresh locally didn't work as expected. I don't want to block on it since you and Madison have successfully run the process locally.
I went through the code in Pycharm and marked all places where it complained of unused function parameters. Are they going to be used in the future, or can they be removed?

@stacimc
Copy link
Contributor Author

stacimc commented Aug 21, 2024

@obulat is there any chance you already had a temp_import_image table in your API DB, perhaps from running the DAG previously (the DAG currently does not drop the import tables)? The errors you're seeing are what I would expect when the import table already exists and are a quirk of the DAG being half-completed.

@stacimc
Copy link
Contributor Author

stacimc commented Aug 23, 2024

I've retested this several times and cannot reproduce @obulat's errors; given that multiple other people are also unable to reproduce, and that I'm fairly sure the issue can be explained by the temp table already existing (which will be fixed as the remaining steps are added to the DAG), I'm going to go ahead and merge. This will be tested many more times before it goes live :)

@stacimc stacimc merged commit 2d364ed into main Aug 23, 2024
45 checks passed
@stacimc stacimc deleted the add/distributed-reindex branch August 23, 2024 16:29
@stacimc stacimc mentioned this pull request Aug 28, 2024
13 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository 🌟 goal: addition Addition of new feature 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Implement local distributed reindexing
5 participants