-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathpagerank_timely.rs
81 lines (65 loc) · 2.25 KB
/
pagerank_timely.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
use std::time::Instant;
use rand::prelude::*;
use renoir::prelude::*;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
const INIT: i64 = 6_000_000;
const HOP: i64 = 1_000_000;
fn main() {
env_logger::init();
let (config, args) = RuntimeConfig::from_args();
// command-line args: numbers of nodes and edges in the random graph.
let nodes: u64 = args[1].parse().unwrap();
let edges: u64 = args[2].parse().unwrap();
let max_iter: usize = 10000;
config.spawn_remote_workers();
let env = StreamContext::new(config);
let source = env
.stream_par_iter(move |index, peers| {
let mut rng1: SmallRng = SeedableRng::seed_from_u64(index);
(0..(edges / peers)).map(move |_| (rng1.gen_range(0..nodes), rng1.gen_range(0..nodes)))
})
.batch_mode(BatchMode::fixed(1024));
let mut split = source.split(2);
let adj_list = split
.pop()
.unwrap()
.group_by(|(x, _y)| *x)
.fold(Vec::new(), |edges, (_x, y)| edges.push(y))
.unkey();
let init = split
.pop()
.unwrap()
.flat_map(|(x, y)| [x, y])
.group_by_fold(|x| *x, (), |_, _| (), |_, _| ())
.unkey()
.left_join(adj_list, |x| x.0, |x| x.0)
.map(|(_, (_, vec))| (INIT, vec.map(|(_, v)| v).unwrap_or_default()));
let out = init.delta_iterate(
max_iter,
|_, (rank, _), delta_rank| *rank += delta_rank,
|x, (rank, adj_list)| {
let mut update = Vec::with_capacity(adj_list.len() + 1);
if !adj_list.is_empty() {
let degree = adj_list.len() as i64;
let new_share = (*rank * 5) / (6 * degree);
for adj in adj_list {
update.push((*adj, new_share));
}
}
update.push((*x, HOP - *rank));
update
},
|_, (rank, _)| rank,
|u| *u != 0,
move |s| s.flatten().drop_key().group_by_sum(|(x, _)| *x, |x| x.1),
);
out.for_each(|x| {
println!("{}:{}", x.0, x.1);
core::hint::black_box(x);
});
let start = Instant::now();
env.execute_blocking();
let elapsed = start.elapsed();
eprintln!("Elapsed {elapsed:?}");
}