2
2
3
3
import abc
4
4
import datetime as dt
5
+ import functools
5
6
import logging
6
7
from collections .abc import Generator
7
8
from io import BytesIO
55
56
56
57
def is_zstandard (reader : IO [bytes ]) -> bool :
57
58
"""
58
- Determine if an `IO[bytes]` reader contains zstandard compressed
59
- data.
59
+ Determine if an `IO[bytes]` reader contains zstandard compressed data.
60
60
61
61
Parameters
62
62
----------
@@ -96,7 +96,9 @@ def is_dbn(reader: IO[bytes]) -> bool:
96
96
97
97
98
98
class DataSource (abc .ABC ):
99
- """Abstract base class for backing DBNStore instances with data."""
99
+ """
100
+ Abstract base class for backing DBNStore instances with data.
101
+ """
100
102
101
103
def __init__ (self , source : object ) -> None :
102
104
...
@@ -137,6 +139,11 @@ def __init__(self, source: PathLike[str] | str):
137
139
if not self ._path .is_file () or not self ._path .exists ():
138
140
raise FileNotFoundError (source )
139
141
142
+ if self ._path .stat ().st_size == 0 :
143
+ raise ValueError (
144
+ f"Cannot create data source from empty file: { self ._path .name } " ,
145
+ )
146
+
140
147
self ._name = self ._path .name
141
148
self .__buffer : IO [bytes ] | None = None
142
149
@@ -244,8 +251,8 @@ def nbytes(self) -> int:
244
251
@property
245
252
def reader (self ) -> IO [bytes ]:
246
253
"""
247
- Return a reader for this buffer.
248
- The reader beings at the start of the buffer.
254
+ Return a reader for this buffer. The reader beings at the start of the
255
+ buffer.
249
256
250
257
Returns
251
258
-------
@@ -306,6 +313,11 @@ class DBNStore:
306
313
to_ndarray : np.ndarray
307
314
The data as a numpy `ndarray`.
308
315
316
+ Raises
317
+ ------
318
+ BentoError
319
+ When the data_source does not contain valid DBN data or is corrupted.
320
+
309
321
See Also
310
322
--------
311
323
https://docs.databento.com/knowledge-base/new-users/dbn-encoding
@@ -328,7 +340,7 @@ def __init__(self, data_source: DataSource) -> None:
328
340
buffer = data_source .reader
329
341
else :
330
342
# We don't know how to read this file
331
- raise RuntimeError (
343
+ raise BentoError (
332
344
f"Could not determine compression format of { self ._data_source .name } " ,
333
345
)
334
346
@@ -452,10 +464,6 @@ def _prepare_dataframe(
452
464
df : pd .DataFrame ,
453
465
schema : Schema ,
454
466
) -> pd .DataFrame :
455
- # Setup column ordering and index
456
- df .set_index (self ._get_index_column (schema ), inplace = True )
457
- df = df .reindex (columns = COLUMNS [schema ])
458
-
459
467
if schema == Schema .MBO or schema in DERIV_SCHEMAS :
460
468
df ["flags" ] = df ["flags" ] & 0xFF # Apply bitmask
461
469
df ["side" ] = df ["side" ].str .decode ("utf-8" )
@@ -500,8 +508,8 @@ def _map_symbols(self, df: pd.DataFrame, pretty_ts: bool) -> pd.DataFrame:
500
508
@property
501
509
def compression (self ) -> Compression :
502
510
"""
503
- Return the data compression format (if any).
504
- This is determined by inspecting the data.
511
+ Return the data compression format (if any). This is determined by
512
+ inspecting the data.
505
513
506
514
Returns
507
515
-------
@@ -525,8 +533,8 @@ def dataset(self) -> str:
525
533
@property
526
534
def end (self ) -> pd .Timestamp | None :
527
535
"""
528
- Return the query end for the data.
529
- If None, the end time was not known when the data was generated.
536
+ Return the query end for the data. If None, the end time was not known
537
+ when the data was generated.
530
538
531
539
Returns
532
540
-------
@@ -632,8 +640,7 @@ def reader(self) -> IO[bytes]:
632
640
@property
633
641
def schema (self ) -> Schema | None :
634
642
"""
635
- Return the DBN record schema.
636
- If None, may contain one or more schemas.
643
+ Return the DBN record schema. If None, may contain one or more schemas.
637
644
638
645
Returns
639
646
-------
@@ -664,8 +671,8 @@ def start(self) -> pd.Timestamp:
664
671
@property
665
672
def stype_in (self ) -> SType | None :
666
673
"""
667
- Return the query input symbology type for the data.
668
- If None, the records may contain mixed STypes.
674
+ Return the query input symbology type for the data. If None, the
675
+ records may contain mixed STypes.
669
676
670
677
Returns
671
678
-------
@@ -739,7 +746,9 @@ def from_file(cls, path: PathLike[str] | str) -> DBNStore:
739
746
Raises
740
747
------
741
748
FileNotFoundError
742
- If a empty or non-existant file is specified.
749
+ If a non-existant file is specified.
750
+ ValueError
751
+ If an empty file is specified.
743
752
744
753
"""
745
754
return cls (FileDataSource (path ))
@@ -760,8 +769,8 @@ def from_bytes(cls, data: BytesIO | bytes | IO[bytes]) -> DBNStore:
760
769
761
770
Raises
762
771
------
763
- FileNotFoundError
764
- If a empty or non-existant file is specified.
772
+ ValueError
773
+ If an empty buffer is specified.
765
774
766
775
"""
767
776
return cls (MemoryDataSource (data ))
@@ -941,7 +950,12 @@ def to_df(
941
950
raise ValueError ("a schema must be specified for mixed DBN data" )
942
951
schema = self .schema
943
952
944
- df = pd .DataFrame (self .to_ndarray (schema = schema ))
953
+ df = pd .DataFrame (
954
+ self .to_ndarray (schema ),
955
+ columns = COLUMNS [schema ],
956
+ )
957
+ df .set_index (self ._get_index_column (schema ), inplace = True )
958
+
945
959
df = self ._prepare_dataframe (df , schema )
946
960
947
961
if pretty_ts :
@@ -1049,12 +1063,10 @@ def to_ndarray(
1049
1063
self ,
1050
1064
)
1051
1065
1052
- result = []
1053
- for record in schema_records :
1054
- np_rec = np .frombuffer (
1055
- bytes (record ),
1056
- dtype = STRUCT_MAP [schema ],
1057
- )
1058
- result .append (np_rec [0 ])
1066
+ decoder = functools .partial (np .frombuffer , dtype = STRUCT_MAP [schema ])
1067
+ result = tuple (map (decoder , map (bytes , schema_records )))
1068
+
1069
+ if not result :
1070
+ return np .empty (shape = (0 , 1 ), dtype = STRUCT_MAP [schema ])
1059
1071
1060
- return np .asarray (result )
1072
+ return np .ravel (result )
0 commit comments