Skip to content

Commit

Permalink
Use concurrent_cdk
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda committed Oct 11, 2023
1 parent 25fc396 commit bda839b
Showing 1 changed file with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from typing import Any, List, Mapping, MutableMapping, Tuple

import pendulum
import stripe
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.entrypoint import logger as entrypoint_logger
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.utils import AirbyteTracedException
from source_stripe.streams import (
Expand All @@ -31,6 +33,9 @@


class SourceStripe(AbstractSource):

message_repository = InMemoryMessageRepository(entrypoint_logger.level)

@staticmethod
def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping:
start_date, lookback_window_days, slice_range = (
Expand Down Expand Up @@ -158,7 +163,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
],
**args,
)
return [
legacy_streams = [
CheckoutSessionsLineItems(**incremental_args),
CustomerBalanceTransactions(**args),
Events(**incremental_args),
Expand Down Expand Up @@ -414,3 +419,5 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
**args,
),
]
# return legacy_streams
return [StreamFacade.create_from_stream(stream, self, entrypoint_logger, 4) for stream in legacy_streams]

0 comments on commit bda839b

Please sign in to comment.