Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Looking for a tip on how to upload multipart files from in-memory strings. #26

Open
CptSpaceToaster opened this issue Aug 21, 2023 · 3 comments

Comments

@CptSpaceToaster
Copy link

CptSpaceToaster commented Aug 21, 2023

Hiya, this library has been a lifesaver.

I'm currently storing some network responses as JSON in memory. I am additionally converting the JSON into dictionaries so they can be written as a CSV.

These in-memory structures tend to be fairly large and are perfect use-case for multipart file uploads. (In fact, I'm getting OS socket errors if I just try and write these resulting strings with one client.put()) but I'm not seeing a good path forward on how to convert these in-memory structures into something I can use with client.put_file_multipart() without duplicating them in memory a second time by calling put_multipart directly, wiring up something funky with an ethereal String.IO file stream, and overriding file_sender

I could settle and write these in-memory structures to disk... and then send those with the library directly... however I'd like this code to end up on an AWS lambda, and I'd have to stay under the 512 Mb storage limit. That might be what I end up doing in the meantime if I can't think of anything better.

Is there a more elegant way to keep this all in memory somehow?

I had been using smart_open previously:
async def write_json(bucket_name: str, filename: str, results: dict, content_type: str = 'text/json'):
    params = {'client_kwargs': {'S3.Client.create_multipart_upload': {'ContentType': content_type}}}
    with smart_open.open(f's3://{bucket_name}/{filename}', 'w', transport_params=params) as fout:
        json.dump(results, fout)


async def write_csv(bucket_name: str, filename: str, fieldnames: List[str], results: dict, content_type: str = 'text/csv'):
    params = {'client_kwargs': {'S3.Client.create_multipart_upload': {'ContentType': content_type}}}
    with smart_open.open(f's3://{bucket_name}/{filename}', 'w', transport_params=params) as fout:
        writer = csv.DictWriter(stream, fieldnames=fieldnames)
        writer.writeheader()
        fout.write(stream.getvalue())

        for row in results:
            stream.seek(0)
            stream.truncate(0)
            writer.writerow(row)
            fout.write(stream.getvalue())
    stream.close()

But I'm at the point where I think I need aiohttp/asyncio speeds. Should I read more on AsyncIterables? I'm stlll learning quite a lot.

@CptSpaceToaster
Copy link
Author

CptSpaceToaster commented Aug 21, 2023

This is where I ended up...

import io
import csv
import json
from typing import List

from aiohttp_s3_client import S3Client


PART_SIZE = 5 * 1024 * 1024  # 5MB


async def write_json(client: S3Client, filename: str, results: str, content_type: str = 'text/json'):
    def dict_sender(results: dict, chunk_size: int):
        with io.BytesIO(json.dumps(results).encode('utf-8')) as buffer:
            while True:
                data = buffer.read(chunk_size)
                if not data:
                    break
                yield data

    await client.put_multipart(
        filename,
        dict_sender(
            results,
            chunk_size=PART_SIZE,
        ),
        headers={'Content-Type': content_type},
    )

async def write_csv(client: S3Client, filename: str, fieldnames: List[str], results: dict, content_type: str = 'text/csv'):
    def csv_sender(results: dict, fieldnames: List[str], chunk_size: int):
        with io.BytesIO() as buffer:
            # CSV writer needs to write strings. TextIOWrapper gets us back to bytes
            sb = io.TextIOWrapper(buffer, 'utf-8', newline='')
            writer = csv.DictWriter(sb, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(results)

            sb.flush()
            buffer.seek(0)

            while True:
                data = buffer.read(chunk_size)
                if not data:
                    break
                yield data
            sb.close()

    await client.put_multipart(
        filename,
        csv_sender(
            results,
            fieldnames,
            chunk_size=PART_SIZE,
        ),
        headers={'Content-Type': content_type},
    )

I would imagine this can be done more efficiently without duplicating the file contents entirely in a byte stream... something that writes roughly 5MB of text at a time, and then yields it...

But for now, memory is slightly more available than disk in the environment I'm targeting, so I think I can try giving this a go.

I would love some feedback if you have the time to spare, but I understand you may not be possible. Thank you so much for maintaining this library! Feel free to close this issue if you don't have time to respond.

@mosquito
Copy link
Collaborator

mosquito commented Aug 22, 2023

@CptSpaceToaster you have to know an exactly precised object size before uploading. It's an S3 API requirement, so IMHO the simplest way to do this is the write a temporary file and send it as a file.

@CptSpaceToaster
Copy link
Author

Hmmm... It sounds like my in-memory solution above might be a decent compromise then. I don't mind writing the content out to a file, but I'd rather live in the world of memory for other reasons... and I am currently under the belief that the code I wrote above isn't completely awful... (But I bet it can be improved!)

I suppose the real next step would involve profiling the garbage I have above, and working through a solution that writes content out to a tempfile and see if there's a meaningful difference.

Because the content is being processed in parallel, I'm doing my very best to keep the solution simple. I'd like to avoid juggling 20 parallel file operations to disk... but it's something I could reasonably investigate as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants