Skip to content

Commit

Permalink
improvement(final?): use multithreading to utilize all threads
Browse files Browse the repository at this point in the history
  • Loading branch information
kahlstrm committed Apr 29, 2024
1 parent 60af122 commit ac144c7
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 9 deletions.
52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ Benchmarks and profiling results shown below are run against a `measurements.txt
| [a custom hash function](#using-a-custom-hash-function) | 30.427 s ± 0.455 | <strong style="color:lime;"> -8,043 s (-21%)</strong> | Use a custom hash function for the hashmap that is used by the Rust Compiler |
| [Custom chunked reader](#chunked-reader-to-reduce-memory-footprint--prepare-for-parallellism) | 29.310 s ± 0.276 | <strong style="color:lime;"> -1,117 s (-4%)</strong> | Write a custom chunked file reader to prepare for parallellism and reduce memory allocation |
| [Tweak chunk size](#tweaking-the-chunk_size) | 28.663 s ± 0.546 | <strong style="color:lime;"> -0,647 s (-2%)</strong> | Small tweak for the fixed buffer-size created in previous improvement |
| [Multithreading](#multithreading) | 3.747 s ± 0.093 | <strong style="color:lime;"> -24,916 s (-87%)</strong> | Multithread the program to utilize all cores on the system |

### Initial Version

Expand Down Expand Up @@ -433,3 +434,54 @@ Benchmark 1: ./target/release/brc-rs
Time (mean ± σ): 28.663 s ± 0.546 s [User: 26.501 s, System: 1.259 s]
Range (min … max): 28.304 s … 29.631 s 5 runs
```

### Multithreading

We have now prepared our program to be able to handle multiple threads at once, and we can start utilizing them to speed up the program.

One might wonder why this wasn't done earlier, but profiling and optimizing a single-threaded program is much more straightforward than a multithreaded one.
As long as we keep the fact that we are going to use multiple threads in mind, we can optimize the single-threaded program to be as fast as possible, and then just spawn more threads to do the same work.

We need to somehow divide the work we have done sequentially into multiple threads.
This can be achieved by dividing the file into chunks and then processing each chunk in a separate thread.

However, we need to be careful when creating the chunks, as we need to make sure that we don't split the lines in the middle.
To do this, we can "peek into" the file at `chunk_size`, and then read forwards until we find a newline character.
Then set the next chunk to start after that newline character.

After creating the chunks, we can then spawn a thread for each chunk, and have them process the chunk in parallel.

After a thread is finished, it will insert its results into the shared hashmap, locked by a [`Mutex`](https://doc.rust-lang.org/std/sync/struct.Mutex.html).
As we are using a `Mutex`, only one thread can access the hashmap at a time, blocking other threads from accessing it until the lock is released.
This time isn't a problem, as the amount of time spend holding the lock is minimal compared to the time spent processing the data.

```sh
~/src/github/brc-rs (main*) » ./bench.sh
Benchmark 1: ./target/release/brc-rs
Time (mean ± σ): 3.747 s ± 0.093 s [User: 29.739 s, System: 2.102 s]
Range (min … max): 3.679 s … 3.910 s 5 runs
```

![Flamegraph of the multithread program, showing the main thread](multithreaded-main-thread.png)
As we can see from the flamegraph, the main thread is mostly waiting for the other threads to finish their work.
Other threads are then doing the actual work of reading the file and processing the data.

Looking at the threads doing the processing, we can see that there are sections where some of the threads have 0% CPU usage.
This is due to I/O blocking, as the thread is waiting for the file to be read into memory.

A slight trick to improve the I/O blocking nature was to create more chunks than there are threads available.
This allows the OS scheduler to switch between threads, potentially enabling I/O blocked threads to be swapped to threads where I/O is not blocked.

## Conclusion

The final(?) result is a program that can process 1 billion rows in 3.7 seconds on a 10-core M1 Macbook Pro.
This is a 97% improvement over the initial version.

A big chunk of the improvement came from multithreading in the end, but without the single-core optimizations done first, we couldn't have been able to parallelize the program as effectively.
Keeping the program single-threaded first allowed us to iterate on the program more easily.

The program is now utilizing all cores on the system, and the I/O blocking is distributed more evenly throughout the program.
The memory footprint is kept low, and the program is quite optimized.

More optimizations could be done, for instance making line parsing branchless, using memory-mapped files, but the latter would require use of `unsafe`-code or external crates, which were not in scope for this project.
For now, I'm quite happy with the results, and I'll leave it at that (for now?)
Binary file added multithreaded-main-thread.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
100 changes: 91 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ use std::{
collections::HashMap,
fs::File,
hash::{BuildHasherDefault, Hasher},
io::Read,
ops::BitXor,
io::{BufRead, BufReader, Read, Seek},
num::NonZeroUsize,
ops::{Add, BitXor},
sync::{Arc, Mutex},
thread,
};

fn main() {
Expand All @@ -23,6 +26,18 @@ impl WeatherStationStats {
self.sum as f64 / 10.0 / self.count as f64
}
}
impl Add<&mut Self> for WeatherStationStats {
type Output = Self;

fn add(self, rhs: &mut Self) -> Self::Output {
WeatherStationStats {
min: self.min.min(rhs.min),
max: self.max.max(rhs.max),
sum: self.sum + rhs.sum,
count: self.count + rhs.count,
}
}
}
fn parse_line(line: &[u8]) -> (&[u8], i64) {
// we know that the measurement is pure ASCII and is at max 5 characters long
// based on this we can find the semicolon faster by doing at most 6 byte comparisons by iterating the reversed bytes
Expand All @@ -46,7 +61,11 @@ fn parse_line(line: &[u8]) -> (&[u8], i64) {
// reversed index 2, is the first whole number, "shift" it twice to the left with * 100
(b, 3) => measurement += (b - b'0') as i64 * 100,
// Data is of incorrect format, as in indices 1, 4 or 5 always must be one of the other characters
_ => unreachable!(),
(b, _) => panic!(
"{} , {:#?}",
String::from_utf8(vec![*b]).unwrap(),
String::from_utf8(line.to_vec())
),
}
}
(
Expand All @@ -58,21 +77,84 @@ fn parse_line(line: &[u8]) -> (&[u8], i64) {
},
)
}

