Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runtime module to enable concurrent load of manifest files. #124

Closed
Tracked by #123
liurenjie1024 opened this issue Dec 19, 2023 · 19 comments
Closed
Tracked by #123

Add runtime module to enable concurrent load of manifest files. #124

liurenjie1024 opened this issue Dec 19, 2023 · 19 comments

Comments

@liurenjie1024
Copy link
Contributor

Currently we implement manifest loading in a sequential approach, e.g. load them one by one. We should add load them concurrently. This requires submitting tasks to rust async runtime, and we should be careful to runtime agnostic.

@odysa
Copy link
Contributor

odysa commented Feb 2, 2024

Hi, is this what you refer to? Can you plz explain more about "careful to runtime agnostic"? Is there anything we need to be careful when implementing concurrent scanning?

let mut file_scan_tasks = Vec::with_capacity(manifest_list.entries().len());
for manifest_list_entry in manifest_list.entries().iter() {
// Data file
let manifest = manifest_list_entry.load_manifest(&self.file_io).await?;
for manifest_entry in manifest.entries().iter().filter(|e| e.is_alive()) {

@liurenjie1024
Copy link
Contributor Author

Hi, is this what you refer to?

Yes, exactly.

Can you plz explain more about "careful to runtime agnostic"? Is there anything we need to be careful when implementing concurrent scanning?

I mean we may need an extra layer for task scheduling, so that we can be adopted to any async runtime such as tokio, async-std.

@odysa
Copy link
Contributor

odysa commented Feb 2, 2024

I mean we may need an extra layer for task scheduling, so that we can be adopted to any async runtime such as tokio, async-std.

Do you want users to choose their own runtime like sqlx?
They are building an abstraction layer(Runtime) so sqlx can run on many blocking/non-blocking runtime.

# tokio (no TLS)
sqlx = { version = "0.7", features = [ "runtime-tokio" ] }
# async-std (no TLS)
sqlx = { version = "0.7", features = [ "runtime-async-std" ] }

I am interested in this feature, but it will take some time for me to draft a design.

@odysa
Copy link
Contributor

odysa commented Feb 2, 2024

Do you want users to choose their own runtime like sqlx? They are building an abstraction layer(Runtime) so sqlx can run on many blocking/non-blocking runtime.

Follow up on this. The sqlx uses a relatively simple solution. For example, the spawn function.

https://github.com/launchbadge/sqlx/blob/84d576004c93a32133688426eacb50434bb5c5f0/sqlx-core/src/rt/mod.rs#L66-L74

@liurenjie1024
Copy link
Contributor Author

Do you want users to choose their own runtime like sqlx?

Yes, exactly. I don't think we should bind to some specific runtime.

Follow up on this. The sqlx uses a relatively simple solution.

I agree that we may need to think about it carefully, sqlx's solution can only use current runtime.

I am interested in this feature, but it will take some time for me to draft a design.

Yeah, welcome to contribute, we can work together on this.

@marvinlanhenke
Copy link
Contributor

@odysa
Just to follow up on this, any progress regarding some design ideas?

@liurenjie1024
Do we have any reference implementation where we can get "inspired" on how things could be done,
or do you have a particular design idea already in mind?

Is this something we should track in #348 perhaps for the v.0.4.0 release?

@liurenjie1024
Copy link
Contributor Author

It's already tracked here: #123

@marvinlanhenke
Copy link
Contributor

in order to verify my understanding and possibly kick of a design discussion, we could follow the approach of sqlx:

  • have a runtime.rs
    • to define a Runtime trait
    • based on a feature-flag import / re-export the implementors of that trait
  • have a /runtime module with specific runtime implementations
    • that implement the Runtime trait

For our particular use-case (loading manifest / or DataFiles on multiple threads) we could e.g. wrap tokio::spawn with our runtime and we're good to go? Then the Runtime trait would evolve to support more "use-cases"?

@liurenjie1024
Copy link
Contributor Author

liurenjie1024 commented May 2, 2024

Maybe currently we don't need a Runtime trait? From what we have learned, we currently need two methods:

  1. spawn
  2. block_on

I think the method here already provided a good example of what we need. Allowing user to specify Runtime for task scheduling could be treated as an advanced feature.

@marvinlanhenke
Copy link
Contributor

marvinlanhenke commented May 3, 2024

... so as a first step - simply wrap tokio::spawn (for example) like here - and not even use a feature-flag for now; just the most simplest layer of abstraction?

@liurenjie1024
Have you already made up your mind; about how we want to read & filter the manifest files concurrently? An idea would be to:

  • async load ManifestList
  • for each entry spawn a new task
    • apply ManifestEvaluator
    • async load Manifest
    • for each ManifestEntry / DataFile spawn a new task
      • apply ExpressionEvaluator
      • apply InclusiveMetricsEvaluator
      • if data file has not been pruned yet
      • send FileScanTask as a result via channel back to main stream
      • yield FileScanTask

I think this way we can achive maximum parallelism, throughput and performance;
however the tradeoffs are the complexity and the spike in ressource consumption, due to spawning multiple tasks and concurrently loading files in memory.

Here is a toy-example to illustrate the idea for further discussion:

async fn create_stream() -> Result<BoxStream<'static, Result<FileScanTask>>> {
    let (tx, mut rx) = mpsc::channel::<FileScanTask>(32);

    // manifest list with entries
    let manifests = Vec::from_iter(0..12);

    for entry in manifests {
        let sender = tx.clone();

        // for each entry spawn a new task
        tokio::spawn(async move {
            // apply `ManifestEvaluator`
            // if not pruned; load manifest
            println!("loading manifest {}", entry);
            time::sleep(Duration::from_millis(1000)).await;

            let data_files: Vec<_> = (0..48).map(|_| DataFile {}).collect();
            // for each DataFile spawn a new task
            for _ in data_files {
                let sender = sender.clone();
                tokio::spawn(async move {
                    // apply ExpressionEvaluator
                    // apply InclusiveMetricsEvaluator
                    process_data_file(sender).await;
                });
            }
        });
    }
    drop(tx);

    let stream = try_stream! {
        while let Some(file_scan_task) = rx.recv().await {
            yield file_scan_task;
        }
    };

    Ok(stream.boxed())
}

@liurenjie1024
Copy link
Contributor Author

Hi, @marvinlanhenke After #233 got merged, we will have a basic runtime framework.

Have you already made up your mind;

Not yet.

I think you solution generally LGTM. Creating on task for each manifest entry may look too much for me. Though spawing one task is lightweight in rust, it still consumes some memory. How do you feel starting with one task for one manifest file?

@Fokko
Copy link
Contributor

Fokko commented May 6, 2024

With Iceberg, the manifests are written to a target size (8 megabyte) by default. Each manifest is bound to the same schema and partition, so you can re-use the evaluators here. I would not go overboard with the parallelism, and just create one task per manifest, and not spawn a task per manifest-entry.

@marvinlanhenke
Copy link
Contributor

How do you feel starting with one task for one manifest file

you mean:

  • spawn a new task for each manifest, load the manifest (entry.load_manifest(...).await?)
  • and handle DataFiles synchronously on the same task

so if we have a manifest_list with e.g. 5 entries, 1 is pruned (ManifestEvaluator) we'd effectively spawn 4 tasks, to load the manifest and handle all the data files; is this correct?

@Fokko
Copy link
Contributor

Fokko commented May 6, 2024

so if we have a manifest_list with e.g. 5 entries, 1 is pruned (ManifestEvaluator) we'd effectively spawn 4 tasks, to load the manifest and handle all the data files; is this correct?

That is correct 👍 I think there might be some confusion around the naming. In the spec we have the Manifest List that contains Manifests. Within the Manifest there are manifest-entries that each point to one Datafile.

@liurenjie1024
Copy link
Contributor Author

How do you feel starting with one task for one manifest file

you mean:

  • spawn a new task for each manifest, load the manifest (entry.load_manifest(...).await?)
  • and handle DataFiles synchronously on the same task

so if we have a manifest_list with e.g. 5 entries, 1 is pruned (ManifestEvaluator) we'd effectively spawn 4 tasks, to load the manifest and handle all the data files; is this correct?

Yeah, that's exactly what I mean.

@sdd
Copy link
Contributor

sdd commented May 13, 2024

Using try_for_each_concurrent here rather than just spawning in a for loop will allow us to tune the concurrncy as it accepts a max concurrent tasks argument. I'd advocate for a data-driven approach to determine a sensible default value for this, alongside the capability to override the default with optional config, probably a with_max_concurrency() method on the builder and some guidance in the docs.

@liurenjie1024
Copy link
Contributor Author

try_for_each_concurrent

Do you meam this method? I think it's ok to me.

@liurenjie1024
Copy link
Contributor Author

Close by #233

@liurenjie1024
Copy link
Contributor Author

Closed by #373

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

No branches or pull requests

5 participants