Skip to content

Commit 0e7f3bc

Browse files
authored
feat(airbyte-cdk) - Async job salesforce (airbytehq#45673)
1 parent f517c3b commit 0e7f3bc

19 files changed

+752
-75
lines changed

airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,4 @@ def update_status(self, status: AsyncJobStatus) -> None:
4747
self._status = status
4848

4949
def __repr__(self) -> str:
50-
return f"AsyncJob(data={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"
50+
return f"AsyncJob(api_job_id={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"

airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py

+229-27
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
import logging
4+
import threading
5+
import uuid
6+
from typing import Set
7+
8+
from airbyte_cdk.logger import lazy_log
9+
10+
LOGGER = logging.getLogger("airbyte")
11+
12+
13+
class ConcurrentJobLimitReached(Exception):
14+
pass
15+
16+
17+
class JobTracker:
18+
def __init__(self, limit: int):
19+
self._jobs: Set[str] = set()
20+
self._limit = limit
21+
self._lock = threading.Lock()
22+
23+
def try_to_get_intent(self) -> str:
24+
lazy_log(LOGGER, logging.DEBUG, lambda: f"JobTracker - Trying to acquire lock by thread {threading.get_native_id()}...")
25+
with self._lock:
26+
if self._has_reached_limit():
27+
raise ConcurrentJobLimitReached("Can't allocate more jobs right now: limit already reached")
28+
intent = f"intent_{str(uuid.uuid4())}"
29+
lazy_log(LOGGER, logging.DEBUG, lambda: f"JobTracker - Thread {threading.get_native_id()} has acquired {intent}!")
30+
self._jobs.add(intent)
31+
return intent
32+
33+
def add_job(self, intent_or_job_id: str, job_id: str) -> None:
34+
if intent_or_job_id not in self._jobs:
35+
raise ValueError(f"Can't add job: Unknown intent or job id, known values are {self._jobs}")
36+
37+
if intent_or_job_id == job_id:
38+
# Nothing to do here as the ID to replace is the same
39+
return
40+
41+
lazy_log(
42+
LOGGER, logging.DEBUG, lambda: f"JobTracker - Thread {threading.get_native_id()} replacing job {intent_or_job_id} by {job_id}!"
43+
)
44+
with self._lock:
45+
self._jobs.add(job_id)
46+
self._jobs.remove(intent_or_job_id)
47+
48+
def remove_job(self, job_id: str) -> None:
49+
"""
50+
If the job is not allocated as a running job, this method does nothing and it won't raise.
51+
"""
52+
lazy_log(LOGGER, logging.DEBUG, lambda: f"JobTracker - Thread {threading.get_native_id()} removing job {job_id}")
53+
with self._lock:
54+
self._jobs.discard(job_id)
55+
56+
def _has_reached_limit(self) -> bool:
57+
return len(self._jobs) >= self._limit

airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/repository.py

+12
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,15 @@ def update_jobs_status(self, jobs: Set[AsyncJob]) -> None:
1919
@abstractmethod
2020
def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
2121
pass
22+
23+
@abstractmethod
24+
def abort(self, job: AsyncJob) -> None:
25+
"""
26+
Called when we need to stop on the API side. This method can raise NotImplementedError as not all the APIs will support aborting
27+
jobs.
28+
"""
29+
raise NotImplementedError("Either the API or the AsyncJobRepository implementation do not support aborting jobs")
30+
31+
@abstractmethod
32+
def delete(self, job: AsyncJob) -> None:
33+
pass

airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+15
Original file line numberDiff line numberDiff line change
@@ -2435,6 +2435,21 @@ definitions:
24352435
anyOf:
24362436
- "$ref": "#/definitions/CustomRequester"
24372437
- "$ref": "#/definitions/HttpRequester"
2438+
download_paginator:
2439+
description: Paginator component that describes how to navigate through the API's pages during download.
2440+
anyOf:
2441+
- "$ref": "#/definitions/DefaultPaginator"
2442+
- "$ref": "#/definitions/NoPagination"
2443+
abort_requester:
2444+
description: Requester component that describes how to prepare HTTP requests to send to the source API to abort a job once it is timed out from the source's perspective.
2445+
anyOf:
2446+
- "$ref": "#/definitions/CustomRequester"
2447+
- "$ref": "#/definitions/HttpRequester"
2448+
delete_requester:
2449+
description: Requester component that describes how to prepare HTTP requests to send to the source API to delete a job once the records are extracted.
2450+
anyOf:
2451+
- "$ref": "#/definitions/CustomRequester"
2452+
- "$ref": "#/definitions/HttpRequester"
24382453
partition_router:
24392454
title: Partition Router
24402455
description: PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.

airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/__init__.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,7 @@
44

55
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
66
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder, IterableDecoder
7+
from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder
78

8-
__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder"]
9+
10+
__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "NoopDecoder"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
import logging
4+
from typing import Any, Generator, Mapping
5+
6+
import requests
7+
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
8+
9+
logger = logging.getLogger("airbyte")
10+
11+
12+
class NoopDecoder(Decoder):
13+
def is_stream_response(self) -> bool:
14+
return False
15+
16+
def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], None, None]:
17+
yield from [{}]

airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+12
Original file line numberDiff line numberDiff line change
@@ -1653,6 +1653,18 @@ class AsyncRetriever(BaseModel):
16531653
...,
16541654
description='Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job.',
16551655
)
1656+
download_paginator: Optional[Union[DefaultPaginator, NoPagination]] = Field(
1657+
None,
1658+
description="Paginator component that describes how to navigate through the API's pages during download.",
1659+
)
1660+
abort_requester: Optional[Union[CustomRequester, HttpRequester]] = Field(
1661+
None,
1662+
description="Requester component that describes how to prepare HTTP requests to send to the source API to abort a job once it is timed out from the source's perspective.",
1663+
)
1664+
delete_requester: Optional[Union[CustomRequester, HttpRequester]] = Field(
1665+
None,
1666+
description='Requester component that describes how to prepare HTTP requests to send to the source API to delete a job once the records are extracted.',
1667+
)
16561668
partition_router: Optional[
16571669
Union[
16581670
CustomPartitionRouter,

airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+43-5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from airbyte_cdk.models import FailureType, Level
1414
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator
15+
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
1516
from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository
1617
from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus
1718
from airbyte_cdk.sources.declarative.auth import DeclarativeOauth2Authenticator, JwtAuthenticator
@@ -30,7 +31,7 @@
3031
from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime
3132
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
3233
from airbyte_cdk.sources.declarative.decoders import Decoder, IterableDecoder, JsonDecoder, JsonlDecoder
33-
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
34+
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector, ResponseToFileExtractor
3435
from airbyte_cdk.sources.declarative.extractors.record_filter import ClientSideIncrementalRecordFilterDecorator
3536
from airbyte_cdk.sources.declarative.extractors.record_selector import SCHEMA_TRANSFORMER_TYPE_MAPPING
3637
from airbyte_cdk.sources.declarative.incremental import (
@@ -158,7 +159,7 @@
158159
from airbyte_cdk.sources.message import InMemoryMessageRepository, LogAppenderMessageRepositoryDecorator, MessageRepository
159160
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
160161
from airbyte_cdk.sources.types import Config
161-
from airbyte_cdk.sources.utils.transform import TypeTransformer
162+
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
162163
from isodate import parse_duration
163164
from pydantic.v1 import BaseModel
164165

@@ -1298,22 +1299,59 @@ def create_async_retriever(
12981299
polling_requester = self._create_component_from_model(
12991300
model=model.polling_requester, decoder=decoder, config=config, name=f"job polling - {name}"
13001301
)
1302+
job_download_components_name = f"job download - {name}"
13011303
download_requester = self._create_component_from_model(
1302-
model=model.download_requester, decoder=decoder, config=config, name=f"job download - {name}"
1304+
model=model.download_requester, decoder=decoder, config=config, name=job_download_components_name
1305+
)
1306+
download_retriever = SimpleRetriever(
1307+
requester=download_requester,
1308+
record_selector=RecordSelector(
1309+
extractor=ResponseToFileExtractor(),
1310+
record_filter=None,
1311+
transformations=[],
1312+
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
1313+
config=config,
1314+
parameters={},
1315+
),
1316+
primary_key=None,
1317+
name=job_download_components_name,
1318+
paginator=self._create_component_from_model(model=model.download_paginator, decoder=decoder, config=config, url_base="")
1319+
if model.download_paginator
1320+
else NoPagination(parameters={}),
1321+
config=config,
1322+
parameters={},
1323+
)
1324+
abort_requester = (
1325+
self._create_component_from_model(model=model.abort_requester, decoder=decoder, config=config, name=f"job abort - {name}")
1326+
if model.abort_requester
1327+
else None
1328+
)
1329+
delete_requester = (
1330+
self._create_component_from_model(model=model.delete_requester, decoder=decoder, config=config, name=f"job delete - {name}")
1331+
if model.delete_requester
1332+
else None
13031333
)
13041334
status_extractor = self._create_component_from_model(model=model.status_extractor, decoder=decoder, config=config, name=name)
13051335
urls_extractor = self._create_component_from_model(model=model.urls_extractor, decoder=decoder, config=config, name=name)
13061336
job_repository: AsyncJobRepository = AsyncHttpJobRepository(
13071337
creation_requester=creation_requester,
13081338
polling_requester=polling_requester,
1309-
download_requester=download_requester,
1339+
download_retriever=download_retriever,
1340+
abort_requester=abort_requester,
1341+
delete_requester=delete_requester,
13101342
status_extractor=status_extractor,
13111343
status_mapping=self._create_async_job_status_mapping(model.status_mapping, config),
13121344
urls_extractor=urls_extractor,
13131345
)
13141346

13151347
return AsyncRetriever(
1316-
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(job_repository, stream_slices),
1348+
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
1349+
job_repository,
1350+
stream_slices,
1351+
JobTracker(1), # FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1
1352+
self._message_repository,
1353+
has_bulk_parent=False, # FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk
1354+
),
13171355
record_selector=record_selector,
13181356
stream_slicer=stream_slicer,
13191357
config=config,

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_job_repository.py

+41-12
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,21 @@
22
import logging
33
import uuid
44
from dataclasses import dataclass, field
5+
from datetime import timedelta
56
from typing import Any, Dict, Iterable, Mapping, Optional
67

78
import requests
9+
from airbyte_cdk import AirbyteMessage
810
from airbyte_cdk.logger import lazy_log
9-
from airbyte_cdk.models import FailureType
11+
from airbyte_cdk.models import FailureType, Type
1012
from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
1113
from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository
1214
from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus
1315
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor, RecordExtractor
1416
from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import ResponseToFileExtractor
1517
from airbyte_cdk.sources.declarative.requesters.requester import Requester
16-
from airbyte_cdk.sources.types import StreamSlice
18+
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
19+
from airbyte_cdk.sources.types import Record, StreamSlice
1720
from airbyte_cdk.utils import AirbyteTracedException
1821
from requests import Response
1922

@@ -24,11 +27,14 @@
2427
class AsyncHttpJobRepository(AsyncJobRepository):
2528
creation_requester: Requester
2629
polling_requester: Requester
27-
download_requester: Requester
30+
download_retriever: SimpleRetriever
31+
abort_requester: Optional[Requester]
32+
delete_requester: Optional[Requester]
2833
status_extractor: DpathExtractor
2934
status_mapping: Mapping[str, AsyncJobStatus]
3035
urls_extractor: DpathExtractor
3136

37+
job_timeout: Optional[timedelta] = None
3238
record_extractor: RecordExtractor = field(init=False, repr=False, default_factory=lambda: ResponseToFileExtractor())
3339

3440
def __post_init__(self) -> None:
@@ -118,7 +124,7 @@ def start(self, stream_slice: StreamSlice) -> AsyncJob:
118124
job_id: str = str(uuid.uuid4())
119125
self._create_job_response_by_id[job_id] = response
120126

121-
return AsyncJob(api_job_id=job_id, job_parameters=stream_slice)
127+
return AsyncJob(api_job_id=job_id, job_parameters=stream_slice, timeout=self.job_timeout)
122128

123129
def update_jobs_status(self, jobs: Iterable[AsyncJob]) -> None:
124130
"""
@@ -135,15 +141,14 @@ def update_jobs_status(self, jobs: Iterable[AsyncJob]) -> None:
135141
None
136142
"""
137143
for job in jobs:
138-
stream_slice = StreamSlice(
139-
partition={"create_job_response": self._create_job_response_by_id[job.api_job_id()]},
140-
cursor_slice={},
141-
)
144+
stream_slice = self._get_create_job_stream_slice(job)
142145
polling_response: requests.Response = self._get_validated_polling_response(stream_slice)
143146
job_status: AsyncJobStatus = self._get_validated_job_status(polling_response)
144147

145148
if job_status != job.status():
146149
lazy_log(LOGGER, logging.DEBUG, lambda: f"Status of job {job.api_job_id()} changed from {job.status()} to {job_status}")
150+
else:
151+
lazy_log(LOGGER, logging.DEBUG, lambda: f"Status of job {job.api_job_id()} is still {job.status()}")
147152

148153
job.update_status(job_status)
149154
if job_status == AsyncJobStatus.COMPLETED:
@@ -163,15 +168,39 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
163168

164169
for url in self.urls_extractor.extract_records(self._polling_job_response_by_id[job.api_job_id()]):
165170
stream_slice: StreamSlice = StreamSlice(partition={"url": url}, cursor_slice={})
166-
# FIXME salesforce will require pagination here
167-
response = self.download_requester.send_request(stream_slice=stream_slice)
168-
if response:
169-
yield from self.record_extractor.extract_records(response)
171+
for message in self.download_retriever.read_records({}, stream_slice):
172+
if isinstance(message, Record):
173+
yield message.data
174+
elif isinstance(message, AirbyteMessage):
175+
if message.type == Type.RECORD:
176+
yield message.record.data # type: ignore # message.record won't be None here as the message is a record
177+
elif isinstance(message, (dict, Mapping)):
178+
yield message
179+
else:
180+
raise TypeError(f"Unknown type `{type(message)}` for message")
170181

171182
yield from []
172183

184+
def abort(self, job: AsyncJob) -> None:
185+
if not self.abort_requester:
186+
return
187+
188+
self.abort_requester.send_request(stream_slice=self._get_create_job_stream_slice(job))
189+
190+
def delete(self, job: AsyncJob) -> None:
191+
if not self.delete_requester:
192+
return
193+
194+
self.delete_requester.send_request(stream_slice=self._get_create_job_stream_slice(job))
173195
self._clean_up_job(job.api_job_id())
174196

175197
def _clean_up_job(self, job_id: str) -> None:
176198
del self._create_job_response_by_id[job_id]
177199
del self._polling_job_response_by_id[job_id]
200+
201+
def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
202+
stream_slice = StreamSlice(
203+
partition={"create_job_response": self._create_job_response_by_id[job.api_job_id()]},
204+
cursor_slice={},
205+
)
206+
return stream_slice

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py

+2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ class HttpMethod(Enum):
1717
Http Method to use when submitting an outgoing HTTP request
1818
"""
1919

20+
DELETE = "DELETE"
2021
GET = "GET"
22+
PATCH = "PATCH"
2123
POST = "POST"
2224

2325

0 commit comments

Comments
 (0)