Skip to content

Commit

Permalink
Merge pull request #3162 from raft-tech/3137-queryset-iterator
Browse files Browse the repository at this point in the history
#3137 (+ #3138) - worker crash/timeout on csv export
  • Loading branch information
jtimpe authored Sep 20, 2024
2 parents db12f08 + e423b1b commit 3ac1dcd
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 50 deletions.
70 changes: 20 additions & 50 deletions tdrs-backend/tdpservice/search_indexes/admin/mixins.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,13 @@
"""Mixin classes supproting custom functionality."""
from django.contrib import admin
from django.http import StreamingHttpResponse
import csv
from datetime import datetime
from django.contrib import admin
from tdpservice.search_indexes.tasks import export_queryset_to_s3_csv
from tdpservice.core.utils import log
from tdpservice.users.models import User

class ExportCsvMixin:
"""Mixin class to support CSV exporting."""

class Echo:
"""An object that implements just the write method of the file-like interface."""

def write(self, value):
"""Write the value by returning it, instead of storing in a buffer."""
return value

class RowIterator:
"""Iterator class to support custom CSV row generation."""

def __init__(self, field_names, queryset):
self.field_names = field_names
self.queryset = queryset
self.writer = csv.writer(ExportCsvMixin.Echo())
self.is_header_row = True
self.header_row = self.__init_header_row(field_names)

def __init_header_row(self, field_names):
"""Generate custom header row."""
header_row = []
for name in field_names:
header_row.append(name)
if name == "datafile":
header_row.append("STT")
return header_row

def __iter__(self):
"""Yield the next row in the csv export."""
for obj in self.queryset:
row = []

if self.is_header_row:
self.is_header_row = False
yield self.writer.writerow(self.header_row)

for field_name in self.field_names:
field = getattr(obj, field_name)
row.append(field)
if field_name == "datafile":
row.append(field.stt.stt_code)
yield self.writer.writerow(row)

def export_as_csv(self, request, queryset):
"""Convert queryset to CSV."""
meta = self.model._meta
Expand All @@ -59,12 +18,23 @@ def export_as_csv(self, request, queryset):
datafile_name = f"{meta}_FY{model.datafile.year}{model.datafile.quarter}_" +\
str(datetime.now().strftime("%d%m%y-%H-%M-%S"))

iterator = ExportCsvMixin.RowIterator(field_names, queryset)
sql, params = queryset.query.sql_with_params()
file_path = f'exports/{datafile_name}.csv.gz'

export_queryset_to_s3_csv.delay(
sql,
params,
field_names,
meta.model_name,
file_path,
)

self.message_user(request, f'Your s3 file url is: {file_path}')

return StreamingHttpResponse(
iterator,
content_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="{datafile_name}.csv"'},
system_user, _ = User.objects.get_or_create(username='system')
log(
f'Beginning export of {queryset.count()} {meta.model_name} objects to s3: {file_path}',
{'user_id': system_user.pk, 'object_id': None, 'object_repr': ''}
)

export_as_csv.short_description = "Export Selected as CSV"
Expand Down
80 changes: 80 additions & 0 deletions tdrs-backend/tdpservice/search_indexes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@

from __future__ import absolute_import
import time
import gzip
import os
from tdpservice.users.models import User
from celery import shared_task
from botocore.exceptions import ClientError
from tdpservice.core.utils import log
import subprocess
from tdpservice.data_files.s3_client import S3Client
from django.conf import settings
from django.apps import apps


def prettify_time_delta(start, end):
Expand Down Expand Up @@ -63,3 +69,77 @@ def reindex_elastic_documents():
'object_id': None,
'object_repr': ''
})


class RowIterator:
"""Iterator class to support custom CSV row generation."""

def __init__(self, field_names, queryset):
self.field_names = field_names
self.queryset = queryset

def _get_header(self, field_names):
"""Generate custom header row."""
header_row = ""
for name in field_names:
header_row += f"{name},"
if name == "datafile":
header_row += "STT,"
return header_row[:-1] + "\n"

def __iter__(self):
"""Yield the next row in the csv export."""
yield self._get_header(self.field_names)

# queryset.iterator simply 'forgets' the record after iterating over it, instead of caching it
# this is okay here since we only write out the contents and don't need the record again.
for obj in self.queryset.iterator():
row = ""
for field_name in self.field_names:
field = getattr(obj, field_name)
row += f"{field},"
if field and field_name == "datafile":
row += f"{field.stt.stt_code},"
yield row[:-1] + "\n"


@shared_task
def export_queryset_to_s3_csv(query_str, query_params, field_names, model_name, s3_filename):
"""
Export a selected queryset to a csv file stored in s3.
@param query_str: a sql string obtained via queryset.query.sql_with_params().
@param query_params: sql query params obtained via queryset.query.sql_with_params().
@param field_names: a list of field names for the csv header.
@param model_name: the `model._meta.model_name` of the model to export.
@param s3_filename: a string representing the file path/name in s3.
"""
system_user, _ = User.objects.get_or_create(username='system')
Model = apps.get_model('search_indexes', model_name)
queryset = Model.objects.raw(query_str, query_params)
iterator = RowIterator(field_names, queryset)
s3 = S3Client()

tmp_filename = 'file.csv.gz'
record_count = -1 # offset row count to account for the header
with gzip.open(tmp_filename, 'wt') as f:
for row in iterator:
record_count += 1
f.write(row)

local_filename = os.path.basename(tmp_filename)

try:
s3.client.upload_file(local_filename, settings.AWS_S3_DATAFILES_BUCKET_NAME, s3_filename)
except ClientError as e:
log(
f'Export failed: {s3_filename}. {e}',
{'user_id': system_user.pk, 'object_id': None, 'object_repr': ''},
'error'
)
else:
log(
f'Export of {record_count} {model_name} objects complete: {s3_filename}',
{'user_id': system_user.pk, 'object_id': None, 'object_repr': ''}
)
os.remove(local_filename)

0 comments on commit 3ac1dcd

Please sign in to comment.