Skip to content

Conversation

@Owen-CH-Leung
Copy link
Contributor

@Owen-CH-Leung Owen-CH-Leung commented Jul 28, 2025

fixes: #50349
Fixes: #51456

ElasticsearchTaskHandler has a feature that allows task log to be directly written to elasticsearch. Such feature is broken in airflow 3.

The root cause is that in Airflow 3, the write path for remote logging has changed. There's a detailed description here by @jason810496 which illustrates how it works in airflow 3. ( Thank you Jason ! )

In summary, the solution is to add RemoteLogIO for Elasticsearch also. When write_to_es is set to true, airflow will initiate a ElasticsearchRemoteLogIO which will handle the task log writing to elasticsearch

@Owen-CH-Leung Owen-CH-Leung marked this pull request as ready for review July 29, 2025 12:48
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Big thanks for helping out!

Only some backward compact need to be fixed and the unit test for new ElasticsearchRemoteLogIO is required.
LGTM overall.

@jason810496 jason810496 requested review from ashb and dstandish July 30, 2025 03:55
base_log_folder: Path = attrs.field(converter=Path)
delete_local_copy: bool

processors = ()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be set up as a processor so that log messages get sent to ElasticSearch as soon as they are written, just once when the log is "closed". See CloudWatch logging handler for the only example we have right now.

That can be a future PR though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb I took a quick look at how we'd make this a processor in the future and I got stuck on the fact that while the upload() method takes the ti as an argument, the process function takes only the event_dict, and we need the ti to construct the log_id which needs to be on every log event that gets indexed to ES/OS.

Can we expect each log event from the task logger to contain the ti or otherwise give us the log_id components? (Or is that a necessary future change?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply. Here are some updates that might answer your question:

The LogTemplate DB Model will be removed from ES, which also means "we need the ti to construct the log_id which needs to be on every log event that gets indexed to ES/OS." will never need anymore. We will retrieve log_id_template directly from airflow.conf instead of fetching from LogTemplate DB Model.

Detail in https://lists.apache.org/thread/nlmhs1plo77qnlp7rqk27mkb2hs41f1p

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @potiuk about a recent Slack discussion we had, this is a prime example of why I'm sad we have separate providers for OpenSearch and ElasticSearch. What we have to do in one we should really do in both.

@Owen-CH-Leung
Copy link
Contributor Author

@jason810496 @ashb I added unit test using testcontainer so that remote_read and write can be tested against a real elasticsearch instance. Could you review again ? Thanks

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the update!
It would be great to verify whether the TaskInstanceLog UI page displays correctly with this change. If so, this PR should be good to go. Thank you!

@Owen-CH-Leung
Copy link
Contributor Author

Owen-CH-Leung commented Aug 8, 2025

Nice! Thanks for the update! It would be great to verify whether the TaskInstanceLog UI page displays correctly with this change. If so, this PR should be good to go. Thank you!

I'd love to verify that as well. Do you know which relevant file / class I should look at ? Not very familiar with the frontend codebase

No worries I think I found it. Let me expand the testing logic

@jason810496
Copy link
Member

jason810496 commented Aug 10, 2025

I'd love to verify that as well. Do you know which relevant file / class I should look at ? Not very familiar with the frontend codebase

No worries I think I found it. Let me expand the testing logic

Sorry for the late reply.

The following command will be helpful to setup Airflow with ElasticSearch in Breeze.

breeze start-airflow --python 3.10 --backend postgres --integration elasticsearch --mount-sources providers-and-tests --use-airflow-version <version>

It would be great to test with the following version matrix for testing compatibility.
List of the <version> to be replaced with:

  • 3.0.3
  • 3.0.4
  • main ( for unreleased 3.1.x version, but this one is a bit more complicated )
    1. Switch to main
    2. Run rm -rf dist/*
    3. Run breeze release-management prepare-airflow-distributions
    4. Run breeze release-management prepare-task-sdk-distributions
    5. Switch back to branch of this PR ( fix-write-to-es-feature in this case )
    6. Run breeze start-airflow --use-airflow-version wheel --mount-sources providers-and-tests --integration elasticsearch instead of command above.

Thanks a lot!

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Owen-CH-Leung

I saw your DM on Slack, the root cause of the CI failure it that the Breeze Container can't connect to TestContainer. Since they are not in the same docker network.

The ElasticSearchContainer will fail on start which will also call _connect

https://github.com/testcontainers/testcontainers-python/blob/5c1504c217d8cd3debd99dee54db826e49bfa579/modules/elasticsearch/testcontainers/elasticsearch/__init__.py#L102

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! The integration tests with test container LGTM.

It would be nice to test the change by start-airflow command with different Airflow core versions mentioned above to ensure the user behavior. Thanks!

@potiuk potiuk requested a review from ashb October 18, 2025 23:06
@Owen-CH-Leung
Copy link
Contributor Author

@jason810496 Thanks. I can confirm that the write-to-es feature works for version 3.0.6 and 3.1.0. the log-reading feature in airflow 2.11 also works as before.

@jhgoebbert Sorry for my late reply ! I've adopted your suggestion to capture the BulkIndexError exception & change the value file also

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

airflow no writing logs to Elasticsearch Logs are not integrating with elasticsearch

7 participants