Processing large datasets in a streaming manner #9772
-
In some cases, the data that you need to process for an asset is too large to fit into memory. In these cases, it would be useful to be able to process data chunk by chunk, with each chunk getting written to external storage before the next chunk is processed. |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 1 reply
-
While a bit convoluted, it is possible to use generators for this purpose, provided you use a custom IOManager. While you're not able to yield these chunks directly from your asset or op (as Dagster only permits yielding structured events such as AssetObservations or Outputs from the body of the op), you can return a generator, which will be consumed within the IOManager itself. A quick example of this behavior would look something like the following:
|
Beta Was this translation helpful? Give feedback.
-
This should get more formal support IMO |
Beta Was this translation helpful? Give feedback.
-
I am using
|
Beta Was this translation helpful? Give feedback.
-
Hello, I know that this is an old thread but I thought that I'd share a real world example of how I have done this. The dataset that I am working with isn't huge but this flow and logic will come in handy when a larger dataset comes along. I take a large(ish) geopackage file, break it into batches, turn those batches into parquet file bytes, upload each batch to S3 as follows:
Hope this helps! Has there been formal support added for this now? Assetimport os
import tempfile
import zipfile
import requests
import polars as pl
import fiona
import io
from typing import List, Dict
from shapely import wkt
from shapely.geometry import shape
from dagster import AssetExecutionContext, Output, asset
from ...utils.variables_helper.url_links import asset_urls
from ...utils.requests_helper.requests_helper import fetch_redirect_url
@asset(
group_name="location_assets",
io_manager_key="S3ParquetPartition",
)
def os_built_up_areas_bronze(context: AssetExecutionContext):
"""
Processes OS Built Up Areas data and returns a generator of parquet bytes for S3 storage
Flow:
ZIP → GPKG → Batch → DataFrame → Parquet Bytes → S3
Directory structure in S3:
bucket/
└── asset_name/
├── batch0001_20240116.parquet
├── batch0002_20240116.parquet
└── etc
"""
def generate_batches():
BATCH_SIZE = 2000
batch_number = 0
errors = []
current_batch = []
# Fetch and validate URL
url = asset_urls.get("os_built_up_areas")
if url is None:
raise ValueError("No URL provided")
redirect_url = fetch_redirect_url(url)
if redirect_url is None:
raise ValueError("No redirect URL found")
# Create temporary directory for processing
with tempfile.TemporaryDirectory() as temp_dir:
# Download and extract zip
zip_path = os.path.join(temp_dir, 'temp.zip')
response = requests.get(redirect_url)
response.raise_for_status()
with open(zip_path, 'wb') as zip_file:
zip_file.write(response.content)
# Extract GPKG file
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Find GPKG file
gpkg_file = next(
(os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if f.endswith('gpkg')),
None
)
if not gpkg_file:
raise FileNotFoundError("No GPKG file found in zip")
# Process GPKG file
with fiona.open(gpkg_file, 'r') as src:
context.log.info(f"CRS: {src.crs}")
context.log.info(f"Schema: {src.schema}")
for i, feature in enumerate(src):
try:
# Extract properties and add geometry as WKT
properties = dict(feature['properties'])
geom = shape(feature['geometry'])
properties['geometry'] = wkt.dumps(geom)
# Ensure all required fields are present with correct types
processed_properties = {
'gsscode': str(properties.get('gsscode', '')),
'name1_text': str(properties.get('name1_text', '')),
'name1_language': str(properties.get('name1_language', '')),
'name2_text': str(properties.get('name2_text', '')),
'name2_language': str(properties.get('name2_language', '')),
'areahectares': float(properties.get('areahectares', 0.0)),
'geometry_area_m': float(properties.get('geometry_area_m', 0.0)),
'geometry': properties['geometry']
}
current_batch.append(processed_properties)
# Hanlde errors if there are any and log them
except Exception as e:
error_msg = f"Error processing feature {i}: {e}"
context.log.error(error_msg)
errors.append(error_msg)
continue
# When the batch size is reached, convert to parquet bytes and yield
if len(current_batch) >= BATCH_SIZE:
batch_number += 1
# Convert batch to DataFrame then to parquet bytes
df_batch = pl.DataFrame(current_batch)
context.log.info(f"Processed final batch {df_batch.head(25)}")
parquet_buffer = io.BytesIO()
df_batch.write_parquet(parquet_buffer)
parquet_bytes = parquet_buffer.getvalue()
yield parquet_bytes
# Reset for next batch
current_batch = []
errors = []
context.log.info(f"Processed batch {batch_number}")
# Process final batch if any remains
if current_batch:
batch_number += 1
df_batch = pl.DataFrame(current_batch)
context.log.info(f"Processed final batch {df_batch.head(25)}")
parquet_buffer = io.BytesIO()
df_batch.write_parquet(parquet_buffer)
parquet_bytes = parquet_buffer.getvalue()
yield parquet_bytes
context.log.info(f"Processed final batch {batch_number}")
return Output(
value=generate_batches(),
metadata={
"description": "Generator of parquet bytes for OS built up areas batch processing"
}
) Custom IO Managerclass S3ParquetManagerPartition(IOManager):
"""
IO manager to handle reading and writing parquet bytes to S3.
"""
def __init__(self, bucket_name: str):
if not bucket_name:
raise S3BucketError()
self.bucket_name = bucket_name
self.aws_client = boto3.client("s3")
def handle_output(self, context, obj):
# Define base variables
asset_name = context.asset_key.path[-1]
batch_number = 0
# Handle generator of parquet bytes
if hasattr(obj, '__iter__'):
for batch_bytes in obj:
batch_number += 1
timestamp = datetime.now().strftime("%Y%m%d")
object_key = f"{asset_name}/batch_{timestamp}_{batch_number:04d}.parquet"
# Upload bytes to S3
try:
self.aws_client.put_object(
Bucket=self.bucket_name,
Key=object_key,
Body=batch_bytes,
ContentType="application/octet-stream",
)
context.log.info(
f"Batch {batch_number} written to S3 at s3://{self.bucket_name}/{object_key}"
)
except Exception as e:
raise e
else:
# Handle single batch case
timestamp = datetime.now().strftime("%Y%m%d")
object_key = f"{asset_name}/batch_{timestamp}.parquet"
try:
self.aws_client.put_object(
Bucket=self.bucket_name,
Key=object_key,
Body=obj,
ContentType="application/octet-stream",
)
context.log.info(
f"Data written to S3 at s3://{self.bucket_name}/{object_key}"
)
except Exception as e:
raise e
def load_input(self, context) -> pl.DataFrame:
asset_name = context.asset_key.path[-1]
prefix = f"{asset_name}/data/batch_"
try:
# List objects in the bucket with the given prefix
response = self.aws_client.list_objects_v2(
Bucket=self.bucket_name, Prefix=prefix
)
if "Contents" not in response or not response["Contents"]:
raise FileNotFoundError(
f"No files found with prefix '{prefix}' in bucket '{self.bucket_name}'"
)
# Sort the objects by last modified date
sorted_objects = sorted(
response["Contents"], key=lambda x: x["LastModified"], reverse=True
)
# Load all batches and concatenate
dfs = []
for obj in sorted_objects:
object_key = obj["Key"]
response = self.aws_client.get_object(
Bucket=self.bucket_name, Key=object_key
)
parquet_bytes = response["Body"].read()
df = pl.read_parquet(io.BytesIO(parquet_bytes))
dfs.append(df)
context.log.info(f"Loaded batch from: {object_key}")
# Concatenate all dataframes
final_df = pl.concat(dfs)
context.log.info(f"Concatenated {len(dfs)} batches")
return final_df
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
raise FileNotFoundError(
f"No files found with prefix '{prefix}' in bucket '{self.bucket_name}'"
)
else:
raise e
except Exception as e:
raise e |
Beta Was this translation helpful? Give feedback.
While a bit convoluted, it is possible to use generators for this purpose, provided you use a custom IOManager. While you're not able to yield these chunks directly from your asset or op (as Dagster only permits yielding structured events such as AssetObservations or Outputs from the body of the op), you can return a generator, which will be consumed within the IOManager itself. A quick example of this behavior would look something like the following: