Skip to content

Commit

Permalink
+25% throughput with segmented arrays
Browse files Browse the repository at this point in the history
For every updated actor in the network body, there are two hash
functions run. One to lookup the actor id to grab their object id, and
another to look up that object id's attributes. Even with the fastest
hash function (FNV), this is still slow when executed a 100k times in a
replay.

The solution is to transition the hashmap to a segmented array where
small indices (which are the vast majority of them) are looked up
directly in a sparse array with a static size. Indices that fall outside
the array are still stored in a hashmap.

Benchmarks showed a +25% throughput improvement, which imo is kinda
incredible that such performance improvement are still on the table.
  • Loading branch information
nickbabcock committed Dec 17, 2024
1 parent dbaa174 commit d77dc00
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 47 deletions.
100 changes: 77 additions & 23 deletions src/network/frame_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,58 @@ use crate::network::models::{
use crate::network::{CacheInfo, VersionTriplet};
use crate::parser::ReplayBody;

#[derive(Debug)]
pub(crate) struct SegmentedArray<T> {
array: Vec<Option<T>>,
map: FnvHashMap<usize, T>,
}

impl<T> SegmentedArray<T> {
pub(crate) fn new(size: usize) -> Self {
let mut array = Vec::with_capacity(size);
array.resize_with(size, || None);
Self {
array,
map: FnvHashMap::default(),
}
}

pub(crate) fn insert(&mut self, key: usize, value: T) {
match self.array.get_mut(key) {
Some(entry) => {
*entry = Some(value);
}
None => {
self.map.insert(key, value);
}
};
}

pub(crate) fn get(&self, key: usize) -> Option<&T> {
match self.array.get(key) {
Some(x) => x.as_ref(),
None => self.map.get(&key),
}
}

pub(crate) fn delete(&mut self, key: usize) {
match self.array.get(key) {
Some(_) => {} // skip removing
None => {
self.map.remove(&key);
}
};
}
}

pub(crate) struct FrameDecoder<'a, 'b: 'a> {
pub frames_len: usize,
pub product_decoder: ProductValueDecoder,
pub max_channels: u32,
pub channel_bits: u32,
pub body: &'a ReplayBody<'b>,
pub spawns: &'a Vec<SpawnTrajectory>,
pub object_ind_attributes: FnvHashMap<ObjectId, CacheInfo<'a>>,
pub object_ind_attributes: Vec<Option<CacheInfo>>,
pub version: VersionTriplet,
pub is_lan: bool,
pub is_rl_223: bool,
Expand Down Expand Up @@ -68,12 +112,12 @@ impl<'a, 'b> FrameDecoder<'a, 'b> {
})
}

