From bb5aabfb3fdc47ffefaafc73922ba28b10cc3798 Mon Sep 17 00:00:00 2001 From: caoshengdong Date: Thu, 21 Mar 2024 02:51:25 +0800 Subject: [PATCH 1/5] add dreame sap source connector --- .../source-dreame-sap/.dockerignore | 6 + .../connectors/source-dreame-sap/Dockerfile | 41 +++ .../connectors/source-dreame-sap/README.md | 132 +++++++++ .../acceptance-test-config.yml | 30 ++ .../acceptance-test-docker.sh | 16 ++ .../connectors/source-dreame-sap/build.gradle | 9 + .../integration_tests/__init__.py | 3 + .../integration_tests/abnormal_state.json | 5 + .../integration_tests/acceptance.py | 16 ++ .../integration_tests/catalog.json | 39 +++ .../integration_tests/configured_catalog.json | 22 ++ .../integration_tests/invalid_config.json | 3 + .../integration_tests/sample_config.json | 3 + .../integration_tests/sample_state.json | 5 + .../connectors/source-dreame-sap/main.py | 13 + .../source-dreame-sap/requirements.txt | 2 + .../sample_files/configured_catalog.json | 150 ++++++++++ .../connectors/source-dreame-sap/setup.py | 29 ++ .../source_dreame_sap/__init__.py | 8 + .../source_dreame_sap/schemas/TODO.md | 25 ++ .../source_dreame_sap/schemas/bom.json | 49 ++++ .../schemas/inventory_data.json | 27 ++ .../schemas/material_master_data.json | 33 +++ .../schemas/purchase_order.json | 62 +++++ .../schemas/purchasing_strategy.json | 56 ++++ .../source_dreame_sap/source.py | 257 ++++++++++++++++++ .../source_dreame_sap/spec.yaml | 16 ++ .../source-dreame-sap/unit_tests/__init__.py | 3 + .../unit_tests/test_incremental_streams.py | 59 ++++ .../unit_tests/test_source.py | 22 ++ .../unit_tests/test_streams.py | 83 ++++++ 31 files changed, 1224 insertions(+) create mode 100644 airbyte-integrations/connectors/source-dreame-sap/.dockerignore create mode 100644 airbyte-integrations/connectors/source-dreame-sap/Dockerfile create mode 100644 airbyte-integrations/connectors/source-dreame-sap/README.md create mode 100644 airbyte-integrations/connectors/source-dreame-sap/acceptance-test-config.yml create mode 100644 airbyte-integrations/connectors/source-dreame-sap/acceptance-test-docker.sh create mode 100644 airbyte-integrations/connectors/source-dreame-sap/build.gradle create mode 100644 airbyte-integrations/connectors/source-dreame-sap/integration_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-dreame-sap/integration_tests/abnormal_state.json create mode 100644 airbyte-integrations/connectors/source-dreame-sap/integration_tests/acceptance.py create mode 100644 airbyte-integrations/connectors/source-dreame-sap/integration_tests/catalog.json create mode 100644 airbyte-integrations/connectors/source-dreame-sap/integration_tests/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-dreame-sap/integration_tests/invalid_config.json create mode 100644 airbyte-integrations/connectors/source-dreame-sap/integration_tests/sample_config.json create mode 100644 airbyte-integrations/connectors/source-dreame-sap/integration_tests/sample_state.json create mode 100644 airbyte-integrations/connectors/source-dreame-sap/main.py create mode 100644 airbyte-integrations/connectors/source-dreame-sap/requirements.txt create mode 100644 airbyte-integrations/connectors/source-dreame-sap/sample_files/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-dreame-sap/setup.py create mode 100644 airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/__init__.py create mode 100644 airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/TODO.md create mode 100644 airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/bom.json create mode 100644 airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/inventory_data.json create mode 100644 airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/material_master_data.json create mode 100644 airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/purchase_order.json create mode 100644 airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/purchasing_strategy.json create mode 100644 airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py create mode 100644 airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/spec.yaml create mode 100644 airbyte-integrations/connectors/source-dreame-sap/unit_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-dreame-sap/unit_tests/test_incremental_streams.py create mode 100644 airbyte-integrations/connectors/source-dreame-sap/unit_tests/test_source.py create mode 100644 airbyte-integrations/connectors/source-dreame-sap/unit_tests/test_streams.py diff --git a/airbyte-integrations/connectors/source-dreame-sap/.dockerignore b/airbyte-integrations/connectors/source-dreame-sap/.dockerignore new file mode 100644 index 000000000000..5fc7dcd81a46 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/.dockerignore @@ -0,0 +1,6 @@ +* +!Dockerfile +!main.py +!source_dreame_sap +!setup.py +!secrets diff --git a/airbyte-integrations/connectors/source-dreame-sap/Dockerfile b/airbyte-integrations/connectors/source-dreame-sap/Dockerfile new file mode 100644 index 000000000000..c5335e9d8915 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/Dockerfile @@ -0,0 +1,41 @@ +FROM python:3.9.11-alpine3.15 as base + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apk --no-cache upgrade \ + && pip install --upgrade pip \ + && apk --no-cache add tzdata build-base + + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install "cython<3.0.0" wheel +RUN pip install PyYAML~=5.4 --no-build-isolation +RUN pip install --prefix=/install . + +# build a clean environment +FROM base +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# bash is installed for more convenient debugging. +RUN apk --no-cache add bash +RUN pip install PyYAML~=5.4 --no-build-isolation + +# copy payload code only +COPY main.py ./ +COPY source_dreame_sap ./source_dreame_sap + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source_dreame_sap diff --git a/airbyte-integrations/connectors/source-dreame-sap/README.md b/airbyte-integrations/connectors/source-dreame-sap/README.md new file mode 100644 index 000000000000..90ae1c45662c --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/README.md @@ -0,0 +1,132 @@ +# Yili Middle Platform Source + +This is the repository for the Yili Middle Platform source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/yili-middle-platform). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.9.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +pip install '.[tests]' +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow. + +To build using Gradle, from the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-yili-middle-platform:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/yili-middle-platform) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_yili_middle_platform/spec.yaml` file. +Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source yili-middle-platform test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/source-yili-middle-platform:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-yili-middle-platform:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-yili-middle-platform:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-yili-middle-platform:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-yili-middle-platform:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-yili-middle-platform:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing +Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-yili-middle-platform:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-yili-middle-platform:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-dreame-sap/acceptance-test-config.yml b/airbyte-integrations/connectors/source-dreame-sap/acceptance-test-config.yml new file mode 100644 index 000000000000..6e9b7fea2a75 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/acceptance-test-config.yml @@ -0,0 +1,30 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: caoshengdong/fons-somnium-succus:0.0.6 +tests: + spec: + - spec_path: "source_dreame_sap/spec.yaml" + connection: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + - config_path: "secrets/config.json" + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] + # TODO uncomment this block to specify that the tests should assert the connector outputs the records provided in the input file a file + # expect_records: + # path: "integration_tests/expected_records.txt" + # extra_fields: no + # exact_order: no + # extra_records: yes + incremental: # TODO if your connector does not implement incremental sync, remove this block + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state_path: "integration_tests/abnormal_state.json" + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-dreame-sap/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-dreame-sap/acceptance-test-docker.sh new file mode 100644 index 000000000000..c51577d10690 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/acceptance-test-docker.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input + diff --git a/airbyte-integrations/connectors/source-dreame-sap/build.gradle b/airbyte-integrations/connectors/source-dreame-sap/build.gradle new file mode 100644 index 000000000000..ad693c8cea21 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/build.gradle @@ -0,0 +1,9 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' + id 'airbyte-source-acceptance-test' +} + +airbytePython { + moduleDirectory 'source_dreame_sap' +} diff --git a/airbyte-integrations/connectors/source-dreame-sap/integration_tests/__init__.py b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/__init__.py new file mode 100644 index 000000000000..1100c1c58cf5 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-dreame-sap/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/abnormal_state.json new file mode 100644 index 000000000000..52b0f2c2118f --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/abnormal_state.json @@ -0,0 +1,5 @@ +{ + "todo-stream-name": { + "todo-field-name": "todo-abnormal-value" + } +} diff --git a/airbyte-integrations/connectors/source-dreame-sap/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/acceptance.py new file mode 100644 index 000000000000..1302b2f57e10 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/acceptance.py @@ -0,0 +1,16 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """This fixture is a placeholder for external resources that acceptance test might require.""" + # TODO: setup test dependencies if needed. otherwise remove the TODO comments + yield + # TODO: clean up test dependencies diff --git a/airbyte-integrations/connectors/source-dreame-sap/integration_tests/catalog.json b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/catalog.json new file mode 100644 index 000000000000..6799946a6851 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/catalog.json @@ -0,0 +1,39 @@ +{ + "streams": [ + { + "name": "TODO fix this file", + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": "column1", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "column1": { + "type": "string" + }, + "column2": { + "type": "number" + } + } + } + }, + { + "name": "table1", + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "column1": { + "type": "string" + }, + "column2": { + "type": "number" + } + } + } + } + ] +} diff --git a/airbyte-integrations/connectors/source-dreame-sap/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/configured_catalog.json new file mode 100644 index 000000000000..36f0468db0d8 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/configured_catalog.json @@ -0,0 +1,22 @@ +{ + "streams": [ + { + "stream": { + "name": "customers", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "employees", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-dreame-sap/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/invalid_config.json new file mode 100644 index 000000000000..f3732995784f --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/invalid_config.json @@ -0,0 +1,3 @@ +{ + "todo-wrong-field": "this should be an incomplete config file, used in standard tests" +} diff --git a/airbyte-integrations/connectors/source-dreame-sap/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/sample_config.json new file mode 100644 index 000000000000..ecc4913b84c7 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/sample_config.json @@ -0,0 +1,3 @@ +{ + "fix-me": "TODO" +} diff --git a/airbyte-integrations/connectors/source-dreame-sap/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/sample_state.json new file mode 100644 index 000000000000..3587e579822d --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/integration_tests/sample_state.json @@ -0,0 +1,5 @@ +{ + "todo-stream-name": { + "todo-field-name": "value" + } +} diff --git a/airbyte-integrations/connectors/source-dreame-sap/main.py b/airbyte-integrations/connectors/source-dreame-sap/main.py new file mode 100644 index 000000000000..5faaef41e1b8 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/main.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_dreame_sap import SourceDreameSAP + +if __name__ == "__main__": + source = SourceDreameSAP() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-dreame-sap/requirements.txt b/airbyte-integrations/connectors/source-dreame-sap/requirements.txt new file mode 100644 index 000000000000..0411042aa091 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/requirements.txt @@ -0,0 +1,2 @@ +-e ../../bases/source-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-dreame-sap/sample_files/configured_catalog.json b/airbyte-integrations/connectors/source-dreame-sap/sample_files/configured_catalog.json new file mode 100644 index 000000000000..cf9a04642c20 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/sample_files/configured_catalog.json @@ -0,0 +1,150 @@ +{ + "streams": [ + { + "stream": { + "name": "yili_middle_platform", + "json_schema": { + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "z_year_month": { + "type": "string" + }, + "product_no": { + "type": "string" + }, + "product_name": { + "type": "string" + }, + "logical_node_no": { + "type": "string" + }, + "logical_node_name": { + "type": "string" + }, + "z_plan_prod_qty": { + "type": "string" + }, + "z_adj_prod_qty": { + "type": "string" + }, + "z_plan_prod_day1": { + "type": "string" + }, + "z_plan_prod_day2": { + "type": "string" + }, + "z_plan_prod_day3": { + "type": "string" + }, + "z_plan_prod_day4": { + "type": "string" + }, + "z_plan_prod_day5": { + "type": "string" + }, + "z_plan_prod_day6": { + "type": "string" + }, + "z_plan_prod_day7": { + "type": "string" + }, + "z_plan_prod_day8": { + "type": "string" + }, + "z_plan_prod_day9": { + "type": "string" + }, + "z_plan_prod_day10": { + "type": "string" + }, + "z_plan_prod_day11": { + "type": "string" + }, + "z_plan_prod_day12": { + "type": "string" + }, + "z_plan_prod_day13": { + "type": "string" + }, + "z_plan_prod_day14": { + "type": "string" + }, + "z_plan_prod_day15": { + "type": "string" + }, + "z_plan_prod_day16": { + "type": "string" + }, + "z_plan_prod_day17": { + "type": "string" + }, + "z_plan_prod_day18": { + "type": "string" + }, + "z_plan_prod_day19": { + "type": "string" + }, + "z_plan_prod_day20": { + "type": "string" + }, + "z_plan_prod_day21": { + "type": "string" + }, + "z_plan_prod_day22": { + "type": "string" + }, + "z_plan_prod_day23": { + "type": "string" + }, + "z_plan_prod_day24": { + "type": "string" + }, + "z_plan_prod_day25": { + "type": "string" + }, + "z_plan_prod_day26": { + "type": "string" + }, + "z_plan_prod_day27": { + "type": "string" + }, + "z_plan_prod_day28": { + "type": "string" + }, + "z_plan_prod_day29": { + "type": "string" + }, + "z_plan_prod_day30": { + "type": "string" + }, + "z_plan_prod_day31": { + "type": "string" + }, + "deletion_flag": { + "type": "string" + }, + "create_user": { + "type": "string" + }, + "create_time": { + "type": "string" + }, + "update_user": { + "type": "string" + }, + "update_time": { + "type": "string" + }, + "ds": { + "type": "string" + } + } + }, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} diff --git a/airbyte-integrations/connectors/source-dreame-sap/setup.py b/airbyte-integrations/connectors/source-dreame-sap/setup.py new file mode 100644 index 000000000000..e39f5ee472dc --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/setup.py @@ -0,0 +1,29 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk~=0.1.56", +] + +TEST_REQUIREMENTS = [ + "pytest~=6.1", + "pytest-mock~=3.6.1", + "source-acceptance-test", +] + +setup( + name="source-dreame-sap", + description="Source implementation for Dreame SAP.", + author="Convect AI", + author_email="hi@convect.ai", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "*.yaml", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/__init__.py b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/__init__.py new file mode 100644 index 000000000000..b6de04ab8795 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from .source import SourceDreameSAP + +__all__ = ["SourceDreameSAP"] diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/TODO.md b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/TODO.md new file mode 100644 index 000000000000..cf1efadb3c9c --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/TODO.md @@ -0,0 +1,25 @@ +# TODO: Define your stream schemas +Your connector must describe the schema of each stream it can output using [JSONSchema](https://json-schema.org). + +The simplest way to do this is to describe the schema of your streams using one `.json` file per stream. You can also dynamically generate the schema of your stream in code, or you can combine both approaches: start with a `.json` file and dynamically add properties to it. + +The schema of a stream is the return value of `Stream.get_json_schema`. + +## Static schemas +By default, `Stream.get_json_schema` reads a `.json` file in the `schemas/` directory whose name is equal to the value of the `Stream.name` property. In turn `Stream.name` by default returns the name of the class in snake case. Therefore, if you have a class `class EmployeeBenefits(HttpStream)` the default behavior will look for a file called `schemas/employee_benefits.json`. You can override any of these behaviors as you need. + +Important note: any objects referenced via `$ref` should be placed in the `shared/` directory in their own `.json` files. + +## Dynamic schemas +If you'd rather define your schema in code, override `Stream.get_json_schema` in your stream class to return a `dict` describing the schema using [JSONSchema](https://json-schema.org). + +## Dynamically modifying static schemas +Override `Stream.get_json_schema` to run the default behavior, edit the returned value, then return the edited value: +``` +def get_json_schema(self): + schema = super().get_json_schema() + schema['dynamically_determined_property'] = "property" + return schema +``` + +Delete this file once you're done. Or don't. Up to you :) diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/bom.json b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/bom.json new file mode 100644 index 000000000000..691ed3f60c69 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/bom.json @@ -0,0 +1,49 @@ +{ + "$schema" : "http://json-schema.org/draft-07/schema#", + "type" : "object", + "properties" : { + "WERKS" : { + "type" : [ + "null", + "string" + ] + }, + "NAME1" : { + "type" : [ + "null", + "string" + ] + }, + "MATNR" : { + "type" : [ + "null", + "string" + ] + }, + "MAKTX_MATNR" : { + "type" : [ + "null", + "string" + ] + }, + "IDNRK" : { + "type" : [ + "null", + "string" + ] + }, + "MAKTX_IDNRK" : { + "type" : [ + "null", + "string" + ] + }, + "MNGLG" : { + "type" : [ + "null", + "string", + "number" + ] + } + } +} diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/inventory_data.json b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/inventory_data.json new file mode 100644 index 000000000000..8f0d56afbbf9 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/inventory_data.json @@ -0,0 +1,27 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "MATNR": { + "type": ["null", "string"] + }, + "MAKTX": { + "type": ["null", "string"] + }, + "WERKS": { + "type": ["null", "string"] + }, + "NAME1": { + "type": ["null", "string"] + }, + "LGORT": { + "type": ["null", "string"] + }, + "LGOBE": { + "type": ["null", "string"] + }, + "LABST": { + "type": ["null", "string", "number"] + } + } +} diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/material_master_data.json b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/material_master_data.json new file mode 100644 index 000000000000..85911690439c --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/material_master_data.json @@ -0,0 +1,33 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "MATNR": { + "type": ["null", "string"] + }, + "MAKTX": { + "type": ["null", "string"] + }, + "ERSDA": { + "type": ["null", "string"] + }, + "MTART": { + "type": ["null", "string"] + }, + "MTBEZ": { + "type": ["null", "string"] + }, + "MEINS": { + "type": ["null", "string"] + }, + "STPRS": { + "type": ["null", "string", "number"] + }, + "ZSFYXCG": { + "type": ["null", "string"] + }, + "LVORM": { + "type": ["null", "string"] + } + } +} diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/purchase_order.json b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/purchase_order.json new file mode 100644 index 000000000000..a771a70c2884 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/purchase_order.json @@ -0,0 +1,62 @@ +{ + "$schema" : "http://json-schema.org/draft-07/schema#", + "type" : "object", + "properties" : { + "EBELN" : { + "type" : [ + "null", + "string" + ] + }, + "EBELP" : { + "type" : [ + "null", + "string", + "number" + ] + }, + "LIFNR" : { + "type" : [ + "null", + "string" + ] + }, + "WERKS" : { + "type" : [ + "null", + "string" + ] + }, + "NAME1" : { + "type" : [ + "null", + "string" + ] + }, + "MATNR" : { + "type" : [ + "null", + "string" + ] + }, + "MAKTX" : { + "type" : [ + "null", + "string" + ] + }, + "EINDT" : { + "type" : [ + "null", + "string" + ] + }, + "ZWJHSL" : { + "type" : [ + "null", + "string", + "number" + ] + } + } +} diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/purchasing_strategy.json b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/purchasing_strategy.json new file mode 100644 index 000000000000..1fff05c1035e --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/purchasing_strategy.json @@ -0,0 +1,56 @@ +{ + "$schema" : "http://json-schema.org/draft-07/schema#", + "type" : "object", + "properties" : { + "WERKS" : { + "type" : [ + "null", + "string" + ] + }, + "NAME1" : { + "type" : [ + "null", + "string" + ] + }, + "MATNR" : { + "type" : [ + "null", + "string" + ] + }, + "MAKTX" : { + "type" : [ + "null", + "string" + ] + }, + "MTART" : { + "type" : [ + "null", + "string" + ] + }, + "MTBEZ" : { + "type" : [ + "null", + "string" + ] + }, + "STPRS" : { + "type" : [ + "null", + "string", + "number" + ] + }, + "PLIFZ" : { + "type" : [ + "null", + "string", + "number" + ] + } + } +} diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py new file mode 100644 index 000000000000..214b8deb0b6c --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py @@ -0,0 +1,257 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import base64 +import json +from abc import ABC +from datetime import datetime +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union + +import requests +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http.auth import NoAuth + +""" +TODO: Most comments in this class are instructive and should be deleted after the source is implemented. + +This file provides a stubbed example of how to use the Airbyte CDK to develop both a source connector which supports full refresh or and an +incremental syncs from an HTTP API. + +The various TODOs are both implementation hints and steps - fulfilling all the TODOs should be sufficient to implement one basic and one incremental +stream from a source. This pattern is the same one used by Airbyte internally to implement connectors. + +The approach here is not authoritative, and devs are free to use their own judgement. + +There are additional required TODOs in the files within the integration_tests folder and the spec.yaml file. +""" + + +class DreameSAPStream(HttpStream, ABC): + url_base = 'http://dreamepodap01.dreame.tech:50000/' + primary_key = None + http_method = 'POST' + input_name = None + input_params = None + + def __init__(self, config: Mapping[str, Any], **kwargs): + super().__init__() + # url_base based on the env of the spec + if config['env'] == 'prod': + self.url_base = 'http://dreamepopap01.dreame.tech:50000/' + self.host = 'dreamepopap01.dreame.tech:50000' + if config['env'] == 'dev': + self.url_base = 'http://dreamepodap01.dreame.tech:50000/' + self.host = 'dreamepodap01.dreame.tech:50000' + if config['env'] == 'uat': + self.url_base = 'http://dreamepoqap01.dreame.tech:50000/' + self.host = 'dreamepoqap01.dreame.tech:50000' + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + return f'/RESTAdapter/Ext_ERP002/' + + def request_headers( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> Mapping[str, Any]: + user = 'PO_USER' + password = 'zm123456' + user_pass = f"{user}:{password}" + user_pass_encoded = base64.b64encode(user_pass.encode()).decode() + return { + 'Content-Type': 'application/json', + 'Authorization': 'Basic ' + user_pass_encoded + } + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + return {} + + def request_body_data( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Optional[Union[Mapping, str]]: + input_params = self.input_params + if next_page_token: + input_params["IMPORT"].update({"IV_PAGENO": str(next_page_token["page"]), "IV_PAGESIZE": str(next_page_token["page_size"])}) + body = {"IS_INPUT": { + "NAME": self.input_name, + "RECEIVER": "SAP", + "SENDER": "flow", + "INPUT": json.dumps(input_params) + }} + return json.dumps(body) + + def parse_response( + self, + response: requests.Response, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Iterable[Mapping]: + result = response.json() + output = json.loads(result['ES_OUTPUT']['OUTPUT'].replace('\n', '').replace('\r', '').replace('\t', '')) + data = output['TABLES']['T_DATA'] + return data + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + +class IncrementalDreameSAPStream(DreameSAPStream, ABC): + + def __init__(self, config: Mapping[str, Any], **kwargs): + super().__init__(config) + self.page_size = 10000 + self.current_page = 1 + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + result = response.json() + output = json.loads(result['ES_OUTPUT']['OUTPUT'].replace('\n', '').replace('\r', '').replace('\t', '')) + data = output['TABLES']['T_DATA'] + if len(data) < self.page_size: + return None + self.current_page += 1 + return {"page": self.current_page, "page_size": self.page_size} + + +class ChildStreamMixin: + parent_stream_class: Optional[Any] = None + + def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + items = self.parent_stream_class(config=self.config).read_records(sync_mode=sync_mode) + for item in items: + yield {"parent_id": item["id"]} + + +class MaterialMasterData(DreameSAPStream): + input_name = "ZFMMM_073" + primary_key = "MATNR" + + def request_body_data(self, **kwargs) -> Optional[Union[Mapping, str]]: + self.input_params = { + "IMPORT": {"IV_DATUM": datetime.now().strftime('%Y%m%d')}, + "TABLES": {"T_WERKS": [{"LOW": "1100"}, {"LOW": "1101"}, {"LOW": "1102"}, {"LOW": "1111"}]} + } + return super().request_body_data(**kwargs) + + +class InventoryData(IncrementalDreameSAPStream): + input_name = "ZFMMM_074" + primary_key = ["MATNR", "WERKS"] + + def request_body_data(self, **kwargs) -> Optional[Union[Mapping, str]]: + self.input_params = { + "TABLES": { + "T_WERKS": [ + {"LOW": "1100"}, + {"LOW": "1101"}, + {"LOW": "1102"}, + {"LOW": "1111"} + ], + "T_LGORT": [ + {"LOW": "1014"}, + {"LOW": "1015"}, + {"LOW": "1016"}, + {"LOW": "3007"}, + {"LOW": "3018"}, + {"LOW": "3029"}, + {"LOW": "3000"}, + {"LOW": "1017"}, + {"LOW": "3000"}, + {"LOW": "3050"}, + {"LOW": "3002"}, + {"LOW": "3069"}, + {"LOW": "3055"} + ] + } + } + return super().request_body_data(**kwargs) + + +class PurchasingStrategy(DreameSAPStream): + input_name = "ZFMMM_076" + primary_key = ["WERKS", "MATNR"] + + def request_body_data(self, **kwargs) -> Optional[Union[Mapping, str]]: + self.input_params = {"TABLES": {"T_WERKS": [{"LOW": "1100"}, {"LOW": "1101"}, {"LOW": "1102"}, {"LOW": "1111"}]}} + return super().request_body_data(**kwargs) + + +class PurchaseOrder(DreameSAPStream): + input_name = "ZFMMM_075" + primary_key = ["EBELN", "EBELP"] + + def request_body_data(self, **kwargs) -> Optional[Union[Mapping, str]]: + # IV_START today YYYY-MM-DD + iv_start = datetime.now().strftime('%Y-%m-%d') + # IV_END today + 2 years YYYY-MM-DD + iv_end = (datetime.now().replace(year=datetime.now().year + 2)).strftime('%Y-%m-%d') + # 省略实现细节,应根据具体API调整参数 + self.input_params = {"IMPORT": {"IV_START": iv_start, "IV_END": iv_end}, + "TABLES": {"T_WERKS": [{"LOW": "1100"}, {"LOW": "1101"}, {"LOW": "1102"}, {"LOW": "1111"}]}} + return super().request_body_data(**kwargs) + + +class Bom(DreameSAPStream): + input_name = "ZFMPP_020" + primary_key = ["WERKS", "MATNR", "IDNRK"] + parent_stream_class = MaterialMasterData + factories = ['1100', '1101', '1102', '1111'] + + def __init__(self, config: Mapping[str, Any], **kwargs): + super().__init__(config) + self.config = config + + def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + # 直接从父流读取记录,并应用筛选逻辑 + parent_stream = self.parent_stream_class(config=self.config) + for item in parent_stream.read_records(sync_mode=sync_mode): + if item.get('MTART') == 'Z001': + # 为每个 iv_matnr 生成 4 个 slices,每个对应一个工厂 + for factory in self.factories: + yield {"parent_id": item["MATNR"], "factory": factory} + + def request_body_json(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> Optional[Mapping]: + # 使用 stream_slice 中的 parent_id 作为 IV_MATNR + iv_matnr = stream_slice["parent_id"] if stream_slice else None + factory = stream_slice["factory"] if stream_slice else None + self.input_params = { + "IMPORT": { + "IV_MATNR": iv_matnr, + "IV_WERKS": factory + } + } + return super().request_body_json(**kwargs) + + +# Source +class SourceDreameSAP(AbstractSource): + def check_connection(self, logger, config) -> Tuple[bool, any]: + return True, None + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + # NoAuth just means there is no authentication required for this API and is included for completeness. + # Skip passing an authenticator if no authentication is required. + # Other authenticators are available for API token-based auth and Oauth2. + auth = NoAuth() + return [ + MaterialMasterData(authenticator=auth, config=config), + InventoryData(authenticator=auth, config=config), + PurchasingStrategy(authenticator=auth, config=config), + PurchaseOrder(authenticator=auth, config=config), + Bom(authenticator=auth, config=config) + ] diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/spec.yaml b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/spec.yaml new file mode 100644 index 000000000000..89354f80ff9c --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/spec.yaml @@ -0,0 +1,16 @@ +documentationUrl: https://docs.airbyte.com/integrations/sources/dreame_sap +connectionSpecification: + $schema: http://json-schema.org/draft-07/schema# + title: Dreame SAP + type: object + required: + - env + properties: + env: + type: string + order: 1 + examples: + - dev + - uat + - prod + diff --git a/airbyte-integrations/connectors/source-dreame-sap/unit_tests/__init__.py b/airbyte-integrations/connectors/source-dreame-sap/unit_tests/__init__.py new file mode 100644 index 000000000000..1100c1c58cf5 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/unit_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-dreame-sap/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-dreame-sap/unit_tests/test_incremental_streams.py new file mode 100644 index 000000000000..539fa0f64d93 --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/unit_tests/test_incremental_streams.py @@ -0,0 +1,59 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.models import SyncMode +from pytest import fixture +from source_dreame_sap.source import IncrementalYiliMiddlePlatformStream + + +@fixture +def patch_incremental_base_class(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(IncrementalYiliMiddlePlatformStream, "path", "v0/example_endpoint") + mocker.patch.object(IncrementalYiliMiddlePlatformStream, "primary_key", "test_primary_key") + mocker.patch.object(IncrementalYiliMiddlePlatformStream, "__abstractmethods__", set()) + + +def test_cursor_field(patch_incremental_base_class): + stream = IncrementalYiliMiddlePlatformStream() + # TODO: replace this with your expected cursor field + expected_cursor_field = [] + assert stream.cursor_field == expected_cursor_field + + +def test_get_updated_state(patch_incremental_base_class): + stream = IncrementalYiliMiddlePlatformStream() + # TODO: replace this with your input parameters + inputs = {"current_stream_state": None, "latest_record": None} + # TODO: replace this with your expected updated stream state + expected_state = {} + assert stream.get_updated_state(**inputs) == expected_state + + +def test_stream_slices(patch_incremental_base_class): + stream = IncrementalYiliMiddlePlatformStream() + # TODO: replace this with your input parameters + inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}} + # TODO: replace this with your expected stream slices list + expected_stream_slice = [None] + assert stream.stream_slices(**inputs) == expected_stream_slice + + +def test_supports_incremental(patch_incremental_base_class, mocker): + mocker.patch.object(IncrementalYiliMiddlePlatformStream, "cursor_field", "dummy_field") + stream = IncrementalYiliMiddlePlatformStream() + assert stream.supports_incremental + + +def test_source_defined_cursor(patch_incremental_base_class): + stream = IncrementalYiliMiddlePlatformStream() + assert stream.source_defined_cursor + + +def test_stream_checkpoint_interval(patch_incremental_base_class): + stream = IncrementalYiliMiddlePlatformStream() + # TODO: replace this with your expected checkpoint interval + expected_checkpoint_interval = None + assert stream.state_checkpoint_interval == expected_checkpoint_interval diff --git a/airbyte-integrations/connectors/source-dreame-sap/unit_tests/test_source.py b/airbyte-integrations/connectors/source-dreame-sap/unit_tests/test_source.py new file mode 100644 index 000000000000..6497f6c5fb4a --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/unit_tests/test_source.py @@ -0,0 +1,22 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +from source_dreame_sap.source import SourceDreameSAP + + +def test_check_connection(mocker): + source = SourceYiliMiddlePlatform() + logger_mock, config_mock = MagicMock(), MagicMock() + assert source.check_connection(logger_mock, config_mock) == (True, None) + + +def test_streams(mocker): + source = SourceYiliMiddlePlatform() + config_mock = MagicMock() + streams = source.streams(config_mock) + # TODO: replace this with your streams number + expected_streams_number = 2 + assert len(streams) == expected_streams_number diff --git a/airbyte-integrations/connectors/source-dreame-sap/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-dreame-sap/unit_tests/test_streams.py new file mode 100644 index 000000000000..746b735690fc --- /dev/null +++ b/airbyte-integrations/connectors/source-dreame-sap/unit_tests/test_streams.py @@ -0,0 +1,83 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from http import HTTPStatus +from unittest.mock import MagicMock + +import pytest +from source_dreame_sap.source import DreameSAPStream + + +@pytest.fixture +def patch_base_class(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(YiliMiddlePlatformStream, "path", "v0/example_endpoint") + mocker.patch.object(YiliMiddlePlatformStream, "primary_key", "test_primary_key") + mocker.patch.object(YiliMiddlePlatformStream, "__abstractmethods__", set()) + + +def test_request_params(patch_base_class): + stream = YiliMiddlePlatformStream() + # TODO: replace this with your input parameters + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + # TODO: replace this with your expected request parameters + expected_params = {} + assert stream.request_params(**inputs) == expected_params + + +def test_next_page_token(patch_base_class): + stream = YiliMiddlePlatformStream() + # TODO: replace this with your input parameters + inputs = {"response": MagicMock()} + # TODO: replace this with your expected next page token + expected_token = None + assert stream.next_page_token(**inputs) == expected_token + + +def test_parse_response(patch_base_class): + stream = YiliMiddlePlatformStream() + # TODO: replace this with your input parameters + inputs = {"response": MagicMock()} + # TODO: replace this with your expected parced object + expected_parsed_object = {} + assert next(stream.parse_response(**inputs)) == expected_parsed_object + + +def test_request_headers(patch_base_class): + stream = YiliMiddlePlatformStream() + # TODO: replace this with your input parameters + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + # TODO: replace this with your expected request headers + expected_headers = {} + assert stream.request_headers(**inputs) == expected_headers + + +def test_http_method(patch_base_class): + stream = YiliMiddlePlatformStream() + # TODO: replace this with your expected http request method + expected_method = "GET" + assert stream.http_method == expected_method + + +@pytest.mark.parametrize( + ("http_status", "should_retry"), + [ + (HTTPStatus.OK, False), + (HTTPStatus.BAD_REQUEST, False), + (HTTPStatus.TOO_MANY_REQUESTS, True), + (HTTPStatus.INTERNAL_SERVER_ERROR, True), + ], +) +def test_should_retry(patch_base_class, http_status, should_retry): + response_mock = MagicMock() + response_mock.status_code = http_status + stream = YiliMiddlePlatformStream() + assert stream.should_retry(response_mock) == should_retry + + +def test_backoff_time(patch_base_class): + response_mock = MagicMock() + stream = YiliMiddlePlatformStream() + expected_backoff_time = None + assert stream.backoff_time(response_mock) == expected_backoff_time From 08b33584351f7cd765e501061303d17c4e24d18d Mon Sep 17 00:00:00 2001 From: caoshengdong Date: Wed, 27 Mar 2024 02:28:07 +0800 Subject: [PATCH 2/5] use db data to retrieve bom header --- .../connectors/source-dreame-sap/setup.py | 1 + .../source_dreame_sap/schemas/bom.json | 21 ++++++ .../source_dreame_sap/source.py | 68 ++++++++++++++----- .../source_dreame_sap/spec.yaml | 56 ++++++++++++++- 4 files changed, 127 insertions(+), 19 deletions(-) diff --git a/airbyte-integrations/connectors/source-dreame-sap/setup.py b/airbyte-integrations/connectors/source-dreame-sap/setup.py index e39f5ee472dc..640a976b54e5 100644 --- a/airbyte-integrations/connectors/source-dreame-sap/setup.py +++ b/airbyte-integrations/connectors/source-dreame-sap/setup.py @@ -7,6 +7,7 @@ MAIN_REQUIREMENTS = [ "airbyte-cdk~=0.1.56", + "psycopg2-binary" ] TEST_REQUIREMENTS = [ diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/bom.json b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/bom.json index 691ed3f60c69..0b8893cb96cb 100644 --- a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/bom.json +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/schemas/bom.json @@ -44,6 +44,27 @@ "string", "number" ] + }, + "FMATNR" : { + "type" : [ + "null", + "string", + "number" + ] + }, + "FMAKTX" : { + "type" : [ + "null", + "string", + "number" + ] + }, + "STUFE" : { + "type" : [ + "null", + "string", + "number" + ] } } } diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py index 214b8deb0b6c..bdf3b6d5fd51 100644 --- a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py @@ -9,6 +9,8 @@ from datetime import datetime from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union +import psycopg2 +import psycopg2.extras import requests from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream @@ -85,8 +87,6 @@ def request_body_data( next_page_token: Mapping[str, Any] = None, ) -> Optional[Union[Mapping, str]]: input_params = self.input_params - if next_page_token: - input_params["IMPORT"].update({"IV_PAGENO": str(next_page_token["page"]), "IV_PAGESIZE": str(next_page_token["page_size"])}) body = {"IS_INPUT": { "NAME": self.input_name, "RECEIVER": "SAP", @@ -105,7 +105,7 @@ def parse_response( result = response.json() output = json.loads(result['ES_OUTPUT']['OUTPUT'].replace('\n', '').replace('\r', '').replace('\t', '')) data = output['TABLES']['T_DATA'] - return data + yield from data def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: return None @@ -118,14 +118,30 @@ def __init__(self, config: Mapping[str, Any], **kwargs): self.page_size = 10000 self.current_page = 1 + def request_body_data( + self, + **kwargs + ) -> Optional[Union[Mapping, str]]: + # Add pagination parameters to the request + input_params = self.input_params or {} + input_params["IMPORT"] = input_params.get("IMPORT", {}) + input_params["IMPORT"].update({"IV_PAGENO": str(self.current_page), "IV_PAGESIZE": str(self.page_size)}) + self.input_params = input_params + return super().request_body_data(**kwargs) + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - result = response.json() - output = json.loads(result['ES_OUTPUT']['OUTPUT'].replace('\n', '').replace('\r', '').replace('\t', '')) - data = output['TABLES']['T_DATA'] - if len(data) < self.page_size: + try: + result = response.json() + output = json.loads(result.get('ES_OUTPUT', {}).get('OUTPUT', "{}").replace('\n', '').replace('\r', '').replace('\t', '')) + data = output.get('TABLES', {}).get('T_DATA', []) + except (ValueError, KeyError) as e: + print(f"Error parsing response: {e}") return None - self.current_page += 1 - return {"page": self.current_page, "page_size": self.page_size} + + if len(data) >= self.page_size: + self.current_page += 1 + return {"page": self.current_page, "page_size": self.page_size} + return None class ChildStreamMixin: @@ -200,7 +216,6 @@ def request_body_data(self, **kwargs) -> Optional[Union[Mapping, str]]: iv_start = datetime.now().strftime('%Y-%m-%d') # IV_END today + 2 years YYYY-MM-DD iv_end = (datetime.now().replace(year=datetime.now().year + 2)).strftime('%Y-%m-%d') - # 省略实现细节,应根据具体API调整参数 self.input_params = {"IMPORT": {"IV_START": iv_start, "IV_END": iv_end}, "TABLES": {"T_WERKS": [{"LOW": "1100"}, {"LOW": "1101"}, {"LOW": "1102"}, {"LOW": "1111"}]}} return super().request_body_data(**kwargs) @@ -209,21 +224,38 @@ def request_body_data(self, **kwargs) -> Optional[Union[Mapping, str]]: class Bom(DreameSAPStream): input_name = "ZFMPP_020" primary_key = ["WERKS", "MATNR", "IDNRK"] - parent_stream_class = MaterialMasterData factories = ['1100', '1101', '1102', '1111'] def __init__(self, config: Mapping[str, Any], **kwargs): super().__init__(config) self.config = config + self.database_config = { + "host": self.config.get('db_host'), + "dbname": self.config.get('db_name'), + "user": self.config.get('db_user'), + "password": self.config.get('db_password'), + "port": self.config.get('db_port', 5432), + } + self.material_master_data_table = self.config.get('material_master_data_table', 'material_master_data') + + def get_material_master_data(self) -> Iterable[Mapping[str, Any]]: + """从数据库中获取 Material Master Data 的全量数据""" + # 这里假设配置文件包含数据库的连接信息 + conn_info = self.database_config + query = f"SELECT sku_code FROM {self.material_master_data_table} WHERE status_code = 'Z001'" + + with psycopg2.connect(**conn_info) as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute(query) + for record in cur: + yield record def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - # 直接从父流读取记录,并应用筛选逻辑 - parent_stream = self.parent_stream_class(config=self.config) - for item in parent_stream.read_records(sync_mode=sync_mode): - if item.get('MTART') == 'Z001': - # 为每个 iv_matnr 生成 4 个 slices,每个对应一个工厂 - for factory in self.factories: - yield {"parent_id": item["MATNR"], "factory": factory} + # 从数据库获取 Material Master Data 记录 + for item in self.get_material_master_data(): + # 为每个 iv_matnr 生成 4 个 slices,每个对应一个工厂 + for factory in self.factories: + yield {"parent_id": item["sku_code"], "factory": factory} def request_body_json(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> Optional[Mapping]: # 使用 stream_slice 中的 parent_id 作为 IV_MATNR diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/spec.yaml b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/spec.yaml index 89354f80ff9c..9df26340ca64 100644 --- a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/spec.yaml +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/spec.yaml @@ -5,12 +5,66 @@ connectionSpecification: type: object required: - env + - db_host + - db_name + - db_user + - db_password + - db_port + - material_master_data_table properties: env: type: string order: 1 - examples: + enum: - dev - uat - prod + db_host: + type: string + order: 2 + title: Database Host + description: The host of the database. + examples: + - localhost + + db_name: + type: string + order: 3 + title: Database Name + description: The name of the database. + examples: + - odp-user-space + + db_user: + type: string + order: 4 + title: Database User + description: The user of the database. + examples: + - user + + db_password: + type: string + order: 5 + title: Database Password + description: The password of the database. + examples: + - password + + db_port: + type: integer + order: 6 + title: Database Port + description: The port of the database. + examples: + - 5432 + + material_master_data_table: + type: string + order: 7 + title: Material Master Data Table + description: The table name of the material master data. + examples: + - material_master_data_table + From 698cb10d2e9a7278af2d9c8ed25eb1f794467a73 Mon Sep 17 00:00:00 2001 From: caoshengdong Date: Sun, 31 Mar 2024 16:04:44 +0800 Subject: [PATCH 3/5] trim the data on every field --- .../connectors/source-dreame-sap/source_dreame_sap/source.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py index bdf3b6d5fd51..ad111debb8bd 100644 --- a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py @@ -105,6 +105,9 @@ def parse_response( result = response.json() output = json.loads(result['ES_OUTPUT']['OUTPUT'].replace('\n', '').replace('\r', '').replace('\t', '')) data = output['TABLES']['T_DATA'] + # trim the data on every field + for record in data: + yield {k: v.strip() if isinstance(v, str) else v for k, v in record.items()} yield from data def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: From 095684f7b203ba41fd6bb59f92f86b8513456d3d Mon Sep 17 00:00:00 2001 From: caoshengdong Date: Tue, 30 Jul 2024 11:24:34 +0800 Subject: [PATCH 4/5] update url base and authentication --- .../source_dreame_sap/source.py | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py index ad111debb8bd..117e2e3b2ede 100644 --- a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py @@ -63,8 +63,8 @@ def path( def request_headers( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> Mapping[str, Any]: - user = 'PO_USER' - password = 'zm123456' + user = 'po_convertai_user' + password = r'G3c\43}1mX{Ix' user_pass = f"{user}:{password}" user_pass_encoded = base64.b64encode(user_pass.encode()).decode() return { @@ -240,6 +240,23 @@ def __init__(self, config: Mapping[str, Any], **kwargs): "port": self.config.get('db_port', 5432), } self.material_master_data_table = self.config.get('material_master_data_table', 'material_master_data') + if config['env'] == 'prod': + self.url_base = 'https://esbpi.dreame.tech/' + self.host = 'esbpe.dreame.tech' + if config['env'] == 'dev': + self.url_base = 'https://esbpi.dreame.tech:8000/' + self.host = 'esbpe.dreame.tech:8000' + if config['env'] == 'uat': + self.url_base = 'http://esbqi.dreame.tech:8000/' + self.host = 'esbpe.dreame.tech:8000' + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + return f'/sap/0012/' def get_material_master_data(self) -> Iterable[Mapping[str, Any]]: """从数据库中获取 Material Master Data 的全量数据""" From 15794074e0fd3a8203134d13e661bf95dcc6aa36 Mon Sep 17 00:00:00 2001 From: caoshengdong Date: Tue, 30 Jul 2024 16:00:02 +0800 Subject: [PATCH 5/5] refactor code --- .../source_dreame_sap/source.py | 199 +++++++----------- 1 file changed, 74 insertions(+), 125 deletions(-) diff --git a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py index 117e2e3b2ede..045d8c531a25 100644 --- a/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py +++ b/airbyte-integrations/connectors/source-dreame-sap/source_dreame_sap/source.py @@ -1,8 +1,3 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - - import base64 import json from abc import ABC @@ -17,115 +12,80 @@ from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.auth import NoAuth -""" -TODO: Most comments in this class are instructive and should be deleted after the source is implemented. - -This file provides a stubbed example of how to use the Airbyte CDK to develop both a source connector which supports full refresh or and an -incremental syncs from an HTTP API. - -The various TODOs are both implementation hints and steps - fulfilling all the TODOs should be sufficient to implement one basic and one incremental -stream from a source. This pattern is the same one used by Airbyte internally to implement connectors. - -The approach here is not authoritative, and devs are free to use their own judgement. - -There are additional required TODOs in the files within the integration_tests folder and the spec.yaml file. -""" - class DreameSAPStream(HttpStream, ABC): - url_base = 'http://dreamepodap01.dreame.tech:50000/' + url_base = "http://dreamepodap01.dreame.tech:50000/" primary_key = None - http_method = 'POST' + http_method = "POST" input_name = None input_params = None def __init__(self, config: Mapping[str, Any], **kwargs): - super().__init__() - # url_base based on the env of the spec - if config['env'] == 'prod': - self.url_base = 'http://dreamepopap01.dreame.tech:50000/' - self.host = 'dreamepopap01.dreame.tech:50000' - if config['env'] == 'dev': - self.url_base = 'http://dreamepodap01.dreame.tech:50000/' - self.host = 'dreamepodap01.dreame.tech:50000' - if config['env'] == 'uat': - self.url_base = 'http://dreamepoqap01.dreame.tech:50000/' - self.host = 'dreamepoqap01.dreame.tech:50000' + super().__init__(config=config, **kwargs) + if config["env"] == "prod": + self.url_base = "http://dreamepopap01.dreame.tech:50000/" + self.host = "dreamepopap01.dreame.tech:50000" + elif config["env"] == "dev": + self.url_base = "http://dreamepodap01.dreame.tech:50000/" + self.host = "dreamepodap01.dreame.tech:50000" + elif config["env"] == "uat": + self.url_base = "http://dreamepoqap01.dreame.tech:50000/" + self.host = "dreamepoqap01.dreame.tech:50000" def path( - self, - stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> str: - return f'/RESTAdapter/Ext_ERP002/' + return "/RESTAdapter/Ext_ERP002/" def request_headers( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> Mapping[str, Any]: - user = 'po_convertai_user' - password = r'G3c\43}1mX{Ix' + user = "po_convertai_user" + password = r"G3c\43}1mX{Ix" user_pass = f"{user}:{password}" user_pass_encoded = base64.b64encode(user_pass.encode()).decode() - return { - 'Content-Type': 'application/json', - 'Authorization': 'Basic ' + user_pass_encoded - } + return {"Content-Type": "application/json", "Authorization": "Basic " + user_pass_encoded} def request_params( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: return {} def request_body_data( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> Optional[Union[Mapping, str]]: - input_params = self.input_params - body = {"IS_INPUT": { - "NAME": self.input_name, - "RECEIVER": "SAP", - "SENDER": "flow", - "INPUT": json.dumps(input_params) - }} + input_params = self.input_params or {} + body = {"IS_INPUT": {"NAME": self.input_name, "RECEIVER": "SAP", "SENDER": "flow", "INPUT": json.dumps(input_params)}} return json.dumps(body) def parse_response( - self, - response: requests.Response, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + self, + response: requests.Response, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, ) -> Iterable[Mapping]: - result = response.json() - output = json.loads(result['ES_OUTPUT']['OUTPUT'].replace('\n', '').replace('\r', '').replace('\t', '')) - data = output['TABLES']['T_DATA'] - # trim the data on every field - for record in data: - yield {k: v.strip() if isinstance(v, str) else v for k, v in record.items()} - yield from data + try: + result = response.json() + output = json.loads(result["ES_OUTPUT"]["OUTPUT"].replace("\n", "").replace("\r", "").replace("\t", "")) + data = output["TABLES"]["T_DATA"] + for record in data: + yield {k: v.strip() if isinstance(v, str) else v for k, v in record.items()} + except (ValueError, KeyError) as e: + self.logger.error(f"Error parsing response: {e}") + return [] def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: return None class IncrementalDreameSAPStream(DreameSAPStream, ABC): - def __init__(self, config: Mapping[str, Any], **kwargs): - super().__init__(config) + super().__init__(config=config, **kwargs) self.page_size = 10000 self.current_page = 1 - def request_body_data( - self, - **kwargs - ) -> Optional[Union[Mapping, str]]: - # Add pagination parameters to the request + def request_body_data(self, **kwargs) -> Optional[Union[Mapping, str]]: input_params = self.input_params or {} input_params["IMPORT"] = input_params.get("IMPORT", {}) input_params["IMPORT"].update({"IV_PAGENO": str(self.current_page), "IV_PAGESIZE": str(self.page_size)}) @@ -135,10 +95,10 @@ def request_body_data( def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: try: result = response.json() - output = json.loads(result.get('ES_OUTPUT', {}).get('OUTPUT', "{}").replace('\n', '').replace('\r', '').replace('\t', '')) - data = output.get('TABLES', {}).get('T_DATA', []) + output = json.loads(result.get("ES_OUTPUT", {}).get("OUTPUT", "{}").replace("\n", "").replace("\r", "").replace("\t", "")) + data = output.get("TABLES", {}).get("T_DATA", []) except (ValueError, KeyError) as e: - print(f"Error parsing response: {e}") + self.logger.error(f"Error parsing response: {e}") return None if len(data) >= self.page_size: @@ -150,7 +110,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, class ChildStreamMixin: parent_stream_class: Optional[Any] = None - def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: items = self.parent_stream_class(config=self.config).read_records(sync_mode=sync_mode) for item in items: yield {"parent_id": item["id"]} @@ -162,8 +122,8 @@ class MaterialMasterData(DreameSAPStream): def request_body_data(self, **kwargs) -> Optional[Union[Mapping, str]]: self.input_params = { - "IMPORT": {"IV_DATUM": datetime.now().strftime('%Y%m%d')}, - "TABLES": {"T_WERKS": [{"LOW": "1100"}, {"LOW": "1101"}, {"LOW": "1102"}, {"LOW": "1111"}]} + "IMPORT": {"IV_DATUM": datetime.now().strftime("%Y%m%d")}, + "TABLES": {"T_WERKS": [{"LOW": "1100"}, {"LOW": "1101"}, {"LOW": "1102"}, {"LOW": "1111"}]}, } return super().request_body_data(**kwargs) @@ -175,12 +135,7 @@ class InventoryData(IncrementalDreameSAPStream): def request_body_data(self, **kwargs) -> Optional[Union[Mapping, str]]: self.input_params = { "TABLES": { - "T_WERKS": [ - {"LOW": "1100"}, - {"LOW": "1101"}, - {"LOW": "1102"}, - {"LOW": "1111"} - ], + "T_WERKS": [{"LOW": "1100"}, {"LOW": "1101"}, {"LOW": "1102"}, {"LOW": "1111"}], "T_LGORT": [ {"LOW": "1014"}, {"LOW": "1015"}, @@ -194,8 +149,8 @@ def request_body_data(self, **kwargs) -> Optional[Union[Mapping, str]]: {"LOW": "3050"}, {"LOW": "3002"}, {"LOW": "3069"}, - {"LOW": "3055"} - ] + {"LOW": "3055"}, + ], } } return super().request_body_data(**kwargs) @@ -216,47 +171,46 @@ class PurchaseOrder(DreameSAPStream): def request_body_data(self, **kwargs) -> Optional[Union[Mapping, str]]: # IV_START today YYYY-MM-DD - iv_start = datetime.now().strftime('%Y-%m-%d') + iv_start = datetime.now().strftime("%Y-%m-%d") # IV_END today + 2 years YYYY-MM-DD - iv_end = (datetime.now().replace(year=datetime.now().year + 2)).strftime('%Y-%m-%d') - self.input_params = {"IMPORT": {"IV_START": iv_start, "IV_END": iv_end}, - "TABLES": {"T_WERKS": [{"LOW": "1100"}, {"LOW": "1101"}, {"LOW": "1102"}, {"LOW": "1111"}]}} + iv_end = (datetime.now().replace(year=datetime.now().year + 2)).strftime("%Y-%m-%d") + self.input_params = { + "IMPORT": {"IV_START": iv_start, "IV_END": iv_end}, + "TABLES": {"T_WERKS": [{"LOW": "1100"}, {"LOW": "1101"}, {"LOW": "1102"}, {"LOW": "1111"}]}, + } return super().request_body_data(**kwargs) class Bom(DreameSAPStream): input_name = "ZFMPP_020" primary_key = ["WERKS", "MATNR", "IDNRK"] - factories = ['1100', '1101', '1102', '1111'] + factories = ["1100", "1101", "1102", "1111"] def __init__(self, config: Mapping[str, Any], **kwargs): super().__init__(config) self.config = config self.database_config = { - "host": self.config.get('db_host'), - "dbname": self.config.get('db_name'), - "user": self.config.get('db_user'), - "password": self.config.get('db_password'), - "port": self.config.get('db_port', 5432), + "host": self.config.get("db_host"), + "dbname": self.config.get("db_name"), + "user": self.config.get("db_user"), + "password": self.config.get("db_password"), + "port": self.config.get("db_port", 5432), } - self.material_master_data_table = self.config.get('material_master_data_table', 'material_master_data') - if config['env'] == 'prod': - self.url_base = 'https://esbpi.dreame.tech/' - self.host = 'esbpe.dreame.tech' - if config['env'] == 'dev': - self.url_base = 'https://esbpi.dreame.tech:8000/' - self.host = 'esbpe.dreame.tech:8000' - if config['env'] == 'uat': - self.url_base = 'http://esbqi.dreame.tech:8000/' - self.host = 'esbpe.dreame.tech:8000' + self.material_master_data_table = self.config.get("material_master_data_table", "material_master_data") + if config["env"] == "prod": + self.url_base = "https://esbpi.dreame.tech/" + self.host = "esbpe.dreame.tech" + if config["env"] == "dev": + self.url_base = "https://esbpi.dreame.tech:8000/" + self.host = "esbpe.dreame.tech:8000" + if config["env"] == "uat": + self.url_base = "http://esbqi.dreame.tech:8000/" + self.host = "esbpe.dreame.tech:8000" def path( - self, - stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None - ) -> str: - return f'/sap/0012/' + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return f"/sap/0012/" def get_material_master_data(self) -> Iterable[Mapping[str, Any]]: """从数据库中获取 Material Master Data 的全量数据""" @@ -281,12 +235,7 @@ def request_body_json(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> # 使用 stream_slice 中的 parent_id 作为 IV_MATNR iv_matnr = stream_slice["parent_id"] if stream_slice else None factory = stream_slice["factory"] if stream_slice else None - self.input_params = { - "IMPORT": { - "IV_MATNR": iv_matnr, - "IV_WERKS": factory - } - } + self.input_params = {"IMPORT": {"IV_MATNR": iv_matnr, "IV_WERKS": factory}} return super().request_body_json(**kwargs) @@ -305,5 +254,5 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: InventoryData(authenticator=auth, config=config), PurchasingStrategy(authenticator=auth, config=config), PurchaseOrder(authenticator=auth, config=config), - Bom(authenticator=auth, config=config) + Bom(authenticator=auth, config=config), ]