struct Chunk {
start_point: u64,
len: usize,
outer_map: Arc<Mutex<HashMap<Vec<u8>, WeatherStationStats>>>,
}
fn chunk_le_file<T: BufRead + Seek>(
mut f: T,
file_len: usize,
arccimuuteksi: Arc<Mutex<HashMap<Vec<u8>, WeatherStationStats>>>,
) -> Vec<Chunk> {
let chunk_count = std::thread::available_parallelism()
.map(NonZeroUsize::get)
.unwrap_or(1)
// do a sneaky 4x chunks vs available threads to allow OS scheduler to switch between threads,
// potentially enabling I/O blocked threads being swapped to threads where I/O is not blocked.
// 4 was tested to provide best perf with both M1 Macbook Max and Ryzen 5950x
* 4;
let chunk_size = file_len / chunk_count + 1;
// max length of line is 100 bytes station name, ';', '-99.9', '\n'
let mut tmp_arr = Vec::with_capacity(107);
let mut res = vec![];
let mut cur_start = 0;
for _ in 0..chunk_count {
f.seek(std::io::SeekFrom::Current(chunk_size as i64))
.unwrap();
f.read_until(b'\n', &mut tmp_arr).unwrap();
let end_pos = f.stream_position().unwrap();
res.push(Chunk {
start_point: cur_start,
len: (end_pos - cur_start) as usize,
outer_map: arccimuuteksi.clone(),
});
tmp_arr.clear();
cur_start = end_pos
}
res
}
fn calc(file_name: Option<String>) -> String {
let f = File::open(file_name.as_deref().unwrap_or("measurements.txt")).unwrap();

let stations = aggregate_measurements(f);
let file_name: Arc<str> = file_name.unwrap_or("measurements.txt".into()).into();
let f = File::open(file_name.to_string()).unwrap();
let file_len = f.metadata().unwrap().len() as usize;
let stations = Arc::new(Mutex::new(HashMap::<Vec<u8>, WeatherStationStats>::new()));
let chunks = chunk_le_file(BufReader::new(f), file_len, stations.clone());
let handles = chunks
.into_iter()
.map(|c| {
let file_name = file_name.clone();
thread::spawn(move || {
let mut f = File::open(file_name.to_string()).unwrap();
f.seek(std::io::SeekFrom::Start(c.start_point)).unwrap();
let f = f.take(c.len as u64);
let stations_välipala = aggregate_measurements(f);
let mut stations = c.outer_map.lock().unwrap();
for (k, v) in stations_välipala {
match stations.get_mut(&k) {
Some(jutska) => *jutska = v + jutska,
None => {
stations.insert(k, v);
}
}
}
})
})
.collect::<Vec<_>>();
for h in handles {
h.join().unwrap()
}
let lock = stations.lock().unwrap();
let mut res = lock.iter().collect::<Vec<_>>();

let mut res = stations.into_iter().collect::<Vec<_>>();
res.sort_unstable_by(|a, b| a.0.cmp(&b.0));
String::from("{")
+ &res
.into_iter()
.map(|(station, stats)| {
format!(
"{}={:.1}/{:.1}/{:.1}",
String::from_utf8(station).unwrap(),
String::from_utf8_lossy(station),
stats.min as f64 / 10.0,
stats.mean(),
stats.max as f64 / 10.0
Expand Down

0 comments on commit ac144c7

Please sign in to comment.