Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
2233ddc
Introduce ElasticsearchRemoteLogIO
Owen-CH-Leung Jul 28, 2025
9bf1cf4
Fix mypy error. Fix ruff format error. Remove test_write_to_es. Fix b…
Owen-CH-Leung Jul 29, 2025
68c5546
Fix mypy error. Fix ruff format error. Fix spelling mistakes
Owen-CH-Leung Jul 29, 2025
b407075
Fix ruff check
Owen-CH-Leung Jul 29, 2025
65f78c5
Fix ruff check on testEsTaskhandler
Owen-CH-Leung Jul 29, 2025
960166b
Merge branch 'main' into fix-write-to-es-feature
Owen-CH-Leung Jul 31, 2025
3e59474
Add ElasticsearchRemoteIO into TaskHandler. Refactor to handle read/w…
Owen-CH-Leung Aug 2, 2025
a60a315
Fix existing unit tests
Owen-CH-Leung Aug 3, 2025
aff9a30
Add unit test for ElasticsearchRemoteIO. Add testcontainer as new dep…
Owen-CH-Leung Aug 7, 2025
b1cb760
Fix provider test
Owen-CH-Leung Aug 7, 2025
f3ce8e7
Fix typo
Owen-CH-Leung Aug 7, 2025
d28d40a
Fix long running provider test
Owen-CH-Leung Aug 7, 2025
998aeee
Remove leftover comments. Move testcontainer dependency to devel-common
Owen-CH-Leung Aug 8, 2025
c17147d
Patch REMOTE_TASK_LOG for airflow 3. Return message / event field dep…
Owen-CH-Leung Aug 12, 2025
3c6d652
Merge branch 'main' into fix-write-to-es-feature
Owen-CH-Leung Aug 12, 2025
9e815bd
Use _render_log_id for both reading & writing remote log
Owen-CH-Leung Aug 20, 2025
a74d7dc
use hasattr to return hit field or message field, instead of relying …
Owen-CH-Leung Aug 31, 2025
f1562c0
Move concat_logs under ESTaskHandler. Revise sample_log_response to h…
Owen-CH-Leung Sep 6, 2025
a7f6dd5
Rebase
Owen-CH-Leung Sep 6, 2025
4c3a303
Fix ruff. get log id before creating runtime instance
Owen-CH-Leung Sep 6, 2025
136ee8f
Fix failing CI
Owen-CH-Leung Sep 6, 2025
d86b92a
Use global session instead of creating a new session
Owen-CH-Leung Sep 14, 2025
19a264c
Merge branch 'main' into fix-write-to-es-feature
Owen-CH-Leung Sep 14, 2025
2fe3f6a
Merge branch 'main' into fix-write-to-es-feature
Owen-CH-Leung Sep 20, 2025
b0aede8
Do not fetch log_id_template from DB. Get it from config instead.
Owen-CH-Leung Sep 20, 2025
98af510
Remove DB session. Fix compatibility CI
Owen-CH-Leung Sep 20, 2025
d456f2c
fix getting log_id_template_from config
Owen-CH-Leung Sep 21, 2025
fbdc3c6
Merge branch 'main' into fix-write-to-es-feature
Owen-CH-Leung Oct 4, 2025
f850dd8
Merge branch 'main' into fix-write-to-es-feature
Owen-CH-Leung Oct 5, 2025
1c797d1
fix timeout error when connecting to elasticsearch
Owen-CH-Leung Oct 5, 2025
f4a02dd
Revert "fix timeout error when connecting to elasticsearch"
Owen-CH-Leung Oct 5, 2025
f9f359a
Set higher memory for testcontainer
Owen-CH-Leung Oct 5, 2025
a01e7ad
Do not refresh all index. Extend timeout to 30 seconds
Owen-CH-Leung Oct 5, 2025
6dad32c
Prove es to be write-ready before starting the test
Owen-CH-Leung Oct 6, 2025
4ae74d5
Revert "Prove es to be write-ready before starting the test"
Owen-CH-Leung Oct 6, 2025
e8bd766
Use fileLock to prevent contention
Owen-CH-Leung Oct 6, 2025
6cc9cf8
Increase client timeout to 60s with 5 retry
Owen-CH-Leung Oct 6, 2025
9b2e05c
Set TESTCONTAINERS_HOST_OVERRIDE to host.docker.internal
Owen-CH-Leung Oct 9, 2025
d2808a5
Revert "Set TESTCONTAINERS_HOST_OVERRIDE to host.docker.internal"
Owen-CH-Leung Oct 9, 2025
52fcfd4
Add BulkIndexError to debug timeout
Owen-CH-Leung Oct 9, 2025
ce4a161
Fix mypy. Add TESTCONTAINER hostname override in docker compose
Owen-CH-Leung Oct 9, 2025
f3ff086
revert docker-compose changes. Add mem_reservation for testcontainer.
Owen-CH-Leung Oct 10, 2025
f0a893e
Merge branch 'main' into fix-write-to-es-feature
Owen-CH-Leung Oct 10, 2025
059906e
Increase timeout to 120. Disable unnecessary modules of ES
Owen-CH-Leung Oct 10, 2025
132922d
Extend pytest execution and setup timeout to 300
Owen-CH-Leung Oct 10, 2025
f822f86
Combine read write es into one test. Pre-create index and wait for sh…
Owen-CH-Leung Oct 10, 2025
e4731ed
Print container stats and indices to debug
Owen-CH-Leung Oct 13, 2025
aec7374
Disable disk-based allocation of shard
Owen-CH-Leung Oct 13, 2025
1ad3376
Remove debug lines
Owen-CH-Leung Oct 13, 2025
e598b4e
Merge branch 'main' into fix-write-to-es-feature
Owen-CH-Leung Oct 15, 2025
0711079
Merge branch 'apache:main' into fix-write-to-es-feature
Owen-CH-Leung Oct 15, 2025
b5d7e01
Fix infinite read
Owen-CH-Leung Oct 16, 2025
765e7bc
Fix failing CI
Owen-CH-Leung Oct 17, 2025
18d39a6
Fix failing CI
Owen-CH-Leung Oct 17, 2025
3694219
Merge branch 'main' into fix-write-to-es-feature
Owen-CH-Leung Oct 17, 2025
49fca58
Merge branch 'main' into fix-write-to-es-feature
Owen-CH-Leung Oct 18, 2025
d9b2444
Update default helm value file
Owen-CH-Leung Oct 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 16 additions & 22 deletions airflow-core/src/airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,35 +279,29 @@ def _default_conn_name_from(mod_path, hook_name):
)
remote_task_handler_kwargs = {}
elif ELASTICSEARCH_HOST:
ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchRemoteLogIO

ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
ELASTICSEARCH_WRITE_TO_ES: bool = conf.getboolean("elasticsearch", "WRITE_TO_ES")
ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
ELASTICSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("elasticsearch", "TARGET_INDEX")
ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
ELASTICSEARCH_LOG_ID_TEMPLATE: str = conf.get_mandatory_value("elasticsearch", "LOG_ID_TEMPLATE")

REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(
host=ELASTICSEARCH_HOST,
target_index=ELASTICSEARCH_TARGET_INDEX,
write_stdout=ELASTICSEARCH_WRITE_STDOUT,
write_to_es=ELASTICSEARCH_WRITE_TO_ES,
offset_field=ELASTICSEARCH_OFFSET_FIELD,
host_field=ELASTICSEARCH_HOST_FIELD,
base_log_folder=BASE_LOG_FOLDER,
delete_local_copy=delete_local_copy,
json_format=ELASTICSEARCH_JSON_FORMAT,
log_id_template=ELASTICSEARCH_LOG_ID_TEMPLATE,
)

ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
"task": {
"class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
"formatter": "airflow",
"base_log_folder": BASE_LOG_FOLDER,
"end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
"host": ELASTICSEARCH_HOST,
"frontend": ELASTICSEARCH_FRONTEND,
"write_stdout": ELASTICSEARCH_WRITE_STDOUT,
"write_to_es": ELASTICSEARCH_WRITE_TO_ES,
"target_index": ELASTICSEARCH_TARGET_INDEX,
"json_format": ELASTICSEARCH_JSON_FORMAT,
"json_fields": ELASTICSEARCH_JSON_FIELDS,
"host_field": ELASTICSEARCH_HOST_FIELD,
"offset_field": ELASTICSEARCH_OFFSET_FIELD,
},
}

DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
elif OPENSEARCH_HOST:
OPENSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("opensearch", "END_OF_LOG_MARK")
OPENSEARCH_PORT: str = conf.get_mandatory_value("opensearch", "PORT")
Expand Down
2 changes: 1 addition & 1 deletion chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2918,7 +2918,7 @@ config:
run_duration: 41460
elasticsearch:
json_format: 'True'
log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
log_id_template: "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
elasticsearch_configs:
max_retries: 3
timeout: 30
Expand Down
1 change: 1 addition & 0 deletions devel-common/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ dependencies = [
"pytest-unordered>=0.6.1",
"pytest-xdist>=3.5.0",
"pytest>=8.3.3",
"testcontainers>=4.12.0",
]
"sentry" = [
"blinker>=1.7.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

from collections.abc import Iterator
from typing import Any


def _wrap(val):
Expand All @@ -25,6 +26,33 @@ def _wrap(val):
return val


def resolve_nested(self, hit: dict[Any, Any], parent_class=None) -> type[Hit]:
"""
Resolve nested hits from Elasticsearch by iteratively navigating the `_nested` field.

The result is used to fetch the appropriate document class to handle the hit.

This method can be used with nested Elasticsearch fields which are structured
as dictionaries with "field" and "_nested" keys.
"""
doc_class = Hit

nested_path: list[str] = []
nesting = hit["_nested"]
while nesting and "field" in nesting:
nested_path.append(nesting["field"])
nesting = nesting.get("_nested")
nested_path_str = ".".join(nested_path)

if hasattr(parent_class, "_index"):
nested_field = parent_class._index.resolve_field(nested_path_str)

if nested_field is not None:
return nested_field._doc_class

return doc_class


class AttributeList:
"""Helper class to provide attribute like access to List objects."""

Expand Down
Loading
Loading