Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-20697: Move JSONL code base to singer-encodings #57

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions tap_s3_csv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@
import singer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the singer-encoding version in the setup.py file.


from singer import metadata
from singer_encodings.csv import SKIP_FILES_COUNT
from tap_s3_csv.discover import discover_streams
from tap_s3_csv import s3
from tap_s3_csv.sync import sync_stream
from tap_s3_csv.s3 import S3Client
from tap_s3_csv.sync import sync_stream, skipped_files_count
from tap_s3_csv.config import CONFIG_CONTRACT

LOGGER = singer.get_logger()

REQUIRED_CONFIG_KEYS = ["start_date", "bucket", "account_id", "external_id", "role_name"]


def do_discover(config):
def do_discover(s3_client):
LOGGER.info("Starting discover")
streams = discover_streams(config)
streams = discover_streams(s3_client)
if not streams:
raise Exception("No streams found")
catalog = {"streams": streams}
Expand All @@ -27,13 +28,13 @@ def stream_is_selected(mdata):
return mdata.get((), {}).get('selected', False)


def do_sync(config, catalog, state):
def do_sync(s3_client, catalog, state):
LOGGER.info('Starting sync.')

for stream in catalog['streams']:
stream_name = stream['tap_stream_id']
mdata = metadata.to_map(stream['metadata'])
table_spec = next(s for s in config['tables'] if s['table_name'] == stream_name)
table_spec = next(s for s in s3_client.config['tables'] if s['table_name'] == stream_name)
if not stream_is_selected(mdata):
LOGGER.info("%s: Skipping - not selected", stream_name)
continue
Expand All @@ -43,9 +44,12 @@ def do_sync(config, catalog, state):
singer.write_schema(stream_name, stream['schema'], key_properties)

LOGGER.info("%s: Starting sync", stream_name)
counter_value = sync_stream(config, state, table_spec, stream)
counter_value = sync_stream(s3_client, state, table_spec, stream)
LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter_value)

if SKIP_FILES_COUNT:
LOGGER.warning("%s files got skipped during the last sampling.", SKIP_FILES_COUNT)

LOGGER.info('Done syncing.')

def validate_table_config(config):
Expand Down Expand Up @@ -74,17 +78,18 @@ def main():

config['tables'] = validate_table_config(config)

s3_client = S3Client(config)
try:
for page in s3.list_files_in_bucket(config):
for page in s3_client.list_files_in_bucket():
break
LOGGER.warning("I have direct access to the bucket without assuming the configured role.")
except:
s3.setup_aws_client(config)
s3_client.setup_aws_client()

if args.discover:
do_discover(args.config)
do_discover(s3_client)
elif args.properties:
do_sync(config, args.properties, args.state)
do_sync(s3_client, args.properties, args.state)


if __name__ == '__main__':
Expand Down
30 changes: 18 additions & 12 deletions tap_s3_csv/discover.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
from singer import metadata
from tap_s3_csv import s3


def discover_streams(config):
from singer_encodings import json_schema
from tap_s3_csv.s3 import SDC_EXTRA_COLUMN, SDC_SOURCE_BUCKET_COLUMN, SDC_SOURCE_FILE_COLUMN, SDC_SOURCE_LINENO_COLUMN

def get_s3_sdc_columns():
"""Override 'get_sdc_columns' from 'json_schema' as per the S3 code"""
return {
SDC_SOURCE_BUCKET_COLUMN: {'type': 'string'},
SDC_SOURCE_FILE_COLUMN: {'type': 'string'},
SDC_SOURCE_LINENO_COLUMN: {'type': 'integer'},
SDC_EXTRA_COLUMN: {'type': 'array', 'items': {
'anyOf': [{'type': 'object', 'properties': {}}, {'type': 'string'}]}}
}

json_schema.get_sdc_columns = get_s3_sdc_columns

def discover_streams(s3_client):
streams = []

for table_spec in config['tables']:
schema = discover_schema(config, table_spec)
for table_spec in s3_client.config['tables']:
schema = json_schema.get_schema_for_table(s3_client, table_spec, sample_rate=5)
streams.append({'stream': table_spec['table_name'], 'tap_stream_id': table_spec['table_name'],
'schema': schema, 'metadata': load_metadata(table_spec, schema)})
return streams


def discover_schema(config, table_spec):
sampled_schema = s3.get_sampled_schema_for_table(config, table_spec)
return sampled_schema


def load_metadata(table_spec, schema):
mdata = metadata.new()

Expand Down
Loading