Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filtered index and promotion steps to data refresh #4833

Merged
merged 28 commits into from
Sep 16, 2024

Conversation

stacimc
Copy link
Contributor

@stacimc stacimc commented Aug 29, 2024

Fixes

Fixes #4149 by @stacimc

Description

This PR adds the final steps to the data refresh DAG:

  • create and promote the filtered index
  • remap indices from the media table to the temp table
  • remap constraints from the media table to the temp table
  • promote the temp table & delete the original
  • promote the 'main' and filtered indices
Screenshot 2024-08-30 at 4 46 40 PM

This was much less straightforward than I imagined and this PR is fairly large as a consequence (although there are tests, and an es_mapping.py configuration file that's just copied from the ingestion server adding to the line count). I will attempt to break it into smaller pieces if reviewers like; however I'll point out that it's much easier to manually test in its current state, because you can actually run the entire data refresh (even multiple times in a row).

While working on this PR I also noticed several small issues that I fixed at the same time. I have left comments at the relevant code changes to make these easier to spot:

  • I changed the create_index steps to create the new index according to a hard-coded index config, rather than basing it off the config of the current index (see comment inline for reasoning)
  • I fixed a bug where index calculations were off when multiple data refreshes were run concurrently
  • I fixed several bugs with error handling and dependency chains, where the failure of a task would not cause presumed downstream tasks to skip. See commit messages
  • I fixed a bug in the ingestion server with the way constraints are mapped. See commit message

I've tried to make commit messages and comments in the code as descriptive as possible, but please ping me if clarification is needed anywhere.

Testing Instructions

Add the following to your catalog/.env:

AIRFLOW_CONN_SENSITIVE_TERMS="http://catalog_indexer_worker:8003/static/mock_sensitive_terms.txt"

Run ov just recreate to start with fresh sample data.

Run an image and an audio DAG for a few minutes to get some new records. Wait until you see that a couple 100 lines have been written in the pull_data logs, then mark the task as a success. I used Cleveland and Jamendo.

Establish a baseline

Go to https://app.elasticvue.com/cluster/0/indices and verify you see the four initial indices (image-init, image-init-filtered, audio-init, audio-init-filtered), and note the number of records in each. Now run ov just api/pgcli and run describe image; and describe audio;. Save the results somewhere.

Run the new data refreshes and compare

Now enable the staging_audio_data_refresh and staging_image_data_refresh DAGs. They should both complete successfully. Once done, check out elasticvue again. You should see that the -init indices have all been dropped, and replace with new indices with a uuid (example audio-f61ab0e757d244ab9abeca0a0613d9e6, audio-f61ab0e757d244ab9abeca0a0613d9e6-filtered, and so on). You should have more records in each of these new indices than in the initial ones. To be extra safe, you can go check the logs of the provider DAGs you ran to see how many records were ingested; that's the number of extra records you should see in the non-filtered indices.

In pgcli, again run describe image; and describe audio;. Compare the results to what you saw earlier; pay particular attention to the indices and constraints. They should be identical, including the index names.

Checklist

  • My pull request has a descriptive title (not a vague title likeUpdate index.md).
  • My pull request targets the default branch of the repository (main) or a parent feature branch.
  • My commit messages follow best practices.
  • My code follows the established code style of the repository.
  • I added or updated tests for the changes I made (if applicable).
  • I added or updated documentation (if applicable).
  • I tried running the project locally and verified that there are no visible errors.
  • I ran the DAG documentation generator (ov just catalog/generate-docs for catalog
    PRs) or the media properties generator (ov just catalog/generate-docs media-props
    for the catalog or ov just api/generate-docs for the API) where applicable.

Developer Certificate of Origin

Developer Certificate of Origin
Developer Certificate of Origin
Version 1.1

Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129

Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.


Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.

@stacimc stacimc added 🟨 priority: medium Not blocking but should be addressed soon 🌟 goal: addition Addition of new feature 💻 aspect: code Concerns the software code in the repository 🧱 stack: ingestion server Related to the ingestion/data refresh server 🧱 stack: catalog Related to the catalog and Airflow DAGs labels Aug 29, 2024
@stacimc stacimc self-assigned this Aug 29, 2024
as part of the Data Refresh.

TODO: We'll swap out the create and populate filtered index DAG to use this instead
of the one defined in the legacy_data_refresh.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that there is a separate issue for this, (and moreover the filtered index is going away).

es.create_index(
index_config={
"index": temp_index_name,
"body": index_settings(data_refresh_config.media_type),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change ensures that the data refresh always creates a new index based off the hard-coded index configuration in es_mapping.py, rather than by copying the configuration of the existing index.

This is how the ingestion server does it currently, and was just a miss in the previous PR. It's necessary to ensure that temporary or experimental changes to the indices (for example, created using our create_new_es_index DAGs or similar) are not carried forward in the data refresh without making the changes permanent via a code change/PR.

# only include Slack airflow connections
done < <(env | grep "^AIRFLOW_CONN_SLACK*")
# only include Slack airflow connections and the Sensitive Terms connection
done < <(env | grep "^AIRFLOW_CONN_SLACK*\|AIRFLOW_CONN_SENSITIVE_TERMS")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is necessary to allow testing with the real sensitive terms list when desired, since it uses https.

@@ -116,6 +117,7 @@ def create_api():
_api.add_route("/healthcheck", HealthcheckResource())
_api.add_route("/task", IndexingJobResource(task_tracker))
_api.add_route("/task/{task_id}", TaskStatusResource(task_tracker))
_api.add_static_route("/static", (Path(".") / "static").absolute())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indexer worker serves the mock sensitive terms list, similar to how the ingestion server currently does so. This is okay because in local development, the indexer worker is always running, and in production we use the real sensitive terms list rather than the mock one. It is helpful to have the mock list served this way to make the local filtered_index steps behave as similarly as possible to the way it does in production, just using a different endpoint.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted to do it a different way (to avoid adding this functionality to the indexer worker), we could put the file into minio, and still retrieve it over HTTP. According to this guide about using minio to host a website it shouldn't be too difficult to do later on.

For the sake of simplicity, though, would it be better if the DAG just swapped the location it reads the sensitive terms from based on an additional environment variable? Like:

if USE_MOCK_SENSITIVE_TERMS:
  return ["running", "water", "bird"]
else:
  # retrieve the regular list

I know it's not as seamless as just swapping out the HTTP connection string in the environment file, but it avoids a bit of this hoop-jumping required to make the DAG code "seamless". I guess really we've just hidden the seam somewhere else by doing it this way 😛 (Also: because I wrote the original code for this in the ingestion server, I can say I think it was a bit cheeky/too clever to do it this way and wish I'd done this if/else there to begin with!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We couldn't use the HttpOperator if we wanted to do that, or else we'd have to add branching tasks to get around selecting the right task per environment. Not a big deal but I'd prefer to keep the operators clean. We could use minio, something similar to what @AetherUnbound did in #4836 maybe 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We couldn't use the HttpOperator if we wanted to do that, or else we'd have to add branching tasks to get around selecting the right task per environment

Oh 😕 We can't call the HttpOperator conditionally in a Python @task operator? I figured something like this would be possible:

@task
def get_sensitive_terms(**context):
    if USE_MOCK_SENSITIVE_TERMS:
        return ["running", "water", "bird"]
    else:
        http_op = HttpOperator(...)
        return http_op.execute(**context)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to initialize the operator inside a @task, but it's probably easier to use the HttpHook inside a @task than have an operator-in-operator situation by using HttpOperator.

I think I'm fine going with the new static route approach suggested here, but @stacimc do you mind adding a comment about what the route is for, and that it's only intended to be used for local testing?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, maybe I'm thinking of the hook.

All good, I was just suggesting alternatives if wanted, this approach obviously has worked fine for us 🙂

It will go away in #3336 anyway, which I didn't know when I wrote my original comment or I would have mentioned it!

@stacimc stacimc marked this pull request as ready for review August 30, 2024 23:58
@stacimc stacimc requested a review from a team as a code owner August 30, 2024 23:58
@stacimc stacimc requested review from krysal, AetherUnbound and sarayourfriend and removed request for a team August 30, 2024 23:58
@stacimc
Copy link
Contributor Author

stacimc commented Aug 31, 2024

@sarayourfriend I added you as an additional reviewer because you expressed interest in this PR (and your feedback is always appreciated 😄) but don't feel obligated!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is such a great example of the fact that the productivity should not be measured by the line count :) A couple of lines changed needed so much debugging and explanation. Thank you for great detailed notes, as always. Great to have the local environment close to the production.

Copy link
Collaborator

@sarayourfriend sarayourfriend left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Far out! I haven't tested this locally, just read over the code and process. Very cool! It's so exciting to have this laid out in the graph view, it really helps show the complexity of the data refresh, while also giving a clearer narrative to the different steps. I'm really glad we went forward with this project to remove the ingestion server.

If you'd like me to test this out locally, I'm happy to do so as well, just let me know. Otherwise, I'll leave it to the catalogue reviewers to do that part of it. For my part, this is very cool and exciting.

# Connect to sensitive terms list used for filtered index creation in the data refresh.
# For local development, the mock terms list is served by the catalog indexer worker. Change to the
# full production URL in order to use the production sensitive terms list.
AIRFLOW_CONN_SENSITIVE_TERMS="http://catalog_indexer_worker:8003/static/mock_sensitive_terms.txt"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stacimc will we need an infrastructure change to point this to the correct live location?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not required, since we can set this through the Admin UI (as we have with several other connections/variables). We could make an infrastructure change to set it as an environment variable though; actually, I'm not clear how we've drawn the line between which variables should be configured where 🤔 What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the Admin UI sounds good, I believe that's in line with decisions we've made in the past (https://github.com/WordPress/openverse-infrastructure/issues/215). I forgot that environment variables aren't necessary for configuring these. I realised that I didn't personally have a clear picture of environment variables vs Airflow variables/connections, and that some of our code muddies that distinction... and presents problems for advancements in our Airflow configuration, management, and execution. I've opened #4863 to address those inconsistencies.

I've started a discussion here to work towards an approach more in line with the rest of our applications (using infrastructure as code).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for now! One disadvantage of declaring this in the Admin UI is that the password (if needed) for the services isn't codified in the same way in our infrastructure repo. Noting that for now, I'll add more thorough thoughts to that discussion when I get a chance to look at that discussion.

catalog/dags/data_refresh/distributed_reindex.py Outdated Show resolved Hide resolved
logger = logging.getLogger(__name__)


class TableIndex(NamedTuple):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: is the usage of NamedTuple instead of dataclasses due to a limitation in Airflow? I seem to recall limitations in task return types. Otherwise, do you have a preference for NamedTuples over dataclasses? And if so, I'd be curious to know the reason 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe anything json serializable can be passed through XCOMs, if I'm not mistaken a dataclass would work. No strong preference beyond following convention in the project, and given that we don't require anything more complex. I don't think the pros/cons of either option are particularly significant in this use case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to what Staci said, according to the documentation:

They can have any (serializable) value

I'm assuming "serializable" in this case means JSON serialization. I was able to return a NamedTuple in the add_rekognition_labels DAG and Airflow hydrated it back into the NamedTuple just fine. In practice (via the UI), it looks like it serializes it to a list:

image

image

Dataclasses, although they have more functionality, are a little heavier-weight. If we essentially just need an object to attach a bunch of data to and reference it more easily/programmatically/in a typed way than a dictionary, NamedTuple is preferred. That said, I did just find this documentation on dataclasses!! 🤩:

As mentioned TaskFlow uses XCom to pass variables to each task. This requires that variables that are used as arguments need to be able to be serialized. Airflow out of the box supports all built-in types (like int or str) and it supports objects that are decorated with @dataclass or @attr.define.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I made an upstream PR to make that clearer here!
apache/airflow#42045

@@ -116,6 +117,7 @@ def create_api():
_api.add_route("/healthcheck", HealthcheckResource())
_api.add_route("/task", IndexingJobResource(task_tracker))
_api.add_route("/task/{task_id}", TaskStatusResource(task_tracker))
_api.add_static_route("/static", (Path(".") / "static").absolute())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted to do it a different way (to avoid adding this functionality to the indexer worker), we could put the file into minio, and still retrieve it over HTTP. According to this guide about using minio to host a website it shouldn't be too difficult to do later on.

For the sake of simplicity, though, would it be better if the DAG just swapped the location it reads the sensitive terms from based on an additional environment variable? Like:

if USE_MOCK_SENSITIVE_TERMS:
  return ["running", "water", "bird"]
else:
  # retrieve the regular list

I know it's not as seamless as just swapping out the HTTP connection string in the environment file, but it avoids a bit of this hoop-jumping required to make the DAG code "seamless". I guess really we've just hidden the seam somewhere else by doing it this way 😛 (Also: because I wrote the original code for this in the ingestion server, I can say I think it was a bit cheeky/too clever to do it this way and wish I'd done this if/else there to begin with!)

@obulat
Copy link
Contributor

obulat commented Sep 5, 2024

I ran the steps from the testing instructions.

The audio refresh ran first. The numbers in the ES (seen in ElasticVue) increased, but the get_after_record_count returned the old counts.
Then, the image refresh ran. The numbers in the ES did not change at all, even though the DAG seemed to have finished successfully

wait_for_reindexing_task finished successfully, but logs an error stating that you cannot pickle a module i in the end. This SO answer suggests that something is wrong with the kwargs object.

The error messages
[2024-09-05, 15:17:29 UTC] {taskinstance.py:3916} ERROR - Error scheduling downstream tasks. Skipping it as this is entirely optional optimisation. There might be various reasons for it, please take a look at the stack trace to figure out if the root cause can be diagnosed and fixed. See the issue https://github.com/apache/***/issues/39717 for details and an example problem. If you would like to get help in solving root cause, open discussion with all details with your managed service support or in Airflow repository.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3912, in schedule_downstream_tasks
    return TaskInstance._schedule_downstream_tasks(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 94, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3861, in _schedule_downstream_tasks
    partial_dag = task.dag.partial_subset(
                  ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dag.py", line 2646, in partial_subset
    t.task_id: _deepcopy_task(t)
               ^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dag.py", line 2643, in _deepcopy_task
    return copy.deepcopy(t, memo)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 143, in deepcopy
    y = copier(memo)
        ^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1388, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
                       ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 143, in deepcopy
    y = copier(memo)
        ^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1388, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
                       ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 143, in deepcopy
    y = copier(memo)
        ^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1388, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
                       ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 143, in deepcopy
    y = copier(memo)
        ^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1388, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
                       ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 143, in deepcopy
    y = copier(memo)
        ^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1388, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
                       ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 143, in deepcopy
    y = copier(memo)
        ^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1388, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
                       ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 143, in deepcopy
    y = copier(memo)
        ^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1388, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
                       ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 143, in deepcopy
    y = copier(memo)
        ^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1388, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
                       ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/copy.py", line 151, in deepcopy
    rv = reductor(4)
         ^^^^^^^^^^^
TypeError: cannot pickle 'module' object

There are also two warnings in the beginning of this task's logs:

[2024-09-05, 15:17:29 UTC] {baseoperator.py:405} WARNING - TemplatedConnectionHttpSensor.execute cannot be called outside TaskInstance!
[2024-09-05, 15:17:29 UTC] {logging_mixin.py:190} WARNING - /home/***/.local/lib/python3.12/site-packages/***/task/task_runner/standard_task_runner.py:70 DeprecationWarning: This process (pid=27862) is multi-threaded, use of fork() may lead to deadlocks in the child.

…ting index

The index configuration should be hard-coded here to match the behavior of the ingestion server. This is also important because we require permanent changes to the index configuration to go through the code review process. Other DAGs can be used to rapidly iterate on index configuration, and changes can be persisted here only if wanted.
This isn't possible because dynamically mapping tasks within a dynamically mapped taskgroup is not supported in our version of Airflow.
There was an error in the task dependencies caused by the fact that we were returning the output of transform_index_defs from the remap_indices taskgroup. Normally, the last task in a TaskGroup is the task that is set upstream of any downstream dependencies (in this case, the remap_constraints group). Apparently if you add a return statement to a TaskGroup (which we were doing in order to make the index_mapping the XCOM result for the entire taskgroup), this borks the dependency chain.

As a result, transform_index_defs was being set as the upstream task to get_all_existing_constraints. This meant that if there was an error in create_table_indices, it would NOT stop the remap_constraints tasks from running. By updating the create_table_indices task to return what we want from XCOMs as well, we can avoid this issue.
@stacimc
Copy link
Contributor Author

stacimc commented Sep 5, 2024

Thanks @obulat! Good catch on the record count -- it was getting the counts for the old index rather than the promoted one, there's a quick fix which I'll push shortly!

I cannot reproduce your issues with the image data refresh, though I'll keep trying some more things. Are you positive that the provider DAG had succeeded and finished loading additional records into the catalog DB before the refresh started? 🤔

The error/warning you mentioned seemed like it's related to the way I've overridden templated fields in the Sensor, but the sensor is working so that's strange. I'll play around with it to see if I can get the warnings cleared.

@stacimc stacimc force-pushed the add/filtered-index-and-promotion-steps branch from e12db4a to bd57214 Compare September 5, 2024 22:48
Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wonderful! I'm sorry this last piece took more than expected but it's so nice to finally be there 🥳

I'm not quite done with my review yet, still about 10 more files to edit, but here's what I have so far.

One thing I wanted to note too - we send Slack notifications for nearly every step of the data refresh in its current implementation (that just all happens on the to-be-removed ingestion server). Maybe it might make sense to have an on_success_callback for the data refresh which does something similar? That way we don't have to add it to every step, but still get some reporting. Just something to think about, if you like the idea or didn't already have a plan for Slack updates, we can make an issue for it!

catalog/dags/common/sql.py Show resolved Hide resolved
catalog/dags/common/sql.py Outdated Show resolved Hide resolved
catalog/dags/data_refresh/distributed_reindex.py Outdated Show resolved Hide resolved
catalog/dags/data_refresh/promote_table.py Outdated Show resolved Hide resolved
logger = logging.getLogger(__name__)


class TableIndex(NamedTuple):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to what Staci said, according to the documentation:

They can have any (serializable) value

I'm assuming "serializable" in this case means JSON serialization. I was able to return a NamedTuple in the add_rekognition_labels DAG and Airflow hydrated it back into the NamedTuple just fine. In practice (via the UI), it looks like it serializes it to a list:

image

image

Dataclasses, although they have more functionality, are a little heavier-weight. If we essentially just need an object to attach a bunch of data to and reference it more easily/programmatically/in a typed way than a dictionary, NamedTuple is preferred. That said, I did just find this documentation on dataclasses!! 🤩:

As mentioned TaskFlow uses XCom to pass variables to each task. This requires that variables that are used as arguments need to be able to be serialized. Airflow out of the box supports all built-in types (like int or str) and it supports objects that are decorated with @dataclass or @attr.define.

catalog/dags/data_refresh/remap_table_constraints.py Outdated Show resolved Hide resolved
Comment on lines +26 to +31
def fetch_all_tuples(cursor):
try:
rows = cursor.fetchall()
return [ConstraintInfo(row[0], row[1], row[2]) for row in rows]
except Exception as e:
raise ValueError("Unable to extract expected row data from cursor") from e
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NEAT

catalog/dags/data_refresh/remap_table_constraints.py Outdated Show resolved Hide resolved
catalog/dags/data_refresh/remap_table_constraints.py Outdated Show resolved Hide resolved
Comment on lines +50 to +51
The constraint to remap may be a constraint directly on the live media table, in
which case it is dropped and then added directly to the temp table. Alternatively,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear, does that mean the live tables won't have those constraints for some time, even while they're still live? That's probably fine, but if something fails part way through, could the constraints be missing on the live table the next time they're tried to be moved over?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. This does mirror the current implementation, so it's not a regression and the (minor) improvement here is that we'd have immediate insight into where a failure happened/if those steps need to be manually undone.

I'll make an issue about this, because it's not simple to work around. We could do something similar to the way we remap the indices, where we give the new constraints a temporary name and then map the names back over in the promote step (if that's even necessary 🤔). But the delete_orphans step and remapping foreign key constraints to the temp table can't be easily postponed or undone.

I don't think we should be too worried about this because it's the very last step before promotion, anyway -- but definitely good to call out, and something we'd need to be aware of if for some reason we wanted to start the data refresh over from the beginning after a failure between this step and promotion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, I think that sounds good! Not blocking in this case because it mirrors the previous implementation.

@openverse-bot
Copy link
Collaborator

Based on the medium urgency of this PR, the following reviewers are being gently reminded to review this PR:

@krysal
This reminder is being automatically generated due to the urgency configuration.

Excluding weekend1 days, this PR was ready for review 4 day(s) ago. PRs labelled with medium urgency are expected to be reviewed within 4 weekday(s)2.

@stacimc, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings.

Footnotes

  1. Specifically, Saturday and Sunday.

  2. For the purpose of these reminders we treat Monday - Friday as weekdays. Please note that the operation that generates these reminders runs at midnight UTC on Monday - Friday. This means that depending on your timezone, you may be pinged outside of the expected range.

Copy link
Contributor

@obulat obulat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-ran the testing instructions, and the image refresh ran well and saved 10,000 images to the main image index.

catalog/dags/data_refresh/copy_data.py Show resolved Hide resolved
catalog/dags/data_refresh/remap_table_indices.py Outdated Show resolved Hide resolved
@@ -84,7 +86,7 @@ def response_check_wait_for_completion(response: Response) -> bool:

@task
def get_worker_params(
estimated_record_count: int,
id_range: tuple[int, int],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much clearer variable name!

Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, here's the rest of my feedback!

constraint_statement: str, constraint_table: str, temp_table_name: str
):
"""
Parse a foreign key constraint and generate a `DELETE` statement to
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this great documentation!

Comment on lines +203 to +208
def apply_constraints_to_table(
postgres_conn_id: str, table_name: str, constraints: list[str]
):
logger.info(f"Applying constraints for `{table_name}`.")
for constraint in constraints:
run_sql.function(postgres_conn_id=postgres_conn_id, sql_template=constraint)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about failure retrying and idempotency here...should these maybe each be separate tasks that are run serially? They can probably still be mapped (by both table name and constraint order) 🤔 Not something that needs to be done now, but I'm thinking about the case where this fails half way through for some reason. We could maybe address and fix the issue, then restart it from that line rather than having to restart things further upstream. Just a though, no action needed!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't dynamically map tasks within a dynamically mapped taskgroup, unfortunately (at least yet). I suppose we could have generate_constraints_for_table generate constraints for all tables, and then map over table + constraint together when applying, although I don't like that control flow as much... 🤔 I think we could take this into consideration as part of the same issue I'll be creating for the other constraint concerns, particularly as neither are regressions from existing behavior.

catalog/dags/data_refresh/remap_table_indices.py Outdated Show resolved Hide resolved
catalog/dags/data_refresh/remap_table_indices.py Outdated Show resolved Hide resolved
# Connect to sensitive terms list used for filtered index creation in the data refresh.
# For local development, the mock terms list is served by the catalog indexer worker. Change to the
# full production URL in order to use the production sensitive terms list.
AIRFLOW_CONN_SENSITIVE_TERMS="http://catalog_indexer_worker:8003/static/mock_sensitive_terms.txt"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for now! One disadvantage of declaring this in the Admin UI is that the password (if needed) for the services isn't codified in the same way in our infrastructure repo. Noting that for now, I'll add more thorough thoughts to that discussion when I get a chance to look at that discussion.

@@ -116,6 +117,7 @@ def create_api():
_api.add_route("/healthcheck", HealthcheckResource())
_api.add_route("/task", IndexingJobResource(task_tracker))
_api.add_route("/task/{task_id}", TaskStatusResource(task_tracker))
_api.add_static_route("/static", (Path(".") / "static").absolute())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to initialize the operator inside a @task, but it's probably easier to use the HttpHook inside a @task than have an operator-in-operator situation by using HttpOperator.

I think I'm fine going with the new static route approach suggested here, but @stacimc do you mind adding a comment about what the route is for, and that it's only intended to be used for local testing?

@stacimc
Copy link
Contributor Author

stacimc commented Sep 7, 2024

@AetherUnbound I've run out of time today but written myself a TODO to create issues for the constraint error handling concerns you raised as well as Slack reporting. I'm not sure what Slack reporting we'll want yet, since unlike the ingestion server we'll already have much easier visibility into what's happening through Airflow -- but I think we ought to have reporting at least when the copy data and reindexing steps finish, for sure.

Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fantastic work on this, looks great to me! I didn't get a chance to test it but given that other reviewers have, I feel comfortable approving it 😄

@stacimc stacimc merged commit 8b2f037 into main Sep 16, 2024
51 checks passed
@stacimc stacimc deleted the add/filtered-index-and-promotion-steps branch September 16, 2024 18:34
@stacimc stacimc removed the 🧱 stack: ingestion server Related to the ingestion/data refresh server label Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository 🌟 goal: addition Addition of new feature 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Add remaining steps to the Data refresh DAGs
5 participants