+dlt.sources
+
+ Module with built in sources and source building blocks
+From 45d70eacf0912699759e07eb5f54a83df2fb1e95 Mon Sep 17 00:00:00 2001
From: AstrakhantsevaAA Module with built in sources and source building blocks Initialize the default requests client from config Retry for given response status codes Abstract base class for retry strategies. Wait strategy that applies exponential backoff. It allows for a customized multiplier and an ability to restrict the
+upper and lower limits to some maximum and minimum value. The intervals are fixed (i.e. there is no jitter), so this strategy is
+suitable for balancing retries against latency when a required resource is
+unavailable for an unknown duration, but not suitable for resolving
+contention between multiple processes for a shared resource. Use
+wait_random_exponential for the latter case. Wrapper for Create a You can provide one or more custom predicates for specific retry condition. The predicate is called after every request with the resulting response and/or exception.
+For example, this will trigger a retry when the response text is The retry is triggered when either any of the predicates or the default conditions based on status code/exception are Update session/retry settings from RunConfiguration Requests session which by default adds a timeout to all requests and calls Constructs a A filter that takes only first A filter that skips first
+dlt
+
+
+dlt
+
+
+
+
+
+
+ 1from dlt.common.configuration.specs import GcpServiceAccountCredentials, GcpOAuthCredentials, GcpCredentials
+2from dlt.common.configuration.specs import ConnectionStringCredentials
+3from dlt.common.configuration.specs import OAuth2Credentials
+4from dlt.common.configuration.specs import CredentialsConfiguration, configspec
+
+dlt
+
+
+
+
+
+
+dlt
+
+
+
+
+
+
+ 1from tenacity import RetryError
+ 2from requests import (
+ 3 Request, Response,
+ 4 ConnectionError,
+ 5 ConnectTimeout,
+ 6 FileModeWarning,
+ 7 HTTPError,
+ 8 ReadTimeout,
+ 9 RequestException,
+10 Timeout,
+11 TooManyRedirects,
+12 URLRequired,
+13)
+14from requests.exceptions import ChunkedEncodingError
+15from dlt.sources.helpers.requests.retry import Client
+16from dlt.sources.helpers.requests.session import Session
+17from dlt.common.configuration.specs import RunConfiguration
+18
+19client = Client()
+20
+21get, post, put, patch, delete, options, head, request = (
+22 client.get, client.post, client.put, client.patch, client.delete, client.options, client.head, client.request
+23)
+24
+25
+26def init(config: RunConfiguration) -> None:
+27 """Initialize the default requests client from config"""
+28 client.update_from_config(config)
+
27def init(config: RunConfiguration) -> None:
+28 """Initialize the default requests client from config"""
+29 client.update_from_config(config)
+
+dlt
+
+
+
+
+
+
+ 1from email.utils import parsedate_tz, mktime_tz
+ 2import re
+ 3import time
+ 4from typing import Optional, cast, Callable, Type, Union, Sequence, Tuple, List, TYPE_CHECKING, Any, Dict
+ 5from threading import local
+ 6
+ 7from requests import Response, HTTPError, Session as BaseSession
+ 8from requests.exceptions import ConnectionError, Timeout, ChunkedEncodingError
+ 9from requests.adapters import HTTPAdapter
+ 10from tenacity import Retrying, retry_if_exception_type, stop_after_attempt, RetryCallState, retry_any, wait_exponential
+ 11from tenacity.retry import retry_base
+ 12
+ 13from dlt.sources.helpers.requests.session import Session, DEFAULT_TIMEOUT
+ 14from dlt.sources.helpers.requests.typing import TRequestTimeout
+ 15from dlt.common.typing import TimedeltaSeconds
+ 16from dlt.common.configuration.specs import RunConfiguration
+ 17from dlt.common.configuration import with_config
+ 18
+ 19
+ 20DEFAULT_RETRY_STATUS = (429, *range(500, 600))
+ 21DEFAULT_RETRY_EXCEPTIONS = (ConnectionError, Timeout, ChunkedEncodingError)
+ 22
+ 23RetryPredicate = Callable[[Optional[Response], Optional[BaseException]], bool]
+ 24
+ 25
+ 26def _get_retry_response(retry_state: RetryCallState) -> Optional[Response]:
+ 27 ex = retry_state.outcome.exception()
+ 28 if ex:
+ 29 if isinstance(ex, HTTPError):
+ 30 return cast(Response, ex.response)
+ 31 return None
+ 32 result = retry_state.outcome.result()
+ 33 return result if isinstance(result, Response) else None
+ 34
+ 35
+ 36class retry_if_status(retry_base):
+ 37 """Retry for given response status codes"""
+ 38
+ 39 def __init__(self, status_codes: Sequence[int]) -> None:
+ 40 self.status_codes = set(status_codes)
+ 41
+ 42 def __call__(self, retry_state: RetryCallState) -> bool:
+ 43 response = _get_retry_response(retry_state)
+ 44 if response is None:
+ 45 return False
+ 46 result = response.status_code in self.status_codes
+ 47 return result
+ 48
+ 49
+ 50class retry_if_predicate(retry_base):
+ 51 def __init__(self, predicate: RetryPredicate) -> None:
+ 52 self.predicate = predicate
+ 53
+ 54 def __call__(self, retry_state: RetryCallState) -> bool:
+ 55 response = _get_retry_response(retry_state)
+ 56 exception = retry_state.outcome.exception()
+ 57 return self.predicate(response, exception)
+ 58
+ 59
+ 60class wait_exponential_retry_after(wait_exponential):
+ 61 def _parse_retry_after(self, retry_after: str) -> Optional[float]:
+ 62 # Borrowed from urllib3
+ 63 seconds: float
+ 64 # Whitespace: https://tools.ietf.org/html/rfc7230#section-3.2.4
+ 65 if re.match(r"^\s*[0-9]+\s*$", retry_after):
+ 66 seconds = int(retry_after)
+ 67 else:
+ 68 retry_date_tuple = parsedate_tz(retry_after)
+ 69 if retry_date_tuple is None:
+ 70 return None
+ 71 retry_date = mktime_tz(retry_date_tuple)
+ 72 seconds = retry_date - time.time()
+ 73 return max(self.min, min(self.max, seconds))
+ 74
+ 75 def _get_retry_after(self, retry_state: RetryCallState) -> Optional[float]:
+ 76 response = _get_retry_response(retry_state)
+ 77 if response is None:
+ 78 return None
+ 79 header = response.headers.get("Retry-After")
+ 80 if not header:
+ 81 return None
+ 82 return self._parse_retry_after(header)
+ 83
+ 84 def __call__(self, retry_state: RetryCallState) -> float:
+ 85 retry_after = self._get_retry_after(retry_state)
+ 86 if retry_after is not None:
+ 87 return retry_after
+ 88 return super().__call__(retry_state)
+ 89
+ 90
+ 91def _make_retry(
+ 92 status_codes: Sequence[int],
+ 93 exceptions: Sequence[Type[Exception]],
+ 94 max_attempts: int,
+ 95 condition: Union[RetryPredicate, Sequence[RetryPredicate], None],
+ 96 backoff_factor: float,
+ 97 respect_retry_after_header: bool,
+ 98 max_delay: TimedeltaSeconds,
+ 99)-> Retrying:
+100 retry_conds = [retry_if_status(status_codes), retry_if_exception_type(tuple(exceptions))]
+101 if condition is not None:
+102 if callable(condition):
+103 retry_condition = [condition]
+104 retry_conds.extend([retry_if_predicate(c) for c in retry_condition])
+105
+106 wait_cls = wait_exponential_retry_after if respect_retry_after_header else wait_exponential
+107 return Retrying(
+108 wait=wait_cls(multiplier=backoff_factor, max=max_delay),
+109 retry=(retry_any(*retry_conds)),
+110 stop=stop_after_attempt(max_attempts),
+111 reraise=True,
+112 retry_error_callback=lambda state: state.outcome.result(),
+113 )
+114
+115
+116class Client:
+117 """Wrapper for `requests` to create a `Session` with configurable retry functionality.
+118
+119 ### Summary
+120 Create a `requests.Session` which automatically retries requests in case of error.
+121 By default retries are triggered for `5xx` and `429` status codes and when the server is unreachable or drops connection.
+122
+123 ### Custom retry condition
+124 You can provide one or more custom predicates for specific retry condition. The predicate is called after every request with the resulting response and/or exception.
+125 For example, this will trigger a retry when the response text is `error`:
+126
+127 >>> from typing import Optional
+128 >>> from requests import Response
+129 >>>
+130 >>> def should_retry(response: Optional[Response], exception: Optional[BaseException]) -> bool:
+131 >>> if response is None:
+132 >>> return False
+133 >>> return response.text == 'error'
+134
+135 The retry is triggered when either any of the predicates or the default conditions based on status code/exception are `True`.
+136
+137 ### Args:
+138 request_timeout: Timeout for requests in seconds. May be passed as `timedelta` or `float/int` number of seconds.
+139 max_connections: Max connections per host in the HTTPAdapter pool
+140 raise_for_status: Whether to raise exception on error status codes (using `response.raise_for_status()`)
+141 session: Optional `requests.Session` instance to add the retry handler to. A new session is created by default.
+142 status_codes: Retry when response has any of these status codes. Default `429` and all `5xx` codes. Pass an empty list to disable retry based on status.
+143 exceptions: Retry on exception of given type(s). Default `(requests.Timeout, requests.ConnectionError)`. Pass an empty list to disable retry on exceptions.
+144 request_max_attempts: Max number of retry attempts before giving up
+145 retry_condition: A predicate or a list of predicates to decide whether to retry. If any predicate returns `True` the request is retried
+146 request_backoff_factor: Multiplier used for exponential delay between retries
+147 request_max_retry_delay: Maximum delay when using exponential backoff
+148 respect_retry_after_header: Whether to use the `Retry-After` response header (when available) to determine the retry delay
+149 session_attrs: Extra attributes that will be set on the session instance, e.g. `{headers: {'Authorization': 'api-key'}}` (see `requests.sessions.Session` for possible attributes)
+150 """
+151 _session_attrs: Dict[str, Any]
+152
+153 @with_config(spec=RunConfiguration)
+154 def __init__(
+155 self,
+156 request_timeout: Optional[Union[TimedeltaSeconds, Tuple[TimedeltaSeconds, TimedeltaSeconds]]] = DEFAULT_TIMEOUT,
+157 max_connections: int = 50,
+158 raise_for_status: bool = True,
+159 status_codes: Sequence[int] = DEFAULT_RETRY_STATUS,
+160 exceptions: Sequence[Type[Exception]] = DEFAULT_RETRY_EXCEPTIONS,
+161 request_max_attempts: int = RunConfiguration.request_max_attempts,
+162 retry_condition: Union[RetryPredicate, Sequence[RetryPredicate], None] = None,
+163 request_backoff_factor: float = RunConfiguration.request_backoff_factor,
+164 request_max_retry_delay: TimedeltaSeconds = RunConfiguration.request_max_retry_delay,
+165 respect_retry_after_header: bool = True,
+166 session_attrs: Optional[Dict[str, Any]] = None,
+167 ) -> None:
+168 self._adapter = HTTPAdapter(pool_maxsize=max_connections)
+169 self._local = local()
+170 self._session_kwargs = dict(timeout=request_timeout, raise_for_status=raise_for_status)
+171 self._retry_kwargs: Dict[str, Any] = dict(
+172 status_codes=status_codes,
+173 exceptions=exceptions,
+174 max_attempts=request_max_attempts,
+175 condition=retry_condition,
+176 backoff_factor=request_backoff_factor,
+177 respect_retry_after_header=respect_retry_after_header,
+178 max_delay=request_max_retry_delay
+179 )
+180 self._session_attrs = session_attrs or {}
+181
+182 if TYPE_CHECKING:
+183 self.get = self.session.get
+184 self.post = self.session.post
+185 self.put = self.session.put
+186 self.patch = self.session.patch
+187 self.delete = self.session.delete
+188 self.head = self.session.head
+189 self.options = self.session.options
+190 self.request = self.session.request
+191
+192 self.get = lambda *a, **kw: self.session.get(*a, **kw)
+193 self.post = lambda *a, **kw: self.session.post(*a, **kw)
+194 self.put = lambda *a, **kw: self.session.put(*a, **kw)
+195 self.patch = lambda *a, **kw: self.session.patch(*a, **kw)
+196 self.delete = lambda *a, **kw: self.session.delete(*a, **kw)
+197 self.head = lambda *a, **kw: self.session.head(*a, **kw)
+198 self.options = lambda *a, **kw: self.session.options(*a, **kw)
+199 self.request = lambda *a, **kw: self.session.request(*a, **kw)
+200
+201 self._config_version: int = 0 # Incrementing marker to ensure per-thread sessions are recreated on config changes
+202
+203 def update_from_config(self, config: RunConfiguration) -> None:
+204 """Update session/retry settings from RunConfiguration"""
+205 self._session_kwargs['timeout'] = config.request_timeout
+206 self._retry_kwargs['backoff_factor'] = config.request_backoff_factor
+207 self._retry_kwargs['max_delay'] = config.request_max_retry_delay
+208 self._retry_kwargs['max_attempts'] = config.request_max_attempts
+209 self._config_version += 1
+210
+211 def _make_session(self) -> Session:
+212 session = Session(**self._session_kwargs) # type: ignore[arg-type]
+213 for key, value in self._session_attrs.items():
+214 setattr(session, key, value)
+215 session.mount('http://', self._adapter)
+216 session.mount('https://', self._adapter)
+217 retry = _make_retry(**self._retry_kwargs)
+218 session.request = retry.wraps(session.request) # type: ignore[method-assign]
+219 return session
+220
+221 @property
+222 def session(self) -> Session:
+223 session: Optional[Session] = getattr(self._local, 'session', None)
+224 version = self._config_version
+225 if session is not None:
+226 version = self._local.config_version
+227 if session is None or version != self._config_version:
+228 # Create a new session if config has changed
+229 session = self._local.session = self._make_session()
+230 self._local.config_version = self._config_version
+231 return session
+
37class retry_if_status(retry_base):
+38 """Retry for given response status codes"""
+39
+40 def __init__(self, status_codes: Sequence[int]) -> None:
+41 self.status_codes = set(status_codes)
+42
+43 def __call__(self, retry_state: RetryCallState) -> bool:
+44 response = _get_retry_response(retry_state)
+45 if response is None:
+46 return False
+47 result = response.status_code in self.status_codes
+48 return result
+
51class retry_if_predicate(retry_base):
+52 def __init__(self, predicate: RetryPredicate) -> None:
+53 self.predicate = predicate
+54
+55 def __call__(self, retry_state: RetryCallState) -> bool:
+56 response = _get_retry_response(retry_state)
+57 exception = retry_state.outcome.exception()
+58 return self.predicate(response, exception)
+
61class wait_exponential_retry_after(wait_exponential):
+62 def _parse_retry_after(self, retry_after: str) -> Optional[float]:
+63 # Borrowed from urllib3
+64 seconds: float
+65 # Whitespace: https://tools.ietf.org/html/rfc7230#section-3.2.4
+66 if re.match(r"^\s*[0-9]+\s*$", retry_after):
+67 seconds = int(retry_after)
+68 else:
+69 retry_date_tuple = parsedate_tz(retry_after)
+70 if retry_date_tuple is None:
+71 return None
+72 retry_date = mktime_tz(retry_date_tuple)
+73 seconds = retry_date - time.time()
+74 return max(self.min, min(self.max, seconds))
+75
+76 def _get_retry_after(self, retry_state: RetryCallState) -> Optional[float]:
+77 response = _get_retry_response(retry_state)
+78 if response is None:
+79 return None
+80 header = response.headers.get("Retry-After")
+81 if not header:
+82 return None
+83 return self._parse_retry_after(header)
+84
+85 def __call__(self, retry_state: RetryCallState) -> float:
+86 retry_after = self._get_retry_after(retry_state)
+87 if retry_after is not None:
+88 return retry_after
+89 return super().__call__(retry_state)
+
Inherited Members
+
+
+ 117class Client:
+118 """Wrapper for `requests` to create a `Session` with configurable retry functionality.
+119
+120 ### Summary
+121 Create a `requests.Session` which automatically retries requests in case of error.
+122 By default retries are triggered for `5xx` and `429` status codes and when the server is unreachable or drops connection.
+123
+124 ### Custom retry condition
+125 You can provide one or more custom predicates for specific retry condition. The predicate is called after every request with the resulting response and/or exception.
+126 For example, this will trigger a retry when the response text is `error`:
+127
+128 >>> from typing import Optional
+129 >>> from requests import Response
+130 >>>
+131 >>> def should_retry(response: Optional[Response], exception: Optional[BaseException]) -> bool:
+132 >>> if response is None:
+133 >>> return False
+134 >>> return response.text == 'error'
+135
+136 The retry is triggered when either any of the predicates or the default conditions based on status code/exception are `True`.
+137
+138 ### Args:
+139 request_timeout: Timeout for requests in seconds. May be passed as `timedelta` or `float/int` number of seconds.
+140 max_connections: Max connections per host in the HTTPAdapter pool
+141 raise_for_status: Whether to raise exception on error status codes (using `response.raise_for_status()`)
+142 session: Optional `requests.Session` instance to add the retry handler to. A new session is created by default.
+143 status_codes: Retry when response has any of these status codes. Default `429` and all `5xx` codes. Pass an empty list to disable retry based on status.
+144 exceptions: Retry on exception of given type(s). Default `(requests.Timeout, requests.ConnectionError)`. Pass an empty list to disable retry on exceptions.
+145 request_max_attempts: Max number of retry attempts before giving up
+146 retry_condition: A predicate or a list of predicates to decide whether to retry. If any predicate returns `True` the request is retried
+147 request_backoff_factor: Multiplier used for exponential delay between retries
+148 request_max_retry_delay: Maximum delay when using exponential backoff
+149 respect_retry_after_header: Whether to use the `Retry-After` response header (when available) to determine the retry delay
+150 session_attrs: Extra attributes that will be set on the session instance, e.g. `{headers: {'Authorization': 'api-key'}}` (see `requests.sessions.Session` for possible attributes)
+151 """
+152 _session_attrs: Dict[str, Any]
+153
+154 @with_config(spec=RunConfiguration)
+155 def __init__(
+156 self,
+157 request_timeout: Optional[Union[TimedeltaSeconds, Tuple[TimedeltaSeconds, TimedeltaSeconds]]] = DEFAULT_TIMEOUT,
+158 max_connections: int = 50,
+159 raise_for_status: bool = True,
+160 status_codes: Sequence[int] = DEFAULT_RETRY_STATUS,
+161 exceptions: Sequence[Type[Exception]] = DEFAULT_RETRY_EXCEPTIONS,
+162 request_max_attempts: int = RunConfiguration.request_max_attempts,
+163 retry_condition: Union[RetryPredicate, Sequence[RetryPredicate], None] = None,
+164 request_backoff_factor: float = RunConfiguration.request_backoff_factor,
+165 request_max_retry_delay: TimedeltaSeconds = RunConfiguration.request_max_retry_delay,
+166 respect_retry_after_header: bool = True,
+167 session_attrs: Optional[Dict[str, Any]] = None,
+168 ) -> None:
+169 self._adapter = HTTPAdapter(pool_maxsize=max_connections)
+170 self._local = local()
+171 self._session_kwargs = dict(timeout=request_timeout, raise_for_status=raise_for_status)
+172 self._retry_kwargs: Dict[str, Any] = dict(
+173 status_codes=status_codes,
+174 exceptions=exceptions,
+175 max_attempts=request_max_attempts,
+176 condition=retry_condition,
+177 backoff_factor=request_backoff_factor,
+178 respect_retry_after_header=respect_retry_after_header,
+179 max_delay=request_max_retry_delay
+180 )
+181 self._session_attrs = session_attrs or {}
+182
+183 if TYPE_CHECKING:
+184 self.get = self.session.get
+185 self.post = self.session.post
+186 self.put = self.session.put
+187 self.patch = self.session.patch
+188 self.delete = self.session.delete
+189 self.head = self.session.head
+190 self.options = self.session.options
+191 self.request = self.session.request
+192
+193 self.get = lambda *a, **kw: self.session.get(*a, **kw)
+194 self.post = lambda *a, **kw: self.session.post(*a, **kw)
+195 self.put = lambda *a, **kw: self.session.put(*a, **kw)
+196 self.patch = lambda *a, **kw: self.session.patch(*a, **kw)
+197 self.delete = lambda *a, **kw: self.session.delete(*a, **kw)
+198 self.head = lambda *a, **kw: self.session.head(*a, **kw)
+199 self.options = lambda *a, **kw: self.session.options(*a, **kw)
+200 self.request = lambda *a, **kw: self.session.request(*a, **kw)
+201
+202 self._config_version: int = 0 # Incrementing marker to ensure per-thread sessions are recreated on config changes
+203
+204 def update_from_config(self, config: RunConfiguration) -> None:
+205 """Update session/retry settings from RunConfiguration"""
+206 self._session_kwargs['timeout'] = config.request_timeout
+207 self._retry_kwargs['backoff_factor'] = config.request_backoff_factor
+208 self._retry_kwargs['max_delay'] = config.request_max_retry_delay
+209 self._retry_kwargs['max_attempts'] = config.request_max_attempts
+210 self._config_version += 1
+211
+212 def _make_session(self) -> Session:
+213 session = Session(**self._session_kwargs) # type: ignore[arg-type]
+214 for key, value in self._session_attrs.items():
+215 setattr(session, key, value)
+216 session.mount('http://', self._adapter)
+217 session.mount('https://', self._adapter)
+218 retry = _make_retry(**self._retry_kwargs)
+219 session.request = retry.wraps(session.request) # type: ignore[method-assign]
+220 return session
+221
+222 @property
+223 def session(self) -> Session:
+224 session: Optional[Session] = getattr(self._local, 'session', None)
+225 version = self._config_version
+226 if session is not None:
+227 version = self._local.config_version
+228 if session is None or version != self._config_version:
+229 # Create a new session if config has changed
+230 session = self._local.session = self._make_session()
+231 self._local.config_version = self._config_version
+232 return session
+
requests
to create a Session
with configurable retry functionality.Summary
+
+requests.Session
which automatically retries requests in case of error.
+By default retries are triggered for 5xx
and 429
status codes and when the server is unreachable or drops connection.Custom retry condition
+
+error
:
+>>> from typing import Optional
+>>> from requests import Response
+>>>
+>>> def should_retry(response: Optional[Response], exception: Optional[BaseException]) -> bool:
+>>> if response is None:
+>>> return False
+>>> return response.text == 'error'
+
True
.Args:
+
+
+request_timeout: Timeout for requests in seconds. May be passed as `timedelta` or `float/int` number of seconds.
+max_connections: Max connections per host in the HTTPAdapter pool
+raise_for_status: Whether to raise exception on error status codes (using `response.raise_for_status()`)
+session: Optional `requests.Session` instance to add the retry handler to. A new session is created by default.
+status_codes: Retry when response has any of these status codes. Default `429` and all `5xx` codes. Pass an empty list to disable retry based on status.
+exceptions: Retry on exception of given type(s). Default `(requests.Timeout, requests.ConnectionError)`. Pass an empty list to disable retry on exceptions.
+request_max_attempts: Max number of retry attempts before giving up
+retry_condition: A predicate or a list of predicates to decide whether to retry. If any predicate returns `True` the request is retried
+request_backoff_factor: Multiplier used for exponential delay between retries
+request_max_retry_delay: Maximum delay when using exponential backoff
+respect_retry_after_header: Whether to use the `Retry-After` response header (when available) to determine the retry delay
+session_attrs: Extra attributes that will be set on the session instance, e.g. `{headers: {'Authorization': 'api-key'}}` (see `requests.sessions.Session` for possible attributes)
+
154 @with_config(spec=RunConfiguration)
+155 def __init__(
+156 self,
+157 request_timeout: Optional[Union[TimedeltaSeconds, Tuple[TimedeltaSeconds, TimedeltaSeconds]]] = DEFAULT_TIMEOUT,
+158 max_connections: int = 50,
+159 raise_for_status: bool = True,
+160 status_codes: Sequence[int] = DEFAULT_RETRY_STATUS,
+161 exceptions: Sequence[Type[Exception]] = DEFAULT_RETRY_EXCEPTIONS,
+162 request_max_attempts: int = RunConfiguration.request_max_attempts,
+163 retry_condition: Union[RetryPredicate, Sequence[RetryPredicate], None] = None,
+164 request_backoff_factor: float = RunConfiguration.request_backoff_factor,
+165 request_max_retry_delay: TimedeltaSeconds = RunConfiguration.request_max_retry_delay,
+166 respect_retry_after_header: bool = True,
+167 session_attrs: Optional[Dict[str, Any]] = None,
+168 ) -> None:
+169 self._adapter = HTTPAdapter(pool_maxsize=max_connections)
+170 self._local = local()
+171 self._session_kwargs = dict(timeout=request_timeout, raise_for_status=raise_for_status)
+172 self._retry_kwargs: Dict[str, Any] = dict(
+173 status_codes=status_codes,
+174 exceptions=exceptions,
+175 max_attempts=request_max_attempts,
+176 condition=retry_condition,
+177 backoff_factor=request_backoff_factor,
+178 respect_retry_after_header=respect_retry_after_header,
+179 max_delay=request_max_retry_delay
+180 )
+181 self._session_attrs = session_attrs or {}
+182
+183 if TYPE_CHECKING:
+184 self.get = self.session.get
+185 self.post = self.session.post
+186 self.put = self.session.put
+187 self.patch = self.session.patch
+188 self.delete = self.session.delete
+189 self.head = self.session.head
+190 self.options = self.session.options
+191 self.request = self.session.request
+192
+193 self.get = lambda *a, **kw: self.session.get(*a, **kw)
+194 self.post = lambda *a, **kw: self.session.post(*a, **kw)
+195 self.put = lambda *a, **kw: self.session.put(*a, **kw)
+196 self.patch = lambda *a, **kw: self.session.patch(*a, **kw)
+197 self.delete = lambda *a, **kw: self.session.delete(*a, **kw)
+198 self.head = lambda *a, **kw: self.session.head(*a, **kw)
+199 self.options = lambda *a, **kw: self.session.options(*a, **kw)
+200 self.request = lambda *a, **kw: self.session.request(*a, **kw)
+201
+202 self._config_version: int = 0 # Incrementing marker to ensure per-thread sessions are recreated on config changes
+
204 def update_from_config(self, config: RunConfiguration) -> None:
+205 """Update session/retry settings from RunConfiguration"""
+206 self._session_kwargs['timeout'] = config.request_timeout
+207 self._retry_kwargs['backoff_factor'] = config.request_backoff_factor
+208 self._retry_kwargs['max_delay'] = config.request_max_retry_delay
+209 self._retry_kwargs['max_attempts'] = config.request_max_attempts
+210 self._config_version += 1
+
+dlt
+
+
+
+
+
+
+ 1from requests import Session as BaseSession
+ 2from tenacity import Retrying, retry_if_exception_type
+ 3from typing import Optional, TYPE_CHECKING, Sequence, Union, Tuple, Type, TypeVar
+ 4
+ 5from dlt.sources.helpers.requests.typing import TRequestTimeout
+ 6from dlt.common.typing import TimedeltaSeconds
+ 7from dlt.common.time import to_seconds
+ 8from dlt.version import __version__
+ 9
+10
+11TSession = TypeVar("TSession", bound=BaseSession)
+12
+13
+14DEFAULT_TIMEOUT = 60
+15
+16
+17def _timeout_to_seconds(timeout: TRequestTimeout) -> Optional[Union[Tuple[float, float], float]]:
+18 return (to_seconds(timeout[0]), to_seconds(timeout[1])) if isinstance(timeout, tuple) else to_seconds(timeout)
+19
+20
+21class Session(BaseSession):
+22 """Requests session which by default adds a timeout to all requests and calls `raise_for_status()` on response
+23
+24 ### Args
+25 timeout: Timeout for requests in seconds. May be passed as `timedelta` or `float/int` number of seconds.
+26 May be a single value or a tuple for separate (connect, read) timeout.
+27 raise_for_status: Whether to raise exception on error status codes (using `response.raise_for_status()`)
+28 """
+29 def __init__(
+30 self,
+31 timeout: Optional[Union[TimedeltaSeconds, Tuple[TimedeltaSeconds, TimedeltaSeconds]]] = DEFAULT_TIMEOUT,
+32 raise_for_status: bool = True,
+33 ) -> None:
+34 super().__init__()
+35 self.timeout = _timeout_to_seconds(timeout)
+36 self.raise_for_status = raise_for_status
+37 self.headers.update({
+38 "User-Agent": f"dlt/{__version__}",
+39 })
+40
+41 if TYPE_CHECKING:
+42 request = BaseSession.request
+43
+44 def request(self, *args, **kwargs): # type: ignore[no-untyped-def,no-redef]
+45 kwargs.setdefault('timeout', self.timeout)
+46 resp = super().request(*args, **kwargs)
+47 if self.raise_for_status:
+48 resp.raise_for_status()
+49 return resp
+
22class Session(BaseSession):
+23 """Requests session which by default adds a timeout to all requests and calls `raise_for_status()` on response
+24
+25 ### Args
+26 timeout: Timeout for requests in seconds. May be passed as `timedelta` or `float/int` number of seconds.
+27 May be a single value or a tuple for separate (connect, read) timeout.
+28 raise_for_status: Whether to raise exception on error status codes (using `response.raise_for_status()`)
+29 """
+30 def __init__(
+31 self,
+32 timeout: Optional[Union[TimedeltaSeconds, Tuple[TimedeltaSeconds, TimedeltaSeconds]]] = DEFAULT_TIMEOUT,
+33 raise_for_status: bool = True,
+34 ) -> None:
+35 super().__init__()
+36 self.timeout = _timeout_to_seconds(timeout)
+37 self.raise_for_status = raise_for_status
+38 self.headers.update({
+39 "User-Agent": f"dlt/{__version__}",
+40 })
+41
+42 if TYPE_CHECKING:
+43 request = BaseSession.request
+44
+45 def request(self, *args, **kwargs): # type: ignore[no-untyped-def,no-redef]
+46 kwargs.setdefault('timeout', self.timeout)
+47 resp = super().request(*args, **kwargs)
+48 if self.raise_for_status:
+49 resp.raise_for_status()
+50 return resp
+
raise_for_status()
on responseArgs
+
+
+timeout: Timeout for requests in seconds. May be passed as `timedelta` or `float/int` number of seconds.
+ May be a single value or a tuple for separate (connect, read) timeout.
+raise_for_status: Whether to raise exception on error status codes (using `response.raise_for_status()`)
+
30 def __init__(
+31 self,
+32 timeout: Optional[Union[TimedeltaSeconds, Tuple[TimedeltaSeconds, TimedeltaSeconds]]] = DEFAULT_TIMEOUT,
+33 raise_for_status: bool = True,
+34 ) -> None:
+35 super().__init__()
+36 self.timeout = _timeout_to_seconds(timeout)
+37 self.raise_for_status = raise_for_status
+38 self.headers.update({
+39 "User-Agent": f"dlt/{__version__}",
+40 })
+
45 def request(self, *args, **kwargs): # type: ignore[no-untyped-def,no-redef]
+46 kwargs.setdefault('timeout', self.timeout)
+47 resp = super().request(*args, **kwargs)
+48 if self.raise_for_status:
+49 resp.raise_for_status()
+50 return resp
+
Request <Request>
, prepares it and sends it.
+Returns Response <Response>
object.Parameters
+
+
+
+Request
object.Request
object.Request
.Request
.Request
.Request
.Request
.'filename'**: file-like-objects
+for multipart encoding upload.(connect timeout,
+read timeout) <timeouts>
tuple.False
.True
. When set to
+False
, requests will accept any TLS certificate presented by
+the server, and will ignore hostname mismatches and/or expired
+certificates, which will make your application vulnerable to
+man-in-the-middle (MitM) attacks. Setting verify to False
+may be useful during local development or testing.Inherited Members
+
+
+
+dlt
+
+
+
+
+
+
+
+
+
+
+dlt
+
+
+
+
+
+
+ 1from dlt.common.typing import TDataItem
+ 2from dlt.extract.typing import ItemTransformFunctionNoMeta
+ 3
+ 4
+ 5def take_first(max_items: int) -> ItemTransformFunctionNoMeta[bool]:
+ 6 """A filter that takes only first `max_items` from a resource"""
+ 7 count: int = 0
+ 8 def _filter(_: TDataItem) -> bool:
+ 9 nonlocal count
+10 count += 1
+11 return count <= max_items
+12 return _filter
+13
+14
+15def skip_first(max_items: int) -> ItemTransformFunctionNoMeta[bool]:
+16 """A filter that skips first `max_items` from a resource"""
+17 count: int = 0
+18 def _filter(_: TDataItem) -> bool:
+19 nonlocal count
+20 count += 1
+21 return count > max_items
+22 return _filter
+
6def take_first(max_items: int) -> ItemTransformFunctionNoMeta[bool]:
+ 7 """A filter that takes only first `max_items` from a resource"""
+ 8 count: int = 0
+ 9 def _filter(_: TDataItem) -> bool:
+10 nonlocal count
+11 count += 1
+12 return count <= max_items
+13 return _filter
+
max_items
from a resource16def skip_first(max_items: int) -> ItemTransformFunctionNoMeta[bool]:
+17 """A filter that skips first `max_items` from a resource"""
+18 count: int = 0
+19 def _filter(_: TDataItem) -> bool:
+20 nonlocal count
+21 count += 1
+22 return count > max_items
+23 return _filter
+
max_items
from a resource
Module with built in sources and source building blocks
\n"}, {"fullname": "dlt.sources.helpers", "modulename": "dlt.sources.helpers", "kind": "module", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests", "modulename": "dlt.sources.helpers.requests", "kind": "module", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.client", "modulename": "dlt.sources.helpers.requests", "qualname": "client", "kind": "variable", "doc": "\n", "default_value": "<dlt.sources.helpers.requests.retry.Client object>"}, {"fullname": "dlt.sources.helpers.requests.init", "modulename": "dlt.sources.helpers.requests", "qualname": "init", "kind": "function", "doc": "Initialize the default requests client from config
\n", "signature": "(\tconfig: dlt.common.configuration.specs.run_configuration.RunConfiguration) -> None:", "funcdef": "def"}, {"fullname": "dlt.sources.helpers.requests.retry", "modulename": "dlt.sources.helpers.requests.retry", "kind": "module", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.retry.DEFAULT_RETRY_STATUS", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "DEFAULT_RETRY_STATUS", "kind": "variable", "doc": "\n", "default_value": "(429, 500, 501, 502, 503, 504, 505, 506, 507, 508, 509, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519, 520, 521, 522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544, 545, 546, 547, 548, 549, 550, 551, 552, 553, 554, 555, 556, 557, 558, 559, 560, 561, 562, 563, 564, 565, 566, 567, 568, 569, 570, 571, 572, 573, 574, 575, 576, 577, 578, 579, 580, 581, 582, 583, 584, 585, 586, 587, 588, 589, 590, 591, 592, 593, 594, 595, 596, 597, 598, 599)"}, {"fullname": "dlt.sources.helpers.requests.retry.DEFAULT_RETRY_EXCEPTIONS", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "DEFAULT_RETRY_EXCEPTIONS", "kind": "variable", "doc": "\n", "default_value": "(<class 'requests.exceptions.ConnectionError'>, <class 'requests.exceptions.Timeout'>, <class 'requests.exceptions.ChunkedEncodingError'>)"}, {"fullname": "dlt.sources.helpers.requests.retry.RetryPredicate", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "RetryPredicate", "kind": "variable", "doc": "\n", "default_value": "typing.Callable[[typing.Optional[requests.models.Response], typing.Optional[BaseException]], bool]"}, {"fullname": "dlt.sources.helpers.requests.retry.retry_if_status", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "retry_if_status", "kind": "class", "doc": "Retry for given response status codes
\n", "bases": "tenacity.retry.retry_base"}, {"fullname": "dlt.sources.helpers.requests.retry.retry_if_status.__init__", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "retry_if_status.__init__", "kind": "function", "doc": "\n", "signature": "(status_codes: Sequence[int])"}, {"fullname": "dlt.sources.helpers.requests.retry.retry_if_status.status_codes", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "retry_if_status.status_codes", "kind": "variable", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.retry.retry_if_predicate", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "retry_if_predicate", "kind": "class", "doc": "Abstract base class for retry strategies.
\n", "bases": "tenacity.retry.retry_base"}, {"fullname": "dlt.sources.helpers.requests.retry.retry_if_predicate.__init__", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "retry_if_predicate.__init__", "kind": "function", "doc": "\n", "signature": "(\tpredicate: Callable[[Optional[requests.models.Response], Optional[BaseException]], bool])"}, {"fullname": "dlt.sources.helpers.requests.retry.retry_if_predicate.predicate", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "retry_if_predicate.predicate", "kind": "variable", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.retry.wait_exponential_retry_after", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "wait_exponential_retry_after", "kind": "class", "doc": "Wait strategy that applies exponential backoff.
\n\nIt allows for a customized multiplier and an ability to restrict the\nupper and lower limits to some maximum and minimum value.
\n\nThe intervals are fixed (i.e. there is no jitter), so this strategy is\nsuitable for balancing retries against latency when a required resource is\nunavailable for an unknown duration, but not suitable for resolving\ncontention between multiple processes for a shared resource. Use\nwait_random_exponential for the latter case.
\n", "bases": "tenacity.wait.wait_exponential"}, {"fullname": "dlt.sources.helpers.requests.retry.Client", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "Client", "kind": "class", "doc": "Wrapper for requests
to create a Session
with configurable retry functionality.
Create a requests.Session
which automatically retries requests in case of error.\nBy default retries are triggered for 5xx
and 429
status codes and when the server is unreachable or drops connection.
You can provide one or more custom predicates for specific retry condition. The predicate is called after every request with the resulting response and/or exception.\nFor example, this will trigger a retry when the response text is error
:
>>> from typing import Optional\n>>> from requests import Response\n>>>\n>>> def should_retry(response: Optional[Response], exception: Optional[BaseException]) -> bool:\n>>> if response is None:\n>>> return False\n>>> return response.text == 'error'\n
\nThe retry is triggered when either any of the predicates or the default conditions based on status code/exception are True
.
request_timeout: Timeout for requests in seconds. May be passed as `timedelta` or `float/int` number of seconds.\nmax_connections: Max connections per host in the HTTPAdapter pool\nraise_for_status: Whether to raise exception on error status codes (using `response.raise_for_status()`)\nsession: Optional `requests.Session` instance to add the retry handler to. A new session is created by default.\nstatus_codes: Retry when response has any of these status codes. Default `429` and all `5xx` codes. Pass an empty list to disable retry based on status.\nexceptions: Retry on exception of given type(s). Default `(requests.Timeout, requests.ConnectionError)`. Pass an empty list to disable retry on exceptions.\nrequest_max_attempts: Max number of retry attempts before giving up\nretry_condition: A predicate or a list of predicates to decide whether to retry. If any predicate returns `True` the request is retried\nrequest_backoff_factor: Multiplier used for exponential delay between retries\nrequest_max_retry_delay: Maximum delay when using exponential backoff\nrespect_retry_after_header: Whether to use the `Retry-After` response header (when available) to determine the retry delay\nsession_attrs: Extra attributes that will be set on the session instance, e.g. `{headers: {'Authorization': 'api-key'}}` (see `requests.sessions.Session` for possible attributes)\n
\n"}, {"fullname": "dlt.sources.helpers.requests.retry.Client.__init__", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "Client.__init__", "kind": "function", "doc": "\n", "signature": "(\trequest_timeout: Union[int, float, datetime.timedelta, Tuple[Union[int, float, datetime.timedelta], Union[int, float, datetime.timedelta]], NoneType] = 60,\tmax_connections: int = 50,\traise_for_status: bool = True,\tstatus_codes: Sequence[int] = (429, 500, 501, 502, 503, 504, 505, 506, 507, 508, 509, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519, 520, 521, 522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544, 545, 546, 547, 548, 549, 550, 551, 552, 553, 554, 555, 556, 557, 558, 559, 560, 561, 562, 563, 564, 565, 566, 567, 568, 569, 570, 571, 572, 573, 574, 575, 576, 577, 578, 579, 580, 581, 582, 583, 584, 585, 586, 587, 588, 589, 590, 591, 592, 593, 594, 595, 596, 597, 598, 599),\texceptions: Sequence[Type[Exception]] = (<class 'requests.exceptions.ConnectionError'>, <class 'requests.exceptions.Timeout'>, <class 'requests.exceptions.ChunkedEncodingError'>),\trequest_max_attempts: int = 5,\tretry_condition: Union[Callable[[Optional[requests.models.Response], Optional[BaseException]], bool], Sequence[Callable[[Optional[requests.models.Response], Optional[BaseException]], bool]], NoneType] = None,\trequest_backoff_factor: float = 1,\trequest_max_retry_delay: Union[int, float, datetime.timedelta] = 300,\trespect_retry_after_header: bool = True,\tsession_attrs: Optional[Dict[str, Any]] = None)"}, {"fullname": "dlt.sources.helpers.requests.retry.Client.get", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "Client.get", "kind": "variable", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.retry.Client.post", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "Client.post", "kind": "variable", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.retry.Client.put", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "Client.put", "kind": "variable", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.retry.Client.patch", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "Client.patch", "kind": "variable", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.retry.Client.delete", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "Client.delete", "kind": "variable", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.retry.Client.head", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "Client.head", "kind": "variable", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.retry.Client.options", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "Client.options", "kind": "variable", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.retry.Client.request", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "Client.request", "kind": "variable", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.retry.Client.update_from_config", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "Client.update_from_config", "kind": "function", "doc": "Update session/retry settings from RunConfiguration
\n", "signature": "(\tself,\tconfig: dlt.common.configuration.specs.run_configuration.RunConfiguration) -> None:", "funcdef": "def"}, {"fullname": "dlt.sources.helpers.requests.retry.Client.session", "modulename": "dlt.sources.helpers.requests.retry", "qualname": "Client.session", "kind": "variable", "doc": "\n", "annotation": ": dlt.sources.helpers.requests.session.Session"}, {"fullname": "dlt.sources.helpers.requests.session", "modulename": "dlt.sources.helpers.requests.session", "kind": "module", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.session.DEFAULT_TIMEOUT", "modulename": "dlt.sources.helpers.requests.session", "qualname": "DEFAULT_TIMEOUT", "kind": "variable", "doc": "\n", "default_value": "60"}, {"fullname": "dlt.sources.helpers.requests.session.Session", "modulename": "dlt.sources.helpers.requests.session", "qualname": "Session", "kind": "class", "doc": "Requests session which by default adds a timeout to all requests and calls raise_for_status()
on response
timeout: Timeout for requests in seconds. May be passed as `timedelta` or `float/int` number of seconds.\n May be a single value or a tuple for separate (connect, read) timeout.\nraise_for_status: Whether to raise exception on error status codes (using `response.raise_for_status()`)\n
\n", "bases": "requests.sessions.Session"}, {"fullname": "dlt.sources.helpers.requests.session.Session.__init__", "modulename": "dlt.sources.helpers.requests.session", "qualname": "Session.__init__", "kind": "function", "doc": "\n", "signature": "(\ttimeout: Union[int, float, datetime.timedelta, Tuple[Union[int, float, datetime.timedelta], Union[int, float, datetime.timedelta]], NoneType] = 60,\traise_for_status: bool = True)"}, {"fullname": "dlt.sources.helpers.requests.session.Session.timeout", "modulename": "dlt.sources.helpers.requests.session", "qualname": "Session.timeout", "kind": "variable", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.session.Session.raise_for_status", "modulename": "dlt.sources.helpers.requests.session", "qualname": "Session.raise_for_status", "kind": "variable", "doc": "\n"}, {"fullname": "dlt.sources.helpers.requests.session.Session.request", "modulename": "dlt.sources.helpers.requests.session", "qualname": "Session.request", "kind": "function", "doc": "Constructs a Request <Request>
, prepares it and sends it.\nReturns Response <Response>
object.
Request
object.Request
object.Request
.Request
.Request
.Request
.Request
.'filename'**: file-like-objects
\nfor multipart encoding upload.(connect timeout,\nread timeout) <timeouts>
tuple.False
.True
. When set to\nFalse
, requests will accept any TLS certificate presented by\nthe server, and will ignore hostname mismatches and/or expired\ncertificates, which will make your application vulnerable to\nman-in-the-middle (MitM) attacks. Setting verify to False
\nmay be useful during local development or testing.A filter that takes only first max_items
from a resource
A filter that skips first max_items
from a resource