diff --git a/lib/rust/transformer/src/main.rs b/lib/rust/transformer/src/main.rs index 1bf2bb49..d1c4fc2f 100644 --- a/lib/rust/transformer/src/main.rs +++ b/lib/rust/transformer/src/main.rs @@ -1,6 +1,7 @@ mod arrow; mod avro; mod models; +use apache_avro::AvroResult; use aws_sdk_sns::model::MessageAttributeValue; use aws_sdk_sqs::model::SendMessageBatchRequestEntry; use chrono::Utc; @@ -663,6 +664,135 @@ impl LineLevelError { } } +async fn write_publish_lines( + s3: aws_sdk_s3::Client, + sns: aws_sdk_sns::Client, + download_items: Vec<(DataBatcherOutputRecord, String)>, + mut stream: Pin> + Send>>, +) -> Result<( + Vec<(String, String, String, usize, usize)>, + Vec, + Vec, +)> { + let mut error_lines = vec![]; + + // This actually does CPU work, but needs to be in channel due to way avro crate is written. Set to high limit. + let (write_tx, mut write_rx) = tokio::sync::mpsc::channel::(50000); + + let process_writes = || { + let (publish_sender, mut publish_receiver) = tokio::sync::mpsc::channel::(35); + + let publish_files = || async move { + let mut results = vec![]; + let mut handles = vec![]; + + // We start up to 35 concurrent publish tasks, but await afterwards (causing send() to await). + // Ensures we only have max 35 concurrent publish tasks (35 * 10MB compressed = 350MB memory) + while let Some(task) = publish_receiver.recv().await { + let fut = publish_transformer_file(task, s3.clone(), sns.clone()); + handles.push(tokio::spawn(fut)); + + if handles.len() > 35 { + let old_handles = std::mem::replace(&mut handles, vec![]); + let res = try_join_all(old_handles).await?; + results.extend(res); + } + } + + if !handles.is_empty() { + let res = try_join_all(handles).await?; + results.extend(res); + } + + anyhow::Ok(results) + }; + + let publish_handle = tokio::spawn(publish_files()); + + let mut writers = TransformerWriters::new(download_items); + async move { + let mut write_line_errors = vec![]; + let mut write_errors = vec![]; + + while let Some(l) = write_rx.recv().await { + let table_writer = writers.writer(l.resolved_table_name.clone(), l.ts_hour.clone()); + let write_res = table_writer.write(l); + match write_res { + Ok(_) => match table_writer.flush_if_needed() { + Ok(Some(write_task)) => { + debug!("Current publish capacity: {}", publish_sender.capacity()); + let res = publish_sender.send(write_task).await; + if let Err(e) = res { + write_errors.push(SQSLambdaError::new( + "Failed to send write task".to_string(), + e.0.record_ids, + )); + } + } + Ok(None) => {} + Err(e) => write_errors.push(e), + }, + Err(e) => write_line_errors.push(e), + } + } + + let final_write_tasks = writers + .flush() + .await + .into_iter() + .flat_map(|r| r.map_err(|e| write_errors.push(e)).ok()) + .collect::>(); + + for task in final_write_tasks { + let res = publish_sender.send(task).await; + if let Err(e) = res { + write_errors.push(SQSLambdaError::new( + "Failed to send write task".to_string(), + e.0.record_ids, + )); + } + } + drop(publish_sender); + + let publish_results = publish_handle.await??; + anyhow::Ok((publish_results, write_line_errors, write_errors)) + } + }; + + let write_handle = tokio::task::spawn(process_writes()); + + let mut errors = vec![]; + while let Some(line) = stream.next().await { + match line { + Ok(l) => { + write_tx.send(l).await.or_else(|e| { + errors.push(SQSLambdaError::new( + "Failed to send line to write task".to_string(), + vec![e.0.record_id.unwrap()], + )); + anyhow::Ok(()) + })?; + } + Err(e) => { + error_lines.push(e); + } + }; + } + drop(write_tx); + + let (publish_results, write_line_errors, mut write_errors) = write_handle.await??; + error_lines.extend(write_line_errors); + errors.extend(write_errors); + + let results = publish_results + .into_iter() + .flat_map(|r| r.map_err(|e| errors.push(e)).ok()) + .flatten() + .collect::>(); + + Ok((results, error_lines, errors)) +} + pub(crate) async fn handler(event: LambdaEvent) -> Result<()> { let start = Instant::now(); event.payload.records.first().iter().for_each(|r| { @@ -686,10 +816,12 @@ pub(crate) async fn handler(event: LambdaEvent) -> Result<()> { .flatten() .collect::>(); + let data_batcher_records_copy = data_batcher_records.clone(); + let s3_download_items = data_batcher_records - .iter() + .into_iter() .filter(|d| d.size > 0) - .map(|d| (d, d.log_source.clone())) + .map(|d| (d.clone(), d.log_source.clone())) .collect::>(); info!( @@ -706,7 +838,6 @@ pub(crate) async fn handler(event: LambdaEvent) -> Result<()> { let transformed_lines_streams = s3_download_items .into_iter() .map(|(item, log_source)| { - let s3 = &s3; let item_copy = item.clone(); @@ -722,7 +853,7 @@ pub(crate) async fn handler(event: LambdaEvent) -> Result<()> { Pin> + Send>>, ) = events.unwrap(); - let reader = raw_lines.map(|r| (r, item.sequencer.clone())); + let reader = raw_lines.map(move |r| (r, item.sequencer.clone())); let select_table_from_payload_expr = LOG_SOURCES_CONFIG.with(|c| { let log_sources_config = c.borrow(); @@ -886,10 +1017,6 @@ pub(crate) async fn handler(event: LambdaEvent) -> Result<()> { .flatten() .collect::>(); - let writers_by_table_utc_hour_pair: RefCell< - HashMap<(String, String), (Writer>, usize)>, - > = RefCell::new(HashMap::new()); - let mut merged: StreamMap< String, Pin, LineError>> + Send>>, @@ -902,141 +1029,35 @@ pub(crate) async fn handler(event: LambdaEvent) -> Result<()> { } let mut error_lines = vec![]; - merged + + let merged = merged .map(|(_, v)| v) .filter_map(|line| async { result_to_option(line) }) - .and_then(|line| { - let mut writers_by_table_utc_hour_pair = writers_by_table_utc_hour_pair.borrow_mut(); - - let v = line.transformed_line; - let resolved_table_name = line.resolved_table_name; - let ts_hour = line.ts_hour; - async move { - let (writer, rows) = writers_by_table_utc_hour_pair - .entry((resolved_table_name.clone(), ts_hour.clone())) - .or_insert_with(|| { - let avro_schema = AVRO_SCHEMAS - .get(&resolved_table_name) - .expect("Failed to find avro schema"); - - let writer: Writer> = Writer::builder() - .schema(avro_schema) - .writer(Vec::new()) - .codec(Codec::Snappy) - .block_size(1 * 1024 * 1024 as usize) - .build(); - (writer, 0) - }); - - *rows += 1; - // IO (well theoretically) TODO: since this actually does non-trivial comptuuation work too (schema validation), we need to figure out a way to prevent this from blocking our main async thread - writer.append_value_ref(&v) - } - .map_err(|e| { - LineError::new_partial( - line.raw_line, - line.log_source, - "AvroSerializationError".to_string(), - format!("{:#}", e), - line.record_id, - ) - }) - }) - .collect::>() - .await - .into_iter() - .filter_map(|r| r.map_err(|e| error_lines.push(e)).ok()) - .for_each(drop); + .boxed(); + + let (results, write_line_errors, write_errors) = write_publish_lines( + s3.clone(), + sns.clone(), + s3_download_items_copy.clone(), + merged, + ) + .await?; + error_lines.extend(write_line_errors); + errors.extend(write_errors); let mut partial_error_lines = vec![]; - error_lines.into_iter().for_each(|e| match e.error_type { - LineErrorType::Total => { - let err = SQSLambdaError::new(e.error, vec![e.record_id.unwrap()]); - errors.push(err); - } - LineErrorType::Partial => { - error!("Line error: {}", &e.error); - partial_error_lines.push(e); - } - }); - - let writers_by_table = writers_by_table_utc_hour_pair.into_inner(); - let futures = - writers_by_table - .into_iter() - .map(|((resolved_table_name, ts_hour), (writer, rows))| { - let matching_record_seqs = s3_download_items_copy - .iter() - .filter_map(|(record, log_source)| { - resolved_table_name - .contains(log_source) - .then_some(record.sequencer.clone()) - }) - .collect::>(); - - async move { - let bytes = writer.into_inner()?; - let bytes_len = bytes.len(); - if bytes.len() == 0 || rows == 0 { - return Ok(None); - } - info!("Writing number of Rows: {}", rows); - - let uuid = Uuid::new_v4(); - let bucket = var("MATANO_REALTIME_BUCKET_NAME")?; - let key = format!( - "transformed/{}/ts_hour={}/{}.snappy.avro", - &resolved_table_name, &ts_hour, uuid - ); - info!("Writing Avro file to S3 path: {}/{}", bucket, key); - - s3.put_object() - .bucket(&bucket) - .key(&key) - .body(ByteStream::from(bytes)) - .content_type("application/avro-binary".to_string()) - // .content_encoding("application/zstd".to_string()) - .send() - .await - .map_err(|e| anyhow!(e).context(format!("Error putting {} to S3", key)))?; - - sns.publish() - .topic_arn(var("MATANO_REALTIME_TOPIC_ARN")?) - .message( - json!({ - "bucket": &bucket, - "key": &key, - "resolved_table_name": resolved_table_name, - "ts_hour": ts_hour, - }) - .to_string(), - ) - .message_attributes( - "resolved_table_name", - MessageAttributeValue::builder() - .data_type("String".to_string()) - .string_value(&resolved_table_name) - .build(), - ) - .send() - .await - .map_err(|e| { - anyhow!(e).context(format!( - "Error publishing SNS notification for S3 key: {}", - key - )) - })?; - anyhow::Ok(Some((resolved_table_name, key, ts_hour, rows, bytes_len))) - } - .map_err(move |e| SQSLambdaError::new(format!("{:#}", e), matching_record_seqs)) - }); - - let results = join_all(futures) - .await + error_lines .into_iter() - .flat_map(|r| r.map_err(|e| errors.push(e)).ok()) - .flatten() - .collect::>(); + .for_each(|e: LineError| match e.error_type { + LineErrorType::Total => { + let err = SQSLambdaError::new(e.error, vec![e.record_id.unwrap()]); + errors.push(err); + } + LineErrorType::Partial => { + error!("Line error: {}", &e.error); + partial_error_lines.push(e); + } + }); // Error handling strategy: // If all records failed, we return an error and the lambda will retry @@ -1044,7 +1065,7 @@ pub(crate) async fn handler(event: LambdaEvent) -> Result<()> { // If some records failed and we've retried them 3 times, we send them to the DLQ let had_error = !errors.is_empty(); let mut failing_log_sources = HashSet::new(); - let is_total_failure = errors.len() == data_batcher_records.len(); + let is_total_failure = errors.len() == data_batcher_records_copy.len(); if had_error { errors.iter().for_each(|e| error!("{}", e)); @@ -1053,7 +1074,7 @@ pub(crate) async fn handler(event: LambdaEvent) -> Result<()> { .flat_map(|e| e.ids) .collect::>(); let failures = error_ids.iter().map(|error_id| { - data_batcher_records + data_batcher_records_copy .iter() .find(|r| &r.sequencer == error_id) .unwrap() @@ -1136,6 +1157,253 @@ pub(crate) async fn handler(event: LambdaEvent) -> Result<()> { } } +async fn publish_transformer_file( + task: PublishTask, + s3: aws_sdk_s3::Client, + sns: aws_sdk_sns::Client, +) -> Result, SQSLambdaError> { + let resolved_table_name = task.resolved_table_name; + let ts_hour = task.ts_hour; + let matching_record_seqs = task.record_ids; + let ret = async move { + let old_rows = task.rows; + let bytes = task.bytes; + + let bytes_len = bytes.len(); + if bytes.len() == 0 || old_rows == 0 { + info!("No rows written, skipping write"); + return Ok(None); + } + info!("Writing number of Rows: {}", old_rows); + + let uuid = Uuid::new_v4(); + let bucket = var("MATANO_REALTIME_BUCKET_NAME")?; + let key = format!( + "transformed/{}/ts_hour={}/{}.snappy.avro", + &resolved_table_name, &ts_hour, uuid + ); + info!("Writing Avro file to S3 path: {}/{}", bucket, key); + + s3.put_object() + .bucket(&bucket) + .key(&key) + .body(ByteStream::from(bytes)) + .content_type("application/avro-binary".to_string()) + // .content_encoding("application/zstd".to_string()) + .send() + .await + .map_err(|e| anyhow!(e).context(format!("Error putting {} to S3", key)))?; + + sns.publish() + .topic_arn(var("MATANO_REALTIME_TOPIC_ARN")?) + .message( + json!({ + "bucket": &bucket, + "key": &key, + "resolved_table_name": resolved_table_name, + "ts_hour": ts_hour, + }) + .to_string(), + ) + .message_attributes( + "resolved_table_name", + MessageAttributeValue::builder() + .data_type("String".to_string()) + .string_value(&resolved_table_name) + .build(), + ) + .send() + .await + .map_err(|e| { + anyhow!(e).context(format!( + "Error publishing SNS notification for S3 key: {}", + key + )) + })?; + anyhow::Ok(Some(( + resolved_table_name.to_string(), + key, + ts_hour.to_string(), + old_rows, + bytes_len, + ))) + } + .map_err(move |e| SQSLambdaError::new(format!("{:#}", e), matching_record_seqs)); + ret.await +} + +struct TransformerWriters<'a> { + download_items: Vec<(DataBatcherOutputRecord, String)>, + writers: HashMap<(String, String), TransformerTableWriter<'a>>, +} + +struct TransformerTableWriter<'a> { + resolved_table_name: String, + ts_hour: String, + download_items: Vec<(DataBatcherOutputRecord, String)>, + writer: Writer<'a, Vec>, + rows: usize, + num_bytes: usize, +} + +fn create_avro_writer(schema: &Schema) -> Writer> { + Writer::builder() + .schema(schema) + .writer(Vec::new()) + .codec(Codec::Snappy) + .block_size(1 * 1024 * 1024 as usize) + .build() +} + +#[derive(Debug, Clone)] +struct PublishTask { + resolved_table_name: String, + ts_hour: String, + rows: usize, + bytes: Vec, + record_ids: Vec, +} + +const TARGET_AVRO_FILE_SIZE_BYTES: usize = 10 * 1024 * 1024; + +impl<'a> TransformerTableWriter<'a> { + fn new( + schema: &'a Schema, + download_items: Vec<(DataBatcherOutputRecord, String)>, + resolved_table_name: String, + ts_hour: String, + ) -> Self { + Self { + resolved_table_name, + ts_hour, + writer: create_avro_writer(schema), + download_items, + rows: 0, + num_bytes: 0, + } + } + + fn write(&mut self, line: LineResult) -> Result<(), LineError> { + let v = line.transformed_line; + + let mut do_write = || { + // IO (well theoretically) TODO: since this actually does non-trivial comptuuation work too (schema validation), we need to figure out a way to prevent this from blocking our main async thread + // crate is written poorly, can't do without Mutex? + + let written = self.writer.append_value_ref(&v)?; + self.num_bytes += written; + AvroResult::Ok(()) + }; + self.rows += 1; + + let ret = do_write().map_err(|e| { + LineError::new_partial( + line.raw_line, + line.log_source, + "AvroSerializationError".to_string(), + format!("{:#}", e), + line.record_id, + ) + }); + ret + } + + fn flush_if_needed(&mut self) -> Result, SQSLambdaError> { + if self.is_flush_needed() { + self.flush().map(|t| Some(t)) + } else { + Ok(None) + } + } + + fn is_flush_needed(&self) -> bool { + // TODO: actually only overwrite mode enrichment tables, fix. + !self.resolved_table_name.starts_with("enrich_") + && self.num_bytes >= TARGET_AVRO_FILE_SIZE_BYTES + } + + fn reset(&mut self) -> Result<(usize, Vec)> { + let old_rows = self.rows; + let new_writer = create_avro_writer(self.writer.schema()); + + let old_writer = std::mem::replace(&mut self.writer, new_writer); + self.rows = 0; + self.num_bytes = 0; + + let bytes = old_writer.into_inner()?; + Ok((old_rows, bytes)) + } + + fn flush(&mut self) -> Result { + let resolved_table_name = self.resolved_table_name.clone(); + let ts_hour = self.ts_hour.clone(); + + info!( + "Flushing file for table: {}, ts_hour: {}", + &resolved_table_name, ts_hour + ); + + let matching_record_seqs = self + .download_items + .iter() + .filter_map(|(record, log_source)| { + resolved_table_name + .contains(log_source) + .then_some(record.sequencer.clone()) + }) + .collect::>(); + + let (old_rows, bytes) = self.reset().map_err(|e| { + SQSLambdaError::new( + format!("Avro write failed: {:#}", e), + matching_record_seqs.clone(), + ) + })?; + + let ret = PublishTask { + resolved_table_name, + ts_hour, + rows: old_rows, + bytes, + record_ids: matching_record_seqs, + }; + Ok(ret) + } +} + +impl<'a> TransformerWriters<'a> { + fn new(download_items: Vec<(DataBatcherOutputRecord, String)>) -> Self { + Self { + download_items, + writers: HashMap::new(), + } + } + + fn writer(&mut self, table_name: String, ts_hour: String) -> &mut TransformerTableWriter<'a> { + self.writers + .entry((table_name.clone(), ts_hour.clone())) + .or_insert_with(|| { + let avro_schema = AVRO_SCHEMAS + .get(&table_name) + .expect("Failed to find avro schema"); + + TransformerTableWriter::new( + avro_schema, + self.download_items.clone(), + table_name.clone(), + ts_hour.clone(), + ) + }) + } + + async fn flush(self) -> Vec> { + self.writers + .into_iter() + .map(|(_, mut w)| w.flush()) + .collect::>() + } +} + async fn handle_all_partial_failures( mut retryable: Vec, un_retryable: Vec,