From 11c2ad20ac369cbef927b756bc7b5b82144508bf Mon Sep 17 00:00:00 2001 From: Craig de Stigter Date: Fri, 3 Jul 2020 16:37:16 +1200 Subject: [PATCH] Remove logpipe. Quieten fast-import This is a better fix for #124. The previous fix appears to cause hangs in some situations. But it turns out that git fast-import doesn't *have* to be really noisy, we were just telling it to. So this tells it not to :) --- sno/output_util.py | 24 ------- sno/structure.py | 176 +++++++++++++++++++++------------------------ 2 files changed, 83 insertions(+), 117 deletions(-) diff --git a/sno/output_util.py b/sno/output_util.py index 043b900fc..5aa86d8be 100644 --- a/sno/output_util.py +++ b/sno/output_util.py @@ -2,10 +2,6 @@ import json import sys -from contextlib import contextmanager -import os -from threading import Thread - JSON_PARAMS = { "compact": {}, @@ -83,23 +79,3 @@ def is_empty_stream(stream): return True stream.seek(pos) return False - - -@contextmanager -def logpipe(logger, level): - """ - Context manager. - Yields a writable file-like object that pipes text to a logger. - - Uses threads to avoid deadlock when this is passed to a subprocess. - """ - fd_read, fd_write = os.pipe() - - def run(): - with os.fdopen(fd_read) as fo_read: - for line in iter(fo_read.readline, ''): - logger.log(level, line.strip('\n')) - - Thread(target=run).start() - with os.fdopen(fd_write, 'w') as f_write: - yield f_write diff --git a/sno/structure.py b/sno/structure.py index 14c4756d6..592b1a8c7 100644 --- a/sno/structure.py +++ b/sno/structure.py @@ -13,7 +13,6 @@ from . import core, gpkg from .exceptions import NotFound, NO_COMMIT -from .output_util import logpipe L = logging.getLogger("sno.structure") @@ -47,104 +46,95 @@ def fast_import_tables( if path in head_tree: raise ValueError(f"{path}/ already exists") - with logpipe(L, logging.INFO) as log_stderr: - click.echo("Starting git-fast-import...") - p = subprocess.Popen( - [ - "git", - "fast-import", - "--date-format=now", - "--done", - "--stats", - f"--max-pack-size={max_pack_size}", - ], - cwd=repo.path, - stdin=subprocess.PIPE, - stderr=log_stderr, + click.echo("Starting git-fast-import...") + p = subprocess.Popen( + [ + "git", + "fast-import", + "--date-format=now", + "--done", + f"--max-pack-size={max_pack_size}", + ], + cwd=repo.path, + stdin=subprocess.PIPE, + ) + try: + user = repo.default_signature + + if message is None: + if len(sources) == 1: + for path, source in sources.items(): + message = f"Import from {Path(source.source).name} to {path}/" + else: + message = f"Import {len(sources)} datasets from '{Path(source.source).name}':\n" + for path, source in sources.items(): + if path == source.table: + message += f"\n* {path}/" + else: + message += f"\n* {path} (from {source.table})" + + # FIXME: this shouldn't be hardcoded to master.. right? + header = ( + "commit refs/heads/master\n" + f"committer {user.name} <{user.email}> now\n" + f"data {len(message.encode('utf8'))}\n{message}\n" ) - try: - user = repo.default_signature + p.stdin.write(header.encode("utf8")) - if message is None: - if len(sources) == 1: - for path, source in sources.items(): - message = f"Import from {Path(source.source).name} to {path}/" + if head_tree is not None: + # start with the existing tree/contents + p.stdin.write(b"from refs/heads/master^0\n") + for path, source in sources.items(): + dataset = DatasetStructure.for_version(version)(tree=None, path=path) + + with source: + if limit: + num_rows = min(limit, source.row_count) + click.echo( + f"Importing {num_rows:,d} of {source.row_count:,d} features from {source} to {path}/ ..." + ) else: - message = f"Import {len(sources)} datasets from '{Path(source.source).name}':\n" - for path, source in sources.items(): - if path == source.table: - message += f"\n* {path}/" - else: - message += f"\n* {path} (from {source.table})" - - # FIXME: this shouldn't be hardcoded to master.. right? - header = ( - "commit refs/heads/master\n" - f"committer {user.name} <{user.email}> now\n" - f"data {len(message.encode('utf8'))}\n{message}\n" - ) - p.stdin.write(header.encode("utf8")) - - if head_tree is not None: - # start with the existing tree/contents - p.stdin.write(b"from refs/heads/master^0\n") - for path, source in sources.items(): - dataset = DatasetStructure.for_version(version)(tree=None, path=path) - - with source: - if limit: - num_rows = min(limit, source.row_count) - click.echo( - f"Importing {num_rows:,d} of {source.row_count:,d} features from {source} to {path}/ ..." - ) - else: - num_rows = source.row_count - click.echo( - f"Importing {num_rows:,d} features from {source} to {path}/ ..." - ) - - for i, blob_path in write_blobs_to_stream( - p.stdin, dataset.import_iter_meta_blobs(repo, source) - ): - pass - - # features - t1 = time.monotonic() - src_iterator = source.iter_features() - - for i, blob_path in write_blobs_to_stream( - p.stdin, dataset.import_iter_feature_blobs(src_iterator, source) - ): - if i and i % 100000 == 0: - click.echo( - f" {i:,d} features... @{time.monotonic()-t1:.1f}s" - ) - - if limit is not None and i == (limit - 1): - click.secho( - f" Stopping at {limit:,d} features", fg="yellow" - ) - break - t2 = time.monotonic() - click.echo(f"Added {num_rows:,d} Features to index in {t2-t1:.1f}s") + num_rows = source.row_count click.echo( - f"Overall rate: {(num_rows/(t2-t1 or 1E-3)):.0f} features/s)" + f"Importing {num_rows:,d} features from {source} to {path}/ ..." ) - p.stdin.write(b"\ndone\n") - except BrokenPipeError as e: - # if git-fast-import dies early, we get an EPIPE here - # we'll deal with it below - pass - else: - p.stdin.close() - p.wait() - if p.returncode != 0: - raise subprocess.CalledProcessError( - f"Error! {p.returncode}", "git-fast-import" - ) - t3 = time.monotonic() - click.echo(f"Closed in {(t3-t2):.0f}s") + for i, blob_path in write_blobs_to_stream( + p.stdin, dataset.import_iter_meta_blobs(repo, source) + ): + pass + + # features + t1 = time.monotonic() + src_iterator = source.iter_features() + + for i, blob_path in write_blobs_to_stream( + p.stdin, dataset.import_iter_feature_blobs(src_iterator, source) + ): + if i and i % 100000 == 0: + click.echo(f" {i:,d} features... @{time.monotonic()-t1:.1f}s") + + if limit is not None and i == (limit - 1): + click.secho(f" Stopping at {limit:,d} features", fg="yellow") + break + t2 = time.monotonic() + click.echo(f"Added {num_rows:,d} Features to index in {t2-t1:.1f}s") + click.echo( + f"Overall rate: {(num_rows/(t2-t1 or 1E-3)):.0f} features/s)" + ) + + p.stdin.write(b"\ndone\n") + except BrokenPipeError as e: + # if git-fast-import dies early, we get an EPIPE here + # we'll deal with it below + pass + else: + p.stdin.close() + p.wait() + if p.returncode != 0: + raise subprocess.CalledProcessError(f"Error! {p.returncode}", "git-fast-import") + t3 = time.monotonic() + click.echo(f"Closed in {(t3-t2):.0f}s") class RepositoryStructure: