Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Make transformer able to handle larger files by streaming
Browse files Browse the repository at this point in the history
Can handle larger files by buffering and flushing to S3.

Signed-off-by: 🐼 Samrose Ahmed 🐼 <[email protected]>
Samrose-Ahmed committed Jun 8, 2023

Verified

This commit was signed with the committer’s verified signature.
alistairewj Alistair Johnson
1 parent 8371bd4 commit 42883a5
Showing 1 changed file with 393 additions and 140 deletions.
533 changes: 393 additions & 140 deletions lib/rust/transformer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod arrow;
mod avro;
mod models;
use apache_avro::AvroResult;
use aws_sdk_sns::model::MessageAttributeValue;
use aws_sdk_sqs::model::SendMessageBatchRequestEntry;
use chrono::Utc;
@@ -663,6 +664,120 @@ impl LineLevelError {
}
}

async fn write_publish_lines(
s3: aws_sdk_s3::Client,
sns: aws_sdk_sns::Client,
download_items: Vec<(DataBatcherOutputRecord, String)>,
mut stream: Pin<Box<dyn Stream<Item = Result<LineResult, LineError>> + Send>>,
) -> Result<(
Vec<(String, String, String, usize, usize)>,
Vec<LineError>,
Vec<SQSLambdaError>,
)> {
let mut error_lines = vec![];

let (write_tx, mut write_rx) = tokio::sync::mpsc::channel::<LineResult>(1000);

let do_writes = || {
let (publish_sender, mut publish_receiver) = tokio::sync::mpsc::channel::<PublishTask>(35);

let publish_files = || async move {
let mut handles = vec![];

while let Some(task) = publish_receiver.recv().await {
let fut = publish_transformer_file(task, s3.clone(), sns.clone());
handles.push(tokio::spawn(fut));
}

try_join_all(handles).await
};

let publish_handle = tokio::spawn(publish_files());

let mut writers = TransformerWriters::new(download_items);
async move {
let mut write_line_errors = vec![];
let mut write_errors = vec![];

while let Some(l) = write_rx.recv().await {
let table_writer = writers.writer(l.resolved_table_name.clone(), l.ts_hour.clone());
let write_res = table_writer.write(l);
match write_res {
Ok(_) => match table_writer.flush_if_needed() {
Ok(Some(write_task)) => {
debug!("Current publish capacity: {}", publish_sender.capacity());
let res = publish_sender.send(write_task).await;
if let Err(e) = res {
write_errors.push(SQSLambdaError::new(
"Failed to send write task".to_string(),
e.0.record_ids,
));
}
}
Ok(None) => {}
Err(e) => write_errors.push(e),
},
Err(e) => write_line_errors.push(e),
}
}

let final_write_tasks = writers
.flush()
.await
.into_iter()
.flat_map(|r| r.map_err(|e| write_errors.push(e)).ok())
.collect::<Vec<_>>();

for task in final_write_tasks {
let res = publish_sender.send(task).await;
if let Err(e) = res {
write_errors.push(SQSLambdaError::new(
"Failed to send write task".to_string(),
e.0.record_ids,
));
}
}
drop(publish_sender);

let publish_results = publish_handle.await??;
anyhow::Ok((publish_results, write_line_errors, write_errors))
}
};

let write_handle = tokio::task::spawn(do_writes());

let mut errors = vec![];
while let Some(line) = stream.next().await {
match line {
Ok(l) => {
write_tx.send(l).await.or_else(|e| {
errors.push(SQSLambdaError::new(
"Failed to send line to write task".to_string(),
vec![e.0.record_id.unwrap()],
));
anyhow::Ok(())
})?;
}
Err(e) => {
error_lines.push(e);
}
};
}
drop(write_tx);

let (publish_results, write_line_errors, mut write_errors) = write_handle.await??;
error_lines.extend(write_line_errors);
errors.extend(write_errors);

let results = publish_results
.into_iter()
.flat_map(|r| r.map_err(|e| errors.push(e)).ok())
.flatten()
.collect::<Vec<_>>();

Ok((results, error_lines, errors))
}

pub(crate) async fn handler(event: LambdaEvent<SqsEvent>) -> Result<()> {
let start = Instant::now();
event.payload.records.first().iter().for_each(|r| {
@@ -686,10 +801,12 @@ pub(crate) async fn handler(event: LambdaEvent<SqsEvent>) -> Result<()> {
.flatten()
.collect::<Vec<DataBatcherOutputRecord>>();

let data_batcher_records_copy = data_batcher_records.clone();

let s3_download_items = data_batcher_records
.iter()
.into_iter()
.filter(|d| d.size > 0)
.map(|d| (d, d.log_source.clone()))
.map(|d| (d.clone(), d.log_source.clone()))
.collect::<Vec<_>>();

info!(
@@ -706,7 +823,6 @@ pub(crate) async fn handler(event: LambdaEvent<SqsEvent>) -> Result<()> {
let transformed_lines_streams = s3_download_items
.into_iter()
.map(|(item, log_source)| {
let s3 = &s3;

let item_copy = item.clone();

@@ -722,7 +838,7 @@ pub(crate) async fn handler(event: LambdaEvent<SqsEvent>) -> Result<()> {
Pin<Box<dyn Stream<Item = Result<Value, anyhow::Error>> + Send>>,
) = events.unwrap();

let reader = raw_lines.map(|r| (r, item.sequencer.clone()));
let reader = raw_lines.map(move |r| (r, item.sequencer.clone()));

let select_table_from_payload_expr = LOG_SOURCES_CONFIG.with(|c| {
let log_sources_config = c.borrow();
@@ -886,10 +1002,6 @@ pub(crate) async fn handler(event: LambdaEvent<SqsEvent>) -> Result<()> {
.flatten()
.collect::<Vec<_>>();

let writers_by_table_utc_hour_pair: RefCell<
HashMap<(String, String), (Writer<Vec<u8>>, usize)>,
> = RefCell::new(HashMap::new());

let mut merged: StreamMap<
String,
Pin<Box<dyn Stream<Item = Result<Option<LineResult>, LineError>> + Send>>,
@@ -902,149 +1014,43 @@ pub(crate) async fn handler(event: LambdaEvent<SqsEvent>) -> Result<()> {
}

let mut error_lines = vec![];
merged

let merged = merged
.map(|(_, v)| v)
.filter_map(|line| async { result_to_option(line) })
.and_then(|line| {
let mut writers_by_table_utc_hour_pair = writers_by_table_utc_hour_pair.borrow_mut();

let v = line.transformed_line;
let resolved_table_name = line.resolved_table_name;
let ts_hour = line.ts_hour;
async move {
let (writer, rows) = writers_by_table_utc_hour_pair
.entry((resolved_table_name.clone(), ts_hour.clone()))
.or_insert_with(|| {
let avro_schema = AVRO_SCHEMAS
.get(&resolved_table_name)
.expect("Failed to find avro schema");

let writer: Writer<Vec<u8>> = Writer::builder()
.schema(avro_schema)
.writer(Vec::new())
.codec(Codec::Snappy)
.block_size(1 * 1024 * 1024 as usize)
.build();
(writer, 0)
});

*rows += 1;
// IO (well theoretically) TODO: since this actually does non-trivial comptuuation work too (schema validation), we need to figure out a way to prevent this from blocking our main async thread
writer.append_value_ref(&v)
}
.map_err(|e| {
LineError::new_partial(
line.raw_line,
line.log_source,
"AvroSerializationError".to_string(),
format!("{:#}", e),
line.record_id,
)
})
})
.collect::<Vec<_>>()
.await
.into_iter()
.filter_map(|r| r.map_err(|e| error_lines.push(e)).ok())
.for_each(drop);
.boxed();

let (results, write_line_errors, write_errors) = write_publish_lines(
s3.clone(),
sns.clone(),
s3_download_items_copy.clone(),
merged,
)
.await?;
error_lines.extend(write_line_errors);
errors.extend(write_errors);

let mut partial_error_lines = vec![];
error_lines.into_iter().for_each(|e| match e.error_type {
LineErrorType::Total => {
let err = SQSLambdaError::new(e.error, vec![e.record_id.unwrap()]);
errors.push(err);
}
LineErrorType::Partial => {
error!("Line error: {}", &e.error);
partial_error_lines.push(e);
}
});

let writers_by_table = writers_by_table_utc_hour_pair.into_inner();
let futures =
writers_by_table
.into_iter()
.map(|((resolved_table_name, ts_hour), (writer, rows))| {
let matching_record_seqs = s3_download_items_copy
.iter()
.filter_map(|(record, log_source)| {
resolved_table_name
.contains(log_source)
.then_some(record.sequencer.clone())
})
.collect::<Vec<_>>();

async move {
let bytes = writer.into_inner()?;
let bytes_len = bytes.len();
if bytes.len() == 0 || rows == 0 {
return Ok(None);
}
info!("Writing number of Rows: {}", rows);

let uuid = Uuid::new_v4();
let bucket = var("MATANO_REALTIME_BUCKET_NAME")?;
let key = format!(
"transformed/{}/ts_hour={}/{}.snappy.avro",
&resolved_table_name, &ts_hour, uuid
);
info!("Writing Avro file to S3 path: {}/{}", bucket, key);

s3.put_object()
.bucket(&bucket)
.key(&key)
.body(ByteStream::from(bytes))
.content_type("application/avro-binary".to_string())
// .content_encoding("application/zstd".to_string())
.send()
.await
.map_err(|e| anyhow!(e).context(format!("Error putting {} to S3", key)))?;

sns.publish()
.topic_arn(var("MATANO_REALTIME_TOPIC_ARN")?)
.message(
json!({
"bucket": &bucket,
"key": &key,
"resolved_table_name": resolved_table_name,
"ts_hour": ts_hour,
})
.to_string(),
)
.message_attributes(
"resolved_table_name",
MessageAttributeValue::builder()
.data_type("String".to_string())
.string_value(&resolved_table_name)
.build(),
)
.send()
.await
.map_err(|e| {
anyhow!(e).context(format!(
"Error publishing SNS notification for S3 key: {}",
key
))
})?;
anyhow::Ok(Some((resolved_table_name, key, ts_hour, rows, bytes_len)))
}
.map_err(move |e| SQSLambdaError::new(format!("{:#}", e), matching_record_seqs))
});

let results = join_all(futures)
.await
error_lines
.into_iter()
.flat_map(|r| r.map_err(|e| errors.push(e)).ok())
.flatten()
.collect::<Vec<_>>();
.for_each(|e: LineError| match e.error_type {
LineErrorType::Total => {
let err = SQSLambdaError::new(e.error, vec![e.record_id.unwrap()]);
errors.push(err);
}
LineErrorType::Partial => {
error!("Line error: {}", &e.error);
partial_error_lines.push(e);
}
});

// Error handling strategy:
// If all records failed, we return an error and the lambda will retry
// If some records failed, we retry the failed records up to 3 times by sending back to the queue with a retry_depth attribute
// If some records failed and we've retried them 3 times, we send them to the DLQ
let had_error = !errors.is_empty();
let mut failing_log_sources = HashSet::new();
let is_total_failure = errors.len() == data_batcher_records.len();
let is_total_failure = errors.len() == data_batcher_records_copy.len();
if had_error {
errors.iter().for_each(|e| error!("{}", e));

@@ -1053,7 +1059,7 @@ pub(crate) async fn handler(event: LambdaEvent<SqsEvent>) -> Result<()> {
.flat_map(|e| e.ids)
.collect::<HashSet<_>>();
let failures = error_ids.iter().map(|error_id| {
data_batcher_records
data_batcher_records_copy
.iter()
.find(|r| &r.sequencer == error_id)
.unwrap()
@@ -1136,6 +1142,253 @@ pub(crate) async fn handler(event: LambdaEvent<SqsEvent>) -> Result<()> {
}
}

async fn publish_transformer_file(
task: PublishTask,
s3: aws_sdk_s3::Client,
sns: aws_sdk_sns::Client,
) -> Result<Option<(String, String, String, usize, usize)>, SQSLambdaError> {
let resolved_table_name = task.resolved_table_name;
let ts_hour = task.ts_hour;
let matching_record_seqs = task.record_ids;
let ret = async move {
let old_rows = task.rows;
let bytes = task.bytes;

let bytes_len = bytes.len();
if bytes.len() == 0 || old_rows == 0 {
info!("No rows written, skipping write");
return Ok(None);
}
info!("Writing number of Rows: {}", old_rows);

let uuid = Uuid::new_v4();
let bucket = var("MATANO_REALTIME_BUCKET_NAME")?;
let key = format!(
"transformed/{}/ts_hour={}/{}.snappy.avro",
&resolved_table_name, &ts_hour, uuid
);
info!("Writing Avro file to S3 path: {}/{}", bucket, key);

s3.put_object()
.bucket(&bucket)
.key(&key)
.body(ByteStream::from(bytes))
.content_type("application/avro-binary".to_string())
// .content_encoding("application/zstd".to_string())
.send()
.await
.map_err(|e| anyhow!(e).context(format!("Error putting {} to S3", key)))?;

sns.publish()
.topic_arn(var("MATANO_REALTIME_TOPIC_ARN")?)
.message(
json!({
"bucket": &bucket,
"key": &key,
"resolved_table_name": resolved_table_name,
"ts_hour": ts_hour,
})
.to_string(),
)
.message_attributes(
"resolved_table_name",
MessageAttributeValue::builder()
.data_type("String".to_string())
.string_value(&resolved_table_name)
.build(),
)
.send()
.await
.map_err(|e| {
anyhow!(e).context(format!(
"Error publishing SNS notification for S3 key: {}",
key
))
})?;
anyhow::Ok(Some((
resolved_table_name.to_string(),
key,
ts_hour.to_string(),
old_rows,
bytes_len,
)))
}
.map_err(move |e| SQSLambdaError::new(format!("{:#}", e), matching_record_seqs));
ret.await
}

struct TransformerWriters<'a> {
download_items: Vec<(DataBatcherOutputRecord, String)>,
writers: HashMap<(String, String), TransformerTableWriter<'a>>,
}

struct TransformerTableWriter<'a> {
resolved_table_name: String,
ts_hour: String,
download_items: Vec<(DataBatcherOutputRecord, String)>,
writer: Writer<'a, Vec<u8>>,
rows: usize,
num_bytes: usize,
}

fn create_avro_writer(schema: &Schema) -> Writer<Vec<u8>> {
Writer::builder()
.schema(schema)
.writer(Vec::new())
.codec(Codec::Snappy)
.block_size(1 * 1024 * 1024 as usize)
.build()
}

#[derive(Debug, Clone)]
struct PublishTask {
resolved_table_name: String,
ts_hour: String,
rows: usize,
bytes: Vec<u8>,
record_ids: Vec<String>,
}

const TARGET_AVRO_FILE_SIZE_BYTES: usize = 10 * 1024 * 1024;

impl<'a> TransformerTableWriter<'a> {
fn new(
schema: &'a Schema,
download_items: Vec<(DataBatcherOutputRecord, String)>,
resolved_table_name: String,
ts_hour: String,
) -> Self {
Self {
resolved_table_name,
ts_hour,
writer: create_avro_writer(schema),
download_items,
rows: 0,
num_bytes: 0,
}
}

fn write(&mut self, line: LineResult) -> Result<(), LineError> {
let v = line.transformed_line;

let mut do_write = || {
// IO (well theoretically) TODO: since this actually does non-trivial comptuuation work too (schema validation), we need to figure out a way to prevent this from blocking our main async thread
// crate is written poorly, can't do without Mutex?

let written = self.writer.append_value_ref(&v)?;
self.num_bytes += written;
AvroResult::Ok(())
};
self.rows += 1;

let ret = do_write().map_err(|e| {
LineError::new_partial(
line.raw_line,
line.log_source,
"AvroSerializationError".to_string(),
format!("{:#}", e),
line.record_id,
)
});
ret
}

fn flush_if_needed(&mut self) -> Result<Option<PublishTask>, SQSLambdaError> {
if self.is_flush_needed() {
self.flush().map(|t| Some(t))
} else {
Ok(None)
}
}

fn is_flush_needed(&self) -> bool {
// TODO: actually only overwrite mode enrichment tables, fix.
!self.resolved_table_name.starts_with("enrich_")
&& self.num_bytes >= TARGET_AVRO_FILE_SIZE_BYTES
}

fn reset(&mut self) -> Result<(usize, Vec<u8>)> {
let old_rows = self.rows;
let new_writer = create_avro_writer(self.writer.schema());

let old_writer = std::mem::replace(&mut self.writer, new_writer);
self.rows = 0;
self.num_bytes = 0;

let bytes = old_writer.into_inner()?;
Ok((old_rows, bytes))
}

fn flush(&mut self) -> Result<PublishTask, SQSLambdaError> {
let resolved_table_name = self.resolved_table_name.clone();
let ts_hour = self.ts_hour.clone();

info!(
"Flushing file for table: {}, ts_hour: {}",
&resolved_table_name, ts_hour
);

let matching_record_seqs = self
.download_items
.iter()
.filter_map(|(record, log_source)| {
resolved_table_name
.contains(log_source)
.then_some(record.sequencer.clone())
})
.collect::<Vec<_>>();

let (old_rows, bytes) = self.reset().map_err(|e| {
SQSLambdaError::new(
format!("Avro write failed: {:#}", e),
matching_record_seqs.clone(),
)
})?;

let ret = PublishTask {
resolved_table_name,
ts_hour,
rows: old_rows,
bytes,
record_ids: matching_record_seqs,
};
Ok(ret)
}
}

impl<'a> TransformerWriters<'a> {
fn new(download_items: Vec<(DataBatcherOutputRecord, String)>) -> Self {
Self {
download_items,
writers: HashMap::new(),
}
}

fn writer(&mut self, table_name: String, ts_hour: String) -> &mut TransformerTableWriter<'a> {
self.writers
.entry((table_name.clone(), ts_hour.clone()))
.or_insert_with(|| {
let avro_schema = AVRO_SCHEMAS
.get(&table_name)
.expect("Failed to find avro schema");

TransformerTableWriter::new(
avro_schema,
self.download_items.clone(),
table_name.clone(),
ts_hour.clone(),
)
})
}

async fn flush(self) -> Vec<Result<PublishTask, SQSLambdaError>> {
self.writers
.into_iter()
.map(|(_, mut w)| w.flush())
.collect::<Vec<_>>()
}
}

async fn handle_all_partial_failures(
mut retryable: Vec<DataBatcherOutputRecord>,
un_retryable: Vec<DataBatcherOutputRecord>,

0 comments on commit 42883a5

Please sign in to comment.