Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Successful Airbyte syncs reporting as failed in Prefect, causing workflows to show as failed #64

Closed
mikebaldry-acu opened this issue Nov 6, 2023 · 5 comments

Comments

@mikebaldry-acu
Copy link

We are experiencing intermittent "false negatives" (where an Airbyte sync succeeds but is reported as failed by the Prefect server) when triggering Airbyte connections in Prefect workflows. These failures occur more often when running multiple concurrent syncs, but aren't limited to only those times. The errors appear to be driven by timeouts, where the Airbyte server doesn't report status back to Prefect in a given timeframe.

Technical specs for the tools we're currently running:

  • Prefect server version 2.10.17
  • Airbyte server version 0.50.7
  • prefect-airbyte v0.3.0
Downloading flow code from storage at '/usr/local/src/data_engineering'
08:12:11 AM
prefect.flow_runs
Created task run 'Update <Redacted> Airbyte Connection-0' for task 'Update <Redacted> Airbyte Connection'
08:13:00 AM
prefect.flow_runs
Executing 'Update <Redacted> Airbyte Connection-0' immediately...
08:13:00 AM
prefect.flow_runs
Got source file <Redacted>
08:13:01 AM
Update <Redacted> Airbyte Connection-0
prefect.task_runs
Successfully updated source file <Redacted>
08:13:01 AM
Update <Redacted> Airbyte Connection-0
prefect.task_runs
Finished in state Completed()
08:13:01 AM
Update <Redacted> Airbyte Connection-0
prefect.task_runs
Created task run 'Run Airbyte Sync-0' for task 'Run Airbyte Sync'
08:13:01 AM
prefect.flow_runs
Executing 'Run Airbyte Sync-0' immediately...
08:13:01 AM
prefect.flow_runs
Starting Airbyte sync for <Redacted>
08:13:04 AM
Run Airbyte Sync-0
prefect.task_runs
Triggering Airbyte Connection <Redacted>, in workspace at '<Redacted>/api/v1'
08:13:04 AM
Run Airbyte Sync-0
prefect.task_runs
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 34, in read
    return await self._stream.receive(max_bytes=max_bytes)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 10, in map_exceptions
    yield
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 32, in read
    with anyio.fail_after(timeout):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
    raise TimeoutError
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 112, in handle_async_request
    raise exc
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 91, in handle_async_request
    ) = await self._receive_response_headers(**kwargs)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 155, in _receive_response_headers
    event = await self._receive_event(timeout=timeout)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 191, in _receive_event
    data = await self._network_stream.read(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 31, in read
    with map_exceptions(exc_map):
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc)
httpcore.ReadTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/tmp/tmp9st250atprefect/orchestration/flows/<Redacted>_archive/workflow.py", line 54, in run_airbyte_sync
    job_run = connection.trigger()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
    return call()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 383, in __call__
    return self.result()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect_airbyte/connections.py", line 367, in trigger
    (job_id, _,) = await airbyte_client.trigger_manual_sync_connection(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect_airbyte/client.py", line 151, in trigger_manual_sync_connection
    response = await self._client.post(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1533, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request
    with map_httpcore_exceptions():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ReadTimeout
08:13:09 AM
Run Airbyte Sync-0
prefect.task_runs
Finished in state Failed('Task run encountered an exception ReadTimeout: ')
08:13:12 AM
Run Airbyte Sync-0
prefect.task_runs
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 34, in read
    return await self._stream.receive(max_bytes=max_bytes)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 10, in map_exceptions
    yield
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 32, in read
    with anyio.fail_after(timeout):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
    raise TimeoutError
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 112, in handle_async_request
    raise exc
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 91, in handle_async_request
    ) = await self._receive_response_headers(**kwargs)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 155, in _receive_response_headers
    event = await self._receive_event(timeout=timeout)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 191, in _receive_event
    data = await self._network_stream.read(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 31, in read
    with map_exceptions(exc_map):
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc)
httpcore.ReadTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/engine.py", line 833, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/tmp/tmp9st250atprefect/orchestration/flows/<Redacted>_archive/workflow.py", line 73, in <Redacted>_sftp_sync
    res = run_airbyte_sync(file.connection_id)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/tasks.py", line 505, in __call__
    return enter_task_run_engine(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/engine.py", line 1137, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/engine.py", line 1302, in get_task_call_return_value
    return await future._result()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/tmp/tmp9st250atprefect/orchestration/flows/<Redacted>_archive/workflow.py", line 54, in run_airbyte_sync
    job_run = connection.trigger()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
    return call()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 383, in __call__
    return self.result()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect_airbyte/connections.py", line 367, in trigger
    (job_id, _,) = await airbyte_client.trigger_manual_sync_connection(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect_airbyte/client.py", line 151, in trigger_manual_sync_connection
    response = await self._client.post(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1533, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request
    with map_httpcore_exceptions():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ReadTimeout
08:13:12 AM
prefect.flow_runs
Finished in state Failed('Flow run encountered an exception. ReadTimeout: ')
@davidfromtandym
Copy link

Also running into this issue, seemingly out of nowhere. I've tried increasing the timeout parameter for the trigger_sync well beyond what the poll interval for the Airbyte server is set at. That seemed to help a bit, but still encountering frequent errors...

  • Prefect server version 2.14.8
  • Airbyte server version 0.50.34
  • prefect-airbyte v0.3.0

@hawkaa
Copy link

hawkaa commented Mar 25, 2024

We are also seeing this issue on our end!

@a-monteiro
Copy link

Any updates here? This happens quite a lot and is especially an issue for larger syncs.

@Ishankoradia
Copy link

Ishankoradia commented Jul 2, 2024

Any updates on this issue, we are also facing this.

@zzstoatzz
Copy link
Contributor

sorry folks - we failed to update this repo to indicate it is no longer maintained. I would encourage any users of this library to fork it or otherwise yank the logic from here that you find useful.

you can find the actively maintained prefect integrations here in prefecthq/prefect

sorry again about the confusion

@zzstoatzz zzstoatzz reopened this Oct 12, 2024
@zzstoatzz zzstoatzz closed this as not planned Won't fix, can't repro, duplicate, stale Oct 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants