From acd98654bd86bfadafb76ab99b0e767ec4326bdb Mon Sep 17 00:00:00 2001 From: Eren Avsarogullari Date: Mon, 29 Apr 2024 12:13:37 -0700 Subject: [PATCH] refactor: Convert IPCWriter metrics from u64 to usize (#10278) --- datafusion/physical-plan/src/common.rs | 10 +++++----- datafusion/physical-plan/src/sorts/sort.rs | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index f7cad9df4ba1..cdd122cf36fe 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -259,11 +259,11 @@ pub struct IPCWriter { /// inner writer pub writer: FileWriter, /// batches written - pub num_batches: u64, + pub num_batches: usize, /// rows written - pub num_rows: u64, + pub num_rows: usize, /// bytes written - pub num_bytes: u64, + pub num_bytes: usize, } impl IPCWriter { @@ -306,9 +306,9 @@ impl IPCWriter { pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { self.writer.write(batch)?; self.num_batches += 1; - self.num_rows += batch.num_rows() as u64; + self.num_rows += batch.num_rows(); let num_bytes: usize = batch.get_array_memory_size(); - self.num_bytes += num_bytes as u64; + self.num_bytes += num_bytes; Ok(()) } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index e4e3d46dfbbc..ebeaf9e471c3 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -406,7 +406,7 @@ impl ExternalSorter { let used = self.reservation.free(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(used); - self.metrics.spilled_rows.add(spilled_rows as usize); + self.metrics.spilled_rows.add(spilled_rows); self.spills.push(spill_file); Ok(used) } @@ -674,7 +674,7 @@ async fn spill_sorted_batches( batches: Vec, path: &Path, schema: SchemaRef, -) -> Result { +) -> Result { let path: PathBuf = path.into(); let task = SpawnedTask::spawn_blocking(move || write_sorted(batches, path, schema)); match task.join().await { @@ -705,7 +705,7 @@ fn write_sorted( batches: Vec, path: PathBuf, schema: SchemaRef, -) -> Result { +) -> Result { let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; for batch in batches { writer.write(&batch)?; @@ -715,7 +715,7 @@ fn write_sorted( "Spilled {} batches of total {} rows to disk, memory released {}", writer.num_batches, writer.num_rows, - human_readable_size(writer.num_bytes as usize), + human_readable_size(writer.num_bytes), ); Ok(writer.num_rows) }