Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl RleV1Encoder for integer #37

Merged
merged 15 commits into from
Jan 19, 2025
1 change: 0 additions & 1 deletion src/encoding/byte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ impl ByteRleEncoder {
self.tail_run_length = 1;
} else if let Some(run_value) = self.run_value {
// Run mode

if value == run_value {
// Continue buffering for Run sequence, flushing if reaching max length
self.num_literals += 1;
Expand Down
245 changes: 228 additions & 17 deletions src/encoding/integer/rle_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,30 @@

//! Handling decoding of Integer Run Length Encoded V1 data in ORC files

use std::{io::Read, marker::PhantomData};
use std::{io::Read, marker::PhantomData, ops::RangeInclusive};

use bytes::{BufMut, BytesMut};
use snafu::OptionExt;

use crate::{
encoding::{
rle::GenericRle,
util::{read_u8, try_read_u8},
PrimitiveValueEncoder,
},
error::{OutOfSpecSnafu, Result},
memory::EstimateMemory,
};

use super::{util::read_varint_zigzagged, EncodingSign, NInt};
use super::{
util::{read_varint_zigzagged, write_varint_zigzagged},
EncodingSign, NInt,
};

const MAX_RUN_LENGTH: usize = 130;
const MIN_RUN_LENGTH: usize = 3;
const MAX_RUN_LENGTH: usize = 127 + MIN_RUN_LENGTH;
const MAX_LITERAL_LENGTH: usize = 128;
const DELAT_RANGE: RangeInclusive<i64> = -128..=127;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum EncodingType {
Expand Down Expand Up @@ -147,6 +156,190 @@ impl<N: NInt, R: Read, S: EncodingSign> GenericRle<N> for RleV1Decoder<N, R, S>
}
}

/// Represents the state of the RLE V1 encoder.
///
/// The encoder can be in one of three states:
///
/// 1. `Empty`: The buffer is empty and there are no values to encode.
/// 2. `Literal`: The encoder is in literal mode, with values saved in buffer.
/// 3. `Run`: The encoder is in run mode, with a run value, delta, and length.
#[derive(Debug, Clone, Eq, PartialEq)]
enum RleV1EncodingState<N: NInt> {
Empty,
Literal,
Run { value: N, delta: i8, length: usize },
}

impl<N: NInt> Default for RleV1EncodingState<N> {
fn default() -> Self {
Self::Empty
}
}

/// `RleV1Encoder` is responsible for encoding a stream of integers using the Run Length Encoding (RLE) version 1 format.
pub struct RleV1Encoder<N: NInt, S: EncodingSign> {
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
writer: BytesMut,
state: RleV1EncodingState<N>,
buffer: Vec<N>,
sign: PhantomData<S>,
}

impl<N: NInt, S: EncodingSign> RleV1Encoder<N, S> {
/// Processes a given value and updates the encoder state accordingly.
///
/// The function handles three possible states of the encoder:
///
/// 1. `RleV1EncoderState::Empty`:
/// - Transitions to the `Literal` state with the given value as the first element in the buffer.
///
/// 2. `RleV1EncoderState::Run`:
/// - If the value continues the current run (i.e., it matches the expected value based on the run's delta and length),
/// the run length is incremented. If the run length reaches `MAX_RUN_LENGTH`, the run is written out and the state
/// transitions to `Empty`.
/// - If the value does not continue the current run, the existing run is written out and the state transitions to
/// `Literal` with the new value as the first element in the buffer.
///
/// 3. `RleV1EncoderState::Literal`:
/// - The value is added to the buffer. If the buffer length reaches `MAX_LITERAL_LENGTH`, the buffer is written out
/// and the state transitions to `Empty`.
/// - If the buffer length is at least `MIN_RUN_LENGTH` and the values in the buffer form a valid run (i.e., the deltas
/// between consecutive values are consistent and within the allowed range), the state transitions to `Run`.
/// - Otherwise, the state remains `Literal`.
///
fn process_value(&mut self, value: N) {
match &mut self.state {
RleV1EncodingState::Empty => {
// change to literal model
self.buffer.clear();
self.buffer.push(value);
self.state = RleV1EncodingState::Literal;
}
RleV1EncodingState::Literal => {
let buf = &mut self.buffer;
buf.push(value);
let length = buf.len();
let delta = (value - buf[length - 2]).as_i64();
// check if can change to run model
if length >= MIN_RUN_LENGTH
&& DELAT_RANGE.contains(&delta)
&& delta == (buf[length - 2] - buf[length - 3]).as_i64()
{
// change to run model
if length > MIN_RUN_LENGTH {
// write the left literals
write_literals::<_, S>(&mut self.writer, &buf[..(length - MIN_RUN_LENGTH)]);
}
self.state = RleV1EncodingState::Run {
value: buf[length - MIN_RUN_LENGTH],
delta: delta as i8,
length: MIN_RUN_LENGTH,
}
} else if length == MAX_LITERAL_LENGTH {
// reach buffer limit, write literals and change to empty state
write_literals::<_, S>(&mut self.writer, buf);
self.state = RleV1EncodingState::Empty;
}
// else keep literal mode
}
RleV1EncodingState::Run {
value: run_value,
delta,
length,
} => {
if run_value.as_i64() + (*delta as i64) * (*length as i64) == value.as_i64() {
// keep run model
*length += 1;
if *length == MAX_RUN_LENGTH {
// reach run limit
write_run::<_, S>(&mut self.writer, *run_value, *delta, *length);
self.state = RleV1EncodingState::Empty;
}
} else {
// write run values and change to literal model
write_run::<_, S>(&mut self.writer, *run_value, *delta, *length);
self.buffer.clear();
self.buffer.push(value);
self.state = RleV1EncodingState::Literal;
}
}
}
}

/// Flushes the current state of the encoder, writing out any buffered values.
///
/// This function handles the three possible states of the encoder:
///
/// 1. `RleV1EncoderState::Empty`:
/// - No action is needed as there are no buffered values to write.
///
/// 3. `RleV1EncoderState::Literal`:
/// - Writes out the buffered literal values.
///
/// 2. `RleV1EncoderState::Run`:
/// - Writes out the current run of values.
///
/// After calling this function, the encoder state will be reset to `Empty`.
fn flush(&mut self) {
let state = std::mem::take(&mut self.state);
match state {
RleV1EncodingState::Empty => {}
RleV1EncodingState::Literal => {
write_literals::<_, S>(&mut self.writer, &self.buffer);
}
RleV1EncodingState::Run {
value,
delta,
length,
} => {
write_run::<_, S>(&mut self.writer, value, delta, length);
}
}
}
}

fn write_run<N: NInt, S: EncodingSign>(writer: &mut BytesMut, value: N, delta: i8, length: usize) {
// write header
writer.put_u8(length as u8 - 3);
writer.put_u8(delta as u8);
// write run value
write_varint_zigzagged::<_, S>(writer, value);
}

fn write_literals<N: NInt, S: EncodingSign>(writer: &mut BytesMut, buffer: &[N]) {
// write header
writer.put_u8(-(buffer.len() as i8) as u8);
// write literals
for literal in buffer {
write_varint_zigzagged::<_, S>(writer, *literal);
}
}

impl<N: NInt, S: EncodingSign> EstimateMemory for RleV1Encoder<N, S> {
fn estimate_memory_size(&self) -> usize {
self.writer.len()
}
}

impl<N: NInt, S: EncodingSign> PrimitiveValueEncoder<N> for RleV1Encoder<N, S> {
fn new() -> Self {
Self {
writer: BytesMut::new(),
state: Default::default(),
buffer: Vec::with_capacity(MAX_LITERAL_LENGTH),
sign: Default::default(),
}
}

fn write_one(&mut self, value: N) {
self.process_value(value);
}

fn take_inner(&mut self) -> bytes::Bytes {
self.flush();
std::mem::take(&mut self.writer).into()
}
}

#[cfg(test)]
mod tests {
use std::io::Cursor;
Expand All @@ -155,32 +348,50 @@ mod tests {

use super::*;

fn test_helper(data: &[u8], expected: &[i64]) {
let mut reader = RleV1Decoder::<i64, _, UnsignedEncoding>::new(Cursor::new(data));
let mut actual = vec![0; expected.len()];
reader.decode(&mut actual).unwrap();
assert_eq!(actual, expected);
fn test_helper(original: &[i64], encoded: &[u8]) {
let mut encoder = RleV1Encoder::<i64, UnsignedEncoding>::new();
encoder.write_slice(original);
encoder.flush();
let actual_encoded = encoder.take_inner();
assert_eq!(actual_encoded, encoded);

let mut decoder = RleV1Decoder::<i64, _, UnsignedEncoding>::new(Cursor::new(encoded));
let mut actual_decoded = vec![0; original.len()];
decoder.decode(&mut actual_decoded).unwrap();
assert_eq!(actual_decoded, original);
}

#[test]
fn test_run() -> Result<()> {
let data = [0x61, 0x00, 0x07];
let expected = [7; 100];
test_helper(&data, &expected);
let original = [7; 100];
let encoded = [0x61, 0x00, 0x07];
test_helper(&original, &encoded);

let original = (1..=100).rev().collect::<Vec<_>>();
let encoded = [0x61, 0xff, 0x64];
test_helper(&original, &encoded);

let data = [0x61, 0xff, 0x64];
let expected = (1..=100).rev().collect::<Vec<_>>();
test_helper(&data, &expected);
let original = (1..=150).rev().collect::<Vec<_>>();
let encoded = [0x7f, 0xff, 0x96, 0x01, 0x11, 0xff, 0x14];
test_helper(&original, &encoded);

let original = [2, 4, 6, 8, 1, 3, 5, 7, 255];
let encoded = [0x01, 0x02, 0x02, 0x01, 0x02, 0x01, 0xff, 0xff, 0x01];
test_helper(&original, &encoded);
Ok(())
}

#[test]
fn test_literal() -> Result<()> {
let data = [0xfb, 0x02, 0x03, 0x06, 0x07, 0xb];
let expected = vec![2, 3, 6, 7, 11];
test_helper(&data, &expected);
let original = vec![2, 3, 6, 7, 11];
let encoded = [0xfb, 0x02, 0x03, 0x06, 0x07, 0xb];
test_helper(&original, &encoded);

let original = vec![2, 3, 6, 7, 11, 1, 2, 3, 0, 256];
let encoded = [
0xfb, 0x02, 0x03, 0x06, 0x07, 0x0b, 0x00, 0x01, 0x01, 0xfe, 0x00, 0x80, 0x02,
];
test_helper(&original, &encoded);
Ok(())
}
}
Loading