Skip to content

Commit

Permalink
Fix AstroCustomXcomBackend circular import issue (#1942)
Browse files Browse the repository at this point in the history
When using `AstroCustomXcomBackend` and Python SDK 1.5, we were raising
the exception:
```
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 5, in <module>
    from airflow.__main__ import main
  File "/usr/local/lib/python3.9/site-packages/airflow/__init__.py", line 55, in <module>
    settings.initialize()
  File "/usr/local/lib/python3.9/site-packages/airflow/settings.py", line 567, in initialize
    import_local_settings()
  File "/usr/local/lib/python3.9/site-packages/airflow/settings.py", line 524, in import_local_settings
    import airflow_local_settings
  File "/usr/local/airflow/airflow_local_settings.py", line 10, in <module>
    from airflow.models.baseoperator import BaseOperator
  File "/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 75, in <module>
    from airflow.models.taskinstance import TaskInstance, clear_task_instances
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 93, in <module>
    from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComAccess, XCom
  File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 835, in <module>
    XCom = resolve_xcom_backend()
  File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 818, in resolve_xcom_backend
    clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
  File "/usr/local/lib/python3.9/site-packages/airflow/configuration.py", line 809, in getimport
    return import_string(full_qualified_path)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/module_loading.py", line 33, in import_string
    module = import_module(module_path)
  File "/usr/local/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "/usr/local/lib/python3.9/site-packages/astro/custom_backend/astro_custom_backend.py", line 6, in <module>
    from astro.custom_backend.serializer import deserialize, serialize
  File "/usr/local/lib/python3.9/site-packages/astro/custom_backend/serializer.py", line 12, in <module>
    if airflow.__version__ >= "2.3":
AttributeError: partially initialized module 'airflow' has no attribute '__version__' (most likely due to a circular import)
```

Relates to: apache/airflow#31379

Closes: #1939
  • Loading branch information
tatiana authored May 25, 2023
1 parent fe4e83d commit dcdf864
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions python-sdk/src/astro/custom_backend/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,27 @@
from json import JSONDecodeError
from typing import Any

import airflow
import numpy as np
import pandas
import sqlalchemy
from packaging import version

if airflow.__version__ >= "2.3":
try:
from sqlalchemy.engine.row import LegacyRow as SQLAlcRow
else:
except ImportError:
from sqlalchemy.engine.result import RowProxy as SQLAlcRow


from astro.files import File
from astro.table import Table, TempTable

log = logging.getLogger("astro.utils.serializer")


def is_newer_sqlalchemy():
return version(sqlalchemy.__version__) >= version.parse("1.4.0")


def serialize(obj: Table | File | Any) -> dict | Any: # noqa
"""
Serialize astro SDK objects (tables, files and dataframes) into json safe dictionary
Expand Down Expand Up @@ -52,7 +58,7 @@ def serialize(obj: Table | File | Any) -> dict | Any: # noqa
"key_map": obj._keymap, # skipcq PYL-W021
"key_style": obj._key_style, # skipcq PYL-W021
}
if airflow.__version__ >= "2.3":
if is_newer_sqlalchemy():
serialized_obj["data"] = obj._data # skipcq PYL-W021
return serialized_obj

Expand Down Expand Up @@ -90,7 +96,7 @@ def deserialize(obj: dict | str | list) -> Table | File | Any: # noqa
log.debug("Found file dictionary %s, will attempt to deserialize", obj)
return _deserialize_file(obj)
elif obj["class"] == "SQLAlcRow":
if airflow.__version__ >= "2.3":
if is_newer_sqlalchemy():
return SQLAlcRow(None, None, obj["key_map"], obj["key_style"], obj["data"])
else:
return SQLAlcRow(None, None, obj["key_map"], obj["key_style"])
Expand Down

0 comments on commit dcdf864

Please sign in to comment.