From 315d79118a522d080d91da43d6b6bbfd5d4726a1 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Fri, 15 Nov 2024 11:31:40 -0700 Subject: [PATCH] refactor: consistent pipeline table names and schemas (#587) * refactor: consistent pipeline table names and schemas * fix: address feedback, use bytes for state With this change we use bytes instead of string for the state column. Casting to a string is easy and straightforward. * fix clippy * Update pipeline/DESIGN.md Co-authored-by: Spencer T Brody * Update pipeline/DESIGN.md Co-authored-by: Spencer T Brody * fix: address review feedback * clarify tips * Update pipeline/DESIGN.md Co-authored-by: Spencer T Brody * Update pipeline/DESIGN.md Co-authored-by: Spencer T Brody * Update pipeline/DESIGN.md Co-authored-by: Spencer T Brody * Update pipeline/DESIGN.md Co-authored-by: Spencer T Brody * fix: rename chain_proofs to time_conclusions --------- Co-authored-by: Spencer T Brody --- arrow-test/src/lib.rs | 20 +- event-svc/src/event/service.rs | 4 +- event-svc/src/tests/event.rs | 48 +-- flight/tests/server.rs | 36 +-- pipeline/DESIGN.md | 243 +++++++++++++++ pipeline/src/aggregator/ceramic_patch.rs | 47 ++- pipeline/src/aggregator/mod.rs | 373 +++++++++++------------ pipeline/src/conclusion/event.rs | 26 +- pipeline/src/conclusion/table.rs | 4 +- pipeline/src/lib.rs | 42 +-- pipeline/src/schemas.rs | 23 +- 11 files changed, 553 insertions(+), 313 deletions(-) create mode 100644 pipeline/DESIGN.md diff --git a/arrow-test/src/lib.rs b/arrow-test/src/lib.rs index 0353c1b4..3676f02b 100644 --- a/arrow-test/src/lib.rs +++ b/arrow-test/src/lib.rs @@ -11,26 +11,25 @@ use datafusion::{ logical_expr::{col, expr::ScalarFunction, Cast, Expr, ScalarUDF}, }; -/// Applies various transformations on a record batch of conclusion_feed data to make it easier to +/// Applies various transformations on a record batch of conclusion_events data to make it easier to /// read. -/// Useful in conjuction with expect_test. -pub async fn pretty_feed_from_batch(batch: RecordBatch) -> Vec { +/// Useful in conjunction with expect_test. +pub async fn pretty_conclusion_events_from_batch(batch: RecordBatch) -> Vec { let ctx = SessionContext::new(); - ctx.register_batch("conclusion_feed", batch).unwrap(); + ctx.register_batch("conclusion_events", batch).unwrap(); - pretty_feed(ctx.table("conclusion_feed").await.unwrap()).await + pretty_conclusion_events(ctx.table("conclusion_events").await.unwrap()).await } -/// Applies various transformations on a dataframe of conclusion_feed data to make it easier to +/// Applies various transformations on a dataframe of conclusion_events data to make it easier to /// read. -/// Useful in conjuction with expect_test. -pub async fn pretty_feed(conclusion_feed: DataFrame) -> Vec { +/// Useful in conjunction with expect_test. +pub async fn pretty_conclusion_events(conclusion_events: DataFrame) -> Vec { let cid_string = Arc::new(ScalarUDF::from(CidString::new())); let cid_string_list = Arc::new(ScalarUDF::from(CidStringList::new())); - conclusion_feed + conclusion_events .select(vec![ col("index"), - col("event_type"), Expr::ScalarFunction(ScalarFunction::new_udf( cid_string.clone(), vec![col("stream_cid")], @@ -44,6 +43,7 @@ pub async fn pretty_feed(conclusion_feed: DataFrame) -> Vec { vec![col("event_cid")], )) .alias("event_cid"), + col("event_type"), Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"), Expr::ScalarFunction(ScalarFunction::new_udf( cid_string_list, diff --git a/event-svc/src/event/service.rs b/event-svc/src/event/service.rs index bd23caea..d2095666 100644 --- a/event-svc/src/event/service.rs +++ b/event-svc/src/event/service.rs @@ -490,7 +490,7 @@ impl EventService { #[serde(rename_all = "camelCase")] struct MIDDataContainer<'a> { metadata: BTreeMap, - data: Option<&'a Ipld>, + content: Option<&'a Ipld>, } impl<'a> MIDDataContainer<'a> { @@ -501,7 +501,7 @@ impl<'a> MIDDataContainer<'a> { BTreeMap::from([("shouldIndex".to_string(), should_index.into())]) }) .unwrap_or_default(), - data, + content: data, } } } diff --git a/event-svc/src/tests/event.rs b/event-svc/src/tests/event.rs index 04ae1cd2..25e43ad5 100644 --- a/event-svc/src/tests/event.rs +++ b/event-svc/src/tests/event.rs @@ -629,36 +629,36 @@ async fn test_conclusion_events_since() -> Result<(), Box let conclusion_events = service.conclusion_events_since(0, 6).await?; expect![[r#" - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------------+ - | index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------------+ - | 1 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | {"metadata":{"shouldIndex":true},"data":null} | [] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------------+ - | 2 | Data | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | {"metadata":{"shouldIndex":true},"data":null} | [] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------------+ - | 3 | Data | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerarwe3qynbdke6u3gckhnbwzpxbg6ehov262cq5pbwxsc63kqkhlma | {"metadata":{"shouldIndex":false},"data":{"stream2":"data_1"}} | [Cid(bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i)] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------------+ - | 4 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq | {"metadata":{},"data":{"stream_1":"data_1"}} | [Cid(bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q)] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------------+ - | 5 | Time | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreicor5mbc2ioqiub6dixpedkmjvp22dehayay3bfpe65zlcmlfw4ga | | [Cid(bagcqcerarwe3qynbdke6u3gckhnbwzpxbg6ehov262cq5pbwxsc63kqkhlma)] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------------+ - | 6 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqceraccgbaicjznz45ov4wgc3wnr62zqwba24sxzreqerlzdklidysfdq | {"metadata":{},"data":{"stream_1":"data_2"}} | [Cid(bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq)] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------------+ + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------------------------+----------------------------------------------------------------------+ + | index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------------------------+----------------------------------------------------------------------+ + | 1 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | {"metadata":{"shouldIndex":true},"content":null} | [] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------------------------+----------------------------------------------------------------------+ + | 2 | Data | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | {"metadata":{"shouldIndex":true},"content":null} | [] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------------------------+----------------------------------------------------------------------+ + | 3 | Data | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerarwe3qynbdke6u3gckhnbwzpxbg6ehov262cq5pbwxsc63kqkhlma | {"metadata":{"shouldIndex":false},"content":{"stream2":"data_1"}} | [Cid(bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------------------------+----------------------------------------------------------------------+ + | 4 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq | {"metadata":{},"content":{"stream_1":"data_1"}} | [Cid(bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------------------------+----------------------------------------------------------------------+ + | 5 | Time | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreicor5mbc2ioqiub6dixpedkmjvp22dehayay3bfpe65zlcmlfw4ga | | [Cid(bagcqcerarwe3qynbdke6u3gckhnbwzpxbg6ehov262cq5pbwxsc63kqkhlma)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------------------------+----------------------------------------------------------------------+ + | 6 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqceraccgbaicjznz45ov4wgc3wnr62zqwba24sxzreqerlzdklidysfdq | {"metadata":{},"content":{"stream_1":"data_2"}} | [Cid(bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------------------------+----------------------------------------------------------------------+ "#]].assert_eq(&events_to_table(&conclusion_events)); // Fetch conclusion events, with non zero watermark let conclusion_events = service.conclusion_events_since(3, 6).await?; expect![[r#" - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+----------------------------------------------+----------------------------------------------------------------------+ - | index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+----------------------------------------------+----------------------------------------------------------------------+ - | 4 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq | {"metadata":{},"data":{"stream_1":"data_1"}} | [Cid(bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q)] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+----------------------------------------------+----------------------------------------------------------------------+ - | 5 | Time | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreicor5mbc2ioqiub6dixpedkmjvp22dehayay3bfpe65zlcmlfw4ga | | [Cid(bagcqcerarwe3qynbdke6u3gckhnbwzpxbg6ehov262cq5pbwxsc63kqkhlma)] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+----------------------------------------------+----------------------------------------------------------------------+ - | 6 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqceraccgbaicjznz45ov4wgc3wnr62zqwba24sxzreqerlzdklidysfdq | {"metadata":{},"data":{"stream_1":"data_2"}} | [Cid(bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq)] | - +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+----------------------------------------------+----------------------------------------------------------------------+ + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------+----------------------------------------------------------------------+ + | index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------+----------------------------------------------------------------------+ + | 4 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq | {"metadata":{},"content":{"stream_1":"data_1"}} | [Cid(bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------+----------------------------------------------------------------------+ + | 5 | Time | bafyreihum3smvdc36yl2qnbl4gqv3nfxwdxj7v2zdozcrrqgmtc3zfhb7i | 2 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce01040171710b0009686d6f64656c2d7631", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bafyreicor5mbc2ioqiub6dixpedkmjvp22dehayay3bfpe65zlcmlfw4ga | | [Cid(bagcqcerarwe3qynbdke6u3gckhnbwzpxbg6ehov262cq5pbwxsc63kqkhlma)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------+----------------------------------------------------------------------+ + | 6 | Data | bagcqcerahx5i27vqxigdq3xulceu5qv6yzdvxzamfueubsyxam5kmjcpp45q | 3 | did:key:z6Mkk3rtfoKDMMG4zyarNGwCQs44GSQ49pcYKQspHJPXSnVw | [model: "ce0102015512204bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a", controller: "6469643a6b65793a7a364d6b6b337274666f4b444d4d47347a7961724e4777435173343447535134397063594b517370484a5058536e5677", context: ""] | bagcqceraccgbaicjznz45ov4wgc3wnr62zqwba24sxzreqerlzdklidysfdq | {"metadata":{},"content":{"stream_1":"data_2"}} | [Cid(bagcqcerakug4jvwbhisuo4zlhzkinwfca2dbcv63ea7jan27zlwhzpxyrleq)] | + +-------+------------+---------------------------------------------------------------+-------------+----------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------+----------------------------------------------------------------------+ "#]].assert_eq(&events_to_table(&conclusion_events)); Ok(()) diff --git a/flight/tests/server.rs b/flight/tests/server.rs index c6b8b07e..f85cea12 100644 --- a/flight/tests/server.rs +++ b/flight/tests/server.rs @@ -5,7 +5,7 @@ use arrow::{compute::concat_batches, util::pretty::pretty_format_batches}; use arrow_array::RecordBatch; use arrow_flight::{sql::client::FlightSqlServiceClient, FlightInfo}; use arrow_schema::Schema; -use ceramic_arrow_test::pretty_feed_from_batch; +use ceramic_arrow_test::pretty_conclusion_events_from_batch; use ceramic_flight::server::new_server; use ceramic_pipeline::{ ConclusionData, ConclusionEvent, ConclusionFeed, ConclusionInit, ConclusionTime, @@ -164,19 +164,19 @@ async fn test_simple() -> Result<()> { let mut client = start_server(feed).await; let info = client - .execute("SELECT * FROM conclusion_feed".to_string(), None) + .execute("SELECT * FROM conclusion_events".to_string(), None) .await?; let batch = execute_flight(&mut client, info).await?; - let batches = pretty_feed_from_batch(batch).await; + let batches = pretty_conclusion_events_from_batch(batch).await; let formatted = pretty_format_batches(&batches).unwrap().to_string(); expect![[r#" - +-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+ - | index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous | - +-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+ - | 0 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | | - | 1 | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | | [baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] | - | 2 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | [{"op":"replace", "path": "/a", "value":1}] | [baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq, baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] | - +-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+"#]].assert_eq(&formatted); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | previous | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"a":0} | | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | | [baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | 0 | [{"op":"replace", "path": "/a", "value":1}] | [baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq, baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+"#]].assert_eq(&formatted); Ok(()) } @@ -191,19 +191,19 @@ async fn test_push_down_predicate() -> Result<()> { let info = client .execute( - "SELECT * FROM conclusion_feed WHERE index > 42 LIMIT 2".to_string(), + "SELECT * FROM conclusion_events WHERE index > 42 LIMIT 2".to_string(), None, ) .await?; let batch = execute_flight(&mut client, info).await?; - let batches = pretty_feed_from_batch(batch).await; + let batches = pretty_conclusion_events_from_batch(batch).await; let formatted = pretty_format_batches(&batches).unwrap().to_string(); expect![[r#" - +-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------+---------------------------------------------------------------+ - | index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous | - +-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------+---------------------------------------------------------------+ - | 42 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | | - | 43 | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | | [baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] | - +-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------+---------------------------------------------------------------+"#]].assert_eq(&formatted); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------+---------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | previous | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------+---------------------------------------------------------------+ + | 42 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"a":0} | | + | 43 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | | [baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------+---------------------------------------------------------------+"#]].assert_eq(&formatted); Ok(()) } diff --git a/pipeline/DESIGN.md b/pipeline/DESIGN.md new file mode 100644 index 00000000..d25f4b68 --- /dev/null +++ b/pipeline/DESIGN.md @@ -0,0 +1,243 @@ +# Pipeline Architecture + +The pipeline architecture allows for defining features over Ceramic data as a series of transformations over that data. +What follows is a descriptions of each phase of the ceramic pipeline, the features it enables, and its dependencies. + +## Overview + +Data enters Ceramic via its API or is discovered over the network and is stored into the raw_events table. +From there various transformations are applied producing various intermediate tables. +Each table schema is considered public API and provides access to arbitrary queries against the data. + +```mermaid +graph LR; + raw_events; + streams; + time_conclusions; + conclusion_events; + model_schemas; + event_states; + stream_tips; + stream_states; + + raw_events --> conclusion_events; + streams --> conclusion_events; + time_conclusions --> conclusion_events; + conclusion_events --> event_states; + model_schemas --> event_states; + event_states --> stream_tips; + stream_tips --> stream_states; +``` + +## Tables + +### raw_events + +The raw_events table contains a row for each event in a stream and contains the raw CAR data of the event. + +#### Features + +* Access to the raw event data that can be used to validate signatures + +#### Schema + +| Column | Type | Description | +| ------ | ---- | ----------- | +| stream_cid | bytes | Cid of the stream | +| event_cid | bytes | Cid of the event | +| car | bytes | CAR data of the event | + +### streams + +The streams table contains a row for each stream and contains the dimensions and controller of the stream. + +#### Features + +* Access to the identifying information for streams + +#### Schema + +| Column | Type | Description | +| ------ | ---- | ----------- | +| stream_cid | bytes | Cid of the stream | +| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) | +| controller | string | Controller of the stream | +| dimensions | map(string,bytes) | Dimensions of the stream | + +### time_conclusions + +The time_conclusions table contains a row for each conclusion about a CID existing on chain. + +TBD how this table is populated and its schema. + +### conclusion_events + +The conclusion_events table contains a row for each event in a stream and represents a raw input event after various conclusions have been made. + +#### Features + +* Access to events allowing in order access to event within stream +* Access to pre-aggregated data for users building their own aggregation system +* Validation of the event signatures +* Validation of timestamp inclusion + +#### Schema + +| Column | Type | Description | +| ------ | ---- | ----------- | +| index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream | +| stream_cid | bytes | Cid of the stream | +| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) | +| controller | string | Controller of the stream | +| dimensions | map(string,bytes) | Set of key values dimension pairs of the stream | +| event_cid | bytes | Cid of the event | +| event_type | u8 | Type of the event, see [event type values](#event-types) | +| data | bytes | The event payload, content is stream type specific | +| previous | list(bytes) | Ordered list of CID previous to this event. Meaning of the order is stream type dependent | + + +#### Transformation + +Raw events are transformed into a flattened structure where stream dimensions and conclusions about data are added to the data. + +Conclusions include: + +* The event has a valid signature +* The dimensions and controller of the event +* The timestamp of the event + +This table joins the raw_events, time_conclusions, and streams tables in order to make the conclusions about the raw events. + +### model_schemas + +The model_schemas table contains a row for each model known to the node and contains the complete resolved schema of the model. + +TBD how this table is populated and its schema. +This table may be able to be combined with the streams table. Should we? + +### event_states + +The event_states table contains a row for each event in a stream and the state of the document at that point in the stream. + +#### Features + +* Access to the full history of states for a stream +* Validation of the model schema + +#### Schema + +| Column | Type | Description | +| ------ | ---- | ----------- | +| index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream | +| stream_cid | bytes | Cid of the stream | +| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) | +| controller | string | Controller of the stream | +| dimensions | map(string,bytes) | Set of key values dimension pairs of the stream | +| event_cid | bytes | Cid of the event | +| event_type | u8 | Type of the event, see [event type values](#event-types) | +| data | bytes | The event payload, content is stream type specific | + +#### Transformation + +This table computes the aggregated state for each conclusion event. +Additionally it validates the aggregated state matches the model schema of the stream. + +### stream_tips + +The stream_tips table contains a row for each tip of each stream representing the canonical state for each branch of the stream. +The tip represents the most recent event in each branch of the stream, where _recent_ is a stream type specific definition. + +#### Features + +* Access to the multiple tips of streams for users building their own conflict resolution + +#### Schema + +| Column | Type | Description | +| ------ | ---- | ----------- | +| index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream | +| stream_cid | bytes | Cid of the stream | +| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) | +| controller | string | Controller of the stream | +| dimensions | map(string,bytes) | Set of key values dimension pairs of the stream | +| event_cid | bytes | Cid of the event | +| event_type | u8 | Type of the event, see [event type values](#event-types) | +| data | bytes | The event payload, content is stream type specific | + +#### Transformation + +This table computes the aggregated state for each conclusion event. +Additionally it validates the aggregated state matches the model schema of the stream. + +### stream_states + +The stream_states table contains a row for each stream representing the canonical state of the stream. + +#### Features + +* Access to canonical state of streams for users relying on built in conflict resolution + +#### Schema + +| Column | Type | Description | +| ------ | ---- | ----------- | +| index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream | +| stream_cid | bytes | Cid of the stream | +| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) | +| controller | string | Controller of the stream | +| dimensions | map(string,bytes) | Set of key values dimension pairs of the stream | +| event_cid | bytes | Cid of the event | +| event_type | u8 | Type of the event, see [event type values](#event-types) | +| data | bytes | The event payload, content is stream type specific | + +#### Transformation + +Computes the singular tip that is the canonical state of the stream. + + +## Stream Types + +The pipeline process varies by stream type. +Each loadable stream type defines: + +* the content of the event payload, +* the rules for validating an event +* the rules for determining a canonical tip for a stream + +| Name | Code | Description | Specification | +| ---- | ---- | ----------- | ------------- | +| Tile | 0x00 | (Deprecated) A stream type representing a json document | https://cips.ceramic.network/CIPs/cip-8 | +| CAIP-10 Link | 0x01 | (Deprecated) Link blockchain accounts to DIDs | https://cips.ceramic.network/CIPs/cip-7 | +| Model | 0x02 | Defines a schema shared by group of documents in ComposeDB | https://github.com/ceramicnetwork/js-ceramic/tree/main/packages/stream-model | +| Model Instance Document | 0x03 | Represents a json document in ComposeDB | https://github.com/ceramicnetwork/js-ceramic/tree/main/packages/stream-model-instance | +| UNLOADABLE | 0x04 | A stream that is not meant to be loaded | https://github.com/ceramicnetwork/js-ceramic/blob/main/packages/stream-model/src/model.ts#L163-L165 | +| EventId | 0x05 | An event id encoded as a cip-124 EventID. Also unloadable | https://cips.ceramic.network/CIPs/cip-124 | + +Source https://cips.ceramic.network/CIPs/cip-59#streamid-multicodec + +### Model + +Model streams represent the definition of a composable model. + +The content of a model stream payload is a DAG-JSON description of its schema. +Models are immutable and updates to the stream are not supported. +Therefore the canonical tip for a model stream is always the init event. + +### Model Instance Document + +Model instance document streams represent an instance of a model defined via a model stream. +An instance must conform to the schema definition of the model. + +The content of a model instance document event payload is a DAG-JSON encoding of a JSON PATCH document. +The stream state is determined by applying the JSON patches in sequence. + +The canonical tip for a stream follows these rules: + +TODO(https://github.com/ceramicnetwork/rust-ceramic/issues/588) write out rules + +## Event Types + +| Name | Code | Description | +| ---- | ---- | ----------- | +| Data | 0x00 | An event containing data for the stream | +| Time | 0x01 | An event about the temporal status of the stream | diff --git a/pipeline/src/aggregator/ceramic_patch.rs b/pipeline/src/aggregator/ceramic_patch.rs index 4f888edd..449d9c84 100644 --- a/pipeline/src/aggregator/ceramic_patch.rs +++ b/pipeline/src/aggregator/ceramic_patch.rs @@ -1,14 +1,11 @@ use std::{collections::BTreeMap, sync::Arc}; use arrow::{ - array::{Array as _, ArrayBuilder as _, ArrayRef, StringBuilder}, + array::{Array as _, ArrayBuilder as _, ArrayRef, BinaryBuilder}, datatypes::DataType, }; use datafusion::{ - common::{ - cast::{as_binary_array, as_string_array}, - exec_datafusion_err, Result, - }, + common::{cast::as_binary_array, exec_datafusion_err, Result}, logical_expr::{ PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF, WindowUDFImpl, }, @@ -29,10 +26,14 @@ impl CeramicPatch { Self { signature: Signature::new( TypeSignature::Exact(vec![ + // Event CID + DataType::Binary, + // Previous CID + DataType::Binary, + // Previous State DataType::Binary, + // State/Patch DataType::Binary, - DataType::Utf8, - DataType::Utf8, ]), Volatility::Immutable, ), @@ -54,7 +55,7 @@ impl WindowUDFImpl for CeramicPatch { } fn return_type(&self, _arg_types: &[DataType]) -> Result { - Ok(DataType::Utf8) + Ok(DataType::Binary) } fn partition_evaluator(&self) -> Result> { @@ -72,7 +73,7 @@ impl WindowUDFImpl for CeramicPatch { #[serde(rename_all = "camelCase")] struct MIDDataContainer { metadata: BTreeMap, - data: D, + content: D, } type MIDDataContainerPatch = MIDDataContainer>; @@ -82,20 +83,19 @@ type MIDDataContainerState = MIDDataContainer; struct CeramicPatchEvaluator; impl CeramicPatchEvaluator { - fn apply_patch(patch: &str, previous_state: &str) -> Result { - let patch: MIDDataContainerPatch = serde_json::from_str(patch) + fn apply_patch(patch: &[u8], previous_state: &[u8]) -> Result> { + let patch: MIDDataContainerPatch = serde_json::from_slice(patch) .map_err(|err| exec_datafusion_err!("Error parsing patch: {err}"))?; - let mut state: MIDDataContainerState = serde_json::from_str(previous_state) + let mut state: MIDDataContainerState = serde_json::from_slice(previous_state) .map_err(|err| exec_datafusion_err!("Error parsing previous state: {err}"))?; // If the state is null use an empty object in order to apply the patch to a valid object. - if serde_json::Value::Null == state.data { - state.data = serde_json::Value::Object(serde_json::Map::default()); + if serde_json::Value::Null == state.content { + state.content = serde_json::Value::Object(serde_json::Map::default()); } state.metadata.extend(patch.metadata); - json_patch::patch(&mut state.data, &patch.data) + json_patch::patch(&mut state.content, &patch.content) .map_err(|err| exec_datafusion_err!("Error applying JSON patch: {err}"))?; - serde_json::to_string(&state) - .map_err(|err| exec_datafusion_err!("Error JSON encoding: {err}")) + serde_json::to_vec(&state).map_err(|err| exec_datafusion_err!("Error JSON encoding: {err}")) } } @@ -121,9 +121,9 @@ impl PartitionEvaluator for CeramicPatchEvaluator { fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result { let event_cids = as_binary_array(&values[0])?; let previous_cids = as_binary_array(&values[1])?; - let previous_states = as_string_array(&values[2])?; - let patches = as_string_array(&values[3])?; - let mut new_states = StringBuilder::new(); + let previous_states = as_binary_array(&values[2])?; + let patches = as_binary_array(&values[3])?; + let mut new_states = BinaryBuilder::new(); for i in 0..num_rows { if previous_cids.is_valid(i) { if let Some(previous_state) = if !previous_states.is_null(i) { @@ -150,7 +150,7 @@ impl PartitionEvaluator for CeramicPatchEvaluator { // So we need to copy the data to a new location before we can copy it back // into the new_states. #[allow(clippy::unnecessary_to_owned)] - new_states.append_value(previous_state.to_string()); + new_states.append_value(previous_state.to_owned()); } else { new_states.append_value(CeramicPatchEvaluator::apply_patch( patches.value(i), @@ -176,13 +176,12 @@ impl PartitionEvaluator for CeramicPatchEvaluator { } } -fn value_at(builder: &StringBuilder, idx: usize) -> &str { +fn value_at(builder: &BinaryBuilder, idx: usize) -> &[u8] { let start = builder.offsets_slice()[idx] as usize; let stop = if idx < builder.len() { builder.offsets_slice()[idx + 1] as usize } else { builder.values_slice().len() }; - std::str::from_utf8(&builder.values_slice()[start..stop]) - .expect("new states should always be valid utf8") + &builder.values_slice()[start..stop] } diff --git a/pipeline/src/aggregator/mod.rs b/pipeline/src/aggregator/mod.rs index 08f4f270..f0138fad 100644 --- a/pipeline/src/aggregator/mod.rs +++ b/pipeline/src/aggregator/mod.rs @@ -5,10 +5,7 @@ mod ceramic_patch; use anyhow::Context; -use arrow::{ - array::{RecordBatch, UInt64Array}, - datatypes::DataType, -}; +use arrow::array::{RecordBatch, UInt64Array}; use ceramic_patch::CeramicPatch; use datafusion::{ common::JoinType, @@ -18,19 +15,21 @@ use datafusion::{ functions_aggregate::min_max::max, functions_array::extract::array_element, logical_expr::{ - col, expr::WindowFunction, lit, Cast, Expr, ExprFunctionExt as _, LogicalPlanBuilder, + col, expr::WindowFunction, lit, Expr, ExprFunctionExt as _, LogicalPlanBuilder, WindowFunctionDefinition, }, physical_plan::collect_partitioned, - sql::TableReference, }; use std::{future::Future, sync::Arc}; use tracing::{debug, error, instrument, Level}; -use crate::{schemas, Result, DOC_STATE_MEM_TABLE, DOC_STATE_PERSISTENT_TABLE}; +use crate::{ + schemas, Result, CONCLUSION_EVENTS_TABLE, EVENT_STATES_MEM_TABLE, + EVENT_STATES_PERSISTENT_TABLE, EVENT_STATES_TABLE, +}; // Maximum number of rows to fetch per pass of the aggregator. -// Minimum number of rows to have proccessed before writing a batch to object store. +// Minimum number of rows to have processed before writing a batch to object store. const BATCH_SIZE: usize = 10_000; pub async fn run(ctx: SessionContext, shutdown_signal: impl Future) -> Result<()> { @@ -48,20 +47,20 @@ async fn run_continuous_stream( loop { tokio::select! { - _ = &mut shutdown_signal => { - debug!("Received shutdown signal, stopping continuous stream processing"); - break; - } - result = processor.process_batch(limit) => { - match result { - Ok(()) => { - // Batch processed successfully, continue to next iteration - continue; - } - Err(e) => { - error!("Error processing batch: {:?}", e); - return Err(e); - } + _ = &mut shutdown_signal => { + debug!("Received shutdown signal, stopping continuous stream processing"); + break; + } + result = processor.process_batch(limit) => { + match result { + Ok(()) => { + // Batch processed successfully, continue to next iteration + continue; + } + Err(err) => { + error!(%err, "error processing batch"); + return Err(err); + } } } } @@ -69,7 +68,7 @@ async fn run_continuous_stream( Ok(()) } -/// Represents a processor for continuous stream processing of conclusion feed data. +/// Represents a processor for continuous stream processing of conclusion event data. struct ContinuousStreamProcessor { ctx: SessionContext, last_processed_index: Option, @@ -78,7 +77,7 @@ struct ContinuousStreamProcessor { impl ContinuousStreamProcessor { async fn new(ctx: SessionContext) -> Result { let max_index = ctx - .table("doc_state") + .table(EVENT_STATES_TABLE) .await? .select_columns(&["index"])? .aggregate(vec![], vec![max(col("index"))])? @@ -103,28 +102,25 @@ impl ContinuousStreamProcessor { #[instrument(skip(self), ret ( level = Level::DEBUG ))] async fn process_batch(&mut self, limit: usize) -> Result<()> { - // Fetch the conclusion feed DataFrame - let mut conclusion_feed = self - .ctx - .table(TableReference::full("ceramic", "v0", "conclusion_feed")) - .await? - .select(vec![ - col("index"), - col("event_type"), - col("stream_cid"), - col("controller"), - col("conclusion_feed.event_cid"), - col("dimensions"), - Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"), - col("previous"), - ])?; + // Fetch the conclusion events DataFrame + let mut conclusion_events = self.ctx.table(CONCLUSION_EVENTS_TABLE).await?.select(vec![ + col("index"), + col("stream_cid"), + col("stream_type"), + col("controller"), + col("dimensions"), + col("event_cid"), + col("event_type"), + col("data"), + col("previous"), + ])?; if let Some(last_index) = self.last_processed_index { - conclusion_feed = conclusion_feed.filter(col("index").gt(lit(last_index)))?; + conclusion_events = conclusion_events.filter(col("index").gt(lit(last_index)))?; } - let batch = conclusion_feed.limit(0, Some(limit))?; + let batch = conclusion_events.limit(0, Some(limit))?; - // Caching the data frame to use it to caluclate the max index - // We need to cache it because we do 2 passes over the data frame, once for process feed batch and once for calculating the max index + // Caching the data frame to use it to calculate the max index + // We need to cache it because we do 2 passes over the data frame, once for process_conclusion_events_batch and once for calculating the max index // We are not using batch.cache() because this loses table name information let batch_plan = batch.clone().create_physical_plan().await?; let task_ctx = Arc::new(batch.task_ctx()); @@ -133,13 +129,13 @@ impl ContinuousStreamProcessor { let df = DataFrame::new( self.ctx.state(), LogicalPlanBuilder::scan( - "conclusion_feed", + CONCLUSION_EVENTS_TABLE, provider_as_source(Arc::new(cached_memtable)), None, )? .build()?, ); - process_feed_batch(self.ctx.clone(), df.clone(), limit).await?; + process_conclusion_events_batch(self.ctx.clone(), df.clone(), limit).await?; // Fetch the highest index from the cached DataFrame let highest_index = df @@ -160,97 +156,94 @@ impl ContinuousStreamProcessor { } } -// Process events from the conclusion feed, producing a new document state for each input event. -// The session context must have a registered a `doc_state` table with stream_cid, event_cid, and -// state columns. +// Process a batch of conclusion events, producing a new document state for each input event. +// The session context must have a registered a `event_states` table with appropriate schema. // -// The events in the conclusion feed must: -// * have stream_cid, event_cid, previous, and data columns, -// * have previous CIDs that either already exist in `doc_state` or be contained within the -// current conclusion_feed batch, -// * be valid JSON patch data documents. -// * use a qualified table name of `conclusion_feed`. -#[instrument(skip(ctx, conclusion_feed), ret ( level = Level::DEBUG ))] -async fn process_feed_batch( +// The events in the conclusion_events batch have a schema of the conclusion_events table. +#[instrument(skip(ctx, conclusion_events), ret ( level = Level::DEBUG ))] +async fn process_conclusion_events_batch( ctx: SessionContext, - conclusion_feed: DataFrame, + conclusion_events: DataFrame, max_cached_rows: usize, ) -> Result<()> { - let doc_state = ctx - .table("doc_state") + let event_states = ctx + .table(EVENT_STATES_TABLE) .await? - .select_columns(&["stream_cid", "event_cid", "state"]) - .context("reading doc_state")?; + .select_columns(&["stream_cid", "event_cid", "data"]) + .context("reading event_states")?; - conclusion_feed + conclusion_events // MID only ever use the first previous, so we can optimize the join by selecting the // first element of the previous array. .select(vec![ col("index"), col("event_type"), col("stream_cid"), + col("stream_type"), col("controller"), col("dimensions"), col("event_cid"), - Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"), + col("data"), array_element(col("previous"), lit(1)).alias("previous"), ])? .join_on( - doc_state, + event_states, JoinType::Left, - [col("previous").eq(col("doc_state.event_cid"))], + [col("previous").eq(col(EVENT_STATES_TABLE.to_string() + ".event_cid"))], )? .select(vec![ - col("conclusion_feed.index").alias("index"), - col("conclusion_feed.event_type").alias("event_type"), - col("conclusion_feed.stream_cid").alias("stream_cid"), - col("conclusion_feed.controller").alias("controller"), - col("conclusion_feed.dimensions").alias("dimensions"), - col("conclusion_feed.event_cid").alias("event_cid"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".index").alias("index"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".event_type").alias("event_type"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".stream_cid").alias("stream_cid"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".stream_type").alias("stream_type"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".controller").alias("controller"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".dimensions").alias("dimensions"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".event_cid").alias("event_cid"), col("previous"), - col("doc_state.state").alias("previous_state"), - col("data"), + col(EVENT_STATES_TABLE.to_string() + ".data").alias("previous_data"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".data").alias("data"), ])? .window(vec![Expr::WindowFunction(WindowFunction::new( WindowFunctionDefinition::WindowUDF(Arc::new(CeramicPatch::new_udwf())), vec![ col("event_cid"), col("previous"), - col("previous_state"), + col("previous_data"), col("data"), ], )) .partition_by(vec![col("stream_cid")]) .order_by(vec![col("index").sort(true, true)]) .build()? - .alias("new_state")])? - // Rename columns to match doc_state table schema + .alias("new_data")])? + // Rename columns to match event_states table schema .select_columns(&[ "index", "stream_cid", - "event_type", + "stream_type", "controller", "dimensions", "event_cid", - "new_state", + "event_type", + "new_data", ])? - .with_column_renamed("new_state", "state")? - // Write states to the in memory doc_state table - .write_table(DOC_STATE_MEM_TABLE, DataFrameWriteOptions::new()) + .with_column_renamed("new_data", "data")? + // Write states to the in memory event_states table + .write_table(EVENT_STATES_MEM_TABLE, DataFrameWriteOptions::new()) .await - .context("computing states")?; + .context("computing data")?; - let count = ctx.table(DOC_STATE_MEM_TABLE).await?.count().await?; + let count = ctx.table(EVENT_STATES_MEM_TABLE).await?.count().await?; // If we have enough data cached in memory write it out to persistent store if count >= max_cached_rows { - ctx.table(DOC_STATE_MEM_TABLE) + ctx.table(EVENT_STATES_MEM_TABLE) .await? - .write_table(DOC_STATE_PERSISTENT_TABLE, DataFrameWriteOptions::new()) + .write_table(EVENT_STATES_PERSISTENT_TABLE, DataFrameWriteOptions::new()) .await?; // Clear all data in the memory batch, by writing an empty batch - ctx.read_batch(RecordBatch::new_empty(schemas::doc_state()))? + ctx.read_batch(RecordBatch::new_empty(schemas::event_states()))? .write_table( - DOC_STATE_MEM_TABLE, + EVENT_STATES_MEM_TABLE, DataFrameWriteOptions::new().with_overwrite(true), ) .await?; @@ -265,11 +258,12 @@ mod tests { use super::*; use arrow::{array::RecordBatch, util::pretty::pretty_format_batches}; + use arrow_schema::DataType; use ceramic_core::StreamIdType; use cid::Cid; use datafusion::{ datasource::{provider_as_source, MemTable}, - logical_expr::{expr::ScalarFunction, LogicalPlanBuilder, ScalarUDF}, + logical_expr::{cast, expr::ScalarFunction, LogicalPlanBuilder, ScalarUDF}, }; use expect_test::{expect, Expect}; use object_store::memory::InMemory; @@ -279,11 +273,11 @@ mod tests { use crate::{ cid_string::CidString, conclusion_events_to_record_batch, schemas, session_from_config, tests::MockConclusionFeed, ConclusionData, ConclusionEvent, ConclusionFeedSource, - ConclusionInit, ConclusionTime, Config, CONCLUSION_FEED_TABLE, + ConclusionInit, ConclusionTime, Config, CONCLUSION_EVENTS_TABLE, }; - async fn do_test(conclusion_feed: RecordBatch) -> anyhow::Result { - do_pass(init_ctx().await?, conclusion_feed, 1_000).await + async fn do_test(conclusion_events: RecordBatch) -> anyhow::Result { + do_pass(init_ctx().await?, conclusion_events, 1_000).await } async fn init_ctx() -> anyhow::Result { @@ -295,10 +289,10 @@ mod tests { .await } - async fn init_ctx_cont(conclusion_feed: RecordBatch) -> anyhow::Result { + async fn init_ctx_cont(conclusion_events: RecordBatch) -> anyhow::Result { session_from_config(Config { conclusion_feed: ConclusionFeedSource::::InMemory( - MemTable::try_new(schemas::conclusion_feed(), vec![vec![conclusion_feed]])?, + MemTable::try_new(schemas::conclusion_events(), vec![vec![conclusion_events]])?, ), object_store_bucket_name: "test_bucket".to_string(), object_store: Arc::new(InMemory::new()), @@ -308,24 +302,25 @@ mod tests { async fn do_pass( ctx: SessionContext, - conclusion_feed: RecordBatch, + conclusion_events: RecordBatch, max_cached_rows: usize, ) -> anyhow::Result { - // Setup conclusion_feed table from RecordBatch - let provider = MemTable::try_new(conclusion_feed.schema(), vec![vec![conclusion_feed]])?; - let conclusion_feed = DataFrame::new( + // Setup conclusion_events table from RecordBatch + let provider = + MemTable::try_new(conclusion_events.schema(), vec![vec![conclusion_events]])?; + let conclusion_events = DataFrame::new( ctx.state(), LogicalPlanBuilder::scan( - CONCLUSION_FEED_TABLE, + CONCLUSION_EVENTS_TABLE, provider_as_source(Arc::new(provider)), None, )? .build()?, ); - process_feed_batch(ctx.clone(), conclusion_feed, max_cached_rows).await?; + process_conclusion_events_batch(ctx.clone(), conclusion_events, max_cached_rows).await?; let cid_string = Arc::new(ScalarUDF::from(CidString::new())); - let doc_state = ctx - .table("doc_state") + let event_states = ctx + .table(EVENT_STATES_TABLE) .await? .select(vec![ col("index"), @@ -334,24 +329,25 @@ mod tests { vec![col("stream_cid")], )) .alias("stream_cid"), - col("event_type"), + col("stream_type"), col("controller"), col("dimensions"), Expr::ScalarFunction(ScalarFunction::new_udf(cid_string, vec![col("event_cid")])) .alias("event_cid"), - col("state"), + col("event_type"), + cast(col("data"), DataType::Utf8).alias("data"), ])? .sort(vec![col("index").sort(true, true)])? .collect() .await?; - Ok(pretty_format_batches(&doc_state)?) + Ok(pretty_format_batches(&event_states)?) } async fn do_run_continuous( - conclusion_feed: RecordBatch, + conclusion_events: RecordBatch, expected_batches: Expect, ) -> anyhow::Result<()> { - let ctx = init_ctx_cont(conclusion_feed).await?; + let ctx = init_ctx_cont(conclusion_events).await?; let (shutdown_signal_tx, shutdown_signal_rx) = oneshot::channel::<()>(); let handle = tokio::spawn(run_continuous_stream( ctx.clone(), @@ -366,8 +362,8 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; retries -= 1; let cid_string = Arc::new(ScalarUDF::from(CidString::new())); - let doc_state = ctx - .table("doc_state") + let event_states = ctx + .table(EVENT_STATES_TABLE) .await? .select(vec![ col("index"), @@ -376,20 +372,21 @@ mod tests { vec![col("stream_cid")], )) .alias("stream_cid"), - col("event_type"), + col("stream_type"), col("controller"), col("dimensions"), Expr::ScalarFunction(ScalarFunction::new_udf( cid_string, - vec![col("doc_state.event_cid")], + vec![col(EVENT_STATES_TABLE.to_string() + ".event_cid")], )) .alias("event_cid"), - col("state"), + col("event_type"), + cast(col("data"), DataType::Utf8).alias("data"), ])? .sort(vec![col("index").sort(true, true)])? .collect() .await?; - batches = pretty_format_batches(&doc_state)?.to_string(); + batches = pretty_format_batches(&event_states)?.to_string(); if expected_batches.data() == batches { break; } @@ -401,7 +398,7 @@ mod tests { #[tokio::test] async fn single_init_event() -> anyhow::Result<()> { - let doc_state = do_test(conclusion_events_to_record_batch(&[ + let event_states = do_test(conclusion_events_to_record_batch(&[ ConclusionEvent::Data(ConclusionData { index: 0, event_cid: Cid::from_str( @@ -419,21 +416,21 @@ mod tests { ], }, previous: vec![], - data: r#"{"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}}"#.into(), + data: r#"{"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}}"#.into(), }), ])?) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); Ok(()) } #[test(tokio::test)] async fn multiple_data_events() -> anyhow::Result<()> { - let doc_state = do_test(conclusion_events_to_record_batch(&[ + let event_states = do_test(conclusion_events_to_record_batch(&[ ConclusionEvent::Data(ConclusionData { index: 1, event_cid: Cid::from_str( @@ -451,7 +448,7 @@ mod tests { ], }, previous: vec![], - data: r#"{"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}}"#.into(), + data: r#"{"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}}"#.into(), }), ConclusionEvent::Data(ConclusionData { index: 2, @@ -473,24 +470,24 @@ mod tests { "baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi", )?], data: - r#"{"metadata":{"foo":2},"data":[{"op":"replace", "path": "/a", "value":1}]}"# + r#"{"metadata":{"foo":2},"content":[{"op":"replace", "path": "/a", "value":1}]}"# .into(), }), ])?) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"metadata":{"foo":2,"shouldIndex":true},"data":{"a":1}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+ + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | 0 | {"metadata":{"foo":2,"shouldIndex":true},"content":{"a":1}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); Ok(()) } #[test(tokio::test)] async fn multiple_data_and_time_events() -> anyhow::Result<()> { - let doc_state = do_test(conclusion_events_to_record_batch(&[ + let event_states = do_test(conclusion_events_to_record_batch(&[ ConclusionEvent::Data(ConclusionData { index: 0, event_cid: Cid::from_str( @@ -508,7 +505,7 @@ mod tests { ], }, previous: vec![], - data: r#"{"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}}"#.into(), + data: r#"{"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}}"#.into(), }), ConclusionEvent::Time(ConclusionTime { index: 1, @@ -549,27 +546,27 @@ mod tests { previous: vec![Cid::from_str( "baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq", )?], - data: r#"{"metadata":{"shouldIndex":false},"data":[{"op":"replace", "path": "/a", "value":1}]}"#.into(), + data: r#"{"metadata":{"shouldIndex":false},"content":[{"op":"replace", "path": "/a", "value":1}]}"#.into(), }), ])?) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 1 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":1}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | 0 | {"metadata":{"foo":1,"shouldIndex":false},"content":{"a":1}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); Ok(()) } #[test(tokio::test)] async fn multiple_single_event_passes() -> anyhow::Result<()> { // Test multiple passes where a single event for the stream is present in the conclusion - // feed for each pass. + // events for each pass. let ctx = init_ctx().await?; - let doc_state = do_pass( + let event_states = do_pass( ctx.clone(), conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData { index: 0, @@ -588,18 +585,18 @@ mod tests { ], }, previous: vec![], - data: r#"{"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}}"#.into(), + data: r#"{"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}}"#.into(), })])?, 1_000, ) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); - let doc_state = do_pass( + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); + let event_states = do_pass( ctx.clone(), conclusion_events_to_record_batch(&[ConclusionEvent::Time(ConclusionTime { index: 1, @@ -625,13 +622,13 @@ mod tests { ) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 1 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); - let doc_state = do_pass( + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); + let event_states = do_pass( ctx, conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData { index: 2, @@ -652,25 +649,25 @@ mod tests { previous: vec![Cid::from_str( "baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq", )?], - data: r#"{"metadata":{"shouldIndex":false},"data":[{"op":"replace", "path": "/a", "value":1}]}"#.into(), + data: r#"{"metadata":{"shouldIndex":false},"content":[{"op":"replace", "path": "/a", "value":1}]}"#.into(), })])?, 1_000, ) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 1 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":1}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | 0 | {"metadata":{"foo":1,"shouldIndex":false},"content":{"a":1}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); Ok(()) } #[test(tokio::test)] async fn multiple_passes() -> anyhow::Result<()> { let ctx = init_ctx().await?; - let doc_state = do_pass( + let event_states = do_pass( ctx.clone(), conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData { index: 0, @@ -689,18 +686,18 @@ mod tests { ], }, previous: vec![], - data: r#"{"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}}"#.into(), + data: r#"{"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}}"#.into(), })])?, 1_000, ) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); - let doc_state = do_pass( + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); + let event_states = do_pass( ctx.clone(), conclusion_events_to_record_batch(&[ ConclusionEvent::Time(ConclusionTime { @@ -742,20 +739,20 @@ mod tests { previous: vec![Cid::from_str( "baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq", )?], - data: r#"{"metadata":{"shouldIndex":false},"data":[{"op":"replace", "path": "/a", "value":1}]}"#.into(), + data: r#"{"metadata":{"shouldIndex":false},"content":[{"op":"replace", "path": "/a", "value":1}]}"#.into(), }), ])?, 1_000, ) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 1 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":1}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | 0 | {"metadata":{"foo":1,"shouldIndex":false},"content":{"a":1}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); Ok(()) } #[test(tokio::test)] @@ -778,7 +775,7 @@ mod tests { ], }, previous: vec![], - data: r#"{"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}}"#.into(), + data: r#"{"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}}"#.into(), }), ConclusionEvent::Data(ConclusionData { index: 2, @@ -799,7 +796,7 @@ mod tests { previous: vec![Cid::from_str( "baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi", )?], - data: r#"{"metadata":{"shouldIndex":false},"data":[{"op":"replace", "path": "/a", "value":1}]}"#.into(), + data: r#"{"metadata":{"shouldIndex":false},"content":[{"op":"replace", "path": "/a", "value":1}]}"#.into(), }), ConclusionEvent::Data(ConclusionData { index: 3, @@ -820,17 +817,17 @@ mod tests { previous: vec![Cid::from_str( "baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du", )?], - data: r#"{"metadata":{},"data":[{"op":"replace", "path": "/a", "value":2}]}"#.into(), + data: r#"{"metadata":{},"content":[{"op":"replace", "path": "/a", "value":2}]}"#.into(), }), ])?, expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":1}} | - | 3 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeic2caccyfigwadnncpyko7hcdbr66dkf4jyzeoh4dbhfebp77hchu | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":2}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+"#]], + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+ + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | 0 | {"metadata":{"foo":1,"shouldIndex":false},"content":{"a":1}} | + | 3 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeic2caccyfigwadnncpyko7hcdbr66dkf4jyzeoh4dbhfebp77hchu | 0 | {"metadata":{"foo":1,"shouldIndex":false},"content":{"a":2}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+"#]], ) .await } diff --git a/pipeline/src/conclusion/event.rs b/pipeline/src/conclusion/event.rs index a0ab96de..3ab2d5f3 100644 --- a/pipeline/src/conclusion/event.rs +++ b/pipeline/src/conclusion/event.rs @@ -70,7 +70,7 @@ pub struct ConclusionData { pub init: ConclusionInit, /// Ordered list of previous events this event references. pub previous: Vec, - /// Raw bytes of the event data encoded as dag-json. + /// Raw bytes of the event payload. pub data: Vec, } @@ -198,7 +198,6 @@ impl Default for ConclusionEventBuilder { impl ConclusionEventBuilder { fn append(&mut self, event: &ConclusionEvent) { - self.event_type.append_value(event.event_type_as_int()); let init = match event { ConclusionEvent::Data(data_event) => { self.event_cid.append_value(data_event.event_cid.to_bytes()); @@ -221,6 +220,7 @@ impl ConclusionEventBuilder { &time_event.init } }; + self.event_type.append_value(event.event_type_as_int()); self.stream_cid.append_value(init.stream_cid.to_bytes()); self.controller.append_value(&init.controller); self.stream_type.append_value(init.stream_type); @@ -234,7 +234,6 @@ impl ConclusionEventBuilder { fn finish(&mut self) -> StructArray { StructArray::try_from(vec![ ("index", Arc::new(self.index.finish()) as ArrayRef), - ("event_type", Arc::new(self.event_type.finish()) as ArrayRef), ("stream_cid", Arc::new(self.stream_cid.finish()) as ArrayRef), ( "stream_type", @@ -243,6 +242,7 @@ impl ConclusionEventBuilder { ("controller", Arc::new(self.controller.finish()) as ArrayRef), ("dimensions", Arc::new(self.dimensions.finish()) as ArrayRef), ("event_cid", Arc::new(self.event_cid.finish()) as ArrayRef), + ("event_type", Arc::new(self.event_type.finish()) as ArrayRef), ("data", Arc::new(self.data.finish()) as ArrayRef), ("previous", Arc::new(self.previous.finish()) as ArrayRef), ]) @@ -335,7 +335,7 @@ mod tests { use std::str::FromStr; use arrow::util::pretty::pretty_format_batches; - use ceramic_arrow_test::pretty_feed_from_batch; + use ceramic_arrow_test::pretty_conclusion_events_from_batch; use ceramic_event::StreamIdType; use cid::Cid; use expect_test::expect; @@ -452,18 +452,18 @@ mod tests { ]; // Convert events to RecordBatch let record_batch = conclusion_events_to_record_batch(&events).unwrap(); - let record_batch = pretty_feed_from_batch(record_batch).await; + let record_batch = pretty_conclusion_events_from_batch(record_batch).await; let formatted = pretty_format_batches(&record_batch).unwrap().to_string(); // Use expect_test to validate the output expect![[r#" - +-------+------------+-------------------------------------------------------------+-------------+---------------+-------------------------------------------------------------+-------------------------------------------------------------+------+----------------------------------------------------------------------------------------------------------------------------+ - | index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous | - +-------+------------+-------------------------------------------------------------+-------------+---------------+-------------------------------------------------------------+-------------------------------------------------------------+------+----------------------------------------------------------------------------------------------------------------------------+ - | 0 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 123 | | - | 1 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeid2w5pgdsdh25nah7batmhxanbj3x2w2is3atser7qxboyojv236q | 456 | [baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu] | - | 2 | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeidtub3bnbojbickf6d4pqscaw6xpt5ksgido7kcsg2jyftaj237di | | [baeabeid2w5pgdsdh25nah7batmhxanbj3x2w2is3atser7qxboyojv236q] | - | 3 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeiewqcj4bwhcssizv5kcyvsvm57bxghjpqshnbzkc6rijmwb4im4yq | 789 | [baeabeidtub3bnbojbickf6d4pqscaw6xpt5ksgido7kcsg2jyftaj237di, baeabeid2w5pgdsdh25nah7batmhxanbj3x2w2is3atser7qxboyojv236q] | - +-------+------------+-------------------------------------------------------------+-------------+---------------+-------------------------------------------------------------+-------------------------------------------------------------+------+----------------------------------------------------------------------------------------------------------------------------+"#]].assert_eq(&formatted); + +-------+-------------------------------------------------------------+-------------+---------------+-------------------------------------------------------------+-------------------------------------------------------------+------------+------+----------------------------------------------------------------------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | previous | + +-------+-------------------------------------------------------------+-------------+---------------+-------------------------------------------------------------+-------------------------------------------------------------+------------+------+----------------------------------------------------------------------------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | 123 | | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeid2w5pgdsdh25nah7batmhxanbj3x2w2is3atser7qxboyojv236q | 0 | 456 | [baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu] | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeidtub3bnbojbickf6d4pqscaw6xpt5ksgido7kcsg2jyftaj237di | 1 | | [baeabeid2w5pgdsdh25nah7batmhxanbj3x2w2is3atser7qxboyojv236q] | + | 3 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeiewqcj4bwhcssizv5kcyvsvm57bxghjpqshnbzkc6rijmwb4im4yq | 0 | 789 | [baeabeidtub3bnbojbickf6d4pqscaw6xpt5ksgido7kcsg2jyftaj237di, baeabeid2w5pgdsdh25nah7batmhxanbj3x2w2is3atser7qxboyojv236q] | + +-------+-------------------------------------------------------------+-------------+---------------+-------------------------------------------------------------+-------------------------------------------------------------+------------+------+----------------------------------------------------------------------------------------------------------------------------+"#]].assert_eq(&formatted); } } diff --git a/pipeline/src/conclusion/table.rs b/pipeline/src/conclusion/table.rs index b29fb49d..c08badd8 100644 --- a/pipeline/src/conclusion/table.rs +++ b/pipeline/src/conclusion/table.rs @@ -16,7 +16,7 @@ use datafusion::{ use futures::TryStreamExt as _; use tracing::{instrument, Level}; -use crate::{conclusion::conclusion_events_to_record_batch, schemas::conclusion_feed}; +use crate::{conclusion::conclusion_events_to_record_batch, schemas::conclusion_events}; use super::ConclusionEvent; @@ -55,7 +55,7 @@ impl FeedTable { pub fn new(feed: Arc) -> Self { Self { feed, - schema: conclusion_feed(), + schema: conclusion_events(), } } fn highwater_mark_from_expr(expr: &Expr) -> Option { diff --git a/pipeline/src/lib.rs b/pipeline/src/lib.rs index 50a31589..05db30ca 100644 --- a/pipeline/src/lib.rs +++ b/pipeline/src/lib.rs @@ -26,7 +26,7 @@ use datafusion::{ functions_aggregate::first_last::LastValue, logical_expr::{col, AggregateUDF, ScalarUDF}, }; -use schemas::doc_state; +use schemas::event_states; use url::Url; use cid_string::{CidString, CidStringList}; @@ -37,10 +37,10 @@ pub use conclusion::{ }; pub use config::{ConclusionFeedSource, Config}; -pub const CONCLUSION_FEED_TABLE: &str = "ceramic.v0.conclusion_feed"; -pub const DOC_STATE_TABLE: &str = "ceramic.v0.doc_state"; -pub const DOC_STATE_MEM_TABLE: &str = "ceramic._internal.doc_state_mem"; -pub const DOC_STATE_PERSISTENT_TABLE: &str = "ceramic._internal.doc_state_persistent"; +pub const CONCLUSION_EVENTS_TABLE: &str = "ceramic.v0.conclusion_events"; +pub const EVENT_STATES_TABLE: &str = "ceramic.v0.event_states"; +pub const EVENT_STATES_MEM_TABLE: &str = "ceramic._internal.event_states_mem"; +pub const EVENT_STATES_PERSISTENT_TABLE: &str = "ceramic._internal.event_states_persistent"; /// Constructs a [`SessionContext`] configured with all tables in the pipeline. pub async fn session_from_config( @@ -56,17 +56,17 @@ pub async fn session_from_config( match config.conclusion_feed { ConclusionFeedSource::Direct(conclusion_feed) => { ctx.register_table( - CONCLUSION_FEED_TABLE, + CONCLUSION_EVENTS_TABLE, Arc::new(conclusion::FeedTable::new(conclusion_feed)), )?; } #[cfg(test)] ConclusionFeedSource::InMemory(table) => { assert_eq!( - schemas::conclusion_feed(), + schemas::conclusion_events(), datafusion::catalog::TableProvider::schema(&table) ); - ctx.register_table("conclusion_feed", Arc::new(table))?; + ctx.register_table(CONCLUSION_EVENTS_TABLE, Arc::new(table))?; } }; // Register the _internal schema @@ -87,39 +87,39 @@ pub async fn session_from_config( url.set_host(Some(&config.object_store_bucket_name))?; ctx.register_object_store(&url, config.object_store); - // Configure doc_state listing table + // Configure event_states listing table let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension(".parquet") .with_file_sort_order(vec![vec![col("index").sort(true, true)]]); - // Set the path within the bucket for the doc_state table - let doc_state_object_store_path = DOC_STATE_TABLE.replace('.', "/") + "/"; - url.set_path(&doc_state_object_store_path); - // Register doc_state_persistent as a listing table + // Set the path within the bucket for the event_states table + let event_states_object_store_path = EVENT_STATES_TABLE.replace('.', "/") + "/"; + url.set_path(&event_states_object_store_path); + // Register event_states_persistent as a listing table ctx.register_table( - DOC_STATE_PERSISTENT_TABLE, + EVENT_STATES_PERSISTENT_TABLE, Arc::new(ListingTable::try_new( ListingTableConfig::new(ListingTableUrl::parse(url)?) .with_listing_options(listing_options) - .with_schema(schemas::doc_state()), + .with_schema(schemas::event_states()), )?), )?; ctx.register_table( - DOC_STATE_MEM_TABLE, + EVENT_STATES_MEM_TABLE, Arc::new(CacheTable::try_new( - doc_state(), - vec![vec![RecordBatch::new_empty(doc_state())]], + event_states(), + vec![vec![RecordBatch::new_empty(event_states())]], )?), )?; ctx.register_table( - DOC_STATE_TABLE, - ctx.table(DOC_STATE_MEM_TABLE) + EVENT_STATES_TABLE, + ctx.table(EVENT_STATES_MEM_TABLE) .await? - .union(ctx.table(DOC_STATE_PERSISTENT_TABLE).await?)? + .union(ctx.table(EVENT_STATES_PERSISTENT_TABLE).await?)? .into_view(), )?; diff --git a/pipeline/src/schemas.rs b/pipeline/src/schemas.rs index 28fa6453..da23c50d 100644 --- a/pipeline/src/schemas.rs +++ b/pipeline/src/schemas.rs @@ -3,17 +3,16 @@ use std::sync::{Arc, OnceLock}; use datafusion::arrow::datatypes::{DataType, Field, Fields, SchemaBuilder, SchemaRef}; -static CONCLUSION_FEED: OnceLock = OnceLock::new(); -static DOC_STATE: OnceLock = OnceLock::new(); +static CONCLUSION_EVENTS: OnceLock = OnceLock::new(); +static EVENT_STATES: OnceLock = OnceLock::new(); -/// The `conclusion_feed` table contains the raw events annotated with conclcusions about each +/// The `conclusion_events` table contains the raw events annotated with conclusions about each /// event. -pub fn conclusion_feed() -> SchemaRef { - Arc::clone(CONCLUSION_FEED.get_or_init(|| { +pub fn conclusion_events() -> SchemaRef { + Arc::clone(CONCLUSION_EVENTS.get_or_init(|| { Arc::new( SchemaBuilder::from(&Fields::from(vec![ Field::new("index", DataType::UInt64, false), - Field::new("event_type", DataType::UInt8, false), Field::new("stream_cid", DataType::Binary, false), Field::new("stream_type", DataType::UInt8, false), Field::new("controller", DataType::Utf8, false), @@ -46,6 +45,7 @@ pub fn conclusion_feed() -> SchemaRef { true, ), Field::new("event_cid", DataType::Binary, false), + Field::new("event_type", DataType::UInt8, false), Field::new("data", DataType::Binary, true), Field::new( "previous", @@ -58,14 +58,14 @@ pub fn conclusion_feed() -> SchemaRef { })) } -/// The `doc_state` table contains the aggregated state for each event for each stream. -pub fn doc_state() -> SchemaRef { - Arc::clone(DOC_STATE.get_or_init(|| { +/// The `event_states` table contains the aggregated state for each event for each stream. +pub fn event_states() -> SchemaRef { + Arc::clone(EVENT_STATES.get_or_init(|| { Arc::new( SchemaBuilder::from(&Fields::from([ Arc::new(Field::new("index", DataType::UInt64, false)), Arc::new(Field::new("stream_cid", DataType::Binary, false)), - Arc::new(Field::new("event_type", DataType::UInt8, false)), + Arc::new(Field::new("stream_type", DataType::UInt8, false)), Arc::new(Field::new("controller", DataType::Utf8, false)), Arc::new(Field::new( "dimensions", @@ -94,7 +94,8 @@ pub fn doc_state() -> SchemaRef { true, )), Arc::new(Field::new("event_cid", DataType::Binary, false)), - Arc::new(Field::new("state", DataType::Utf8, true)), + Arc::new(Field::new("event_type", DataType::UInt8, false)), + Arc::new(Field::new("data", DataType::Binary, true)), ])) .finish(), )