fn decode_frame(
&self,
fn decode_frame<'c>(
&'c self,
attr_decoder: &AttributeDecoder,
bits: &mut LittleEndianReader<'_>,
buf: &mut [u8],
actors: &mut FnvHashMap<ActorId, ObjectId>,
actors: &mut SegmentedArray<(ObjectId, &'c CacheInfo)>,
new_actors: &mut Vec<NewActor>,
deleted_actors: &mut Vec<ActorId>,
updated_actors: &mut Vec<UpdatedAttribute>,
Expand Down Expand Up @@ -123,24 +167,24 @@ impl<'a, 'b> FrameDecoder<'a, 'b> {
// Insert the new actor so we can keep track of it for attribute
// updates. It's common for an actor id to already exist, so we
// overwrite it.
actors.insert(actor.actor_id, actor.object_id);
let cache_info = self
.object_ind_attributes
.get(actor.object_id.0 as usize)
.and_then(|x| x.as_ref())
.ok_or(FrameError::MissingCache {
actor: actor_id,
actor_object: actor.object_id,
})?;

actors.insert(actor.actor_id.0 as usize, (actor.object_id, cache_info));
new_actors.push(actor);
} else {
// We'll be updating an existing actor with some attributes so we need
// to track down what the actor's type is
let object_id = actors
.get(&actor_id)
// to track down what the actor's type is and what attributes are available
let (object_id, cache_info) = actors
.get(actor_id.0 as usize)
.ok_or(FrameError::MissingActor { actor: actor_id })?;

// Once we have the type we need to look up what attributes are
// available for said type
let cache_info = self.object_ind_attributes.get(object_id).ok_or(
FrameError::MissingCache {
actor: actor_id,
actor_object: *object_id,
},
)?;

// While there are more attributes to update for our actor:
while bits
.read_bit()
Expand All @@ -164,7 +208,7 @@ impl<'a, 'b> FrameDecoder<'a, 'b> {
// decoding function. Experience has told me replays that fail to
// parse, fail to do so here, so a large chunk is dedicated to
// generating an error message with context
let attr = cache_info.attributes.get(&stream_id).ok_or(
let attr = cache_info.attributes.get(stream_id.0 as usize).ok_or(
FrameError::MissingAttribute {
actor: actor_id,
actor_object: *object_id,
Expand Down Expand Up @@ -198,7 +242,7 @@ impl<'a, 'b> FrameDecoder<'a, 'b> {
}
} else {
deleted_actors.push(actor_id);
actors.remove(&actor_id);
actors.delete(actor_id.0 as usize);
}
}

Expand All @@ -219,7 +263,7 @@ impl<'a, 'b> FrameDecoder<'a, 'b> {
};

let mut frames: Vec<Frame> = Vec::with_capacity(self.frames_len);
let mut actors = FnvHashMap::default();
let mut actors = SegmentedArray::new(200);
let mut bits = LittleEndianReader::new(self.body.network_data);
let mut new_actors = Vec::new();
let mut updated_actors = Vec::new();
Expand All @@ -245,19 +289,29 @@ impl<'a, 'b> FrameDecoder<'a, 'b> {
object_attributes: self
.object_ind_attributes
.iter()
.enumerate()
.flat_map(|(i, x)| Some((i, x.as_ref()?)))
.map(|(key, value)| {
(
*key,
ObjectId(key as i32),
value
.attributes
.map
.iter()
.map(|(key2, value)| (*key2, value.object_id))
.enumerate()
.map(|(key2, value)| {
(StreamId(key2 as i32), value.1.object_id)
})
.collect(),
)
})
.collect(),
frames: frames.clone(),
actors: actors.clone(),
actors: actors
.map
.iter()
.map(|(k, (o, _))| (ActorId(*k as i32), *o))
.collect(),
new_actors: new_actors.clone(),
updated_actors: updated_actors.clone(),
}),
Expand Down
58 changes: 34 additions & 24 deletions src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ use crate::models::*;
use crate::network::frame_decoder::FrameDecoder;
use crate::parser::ReplayBody;
use fnv::FnvHashMap;
use frame_decoder::SegmentedArray;
use std::cmp;

#[derive(Debug)]
pub(crate) struct CacheInfo<'a> {
pub(crate) struct CacheInfo {
max_prop_id: u32,
prop_id_bits: u32,
attributes: &'a FnvHashMap<StreamId, ObjectAttribute>,
attributes: SegmentedArray<ObjectAttribute>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -134,28 +135,37 @@ pub(crate) fn parse(header: &Header, body: &ReplayBody) -> Result<NetworkFrames,
);
}

let object_ind_attributes: FnvHashMap<ObjectId, CacheInfo> = object_ind_attrs
.iter()
.map(|(obj_id, attrs)| {
let id = *obj_id;
let max = attrs
.keys()
.map(|&x| i32::from(x))
.max()
.unwrap_or(2)
.saturating_add(1);

let max_bit_width = crate::bits::bit_width(max as u64);
Ok((
id,
CacheInfo {
max_prop_id: max as u32,
prop_id_bits: cmp::max(max_bit_width, 1) - 1,
attributes: attrs,
},
))
})
.collect::<Result<FnvHashMap<_, _>, NetworkError>>()?;
let mut object_ind_attributes: Vec<Option<CacheInfo>> = Vec::with_capacity(body.objects.len());
object_ind_attributes.resize_with(body.objects.len(), || None);

let iter = object_ind_attrs.into_iter().map(|(obj_id, attrs)| {
let id = obj_id;
let max = attrs
.keys()
.map(|&x| i32::from(x))
.max()
.unwrap_or(2)
.saturating_add(1);
let mut attributes = SegmentedArray::new(64);
for (k, v) in attrs {
attributes.insert(k.0 as usize, v);
}

let max_bit_width = crate::bits::bit_width(max as u64);
Ok((
id,
CacheInfo {
max_prop_id: max as u32,
prop_id_bits: cmp::max(max_bit_width, 1) - 1,
attributes,
},
))
});

for x in iter {
let (object, cache) = x?;
object_ind_attributes[object.0 as usize] = Some(cache);
}

let product_decoder = ProductValueDecoder::create(version, &object_index);

Expand Down

0 comments on commit d77dc00

Please sign in to comment.