Skip to content

Commit

Permalink
LITE-28260: sync stream command
Browse files Browse the repository at this point in the history
  • Loading branch information
gab832 committed Aug 18, 2023
1 parent 826de2e commit aae2f6f
Show file tree
Hide file tree
Showing 2 changed files with 312 additions and 1 deletion.
30 changes: 30 additions & 0 deletions connect/cli/plugins/commerce/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
display_streams_table,
export_stream,
get_destination_account,
print_errors,
print_results,
sync_stream,
)


Expand Down Expand Up @@ -165,5 +167,33 @@ def cmd_clone_stream(
print_results(results)


@grp_commerce_streams.command(
name='sync',
short_help='Synchronize a stream from an excel file.',
)
@click.argument('input_file', metavar='input_file', nargs=1, required=True) # noqa: E304
@pass_config
def cmd_sync_stream(config, input_file):
stream_id = None
if '.xlsx' not in input_file:
stream_id = input_file
input_file = f'{input_file}/{input_file}.xlsx'
else:
stream_id = input_file.split('/')[-1].split('.')[0]
results, errors = sync_stream(
account=config.active,
stream_id=stream_id,
input_file=input_file,
)

console.echo('')

print_results(results)

console.echo('')

print_errors(errors)


def get_group():
return grp_commerce
283 changes: 282 additions & 1 deletion connect/cli/plugins/commerce/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from click import ClickException
from connect.cli.core.terminal import console
from connect.client import ClientError
from openpyxl import Workbook
from openpyxl import Workbook, load_workbook
from openpyxl.styles import Alignment, Font, PatternFill
from openpyxl.styles.colors import WHITE, Color

Expand Down Expand Up @@ -349,6 +349,8 @@ def export_stream(

wb.save(output_file)

console.echo('')

console.secho(
f'Stream {stream_id} exported properly to {output_file}.',
fg='green',
Expand Down Expand Up @@ -634,6 +636,20 @@ def print_results(results):
)


def print_errors(errors):
if errors:
console.confirm(
'Are you sure you want to display errors?',
abort=True,
)
console.echo('')
for error in errors:
console.secho(
error,
fg='red',
)


def clone_stream(
origin_account,
stream_id,
Expand Down Expand Up @@ -761,3 +777,268 @@ def clone_stream(
)

return destination_stream_id, results


def _validate_headers(
current_headers,
expected_headers,
sheet_name,
):
for header in expected_headers:
if header not in current_headers:
raise ClickException(
f'The {sheet_name} sheet header does not contain `{header}` header.'
)


def _validate_headers(wb):
_validate_headers(
[c.value for c in wb['General Information'][1]],
(
'Stream information',
None,
),
'General Information',
)
_validate_headers(
[c.value for c in wb['Columns'][1]],
(
'ID',
'Name',
'Description',
'Type',
'Position',
'Required',
'Output',
),
'Columns',
)
_validate_headers(
[c.value for c in wb['Transformations'][1]],
(
'ID',
'Function ID',
'Function Name',
'Description',
'Overview',
'Input columns',
'Output columns',
'Position',
'Settings',
),
'Transformations',
)
_validate_headers(
[c.value for c in wb['Attachments'][1]],
(
'ID',
'Name',
),
'Attachments',
)


def get_work_book(input_file):
if not os.path.exists(input_file):
raise ClickException(f'The file {input_file} does not exists.')
if 'xlsx' not in input_file:
raise ClickException(
f'The file {input_file} has invalid format, must be xlsx.'
)
wb = load_workbook(input_file, read_only=True)
if (
'General Information',
'Columns',
'Transformations',
'Attachments',
) not in wb.sheetnames:
raise ClickException(
f'The file must contain `General Information`, `Columns`, `Transformations` and `Attachments` sheets.'
)

_validate_headers(wb)

return wb


def update_general_information(
client,
collection,
stream_id,
sheet,
results,
errors,
progress,
):
task = progress.add_task('Updating general information', total=5)

stream = (
client.ns(collection)
.streams.filter(id=stream_id)
.select(
'context',
'samples',
'sources',
)
.first()
)
body = {}
updated = 0
errors_on_update = 0
for n in range(sheet.max_row + 1):
h, v = sheet[n]
if h.value == 'Stream Name' and stream['name'] != v.value:
body['name'] = v.value
updated += 1
elif h.value == 'Stream Description' and stream['description'] != v.value:
body['description'] = v.value
updated += 1
elif h.value == 'Product ID' and stream.get('context', {}).get('product', {}).get('id', None) != v.value:
if 'context' in body:
body['context'].update({'product': {'id': v.value}})
else:
body['context'] = {'product': {'id': v.value}}
updated += 1
elif h.value == 'Partner ID' and stream.get('context', {}).get('account', {}).get('id', None) != v.value:
if 'context' in body:
body['context'].update({'account': v.value})
else:
body['context'] = {'account': v.value}
updated += 1
elif h.value == 'Marketplace ID' and stream.get('context', {}).get('marketplace', {}).get('id', None) != v.value:
if 'context' in body:
body['context'].update({'marketplace': {'id': v.value}})
else:
body['context'] = {'marketplace': {'id': v.value}}
updated += 1

if updated:
try:
client.ns(collection).streams[stream_id].update(
json=body,
)
except ClientError as e:
errors.append(str(e.value))
updated = 0
errors_on_update = 5

results.append(('General information', 5, 0, updated, 0, 0, errors_on_update))
progress.update(task, advance=5 )


def update_transformations(
client,
collection,
stream_id,
sheet,
results,
errors,
progress,
):

task = progress.add_task('Updating transformation information', total=sheet.max_row - 1)

updated = 0
errors_on_update = 0
ids = []
for n in range(1, sheet.max_row + 1):
id, fid, fname, descr, over, input, output, position, settings = sheet[n]
ids.append(id.value)
try:
if client.ns(collection).streams[stream_id].transformations.filter(id=id.value).count() == 0:
client.ns(collection).streams[stream_id].transformations.create(
json={
#TODO: Too much data to use on create. We need to get the columns and the output cols we do not have any info, so impossible.
#TODO: We could notify/warn/error if we found this case.
},
)
origin_trf = client.ns(collection).streams[stream_id].transformations[id.value]
#TODO: maybe we could update other fields like description or position?
if origin_trf['settings'] != settings.value:
client.ns(collection).streams[stream_id].transformations[id.value].update(
json={'settings': settings.value}
)
updated += 1
progress.update(task, advance=1)
except ClientError as e:
errors.append(str(e.value))
errors_on_update += 1

try:
to_delete = client.ns(collection).streams[stream_id].transformations.exclude(id__in=ids)
ids = [t['id'] for t in to_delete]
client.ns(collection).streams[stream_id].transformations.filter(id__in=ids).delete()
except ClientError as e:
errors.append(str(e.value))

results.append(('General information', sheet.max_row - 1, 0, updated, 0, 0, errors_on_update))


def update_attachments(
client,
collection,
stream_id,
sheet,
results,
errors,
progress,
):
#TODO: Update from Attachments
#TODO: If exists, we skip
#TODO: if not exists we create
#TODO: We delete the attachments that are not in the sheet (but are in the server)
pass


def sync_stream(
account,
stream_id,
input_file,
):
wb = get_work_book(input_file)

collection = guess_if_billing_or_pricing_stream(account.client, stream_id)
if not collection:
raise ClickException(
f'Stream {stream_id} not found for the current account {account.id}.'
)

results = []
errors = []

with console.status_progress() as (status, progress):

status.update('Updating general information', fg='blue')
update_general_information(
client=account.client,
collection=collection,
stream_id=stream_id,
sheet=wb['General Information'],
results=results,
errors=errors,
progress=progress,
)

status.update('Updating transformations', fg='blue')
update_transformations(
client=account.client,
collection=collection,
stream_id=stream_id,
sheet=wb['Transformations'],
results=results,
errors=errors,
progress=progress,
)

status.update('Updating attachments', fg='blue')
update_attachments(
client=account.client,
collection=collection,
stream_id=stream_id,
sheet=wb['Attachments'],
results=results,
errors=errors,
progress=progress,
)

return results, errors

0 comments on commit aae2f6f

Please sign in to comment.