Skip to content

Commit

Permalink
+25% throughput with segmented arrays (#188)
Browse files Browse the repository at this point in the history
* +25% throughput with segmented arrays

For every updated actor in the network body, there are two hash
functions run. One to lookup the actor id to grab their object id, and
another to look up that object id's attributes. Even with the fastest
hash function (FNV), this is still slow when executed a 100k times in a
replay.

The solution is to transition the hashmap to a segmented array where
small indices (which are the vast majority of them) are looked up
directly in a sparse array with a static size. Indices that fall outside
the array are still stored in a hashmap.

Benchmarks showed a +25% throughput improvement, which imo is kinda
incredible that such performance improvement are still on the table.

* ...

* update perf numbers

* Update error model
  • Loading branch information
nickbabcock authored Dec 24, 2024
1 parent 0803cfd commit 4b29649
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 168 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ with the following options and the estimated elapsed time.

| Header | Corruption Check | Body | Output JSON | Elapsed | Throughput |
| - | - | - | - | - | - |
|| | | | 68.0 µs | |
|||| | 6.6 ms | 223 MiB/s |
|| || | 6.3 ms | 232 MiB/s |
||||| 35 ms | 531 MiB/s ^1 |
|| | | | 32 µs | |
|||| | 5.1 ms | 290 MiB/s |
|| || | 4.8 ms | 315 MiB/s |
||||| 31 ms | 600 MiB/s ^1 |

^1: JSON serialization throughput includes the amount of JSON produced

Expand Down
196 changes: 94 additions & 102 deletions src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::data::ATTRIBUTES;
use crate::network::{ActorId, Frame, NewActor, ObjectId, StreamId, UpdatedAttribute};
use crate::{AttributeTag, CacheInfo};
use fnv::FnvHashMap;
use std::error::Error;
use std::fmt;
Expand Down Expand Up @@ -105,12 +106,12 @@ impl Display for AttributeError {

#[derive(PartialEq, Debug, Clone)]
pub struct FrameContext {
pub objects: Vec<String>,
pub object_attributes: FnvHashMap<ObjectId, FnvHashMap<StreamId, ObjectId>>,
pub frames: Vec<Frame>,
pub actors: FnvHashMap<ActorId, ObjectId>,
pub new_actors: Vec<NewActor>,
pub updated_actors: Vec<UpdatedAttribute>,
pub(crate) objects: Vec<String>,
pub(crate) object_attributes: Vec<Option<CacheInfo>>,
pub(crate) frames: Vec<Frame>,
pub(crate) actors: FnvHashMap<ActorId, ObjectId>,
pub(crate) new_actors: Vec<NewActor>,
pub(crate) updated_actors: Vec<UpdatedAttribute>,
}

impl FrameContext {
Expand All @@ -124,55 +125,48 @@ impl FrameContext {
}

fn display_new_actor(&self, f: &mut fmt::Formatter<'_>, actor: &NewActor) -> fmt::Result {
write!(
writeln!(f)?;
writeln!(f, "Last new actor:")?;
writeln!(f, "{}", "-".repeat("Last new actor:".len()))?;
writeln!(f, "actor id: {}", actor.actor_id)?;
writeln!(
f,
"(id: {}, nameId: {}, objId: {}, objName: {}, initial trajectory: {:?})",
actor.actor_id,
"name id: {}",
actor
.name_id
.map(|x| x.to_string())
.unwrap_or_else(|| String::from("<none>")),
actor.object_id,
self.object_ind_to_string(actor.object_id),
actor.initial_trajectory
)
}

fn display_update(&self, f: &mut fmt::Formatter<'_>, attr: &UpdatedAttribute) -> fmt::Result {
let actor_entry = self.actors.get(&attr.actor_id);
let actor_obj_name = actor_entry.and_then(|x| self.objects.get(usize::from(*x)));
let stream_obj_name = self.objects.get(usize::from(attr.object_id));

write!(
.unwrap_or_else(|| String::from("<none>"))
)?;
writeln!(f, "object id: {}", actor.object_id)?;
writeln!(
f,
"(actor stream id / object id / name: {} / ",
attr.actor_id
"object name: {}",
self.object_ind_to_string(actor.object_id)
)?;
if let Some(actor_id) = actor_entry {
write!(f, "{} / ", actor_id)
} else {
write!(f, "<none> / ")
}?;
writeln!(f, "location: {:?}", actor.initial_trajectory.location)?;
writeln!(f, "rotation: {:?}", actor.initial_trajectory.rotation)
}

if let Some(name) = actor_obj_name {
write!(f, "{}, ", name)
fn display_update(&self, f: &mut fmt::Formatter<'_>, attr: &UpdatedAttribute) -> fmt::Result {
writeln!(f)?;
writeln!(f, "Last actor update:")?;
writeln!(f, "{}", "-".repeat("Last actor update:".len()))?;

writeln!(f, "actor id: {}", attr.actor_id)?;
if let Some(object_id) = self.actors.get(&attr.actor_id) {
writeln!(f, "object id: {}", object_id)?;
writeln!(f, "object name: {}", self.object_ind_to_string(*object_id))?;
} else {
write!(f, "<none>, ")
}?;

write!(
writeln!(f, "object id: <none>")?;
};
writeln!(f, "attribute stream id: {}", attr.stream_id)?;
writeln!(f, "attribute object id: {}", attr.object_id)?;
writeln!(
f,
"attribute stream id / object id / name: {} / {} / ",
attr.stream_id, attr.object_id
"attribute object name: {}",
self.object_ind_to_string(attr.object_id)
)?;

if let Some(name) = stream_obj_name {
write!(f, "{}", name)
} else {
write!(f, "<none>")
}?;

write!(f, ", attribute: {:?})", attr.attribute)
writeln!(f, "attribute: {:?}", attr.attribute)
}

fn most_recent_frame_with_data(&self) -> Option<(usize, &Frame)> {
Expand All @@ -187,32 +181,37 @@ impl FrameContext {
impl fmt::Display for FrameContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let last_frame = self.frames.last();
write!(
writeln!(
f,
"on frame: {} (time: {} delta: {}), ",
"Current frame: {} (time: {} delta: {})",
self.frames.len(),
last_frame.map(|x| x.time).unwrap_or_default(),
last_frame.map(|x| x.delta).unwrap_or_default()
)?;
if let Some(updated) = self.updated_actors.last() {
write!(f, "last updated actor: ")?;
self.display_update(f, updated)
return self.display_update(f, updated);
} else if let Some(new) = self.new_actors.last() {
write!(f, "last new actor: ")?;
return self.display_new_actor(f, new);
}

writeln!(f, "- No actor information decoded")?;

let Some((frame_idx, frame)) = self.most_recent_frame_with_data() else {
return Ok(());
};

writeln!(
f,
"Retrace frame: {} (time: {}, delta: {})",
frame_idx, frame.time, frame.delta
)?;

if let Some(updated) = frame.updated_actors.last() {
self.display_update(f, updated)
} else if let Some(new) = frame.new_actors.last() {
self.display_new_actor(f, new)
} else if let Some((frame_idx, frame)) = self.most_recent_frame_with_data() {
write!(f, "backtracking to frame {}, ", frame_idx)?;
if let Some(updated) = frame.updated_actors.last() {
write!(f, "last updated actor: ")?;
self.display_update(f, updated)
} else if let Some(new) = frame.new_actors.last() {
write!(f, "last new actor: ")?;
self.display_new_actor(f, new)
} else {
write!(f, "it didn't decode anything")
}
} else {
write!(f, "it didn't decode anything")
writeln!(f, "- No actor information decoded")
}
}
}
Expand Down Expand Up @@ -278,35 +277,28 @@ impl FrameError {
let objs_with_attr = context
.object_attributes
.iter()
.flat_map(|(obj_id, attrs)| {
attrs
.iter()
.map(move |(attr_id, attr_obj_id)| (obj_id, attr_obj_id, attr_id))
})
.filter(|&(_, _, stream_id)| stream_id == attribute_stream)
.filter_map(|x| x.as_ref())
.filter_map(|c| c.attributes.get(*attribute_stream))
.collect::<Vec<_>>();

let obj = context
.object_attributes
.get(actor_object)
.and_then(|x| x.get(attribute_stream));

if let Some(attr_obj_id) = obj {
if let Some(name) = context.objects.get(usize::from(*attr_obj_id)) {
if let Some(attr) = ATTRIBUTES.get(name.as_str()) {
write!(
f,
"found attribute {} ({:?}) on {} in network cache data. ",
name, attr, actor_obj_name
)?;
} else {
write!(f, "found attribute {} (unknown to boxcars) on {} in network cache data. This is likely due to a rocket league update or an atypical replay. File a bug report!", name, actor_obj_name)?;

// No need for further context so we return early.
return Ok(());
}
.get(usize::from(*actor_object))
.and_then(|x| x.as_ref())
.and_then(|x| x.attributes.get(*attribute_stream));

if let Some(attr_obj) = obj {
if attr_obj.attribute != AttributeTag::NotImplemented {
write!(
f,
"found attribute ({:?}) on {} in network cache data. ",
attr_obj.attribute, actor_obj_name
)?;
} else {
write!(f, "found attribute on {} in network cache data, but not in replay object data, ", actor_obj_name)?;
writeln!(f, "attribute: {}", context.objects.get(usize::from(attr_obj.object_id)).map_or("<unknown>", |v| v))?;

// No need for further context so we return early.
return Ok(());
}
} else {
write!(
Expand All @@ -318,25 +310,14 @@ impl FrameError {

let mut obj_attr_names = objs_with_attr
.iter()
.filter_map(|&(obj_id, attr_obj_id, _)| {
context
.objects
.get(usize::from(*obj_id))
.and_then(|obj_name| {
context
.objects
.get(usize::from(*attr_obj_id))
.map(|attr_name| (obj_name, attr_name))
})
})
.filter_map(|attr| context.objects.get(usize::from(attr.object_id)))
.collect::<Vec<_>>();

obj_attr_names.sort();
obj_attr_names.dedup();

let mut unknown_attributes = obj_attr_names
.iter()
.map(|(_obj_name, attr_name)| attr_name)
.filter(|x| !ATTRIBUTES.contains_key(x.as_str()))
.cloned()
.cloned()
Expand All @@ -347,7 +328,7 @@ impl FrameError {

let stringify_names = obj_attr_names
.iter()
.map(|(obj_name, attr_name)| format!("({}: {})", obj_name, attr_name))
.map(|attr_name| format!("({})", attr_name))
.collect::<Vec<_>>();

write!(f, "searching all attributes with the same stream id, ")?;
Expand Down Expand Up @@ -386,7 +367,13 @@ impl Display for FrameError {
FrameError::ObjectIdOutOfRange {obj} => write!(f, "new actor object id out of range: {}", obj),
FrameError::MissingActor {actor} => write!(f, "attribute update references unknown actor: {}", actor),
FrameError::MissingCache {actor, actor_object} => write!(f, "no known attributes found for actor id / object id: {} / {}", actor, actor_object),
FrameError::MissingAttribute {actor, actor_object, attribute_stream} => write!(f, "attribute unknown or not implemented: actor id / actor object id / attribute id: {} / {} / {}", actor, actor_object, attribute_stream),
FrameError::MissingAttribute {actor, actor_object, attribute_stream} =>{
writeln!(f, "attribute unknown or not implemented:")?;
writeln!(f, "{}", "-".repeat(10))?;
writeln!(f, "actor id: {}", actor)?;
writeln!(f, "actor object id: {}", actor_object)?;
writeln!(f, "attribute stream id: {}", attribute_stream)
},
FrameError::AttributeError {actor, actor_object, attribute_stream, error} => write!(f, "attribute decoding error encountered: {} for actor id / actor object id / attribute id: {} / {} / {}", error, actor, actor_object, attribute_stream),
}
}
Expand Down Expand Up @@ -436,9 +423,14 @@ impl Display for NetworkError {
),
NetworkError::TooManyFrames(size) => write!(f, "Too many frames to decode: {}", size),
NetworkError::FrameError(err, context) => {
write!(f, "Error decoding frame: {}. ", err)?;
write!(f, "Error decoding frame: {}", err)?;
if !matches!(err, FrameError::MissingAttribute { .. }) {
write!(f, ". ")?;
}

err.contextualize(f, context)?;
write!(f, " Context: {}", context)
writeln!(f)?;
write!(f, "{}", context)
}
}
}
Expand Down
Loading

0 comments on commit 4b29649

Please sign in to comment.