Skip to content

Commit

Permalink
Fix large exports
Browse files Browse the repository at this point in the history
closes pulp#5375

Co-authored-by: Tobias Grigo <[email protected]>
  • Loading branch information
2 people authored and dralley committed May 23, 2024
1 parent 914c3e3 commit f3277fe
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGES/5375.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added Pulp side batching to fix large exports that were failing due to changes in psycopg.
68 changes: 37 additions & 31 deletions pulpcore/app/importexport.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from django.db.models.query import QuerySet

from pulpcore.app.apps import get_plugin_config
from pulpcore.app.models.content import Artifact
from pulpcore.app.models.progress import ProgressReport
from pulpcore.app.models.repository import Repository
from pulpcore.app.modelresource import (
Expand Down Expand Up @@ -50,25 +51,21 @@ def _write_export(the_tarfile, resource, dest_dir=None):
temp_file.write("[")

def process_batch(batch):
dataset = resource.export(batch)
model = resource.queryset.model
queryset = model.objects.filter(pk__in=batch)
dataset = resource.export(queryset)
# Strip "[" and "]" as we are writing the dataset in batch
temp_file.write(dataset.json.lstrip("[").rstrip("]"))

batch = []
needs_comma = False
for item in resource.queryset.iterator(chunk_size=EXPORT_BATCH_SIZE):
batch.append(item)
if needs_comma:
# Write "," if not last loop
temp_file.write(", ")
needs_comma = False

if len(batch) >= EXPORT_BATCH_SIZE:
process_batch(batch)
batch.clear()
needs_comma = True
first_loop = True
resource_pks = resource.queryset.values_list("pk", flat=True)
for offset in range(0, len(resource_pks), EXPORT_BATCH_SIZE):
batch = resource_pks[offset : offset + EXPORT_BATCH_SIZE]

if batch:
if not first_loop:
temp_file.write(", ")
else:
first_loop = False
process_batch(batch)

temp_file.write("]")
Expand Down Expand Up @@ -102,39 +99,48 @@ def export_versions(export, version_info):
export.tarfile.addfile(info, io.BytesIO(version_json))


def export_artifacts(export, artifacts):
def export_artifacts(export, artifact_pks):
"""
Export a set of Artifacts, ArtifactResources, and RepositoryResources
Args:
export (django.db.models.PulpExport): export instance that's doing the export
artifacts (django.db.models.Artifacts): List of artifacts in all repos being exported
artifact_pks (django.db.models.Artifacts): List of artifact_pks in all repos being exported
Raises:
ValidationError: When path is not in the ALLOWED_EXPORT_PATHS setting
"""
data = dict(message="Exporting Artifacts", code="export.artifacts", total=len(artifacts))
data = dict(message="Exporting Artifacts", code="export.artifacts", total=len(artifact_pks))
with ProgressReport(**data) as pb:
pb.BATCH_INTERVAL = 5000

if settings.DEFAULT_FILE_STORAGE != "pulpcore.app.models.storage.FileSystem":
with tempfile.TemporaryDirectory(dir=".") as temp_dir:
for artifact in pb.iter(artifacts.only("file").iterator()):
with tempfile.NamedTemporaryFile(dir=temp_dir) as temp_file:
# TODO: this looks like a memory usage threat
# TODO: it's also probably horrificaly slow, going one-by-one over the net
# TODO: probably we could skip the temp file entirely and add
# artifact.file.read() directly to the tarfile with tarfile.addfile()
temp_file.write(artifact.file.read())
temp_file.flush()
artifact.file.close()
export.tarfile.add(temp_file.name, artifact.file.name)
for offset in range(0, len(artifact_pks), EXPORT_BATCH_SIZE):
batch = artifact_pks[offset : offset + EXPORT_BATCH_SIZE]
batch_qs = Artifact.objects.filter(pk__in=batch).only("file")

for artifact in pb.iter(batch_qs.iterator()):
with tempfile.NamedTemporaryFile(dir=temp_dir) as temp_file:
# TODO: this looks like a memory usage threat
# TODO: it's probably very slow, going one-by-one over the net
# TODO: probably we could skip the temp file entirely and add
# artifact.file.read() directly to the tarfile with
# tarfile.addfile()
temp_file.write(artifact.file.read())
temp_file.flush()
artifact.file.close()
export.tarfile.add(temp_file.name, artifact.file.name)
else:
for artifact in pb.iter(artifacts.only("file").iterator()):
export.tarfile.add(artifact.file.path, artifact.file.name)
for offset in range(0, len(artifact_pks), EXPORT_BATCH_SIZE):
batch = artifact_pks[offset : offset + EXPORT_BATCH_SIZE]
batch_qs = Artifact.objects.filter(pk__in=batch).only("file")

for artifact in pb.iter(batch_qs.iterator()):
export.tarfile.add(artifact.file.path, artifact.file.name)

resource = ArtifactResource()
resource.queryset = artifacts
resource.queryset = Artifact.objects.filter(pk__in=artifact_pks)
_write_export(export.tarfile, resource)

resource = RepositoryResource()
Expand Down
4 changes: 2 additions & 2 deletions pulpcore/app/tasks/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
RepositoryVersion,
Task,
)
from pulpcore.app.models.content import Artifact, ContentArtifact
from pulpcore.app.models.content import ContentArtifact
from pulpcore.app.serializers import PulpExportSerializer

from pulpcore.app.util import compute_file_hash, Crc32Hasher
Expand Down Expand Up @@ -509,7 +509,7 @@ def _do_export(pulp_exporter, tar, the_export):

# Export the top-level entities (artifacts and repositories)
# Note: we've already handled "what about incrementals" when building the 'artifacts' list
export_artifacts(the_export, Artifact.objects.filter(pk__in=artifact_pks))
export_artifacts(the_export, list(artifact_pks))
del artifact_pks

# Export the repository-version data, per-version
Expand Down

0 comments on commit f3277fe

Please sign in to comment.