diff --git a/airbyte/cli.py b/airbyte/cli.py index 5d3082c1..5f606ebf 100644 --- a/airbyte/cli.py +++ b/airbyte/cli.py @@ -14,7 +14,8 @@ pyab --help ``` -You can also use the fast and powerful `uv` tool to run the CLI without pre-installing: +You can also use `pipx` or the fast and powerful `uv` tool to run the PyAirbyte CLI +without pre-installing: ``` # Install `uv` if you haven't already: @@ -23,6 +24,43 @@ # Run the PyAirbyte CLI using `uvx`: uvx --from=airbyte pyab --help ``` + +Example `benchmark` Usage: + + ``` + # PyAirbyte System Benchmark (no-op): + pyab benchmark --num-records=2.4e6 + + # Source Benchmark: + pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' + pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams='*' + pyab benchmark --source=source-hardcoded-records --config='{count: 4000}' --streams=dummy_fields + + # Source Benchmark from Docker Image: + pyab benchmark --source=airbyte/source-hardcoded-records:latest --config='{count: 400_000}' + pyab benchmark --source=airbyte/source-hardcoded-records:dev --config='{count: 400_000}' + + # Destination Benchmark: + pyab benchmark --destination=destination-dev-null --config=/path/to/config.json + + # Benchmark a Local Python Source (source-s3): + pyab benchmark --source=$(poetry run which source-s3) --config=./secrets/config.json + # Equivalent to: + LOCAL_EXECUTABLE=$(poetry run which source-s3) + CONFIG_PATH=$(realpath ./secrets/config.json) + pyab benchmark --source=$LOCAL_EXECUTABLE --config=$CONFIG_PATH + ``` + +Example Usage with `uv`: + + + +Example `validate` Usage: + + ``` + pyab validate --connector=source-hardcoded-records + pyab validate --connector=source-hardcoded-records --config='{count: 400_000}' + ``` """ from __future__ import annotations @@ -118,6 +156,22 @@ def _inject_secrets(config_dict: dict[str, Any]) -> None: return config_dict +def _is_docker_image(image: str | None) -> bool: + """Check if the source or destination is a docker image.""" + return image is not None and ":" in image + + +def _is_executable_path(connector_str: str) -> bool: + return connector_str.startswith(".") or "/" in connector_str + + +def _get_connector_name(connector: str) -> str: + if _is_docker_image(connector): + return connector.split(":")[0].split("/")[-1] + + return connector + + def _resolve_source_job( *, source: str | None = None, @@ -127,16 +181,29 @@ def _resolve_source_job( """Resolve the source job into a configured Source object. Args: - source: The source name, with an optional version declaration. + source: The source name or source reference. If a path is provided, the source will be loaded from the local path. - If the string `'.'` is provided, the source will be loaded from the current - working directory. + If the source contains a colon (':'), it will be interpreted as a docker image and tag. config: The path to a configuration file for the named source or destination. streams: A comma-separated list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, all streams will be selected. """ + config_dict = _resolve_config(config) if config else None + streams_list: str | list[str] = streams or "*" + if isinstance(streams, str) and streams != "*": + streams_list = [stream.strip() for stream in streams.split(",")] + source_obj: Source - if source and (source.startswith(".") or "/" in source): + if source and _is_docker_image(source): + source_obj = get_source( + name=_get_connector_name(source), + docker_image=source, + config=config_dict, + streams=streams_list, + ) + return source_obj + + if source and _is_executable_path(source): # Treat the source as a path. source_executable = Path(source) if not source_executable.exists(): @@ -149,26 +216,22 @@ def _resolve_source_job( source_obj = get_source( name=source_executable.stem, local_executable=source_executable, + config=config_dict, + streams=streams_list, ) return source_obj - if not config: - raise PyAirbyteInputError( - message="No configuration found.", - ) + if not source or not source.startswith("source-"): raise PyAirbyteInputError( - message="Expected a source name or path to executable.", + message="Expected a source name, docker image, or path to executable.", input_value=source, ) source_name: str = source - streams_list: str | list[str] = streams or "*" - if isinstance(streams, str) and streams != "*": - streams_list = [stream.strip() for stream in streams.split(",")] return get_source( name=source_name, - config=_resolve_config(config) if config else {}, + config=config_dict, streams=streams_list, ) @@ -181,10 +244,10 @@ def _resolve_destination_job( """Resolve the destination job into a configured Destination object. Args: - destination: The destination name, with an optional version declaration. - If a path is provided, the destination will be loaded from the local path. - If the string `'.'` is provided, the destination will be loaded from the current - working directory. + destination: The destination name or source reference. + If a path is provided, the source will be loaded from the local path. + If the destination contains a colon (':'), it will be interpreted as a docker image + and tag. config: The path to a configuration file for the named source or destination. """ if not config: @@ -236,62 +299,27 @@ def _resolve_destination_job( required=False, help=CONFIG_HELP, ) -@click.option( - "--install", - is_flag=True, - default=False, - help=( - "Whether to install the connector if it is not available locally. " - "Defaults to False, meaning the connector is expected to be already be installed." - ), -) def validate( connector: str | None = None, config: str | None = None, - *, - install: bool = False, ) -> None: """Validate the connector.""" - local_executable: Path | None = None if not connector: raise PyAirbyteInputError( message="No connector provided.", ) - if connector.startswith(".") or "/" in connector: - # Treat the connector as a path. - local_executable = Path(connector) - if not local_executable.exists(): - raise PyAirbyteInputError( - message="Connector executable not found.", - context={ - "connector": connector, - }, - ) - connector_name = local_executable.stem - else: - connector_name = connector - - if not connector_name.startswith("source-") and not connector_name.startswith("destination-"): - raise PyAirbyteInputError( - message=( - "Expected a connector name or path to executable. " - "Connector names are expected to begin with 'source-' or 'destination-'." - ), - input_value=connector, - ) connector_obj: Source | Destination - if connector_name.startswith("source-"): - connector_obj = get_source( - name=connector_name, - local_executable=local_executable, - install_if_missing=install, + if "source-" in connector: + connector_obj = _resolve_source_job( + source=connector, + config=None, + streams=None, ) else: # destination - connector_obj = get_destination( - name=connector_name, - local_executable=local_executable, - install_if_missing=install, + connector_obj = _resolve_destination_job( + destination=connector, + config=None, ) print("Getting `spec` output from connector...") @@ -310,7 +338,7 @@ def validate( type=str, help=( "The source name, with an optional version declaration. " - "If a path is provided, it will be interpreted as a path to the local executable. " + "If the name contains a colon (':'), it will be interpreted as a docker image and tag. " ), ) @click.option( diff --git a/examples/run_perf_test_reads.py b/examples/run_perf_test_reads.py index f4a07893..3f69c013 100644 --- a/examples/run_perf_test_reads.py +++ b/examples/run_perf_test_reads.py @@ -23,22 +23,25 @@ ``` # Run with 5_000 records -poetry run python ./examples/run_perf_test_reads.py -n=1e3 +poetry run python ./examples/run_perf_test_reads.py -n=5e3 # Run with 500_000 records -poetry run python ./examples/run_perf_test_reads.py -n=1e5 +poetry run python ./examples/run_perf_test_reads.py -n=5e5 + +# Load 1 million records to Snowflake cache +poetry run python ./examples/run_perf_test_reads.py -n=1e6 --cache=snowflake -# Load 5_000 records to Snowflake -poetry run python ./examples/run_perf_test_reads.py -n=1e3 --cache=snowflake +# Load 1 million records to Snowflake destination +poetry run python ./examples/run_perf_test_reads.py -n=1e6 --destination=snowflake # Load 5_000 records to BigQuery -poetry run python ./examples/run_perf_test_reads.py -n=1e3 --cache=bigquery +poetry run python ./examples/run_perf_test_reads.py -n=5e3 --cache=bigquery ``` You can also use this script to test destination load performance: ```bash # Load 5_000 records to BigQuery -poetry run python ./examples/run_perf_test_reads.py -n=1e3 --destination=e2e +poetry run python ./examples/run_perf_test_reads.py -n=5e3 --destination=e2e ``` Testing raw PyAirbyte throughput with and without caching: @@ -74,6 +77,7 @@ from airbyte.secrets.google_gsm import GoogleGSMSecretManager from airbyte.sources import get_benchmark_source from typing_extensions import Literal +from ulid import ULID if TYPE_CHECKING: from airbyte.sources.base import Source @@ -82,6 +86,12 @@ AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing" +def _random_suffix() -> str: + """Generate a random suffix for use in test environments, using ULIDs.""" + ulid = str(ULID()) + return ulid[:6] + ulid[-3:] + + def get_gsm_secret_json(secret_name: str) -> dict: secret_mgr = GoogleGSMSecretManager( project=AIRBYTE_INTERNAL_GCP_PROJECT, @@ -95,25 +105,26 @@ def get_gsm_secret_json(secret_name: str) -> dict: def get_cache( - cache_type: Literal["duckdb", "snowflake", "bigquery", False], + cache_type: Literal["duckdb", "snowflake", "bigquery", "disabled", False], ) -> CacheBase | Literal[False]: - if cache_type is False: + if cache_type is False or cache_type == "disabled": return False if cache_type == "duckdb": return ab.new_local_cache() if cache_type == "snowflake": - secret_config = get_gsm_secret_json( + snowflake_config = get_gsm_secret_json( secret_name="AIRBYTE_LIB_SNOWFLAKE_CREDS", ) return SnowflakeCache( - account=secret_config["account"], - username=secret_config["username"], - password=secret_config["password"], - database=secret_config["database"], - warehouse=secret_config["warehouse"], - role=secret_config["role"], + account=snowflake_config["account"], + username=snowflake_config["username"], + password=snowflake_config["password"], + database=snowflake_config["database"], + warehouse=snowflake_config["warehouse"], + role=snowflake_config["role"], + schema_name=f"INTEGTEST_{_random_suffix()}", ) if cache_type == "bigquery": @@ -171,12 +182,26 @@ def get_destination(destination_type: str) -> ab.Destination: if destination_type in ["e2e", "noop"]: return get_noop_destination() + if destination_type.removeprefix("destination-") == "snowflake": + snowflake_config = get_gsm_secret_json( + secret_name="AIRBYTE_LIB_SNOWFLAKE_CREDS", + ) + snowflake_config["host"] = ( + f"{snowflake_config['account']}.snowflakecomputing.com" + ) + snowflake_config["schema"] = f"INTEGTEST_{_random_suffix()}" + return ab.get_destination( + "destination-snowflake", + config=snowflake_config, + docker_image=True, + ) + raise ValueError(f"Unknown destination type: {destination_type}") # noqa: TRY003 def main( n: int | str = "5e5", - cache_type: Literal["duckdb", "bigquery", "snowflake", False] = "duckdb", + cache_type: Literal["duckdb", "bigquery", "snowflake", "disabled"] = "disabled", source_alias: str = "e2e", destination_type: str | None = None, ) -> None: @@ -222,8 +247,8 @@ def main( "--cache", type=str, help="The cache type to use.", - choices=["duckdb", "snowflake", "bigquery"], - default="duckdb", + choices=["duckdb", "snowflake", "bigquery", "disabled"], + default="disabled", ) parser.add_argument( "--no-cache", @@ -244,20 +269,20 @@ def main( "hardcoded", "faker", ], - default="hardcoded", + default="benchmark", ) parser.add_argument( "--destination", type=str, help=("The destination to use (optional)."), - choices=["e2e"], + choices=["e2e", "noop", "snowflake"], default=None, ) args = parser.parse_args() main( n=args.n, - cache_type=args.cache if not args.no_cache else False, + cache_type=args.cache if not args.no_cache else "disabled", source_alias=args.source, destination_type=args.destination, )