-
Notifications
You must be signed in to change notification settings - Fork 72
IO with Adapters
In First Steps and More with CSP we used toy data for the streaming workflows. In real workflows, we need to access data stored in particular storage formats. To bring data into or out of a csp graph, we use adapters.
csp has several built-in adapters to access certain types of data such as Kafka and Parquet. You can also write your own adapters for any other data format; for reference, see the various "How-to" guides for historical, real-time and output adapters. I/O adapters form the interface between external data and the time series format used in csp.
In this tutorial, you write to and read from Parquet files on the local file system.
csp has the ParquetWriter and ParquetReader adapters to stream data to and from Parquet files. Check out the complete API in the Reference documentation.
Important
csp graphs can process historical and real-time data with little to no changes in the application code.
A csp.Struct is a basic form of structured data in csp where each field can be accessed as its own time series. It is analogous to a dataclass in Python, and its fields must be type annotated. We will store some example data in a custom struct called Example and then stream the struct to a Parquet file.
from csp.adapters.parquet import ParquetOutputConfig, ParquetWriter, ParquetReader
class Example(csp.Struct):
int_val: int
float_val: floatIn a graph, create some sample values for Example and use ParquetWriter to stream to a Parquet file.
- The
timestamp_column_nameis howcsppreserves the timestamps on each event. If the timestamp information is not required, you can set the column name argument toNone. - You can provide optional configurations in the
ParquetOutputConfigformat which can setallow_overwrite,batch_size,compression, andwrite_arrow_binary. - We use
publish_structto publish (write) the time series data to disk.
@csp.graph
def write_struct(file_name: str):
st = datetime(2020, 1, 1)
curve = csp.curve(
Example,
[
(st + timedelta(seconds=1), Example(int_val=1, float_val=1.0)),
(st + timedelta(seconds=2), Example(int_val=2, float_val=2.0)),
(st + timedelta(seconds=3), Example(int_val=3, float_val=3.0)),
],
)
writer = ParquetWriter(
file_name=file_name, timestamp_column_name="csp_time", config=ParquetOutputConfig(allow_overwrite=True)
)
writer.publish_struct(curve)You can use ParquetReader with a time_column to read back the data.
@csp.graph
def read_struct(file_name: str):
struct_reader = ParquetReader(file_name, time_column="csp_time")
struct_all = struct_reader.subscribe_all(Example)
csp.print("struct_all", struct_all)Go through the complete example at examples/03_using_adapters/parquet/e1_parquet_write_read.py and check out the the API reference for more details.
This wiki is autogenerated. To made updates, open a PR against the original source file in docs/wiki.
Get Started (Tutorials)
Concepts
- CSP Node
- CSP Graph
- Historical Buffers
- Execution Modes
- Adapters
- Feedback and Delayed Edge
- Common Mistakes
How-to guides
- Use Statistical Nodes
- Create Dynamic Baskets
- Write Adapters:
- Profile CSP Code
References
- API Reference
- Glossary of Terms
- Examples
Developer Guide