Skip to content

Commit

Permalink
refactor: get_deletes_for_data_file. Change tokio::spawn to crate::ru…
Browse files Browse the repository at this point in the history
…ntime::spawn. Add missing license
  • Loading branch information
sdd committed Dec 21, 2024
1 parent 07037d1 commit b41a6b1
Showing 1 changed file with 39 additions and 44 deletions.
83 changes: 39 additions & 44 deletions crates/iceberg/src/delete_file_index.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;

use futures::channel::mpsc;
use futures::{StreamExt, TryStreamExt};
use tokio::sync::watch;

use crate::runtime::spawn;
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
use crate::spec::{DataContentType, DataFile, Struct};
use crate::Result;
Expand All @@ -29,7 +47,7 @@ impl DeleteFileIndex {
let (tx, rx) = watch::channel(None);

let delete_file_stream = receiver.boxed();
tokio::spawn(async move {
spawn(async move {
let delete_files = delete_file_stream.try_collect::<Vec<_>>().await;
let delete_file_index = delete_files.map(DeleteFileIndex::from_delete_files);
let delete_file_index = Arc::new(delete_file_index);
Expand Down Expand Up @@ -107,59 +125,36 @@ impl DeleteFileIndex {
data_file: &DataFile,
seq_num: Option<i64>,
) -> Vec<FileScanTaskDeleteFile> {
let mut results = vec![];
let mut deletes_queue = vec![];

if let Some(deletes) = self.pos_deletes_by_path.get(data_file.file_path()) {
deletes
.iter()
.filter(|&delete| {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
.unwrap_or_else(|| true)
})
.for_each(|delete| {
results.push(FileScanTaskDeleteFile {
file_path: delete.manifest_entry.file_path().to_string(),
file_type: delete.manifest_entry.content_type(),
partition_spec_id: delete.partition_spec_id,
})
});
deletes_queue.extend(deletes.iter());
}

if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) {
deletes
.iter()
.filter(|&delete| {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
.unwrap_or_else(|| true)
})
.for_each(|delete| {
results.push(FileScanTaskDeleteFile {
file_path: delete.manifest_entry.file_path().to_string(),
file_type: delete.manifest_entry.content_type(),
partition_spec_id: delete.partition_spec_id,
})
});
deletes_queue.extend(deletes.iter());
}

if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) {
deletes
.iter()
.filter(|&delete| {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
.unwrap_or_else(|| true)
})
.for_each(|delete| {
results.push(FileScanTaskDeleteFile {
file_path: delete.manifest_entry.file_path().to_string(),
file_type: delete.manifest_entry.content_type(),
partition_spec_id: delete.partition_spec_id,
})
});
deletes_queue.extend(deletes.iter());
}

let mut results = vec![];
deletes_queue
.iter()
.filter(|&delete| {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
.unwrap_or_else(|| true)
})
.for_each(|delete| {
results.push(FileScanTaskDeleteFile {
file_path: delete.manifest_entry.file_path().to_string(),
file_type: delete.manifest_entry.content_type(),
partition_spec_id: delete.partition_spec_id,
})
});

results
}
}

0 comments on commit b41a6b1

Please sign in to comment.