Skip to content

Commit

Permalink
Remove logpipe. Quieten fast-import
Browse files Browse the repository at this point in the history
This is a better fix for #124.
The previous fix appears to cause hangs in some situations.
But it turns out that git fast-import doesn't *have* to be really noisy,
we were just telling it to. So this tells it not to :)
  • Loading branch information
craigds committed Jul 3, 2020
1 parent 6d08db3 commit 11c2ad2
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 117 deletions.
24 changes: 0 additions & 24 deletions sno/output_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
import json
import sys

from contextlib import contextmanager
import os
from threading import Thread


JSON_PARAMS = {
"compact": {},
Expand Down Expand Up @@ -83,23 +79,3 @@ def is_empty_stream(stream):
return True
stream.seek(pos)
return False


@contextmanager
def logpipe(logger, level):
"""
Context manager.
Yields a writable file-like object that pipes text to a logger.
Uses threads to avoid deadlock when this is passed to a subprocess.
"""
fd_read, fd_write = os.pipe()

def run():
with os.fdopen(fd_read) as fo_read:
for line in iter(fo_read.readline, ''):
logger.log(level, line.strip('\n'))

Thread(target=run).start()
with os.fdopen(fd_write, 'w') as f_write:
yield f_write
176 changes: 83 additions & 93 deletions sno/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

from . import core, gpkg
from .exceptions import NotFound, NO_COMMIT
from .output_util import logpipe

L = logging.getLogger("sno.structure")

Expand Down Expand Up @@ -47,104 +46,95 @@ def fast_import_tables(
if path in head_tree:
raise ValueError(f"{path}/ already exists")

with logpipe(L, logging.INFO) as log_stderr:
click.echo("Starting git-fast-import...")
p = subprocess.Popen(
[
"git",
"fast-import",
"--date-format=now",
"--done",
"--stats",
f"--max-pack-size={max_pack_size}",
],
cwd=repo.path,
stdin=subprocess.PIPE,
stderr=log_stderr,
click.echo("Starting git-fast-import...")
p = subprocess.Popen(
[
"git",
"fast-import",
"--date-format=now",
"--done",
f"--max-pack-size={max_pack_size}",
],
cwd=repo.path,
stdin=subprocess.PIPE,
)
try:
user = repo.default_signature

if message is None:
if len(sources) == 1:
for path, source in sources.items():
message = f"Import from {Path(source.source).name} to {path}/"
else:
message = f"Import {len(sources)} datasets from '{Path(source.source).name}':\n"
for path, source in sources.items():
if path == source.table:
message += f"\n* {path}/"
else:
message += f"\n* {path} (from {source.table})"

# FIXME: this shouldn't be hardcoded to master.. right?
header = (
"commit refs/heads/master\n"
f"committer {user.name} <{user.email}> now\n"
f"data {len(message.encode('utf8'))}\n{message}\n"
)
try:
user = repo.default_signature
p.stdin.write(header.encode("utf8"))

if message is None:
if len(sources) == 1:
for path, source in sources.items():
message = f"Import from {Path(source.source).name} to {path}/"
if head_tree is not None:
# start with the existing tree/contents
p.stdin.write(b"from refs/heads/master^0\n")
for path, source in sources.items():
dataset = DatasetStructure.for_version(version)(tree=None, path=path)

with source:
if limit:
num_rows = min(limit, source.row_count)
click.echo(
f"Importing {num_rows:,d} of {source.row_count:,d} features from {source} to {path}/ ..."
)
else:
message = f"Import {len(sources)} datasets from '{Path(source.source).name}':\n"
for path, source in sources.items():
if path == source.table:
message += f"\n* {path}/"
else:
message += f"\n* {path} (from {source.table})"

# FIXME: this shouldn't be hardcoded to master.. right?
header = (
"commit refs/heads/master\n"
f"committer {user.name} <{user.email}> now\n"
f"data {len(message.encode('utf8'))}\n{message}\n"
)
p.stdin.write(header.encode("utf8"))

if head_tree is not None:
# start with the existing tree/contents
p.stdin.write(b"from refs/heads/master^0\n")
for path, source in sources.items():
dataset = DatasetStructure.for_version(version)(tree=None, path=path)

with source:
if limit:
num_rows = min(limit, source.row_count)
click.echo(
f"Importing {num_rows:,d} of {source.row_count:,d} features from {source} to {path}/ ..."
)
else:
num_rows = source.row_count
click.echo(
f"Importing {num_rows:,d} features from {source} to {path}/ ..."
)

for i, blob_path in write_blobs_to_stream(
p.stdin, dataset.import_iter_meta_blobs(repo, source)
):
pass

# features
t1 = time.monotonic()
src_iterator = source.iter_features()

for i, blob_path in write_blobs_to_stream(
p.stdin, dataset.import_iter_feature_blobs(src_iterator, source)
):
if i and i % 100000 == 0:
click.echo(
f" {i:,d} features... @{time.monotonic()-t1:.1f}s"
)

if limit is not None and i == (limit - 1):
click.secho(
f" Stopping at {limit:,d} features", fg="yellow"
)
break
t2 = time.monotonic()
click.echo(f"Added {num_rows:,d} Features to index in {t2-t1:.1f}s")
num_rows = source.row_count
click.echo(
f"Overall rate: {(num_rows/(t2-t1 or 1E-3)):.0f} features/s)"
f"Importing {num_rows:,d} features from {source} to {path}/ ..."
)

p.stdin.write(b"\ndone\n")
except BrokenPipeError as e:
# if git-fast-import dies early, we get an EPIPE here
# we'll deal with it below
pass
else:
p.stdin.close()
p.wait()
if p.returncode != 0:
raise subprocess.CalledProcessError(
f"Error! {p.returncode}", "git-fast-import"
)
t3 = time.monotonic()
click.echo(f"Closed in {(t3-t2):.0f}s")
for i, blob_path in write_blobs_to_stream(
p.stdin, dataset.import_iter_meta_blobs(repo, source)
):
pass

# features
t1 = time.monotonic()
src_iterator = source.iter_features()

for i, blob_path in write_blobs_to_stream(
p.stdin, dataset.import_iter_feature_blobs(src_iterator, source)
):
if i and i % 100000 == 0:
click.echo(f" {i:,d} features... @{time.monotonic()-t1:.1f}s")

if limit is not None and i == (limit - 1):
click.secho(f" Stopping at {limit:,d} features", fg="yellow")
break
t2 = time.monotonic()
click.echo(f"Added {num_rows:,d} Features to index in {t2-t1:.1f}s")
click.echo(
f"Overall rate: {(num_rows/(t2-t1 or 1E-3)):.0f} features/s)"
)

p.stdin.write(b"\ndone\n")
except BrokenPipeError as e:
# if git-fast-import dies early, we get an EPIPE here
# we'll deal with it below
pass
else:
p.stdin.close()
p.wait()
if p.returncode != 0:
raise subprocess.CalledProcessError(f"Error! {p.returncode}", "git-fast-import")
t3 = time.monotonic()
click.echo(f"Closed in {(t3-t2):.0f}s")


class RepositoryStructure:
Expand Down

0 comments on commit 11c2ad2

Please sign in to comment.