Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 5, 2024
1 parent ec62932 commit 8dc99b1
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 39 deletions.
33 changes: 18 additions & 15 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use num_traits::Pow;
use polars_core::prelude::*;
use polars_core::{config, POOL};
use polars_error::feature_gated;
use polars_utils::index::{Bounded, Indexable};
use polars_utils::index::Bounded;
use polars_utils::slice::GetSaferUnchecked;
use rayon::prelude::*;

Expand Down Expand Up @@ -387,11 +387,11 @@ impl<'a> Iterator for SplitLines<'a> {
type Item = &'a [u8];

#[inline]
#[cfg(not(feature = "simd"))]
fn next(&mut self) -> Option<&'a [u8]> {
if self.v.is_empty() {
return None;
}
#[cfg(not(feature = "simd"))]
{
let mut pos = 0u32;
let mut iter = self.v.iter();
Expand Down Expand Up @@ -433,8 +433,14 @@ impl<'a> Iterator for SplitLines<'a> {
ret
}
}
}

#[cfg(feature = "simd")]
#[inline]
#[cfg(feature = "simd")]
fn next(&mut self) -> Option<&'a [u8]> {
if self.v.is_empty() {
return None;
}
{
self.total_index = 0;
let mut not_in_field_previous_iter = true;
Expand Down Expand Up @@ -548,20 +554,16 @@ impl<'a> Iterator for SplitLines<'a> {
}
}


pub(super) struct CountLines {
quote_char: u8,
eol_char: u8,
#[cfg(feature = "simd")]
simd_eol_char: SimdVec,
#[cfg(feature = "simd")]
simd_quote_char: SimdVec,
#[cfg(feature = "simd")]
previous_valid_eols: u64,
quoting: bool,
}


impl CountLines {
pub(super) fn new(quote_char: Option<u8>, eol_char: u8) -> Self {
let quoting = quote_char.is_some();
Expand All @@ -577,8 +579,6 @@ impl CountLines {
simd_eol_char,
#[cfg(feature = "simd")]
simd_quote_char,
#[cfg(feature = "simd")]
previous_valid_eols: 0,
quoting,
}
}
Expand Down Expand Up @@ -612,8 +612,7 @@ impl CountLines {
if not_in_field_previous_iter {
not_in_quote_field = !not_in_quote_field;
}
not_in_field_previous_iter =
(not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
eol_mask & not_in_quote_field
} else {
eol_mask
Expand All @@ -625,10 +624,9 @@ impl CountLines {
debug_assert_eq!(original_bytes[position], self.eol_char)
}
total_idx += SIMD_SIZE;

} else if bytes.is_empty() {
debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
return (count, position)
return (count, position);
} else {
let (c, o) = self.count_no_simd(bytes);

Expand All @@ -639,13 +637,18 @@ impl CountLines {
};
debug_assert!(count == 0 || original_bytes[position] == self.eol_char);

return (count, position)
return (count, position);
}
}
}

#[cfg(not(feature = "simd"))]
pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
self.count_no_simd(bytes)
}

fn count_no_simd(&self, bytes: &[u8]) -> (usize, usize) {
let mut iter = bytes.iter();
let iter = bytes.iter();
let mut in_field = false;
let mut count = 0;
let mut position = 0;
Expand Down
45 changes: 22 additions & 23 deletions crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ use polars_core::utils::{accumulate_dataframes_vertical, handle_casting_failures
use polars_core::POOL;
#[cfg(feature = "polars-time")]
use polars_time::prelude::*;
use polars_utils::slice::GetSaferUnchecked;
use rayon::prelude::*;

use super::buffer::init_buffers;
use super::options::{CommentPrefix, CsvEncoding, NullValues, NullValuesCompiled};
use super::parser::{is_comment_line, next_line_position, next_line_position_naive, parse_lines, skip_bom, skip_line_ending, skip_this_line, CountLines, SplitLines};
use super::parser::{
is_comment_line, next_line_position, next_line_position_naive, parse_lines, skip_bom,
skip_line_ending, skip_this_line, CountLines,
};
use super::schema_inference::{check_decimal_comma, infer_file_schema};
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
use super::utils::decompress;
Expand Down Expand Up @@ -369,7 +373,7 @@ impl<'a> CoreReader<'a> {
}

fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {
let (mut bytes, starting_point_offset) =
let (bytes, starting_point_offset) =
self.find_starting_point(bytes, self.quote_char, self.eol_char)?;

let projection = self.get_projection()?;
Expand Down Expand Up @@ -399,18 +403,12 @@ impl<'a> CoreReader<'a> {
let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
let n_parts_hint = n_threads * 32;
let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 1024 * 128);
let mut chunk_size = std::cmp::max(chunk_size, 32 );
let mut chunk_size = std::cmp::max(chunk_size, 32);
let mut total_bytes_offset = 0;

let results = Arc::new(Mutex::new(vec![]));
let mut total_line_count = 0;

// let t = std::time::Instant::now();
// let mut counter = CountLines::new(self.quote_char, self.eol_char);
// let c= counter.count(bytes);
// dbg!(c);
// dbg!(t.elapsed());
// std::process::exit(0);
#[cfg(not(target_family = "wasm"))]
let pool;
#[cfg(not(target_family = "wasm"))]
Expand All @@ -426,29 +424,37 @@ impl<'a> CoreReader<'a> {
#[cfg(target_family = "wasm")]
let pool = &POOL;

let mut counter = CountLines::new(self.quote_char, self.eol_char);
let counter = CountLines::new(self.quote_char, self.eol_char);
let mut total_offset = 0;

pool.scope(|s| {
loop {
let b = &bytes[total_offset..std::cmp::min(total_offset + chunk_size, bytes.len())];
let b = unsafe {
bytes.get_unchecked_release(
total_offset..std::cmp::min(total_offset + chunk_size, bytes.len()),
)
};
if b.is_empty() {
break;
}
debug_assert!(total_offset == 0 || bytes[total_offset -1] == self.eol_char);
debug_assert!(total_offset == 0 || bytes[total_offset - 1] == self.eol_char);
let (count, position) = counter.count(b);
debug_assert!(count == 0 || b[position] == self.eol_char);

let (b, count) = if count == 0 && unsafe { b.as_ptr().add(b.len()) == bytes.as_ptr().add(bytes.len()) } {
let (b, count) = if count == 0
&& unsafe { b.as_ptr().add(b.len()) == bytes.as_ptr().add(bytes.len()) }
{
total_offset = bytes.len();
(b, 1)
} else {
if count == 0 {
chunk_size *= 2;
continue
continue;
}

let b = &bytes[total_offset..total_offset + position];
let b = unsafe {
bytes.get_unchecked_release(total_offset..total_offset + position)
};
total_offset += position + 1;
(b, count)
};
Expand All @@ -461,14 +467,7 @@ impl<'a> CoreReader<'a> {
let slf = &(*self);
s.spawn(move |_| {
let result = slf
.read_chunk(
b,
projection,
0,
count,
starting_point_offset,
b.len(),
)
.read_chunk(b, projection, 0, count, starting_point_offset, b.len())
.and_then(|mut df| {
debug_assert!(df.height() <= count);

Expand Down
1 change: 0 additions & 1 deletion crates/polars/tests/it/io/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,6 @@ fn test_try_parse_dates() -> PolarsResult<()> {
.into_reader_with_file_handle(file)
.finish()?;

dbg!(&df);
assert_eq!(df.dtypes(), &[DataType::Date]);
assert_eq!(df.column("date")?.null_count(), 1);
Ok(())
Expand Down

0 comments on commit 8dc99b1

Please sign in to comment.