From d7db1d238738a991e7fd7ec37783699cec7af951 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Fri, 5 Jul 2024 21:19:54 -0300 Subject: [PATCH 1/8] fix: use atomic operation to write cursor (#798) --- src/utils/cursor.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/utils/cursor.rs b/src/utils/cursor.rs index ce1b6468..f462033b 100644 --- a/src/utils/cursor.rs +++ b/src/utils/cursor.rs @@ -148,7 +148,11 @@ impl CanStore for FileStorage { } fn write_cursor(&self, point: PointArg) -> Result<(), Error> { - std::fs::write(&self.0.path, point.to_string().as_bytes())?; + // we save to a tmp file and then rename to make it an atomic operation. If the + // write were to fail, the only affected file will be the temporal one. + let tmp_file = format!("{}.tmp", self.0.path); + std::fs::write(&tmp_file, point.to_string().as_bytes())?; + std::fs::rename(&tmp_file, &self.0.path)?; Ok(()) } From d94068562d98f43aeef8e224111fbdaeb2bc186c Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Fri, 5 Jul 2024 21:39:46 -0300 Subject: [PATCH 2/8] Release v1.8.6 --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5c9ce264..01062496 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2187,7 +2187,7 @@ checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" [[package]] name = "oura" -version = "1.8.5" +version = "1.8.6" dependencies = [ "aws-config", "aws-sdk-lambda", diff --git a/Cargo.toml b/Cargo.toml index 561a42bb..632fab7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "oura" description = "The tail of Cardano" -version = "1.8.5" +version = "1.8.6" edition = "2021" repository = "https://github.com/txpipe/oura" homepage = "https://github.com/txpipe/oura" From 5bf6b83e49f7a50ed238e34e0dface02a10ad27a Mon Sep 17 00:00:00 2001 From: "Joaquin Hoyos (Clark)" Date: Sun, 28 Jul 2024 09:26:04 -0300 Subject: [PATCH 3/8] feat: add support for GCP pubsub emulator (#803) --- Cargo.lock | 138 +++++++++++++++------------------- Cargo.toml | 8 +- src/sinks/gcp_pubsub/run.rs | 17 ++++- src/sinks/gcp_pubsub/setup.rs | 12 +++ 4 files changed, 92 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 01062496..92ebce5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -738,6 +738,12 @@ version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bech32" version = "0.9.1" @@ -1421,8 +1427,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" dependencies = [ "cfg-if 1.0.0", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1433,13 +1441,14 @@ checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "google-cloud-auth" -version = "0.8.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d86a8cc190da6ab0cff095fcbfe95a057496428ff2a0a711ca110f2fcbb231" +checksum = "3bf7cb7864f08a92e77c26bb230d021ea57691788fb5dd51793f96965d19e7f9" dependencies = [ "async-trait", "base64 0.21.5", "google-cloud-metadata", + "google-cloud-token", "home", "jsonwebtoken", "reqwest", @@ -1454,16 +1463,15 @@ dependencies = [ [[package]] name = "google-cloud-gax" -version = "0.12.1" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "870d9616855e89b37a824f9e35ca670d6bf4f04fab254b2eb01c7818d26f9d0b" +checksum = "8cb60314136e37de9e2a05ddb427b9c5a39c3d188de2e2f026c6af74425eef44" dependencies = [ - "google-cloud-auth", + "google-cloud-token", "http", "thiserror", "tokio", "tokio-retry", - "tokio-util", "tonic", "tower", "tracing", @@ -1471,9 +1479,9 @@ dependencies = [ [[package]] name = "google-cloud-googleapis" -version = "0.7.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bccc14a0bfb89f7b6c955da84992774d94d2f16570e5f7b74c7db11e765f791" +checksum = "db8a478015d079296167e3f08e096dc99cffc2cb50fa203dd38aaa9dd37f8354" dependencies = [ "prost", "prost-types", @@ -1482,9 +1490,9 @@ dependencies = [ [[package]] name = "google-cloud-metadata" -version = "0.3.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d2c3b00f0a07a1a9efffc1bdd0603ef853d8a6d4ee9de8d73039cd92fdc8f26" +checksum = "cc279bfb50487d7bcd900e8688406475fc750fe474a835b2ab9ade9eb1fc90e2" dependencies = [ "reqwest", "thiserror", @@ -1493,21 +1501,32 @@ dependencies = [ [[package]] name = "google-cloud-pubsub" -version = "0.12.1" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb0f24ed765ac6da045ea846ffb2f38ede5aae28645a4fa4025b38c070a82d76" +checksum = "1da196da473976944d408a91213bafe078e7223e10694d3f8ed36b6e210fa130" dependencies = [ "async-channel 1.9.0", "async-stream", "google-cloud-auth", "google-cloud-gax", "google-cloud-googleapis", + "google-cloud-token", "prost-types", "thiserror", "tokio", + "tokio-util", "tracing", ] +[[package]] +name = "google-cloud-token" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c12ba8b21d128a2ce8585955246977fbce4415f680ebf9199b6f9d6d725f" +dependencies = [ + "async-trait", +] + [[package]] name = "h2" version = "0.3.22" @@ -1668,7 +1687,7 @@ dependencies = [ "rustls-native-certs 0.5.0", "tokio", "tokio-rustls 0.22.0", - "webpki 0.21.4", + "webpki", ] [[package]] @@ -1828,13 +1847,14 @@ dependencies = [ [[package]] name = "jsonwebtoken" -version = "8.3.0" +version = "9.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378" +checksum = "b9ae10193d25051e74945f1ea2d0b42e03cc3b890f7e4cc5faa44997d808193f" dependencies = [ "base64 0.21.5", + "js-sys", "pem", - "ring 0.16.20", + "ring 0.17.7", "serde", "serde_json", "simple_asn1", @@ -2200,6 +2220,7 @@ dependencies = [ "elasticsearch", "env_logger", "file-rotate", + "google-cloud-gax", "google-cloud-googleapis", "google-cloud-pubsub", "hex", @@ -2389,11 +2410,12 @@ checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" [[package]] name = "pem" -version = "1.1.1" +version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8835c273a76a90455d7344889b0964598e3316e2a79ede8e36f16bdcf2228b8" +checksum = "8e459365e590736a54c3fa561947c84837534b8e9af6fc5bf781307e82658fae" dependencies = [ - "base64 0.13.1", + "base64 0.22.1", + "serde", ] [[package]] @@ -2566,9 +2588,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.11.9" +version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" dependencies = [ "bytes", "prost-derive", @@ -2576,22 +2598,22 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.11.9" +version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", "itertools", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.41", ] [[package]] name = "prost-types" -version = "0.11.9" +version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" dependencies = [ "prost", ] @@ -2849,19 +2871,7 @@ dependencies = [ "log", "ring 0.16.20", "sct 0.6.1", - "webpki 0.21.4", -] - -[[package]] -name = "rustls" -version = "0.20.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" -dependencies = [ - "log", - "ring 0.16.20", - "sct 0.7.1", - "webpki 0.22.4", + "webpki", ] [[package]] @@ -3482,18 +3492,17 @@ checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ "rustls 0.19.1", "tokio", - "webpki 0.21.4", + "webpki", ] [[package]] name = "tokio-rustls" -version = "0.23.4" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.20.9", + "rustls 0.21.10", "tokio", - "webpki 0.22.4", ] [[package]] @@ -3532,18 +3541,16 @@ dependencies = [ [[package]] name = "tonic" -version = "0.8.3" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" dependencies = [ "async-stream", "async-trait", "axum", - "base64 0.13.1", + "base64 0.21.5", "bytes", "flate2", - "futures-core", - "futures-util", "h2", "http", "http-body", @@ -3552,17 +3559,15 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "prost-derive", + "rustls 0.21.10", "rustls-pemfile", "tokio", - "tokio-rustls 0.23.4", + "tokio-rustls 0.24.1", "tokio-stream", - "tokio-util", "tower", "tower-layer", "tower-service", "tracing", - "tracing-futures", "webpki-roots", ] @@ -3630,16 +3635,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - [[package]] name = "try-lock" version = "0.2.5" @@ -3853,24 +3848,11 @@ dependencies = [ "untrusted 0.7.1", ] -[[package]] -name = "webpki" -version = "0.22.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" -dependencies = [ - "ring 0.17.7", - "untrusted 0.9.0", -] - [[package]] name = "webpki-roots" -version = "0.22.6" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" -dependencies = [ - "webpki 0.22.4", -] +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" [[package]] name = "winapi" diff --git a/Cargo.toml b/Cargo.toml index 632fab7a..6bbf25d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,8 +62,10 @@ openssl = { version = "0.10", optional = true, features = ["vendored"] } redis = { version = "0.21.6", optional = true, features = ["tokio-comp"] } # features: gcp -google-cloud-pubsub = { version = "0.12.0", optional = true } -google-cloud-googleapis = { version = "0.7.0", optional = true } + +google-cloud-gax = {version ="0.17.0", optional = true } +google-cloud-pubsub = { version = "0.23.0", optional = true } +google-cloud-googleapis = { version = "0.12.0", optional = true } # features: rabbitmqsink lapin = { version = "2.1.1", optional = true } @@ -78,5 +80,5 @@ elasticsink = ["elasticsearch", "tokio"] fingerprint = ["murmur3"] aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"] redissink = ["redis", "tokio"] -gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web"] +gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web" ,"google-cloud-gax"] rabbitmqsink = ["lapin", "tokio"] diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs index b1b52881..b378753a 100644 --- a/src/sinks/gcp_pubsub/run.rs +++ b/src/sinks/gcp_pubsub/run.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, sync::Arc}; +use google_cloud_gax::conn::Environment; use google_cloud_googleapis::pubsub::v1::PubsubMessage; use google_cloud_pubsub::{ client::{Client, ClientConfig}, @@ -32,7 +33,7 @@ async fn send_pubsub_msg( }; publisher - .publish_immediately(vec![msg], None, None) + .publish_immediately(vec![msg], None) .await .map_err(|err| err.message().to_owned())?; @@ -48,6 +49,9 @@ pub fn writer_loop( retry_policy: &retry::Policy, ordering_key: &str, attributes: &GenericKV, + emulator: bool, + emulator_endpoint: &Option, + emulator_project_id: &Option, utils: Arc, ) -> Result<(), crate::Error> { let rt = tokio::runtime::Builder::new_current_thread() @@ -56,7 +60,16 @@ pub fn writer_loop( .build()?; let publisher: Publisher = rt.block_on(async { - let client = Client::new(ClientConfig::default()).await?; + let client_config = if emulator { + ClientConfig { + project_id: Some(emulator_project_id.clone().unwrap_or_default()), + environment: Environment::Emulator(emulator_endpoint.clone().unwrap_or_default()), + ..Default::default() + } + } else { + ClientConfig::default() + }; + let client = Client::new(client_config).await?; let topic = client.topic(topic_name); Result::<_, crate::Error>::Ok(topic.new_publisher(None)) })?; diff --git a/src/sinks/gcp_pubsub/setup.rs b/src/sinks/gcp_pubsub/setup.rs index 666de1d0..a1cdd064 100644 --- a/src/sinks/gcp_pubsub/setup.rs +++ b/src/sinks/gcp_pubsub/setup.rs @@ -16,6 +16,9 @@ pub struct Config { pub retry_policy: Option, pub ordering_key: Option, pub attributes: Option, + pub emulator: Option, + pub emulator_endpoint: Option, + pub emulator_project_id: Option, #[warn(deprecated)] pub credentials: Option, @@ -24,6 +27,12 @@ pub struct Config { impl SinkProvider for WithUtils { fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { let topic_name = self.inner.topic.to_owned(); + let mut use_emulator = self.inner.emulator.unwrap_or(false); + let emulator_endpoint = self.inner.emulator_endpoint.to_owned(); + let emulator_project_id = self.inner.emulator_project_id.to_owned(); + if use_emulator && (emulator_endpoint.is_none() || emulator_project_id.is_none()) { + use_emulator = false; + } let error_policy = self .inner @@ -47,6 +56,9 @@ impl SinkProvider for WithUtils { &retry_policy, &ordering_key, &attributes, + use_emulator, + &emulator_endpoint, + &emulator_project_id, utils, ) .expect("writer loop failed"); From dc6168fb5481063abfde6853cbeaf606d5dfad26 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 4 Aug 2024 09:24:18 -0300 Subject: [PATCH 4/8] feat: make v1 compatible with Conway era (#807) --- Cargo.lock | 116 ++++--- Cargo.toml | 13 +- src/mapper/babbage.rs | 12 +- src/mapper/byron.rs | 66 ++-- src/mapper/cip15.rs | 2 +- src/mapper/cip25.rs | 2 +- src/mapper/collect.rs | 26 +- src/mapper/conway.rs | 588 ++++++++++++++++++++++++++++++++++++ src/mapper/map.rs | 58 ++-- src/mapper/mod.rs | 1 + src/mapper/prelude.rs | 16 +- src/mapper/shelley.rs | 12 +- src/model.rs | 3 +- src/sinks/gcp_pubsub/run.rs | 1 + src/sources/common.rs | 167 +++++----- src/sources/n2c/run.rs | 14 +- src/sources/n2n/run.rs | 12 +- src/utils/mod.rs | 2 +- src/utils/time.rs | 20 ++ 19 files changed, 905 insertions(+), 226 deletions(-) create mode 100644 src/mapper/conway.rs diff --git a/Cargo.lock b/Cargo.lock index 92ebce5a..27ee56c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1830,6 +1830,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -2007,6 +2016,16 @@ dependencies = [ "minicbor-derive", ] +[[package]] +name = "minicbor" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d15f4203d71fdf90903c2696e55426ac97a363c67b218488a73b534ce7aca10" +dependencies = [ + "half", + "minicbor-derive", +] + [[package]] name = "minicbor-derive" version = "0.13.0" @@ -2101,6 +2120,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.45" @@ -2231,7 +2256,13 @@ dependencies = [ "murmur3", "net2", "openssl", - "pallas", + "pallas-addresses", + "pallas-codec 0.29.0", + "pallas-crypto", + "pallas-miniprotocols", + "pallas-multiplexer", + "pallas-primitives", + "pallas-traverse", "prometheus_exporter", "redis", "reqwest", @@ -2239,6 +2270,7 @@ dependencies = [ "serde_json", "strum", "strum_macros", + "time", "tokio", "unicode-truncate", ] @@ -2260,31 +2292,18 @@ dependencies = [ "yasna", ] -[[package]] -name = "pallas" -version = "0.18.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94ef55b690eac7ddf43a3e7ce10d4594866c34279c424aa2ce26d757789246da" -dependencies = [ - "pallas-addresses", - "pallas-codec", - "pallas-crypto", - "pallas-miniprotocols", - "pallas-multiplexer", - "pallas-primitives", - "pallas-traverse", -] - [[package]] name = "pallas-addresses" -version = "0.18.2" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8db28c4050dea032d497555bc68c269ae8e691486d8ec83f02b090487da0d0be" +checksum = "d628ad58404ddd733e8fe46fe9986489b46258a2ab1bb7b1c4b8e406b91b7cff" dependencies = [ "base58", "bech32", + "crc", + "cryptoxide", "hex", - "pallas-codec", + "pallas-codec 0.29.0", "pallas-crypto", "thiserror", ] @@ -2296,19 +2315,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b6e03d05d42a663526d78c8b1d4f2554f09bbf4cc846e1a9e839c558bf6103c" dependencies = [ "hex", - "minicbor", + "minicbor 0.19.1", "serde", ] +[[package]] +name = "pallas-codec" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da003a7360fa032b80d38b4a15573f885f412f2b3868772d49fb072197a9d5f9" +dependencies = [ + "hex", + "minicbor 0.20.0", + "serde", + "thiserror", +] + [[package]] name = "pallas-crypto" -version = "0.18.2" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a35fc93b3613c0a628d0820f8d5d9a52709d795b59a1754a337aee0fca289dd" +checksum = "c9248ed0e594bcb0f548393264519c7adea88874d8bd7cc86f894e8ba4e918c2" dependencies = [ "cryptoxide", "hex", - "pallas-codec", + "pallas-codec 0.29.0", "rand_core", "serde", "thiserror", @@ -2321,8 +2352,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8a4754676d92ae351ad524d98bc32d70835856ee0623a45288bb50a5ee4b161" dependencies = [ "hex", - "itertools", - "pallas-codec", + "itertools 0.10.5", + "pallas-codec 0.18.2", "pallas-multiplexer", "thiserror", "tracing", @@ -2337,7 +2368,7 @@ dependencies = [ "byteorder", "hex", "log", - "pallas-codec", + "pallas-codec 0.18.2", "rand", "thiserror", "tracing", @@ -2345,15 +2376,15 @@ dependencies = [ [[package]] name = "pallas-primitives" -version = "0.18.2" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc5fdf328f41971e0b1457e2377abeb09143fa50ab79f1a6a6ab5740bc94dc4b" +checksum = "c0fa55305212f7828651c8db024e1e286198c2fccb028bbb697c68990c044959" dependencies = [ "base58", "bech32", "hex", "log", - "pallas-codec", + "pallas-codec 0.29.0", "pallas-crypto", "serde", "serde_json", @@ -2361,15 +2392,18 @@ dependencies = [ [[package]] name = "pallas-traverse" -version = "0.18.2" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c58c353ecb175a63422386c80301493db9fc448407bc63322534522579e22879" +checksum = "49459bd0d2ba86fd909890a81e6238eaf051952d7e38ad63195301e72e8f458e" dependencies = [ "hex", + "itertools 0.13.0", "pallas-addresses", - "pallas-codec", + "pallas-codec 0.29.0", "pallas-crypto", "pallas-primitives", + "paste", + "serde", "thiserror", ] @@ -2402,6 +2436,12 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "pathdiff" version = "0.2.1" @@ -2603,7 +2643,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.41", @@ -3369,12 +3409,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.30" +version = "0.3.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ "deranged", "itoa", + "num-conv", "powerfmt", "serde", "time-core", @@ -3389,10 +3430,11 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.15" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" dependencies = [ + "num-conv", "time-core", ] diff --git a/Cargo.toml b/Cargo.toml index 6bbf25d7..ee6b8858 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,13 @@ authors = ["Santiago Carmuega "] [dependencies] -pallas = "0.18.2" +pallas-multiplexer = "0.18.2" +pallas-miniprotocols = "0.18.2" +pallas-primitives = "0.29.0" +pallas-traverse = "0.29.0" +pallas-addresses = "0.29.0" +pallas-codec = "0.29.0" +pallas-crypto = "0.29.0" # pallas = { git = "https://github.com/txpipe/pallas" } # pallas = { path = "../pallas/pallas" } hex = "0.4.3" @@ -30,6 +36,7 @@ strum = "0.24" strum_macros = "0.24" prometheus_exporter = { version = "0.8.5", default-features = false } unicode-truncate = "0.2.0" +time = "0.3.36" # feature logs file-rotate = { version = "0.7.1", optional = true } @@ -63,7 +70,7 @@ redis = { version = "0.21.6", optional = true, features = ["tokio-comp"] } # features: gcp -google-cloud-gax = {version ="0.17.0", optional = true } +google-cloud-gax = { version = "0.17.0", optional = true } google-cloud-pubsub = { version = "0.23.0", optional = true } google-cloud-googleapis = { version = "0.12.0", optional = true } @@ -80,5 +87,5 @@ elasticsink = ["elasticsearch", "tokio"] fingerprint = ["murmur3"] aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"] redissink = ["redis", "tokio"] -gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web" ,"google-cloud-gax"] +gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web", "google-cloud-gax"] rabbitmqsink = ["lapin", "tokio"] diff --git a/src/mapper/babbage.rs b/src/mapper/babbage.rs index fda60d46..994e3ad1 100644 --- a/src/mapper/babbage.rs +++ b/src/mapper/babbage.rs @@ -1,12 +1,12 @@ -use pallas::codec::utils::KeepRaw; +use pallas_codec::utils::KeepRaw; -use pallas::ledger::primitives::babbage::{ +use pallas_primitives::babbage::{ AuxiliaryData, MintedBlock, MintedDatumOption, MintedPostAlonzoTransactionOutput, MintedTransactionBody, MintedTransactionOutput, MintedWitnessSet, NetworkId, }; -use pallas::crypto::hash::Hash; -use pallas::ledger::traverse::OriginalHash; +use pallas_crypto::hash::Hash; +use pallas_traverse::OriginalHash; use crate::model::{BlockRecord, Era, TransactionRecord}; use crate::utils::time::TimeProvider; @@ -199,7 +199,7 @@ impl EventWriter { let record = self.to_post_alonzo_output_record(output)?; self.append(record.into())?; - let address = pallas::ledger::addresses::Address::from_bytes(&output.address)?; + let address = pallas_addresses::Address::from_bytes(&output.address)?; let child = &self.child_writer(EventContext { output_address: address.to_string().into(), @@ -389,7 +389,7 @@ impl EventWriter { /// Entry-point to start crawling a blocks for events. Meant to be used when /// we haven't decoded the CBOR yet (for example, N2N). pub fn crawl_from_babbage_cbor(&self, cbor: &[u8]) -> Result<(), Error> { - let (_, block): (u16, MintedBlock) = pallas::codec::minicbor::decode(cbor)?; + let (_, block): (u16, MintedBlock) = pallas_codec::minicbor::decode(cbor)?; self.crawl_babbage_with_cbor(&block, cbor) } } diff --git a/src/mapper/byron.rs b/src/mapper/byron.rs index 2ed223d5..d0f58ac5 100644 --- a/src/mapper/byron.rs +++ b/src/mapper/byron.rs @@ -3,11 +3,12 @@ use std::ops::Deref; use super::map::ToHex; use super::EventWriter; use crate::model::{BlockRecord, Era, EventData, TransactionRecord, TxInputRecord, TxOutputRecord}; +use crate::utils::time::TimeProvider; use crate::{model::EventContext, Error}; -use pallas::crypto::hash::Hash; -use pallas::ledger::primitives::byron; -use pallas::ledger::traverse::OriginalHash; +use pallas_crypto::hash::Hash; +use pallas_primitives::byron; +use pallas_traverse::OriginalHash; impl EventWriter { fn to_byron_input_record(&self, source: &byron::TxIn) -> Option { @@ -41,12 +42,9 @@ impl EventWriter { } fn to_byron_output_record(&self, source: &byron::TxOut) -> Result { - let address: pallas::ledger::addresses::Address = - pallas::ledger::addresses::ByronAddress::new( - &source.address.payload.0, - source.address.crc, - ) - .into(); + let address: pallas_addresses::Address = + pallas_addresses::ByronAddress::new(&source.address.payload.0, source.address.crc) + .into(); Ok(TxOutputRecord { address: address.to_string(), @@ -168,10 +166,12 @@ impl EventWriter { hash: &Hash<32>, cbor: &[u8], ) -> Result { - let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( - source.header.consensus_data.0.epoch, - source.header.consensus_data.0.slot, - ); + let abs_slot = self.utils.time.as_ref().map(|time| { + time.byron_epoch_slot_to_absolute( + source.header.consensus_data.0.epoch, + source.header.consensus_data.0.slot, + ) + }); let mut record = BlockRecord { era: Era::Byron, @@ -181,7 +181,7 @@ impl EventWriter { tx_count: source.body.tx_payload.len(), hash: hash.to_hex(), number: source.header.consensus_data.2[0], - slot: abs_slot, + slot: abs_slot.unwrap_or_default(), epoch: Some(source.header.consensus_data.0.epoch), epoch_slot: Some(source.header.consensus_data.0.slot), previous_hash: source.header.prev_block.to_hex(), @@ -234,10 +234,9 @@ impl EventWriter { hash: &Hash<32>, cbor: &[u8], ) -> Result { - let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( - source.header.consensus_data.epoch_id, - 0, - ); + let abs_slot = self.utils.time.as_ref().map(|time| { + time.byron_epoch_slot_to_absolute(source.header.consensus_data.epoch_id, 0) + }); Ok(BlockRecord { era: Era::Byron, @@ -247,7 +246,7 @@ impl EventWriter { vrf_vkey: Default::default(), tx_count: 0, number: source.header.consensus_data.difficulty[0], - slot: abs_slot, + slot: abs_slot.unwrap_or_default(), epoch: Some(source.header.consensus_data.epoch_id), epoch_slot: Some(0), previous_hash: source.header.prev_block.to_hex(), @@ -288,16 +287,18 @@ impl EventWriter { ) -> Result<(), Error> { let hash = block.header.original_hash(); - let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( - block.header.consensus_data.0.epoch, - block.header.consensus_data.0.slot, - ); + let abs_slot = self.utils.time.as_ref().map(|time| { + time.byron_epoch_slot_to_absolute( + block.header.consensus_data.0.epoch, + block.header.consensus_data.0.slot, + ) + }); let child = self.child_writer(EventContext { block_hash: Some(hex::encode(hash)), block_number: Some(block.header.consensus_data.2[0]), - slot: Some(abs_slot), - timestamp: self.compute_timestamp(abs_slot), + slot: abs_slot, + timestamp: abs_slot.and_then(|slot| self.compute_timestamp(slot)), ..EventContext::default() }); @@ -311,7 +312,7 @@ impl EventWriter { /// Entry-point to start crawling a blocks for events. Meant to be used when /// we haven't decoded the CBOR yet (for example, N2N). pub fn crawl_from_byron_cbor(&self, cbor: &[u8]) -> Result<(), Error> { - let (_, block): (u16, byron::MintedBlock) = pallas::codec::minicbor::decode(cbor)?; + let (_, block): (u16, byron::MintedBlock) = pallas_codec::minicbor::decode(cbor)?; self.crawl_byron_with_cbor(&block, cbor) } @@ -328,16 +329,15 @@ impl EventWriter { if self.config.include_byron_ebb { let hash = block.header.original_hash(); - let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( - block.header.consensus_data.epoch_id, - 0, - ); + let abs_slot = self.utils.time.as_ref().map(|time| { + time.byron_epoch_slot_to_absolute(block.header.consensus_data.epoch_id, 0) + }); let child = self.child_writer(EventContext { block_hash: Some(hex::encode(hash)), block_number: Some(block.header.consensus_data.difficulty[0]), - slot: Some(abs_slot), - timestamp: self.compute_timestamp(abs_slot), + slot: abs_slot, + timestamp: abs_slot.and_then(|slot| self.compute_timestamp(slot)), ..EventContext::default() }); @@ -352,7 +352,7 @@ impl EventWriter { /// Entry-point to start crawling a blocks for events. Meant to be used when /// we haven't decoded the CBOR yet (for example, N2N). pub fn crawl_from_ebb_cbor(&self, cbor: &[u8]) -> Result<(), Error> { - let (_, block): (u16, byron::MintedEbBlock) = pallas::codec::minicbor::decode(cbor)?; + let (_, block): (u16, byron::MintedEbBlock) = pallas_codec::minicbor::decode(cbor)?; self.crawl_ebb_with_cbor(&block, cbor) } } diff --git a/src/mapper/cip15.rs b/src/mapper/cip15.rs index 1d269206..84a548d6 100644 --- a/src/mapper/cip15.rs +++ b/src/mapper/cip15.rs @@ -3,7 +3,7 @@ use crate::model::CIP15AssetRecord; use crate::Error; use serde_json::Value as JsonValue; -use pallas::ledger::primitives::alonzo::Metadatum; +use pallas_primitives::alonzo::Metadatum; fn extract_json_property<'a>( json: &'a JsonValue, diff --git a/src/mapper/cip25.rs b/src/mapper/cip25.rs index 9def5b1a..813b7ef7 100644 --- a/src/mapper/cip25.rs +++ b/src/mapper/cip25.rs @@ -1,6 +1,6 @@ use serde_json::Value as JsonValue; -use pallas::ledger::primitives::alonzo::Metadatum; +use pallas_primitives::alonzo::Metadatum; use crate::{model::CIP25AssetRecord, Error}; diff --git a/src/mapper/collect.rs b/src/mapper/collect.rs index 515e11b5..1de39df9 100644 --- a/src/mapper/collect.rs +++ b/src/mapper/collect.rs @@ -1,19 +1,15 @@ -use pallas::{ - codec::utils::{KeepRaw, KeyValuePairs, MaybeIndefArray}, - ledger::{ - primitives::{ - alonzo::{ - AuxiliaryData, Coin, MintedBlock, Multiasset, NativeScript, PlutusData, - PlutusScript, Redeemer, RewardAccount, TransactionInput, VKeyWitness, Value, - }, - babbage::{ - LegacyTransactionOutput, MintedPostAlonzoTransactionOutput, - MintedTransactionOutput, PlutusV2Script, - }, - }, - traverse::OriginalHash, +use pallas_codec::utils::{KeepRaw, KeyValuePairs, MaybeIndefArray}; +use pallas_primitives::{ + alonzo::{ + AuxiliaryData, Coin, MintedBlock, Multiasset, NativeScript, PlutusData, PlutusScript, + Redeemer, RewardAccount, TransactionInput, VKeyWitness, Value, + }, + babbage::{ + LegacyTransactionOutput, MintedPostAlonzoTransactionOutput, MintedTransactionOutput, + PlutusV2Script, }, }; +use pallas_traverse::OriginalHash; use crate::{ model::{ @@ -138,7 +134,7 @@ impl EventWriter { pub fn collect_native_witness_records( &self, - witness_set: &Option>, + witness_set: &Option>>, ) -> Result, Error> { match witness_set { Some(all) => all diff --git a/src/mapper/conway.rs b/src/mapper/conway.rs new file mode 100644 index 00000000..8b07fc9e --- /dev/null +++ b/src/mapper/conway.rs @@ -0,0 +1,588 @@ +use pallas_codec::utils::{KeepRaw, NonZeroInt}; + +use pallas_primitives::conway::{ + AuxiliaryData, Certificate, MintedBlock, MintedDatumOption, MintedPostAlonzoTransactionOutput, + MintedTransactionBody, MintedTransactionOutput, MintedWitnessSet, Multiasset, NetworkId, + RedeemerTag, RedeemersKey, RedeemersValue, +}; + +use pallas_crypto::hash::Hash; +use pallas_primitives::ToCanonicalJson as _; +use pallas_traverse::OriginalHash; + +use crate::model::{ + BlockRecord, Era, MintRecord, PlutusRedeemerRecord, TransactionRecord, TxOutputRecord, +}; +use crate::utils::time::TimeProvider; +use crate::{ + model::{EventContext, EventData}, + Error, +}; + +use super::{map::ToHex, EventWriter}; + +impl EventWriter { + pub fn collect_conway_mint_records(&self, mint: &Multiasset) -> Vec { + mint.iter() + .flat_map(|(policy, assets)| { + assets + .iter() + .map(|(asset, amount)| self.to_mint_record(policy, asset, amount.into())) + }) + .collect() + } + + pub fn crawl_conway_mints(&self, mints: &Multiasset) -> Result<(), Error> { + for (policy, assets) in mints.iter() { + for (asset, quantity) in assets.iter() { + self.append_from(self.to_mint_record(policy, asset, quantity.into()))?; + } + } + + Ok(()) + } + + pub fn to_conway_output_record( + &self, + output: &MintedPostAlonzoTransactionOutput, + ) -> Result { + let address = pallas_addresses::Address::from_bytes(&output.address)?; + + Ok(TxOutputRecord { + address: address.to_string(), + amount: super::map::get_tx_output_coin_value(&output.value), + assets: self.collect_asset_records(&output.value).into(), + datum_hash: match &output.datum_option { + Some(MintedDatumOption::Hash(x)) => Some(x.to_string()), + Some(MintedDatumOption::Data(x)) => Some(x.original_hash().to_hex()), + None => None, + }, + inline_datum: match &output.datum_option { + Some(MintedDatumOption::Data(x)) => Some(self.to_plutus_datum_record(x)?), + _ => None, + }, + }) + } + + pub fn to_conway_redeemer_record( + &self, + key: &RedeemersKey, + value: &RedeemersValue, + ) -> Result { + Ok(PlutusRedeemerRecord { + purpose: match key.tag { + RedeemerTag::Spend => "spend".to_string(), + RedeemerTag::Mint => "mint".to_string(), + RedeemerTag::Cert => "cert".to_string(), + RedeemerTag::Reward => "reward".to_string(), + RedeemerTag::Vote => "vote".to_string(), + RedeemerTag::Propose => "propose".to_string(), + }, + ex_units_mem: value.ex_units.mem, + ex_units_steps: value.ex_units.steps, + input_idx: key.index, + plutus_data: value.data.to_json(), + }) + } + + pub fn collect_conway_output_records( + &self, + source: &[MintedTransactionOutput], + ) -> Result, Error> { + source + .iter() + .map(|x| match x { + MintedTransactionOutput::Legacy(x) => self.to_legacy_output_record(x), + MintedTransactionOutput::PostAlonzo(x) => self.to_conway_output_record(x), + }) + .collect() + } + + pub fn to_conway_tx_size( + &self, + body: &KeepRaw, + aux_data: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, + ) -> usize { + body.raw_cbor().len() + + aux_data.map(|ax| ax.raw_cbor().len()).unwrap_or(2) + + witness_set.map(|ws| ws.raw_cbor().len()).unwrap_or(1) + } + + pub fn to_conway_transaction_record( + &self, + body: &KeepRaw, + tx_hash: &str, + aux_data: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, + ) -> Result { + let mut record = TransactionRecord { + hash: tx_hash.to_owned(), + size: self.to_conway_tx_size(body, aux_data, witness_set) as u32, + fee: body.fee, + ttl: body.ttl, + validity_interval_start: body.validity_interval_start, + network_id: body.network_id.as_ref().map(|x| match x { + NetworkId::One => 1, + NetworkId::Two => 2, + }), + ..Default::default() + }; + + let outputs = self.collect_conway_output_records(body.outputs.as_slice())?; + record.output_count = outputs.len(); + record.total_output = outputs.iter().map(|o| o.amount).sum(); + + let inputs = self.collect_input_records(&body.inputs); + record.input_count = inputs.len(); + + if let Some(mint) = &body.mint { + let mints = self.collect_conway_mint_records(mint); + record.mint_count = mints.len(); + + if self.config.include_transaction_details { + record.mint = mints.into(); + } + } + + // Add Collateral Stuff + let collateral_inputs = &body.collateral.as_deref(); + record.collateral_input_count = collateral_inputs.iter().count(); + record.has_collateral_output = body.collateral_return.is_some(); + + // TODO + // TransactionBodyComponent::ScriptDataHash(_) + // TransactionBodyComponent::RequiredSigners(_) + // TransactionBodyComponent::AuxiliaryDataHash(_) + + if self.config.include_transaction_details { + record.outputs = outputs.into(); + record.inputs = inputs.into(); + + // transaction_details collateral stuff + record.collateral_inputs = + collateral_inputs.map(|inputs| self.collect_input_records(inputs)); + + record.collateral_output = body.collateral_return.as_ref().map(|output| match output { + MintedTransactionOutput::Legacy(x) => self.to_legacy_output_record(x).unwrap(), + MintedTransactionOutput::PostAlonzo(x) => self.to_conway_output_record(x).unwrap(), + }); + + record.metadata = match aux_data { + Some(aux_data) => self.collect_metadata_records(aux_data)?.into(), + None => None, + }; + + if let Some(witnesses) = witness_set { + record.vkey_witnesses = Some( + witnesses + .vkeywitness + .iter() + .flatten() + .map(|i| self.to_vkey_witness_record(i)) + .collect::>()?, + ); + + record.native_witnesses = Some( + witnesses + .native_script + .iter() + .flatten() + .map(|i| self.to_native_witness_record(i)) + .collect::>()?, + ); + + let mut all_plutus = vec![]; + + let plutus_v1: Vec<_> = witnesses + .plutus_v1_script + .iter() + .flatten() + .map(|i| self.to_plutus_v1_witness_record(i)) + .collect::>()?; + + all_plutus.extend(plutus_v1); + + let plutus_v2: Vec<_> = witnesses + .plutus_v2_script + .iter() + .flatten() + .map(|i| self.to_plutus_v2_witness_record(i)) + .collect::>()?; + + all_plutus.extend(plutus_v2); + + let plutus_v3: Vec<_> = witnesses + .plutus_v3_script + .iter() + .flatten() + .map(|i| self.to_plutus_v3_witness_record(i)) + .collect::>()?; + + all_plutus.extend(plutus_v3); + + record.plutus_witnesses = Some(all_plutus); + + record.plutus_redeemers = Some( + witnesses + .redeemer + .iter() + .flat_map(|i| i.iter()) + .map(|(k, v)| self.to_conway_redeemer_record(k, v)) + .collect::>()?, + ); + + record.plutus_data = Some( + witnesses + .plutus_data + .iter() + .flatten() + .map(|i| self.to_plutus_datum_record(i)) + .collect::>()?, + ); + } + + if let Some(withdrawals) = &body.withdrawals { + record.withdrawals = self.collect_withdrawal_records(withdrawals).into(); + } + } + + Ok(record) + } + + pub fn to_conway_block_record( + &self, + source: &MintedBlock, + hash: &Hash<32>, + cbor: &[u8], + ) -> Result { + let relative_epoch = self + .utils + .time + .as_ref() + .map(|time| time.absolute_slot_to_relative(source.header.header_body.slot)); + + let mut record = BlockRecord { + era: Era::Conway, + body_size: source.header.header_body.block_body_size as usize, + issuer_vkey: source.header.header_body.issuer_vkey.to_hex(), + vrf_vkey: source.header.header_body.vrf_vkey.to_hex(), + tx_count: source.transaction_bodies.len(), + hash: hex::encode(hash), + number: source.header.header_body.block_number, + slot: source.header.header_body.slot, + epoch: relative_epoch.map(|(epoch, _)| epoch), + epoch_slot: relative_epoch.map(|(_, epoch_slot)| epoch_slot), + previous_hash: source + .header + .header_body + .prev_hash + .map(hex::encode) + .unwrap_or_default(), + cbor_hex: match self.config.include_block_cbor { + true => hex::encode(cbor).into(), + false => None, + }, + transactions: None, + }; + + if self.config.include_block_details || self.config.include_transaction_details { + record.transactions = Some(self.collect_conway_tx_records(source)?); + } + + Ok(record) + } + + pub fn collect_conway_tx_records( + &self, + block: &MintedBlock, + ) -> Result, Error> { + block + .transaction_bodies + .iter() + .enumerate() + .map(|(idx, tx)| { + let aux_data = block + .auxiliary_data_set + .iter() + .find(|(k, _)| *k == (idx as u32)) + .map(|(_, v)| v); + + let witness_set = block.transaction_witness_sets.get(idx); + + let tx_hash = tx.original_hash().to_hex(); + + self.to_conway_transaction_record(tx, &tx_hash, aux_data, witness_set) + }) + .collect() + } + + fn crawl_conway_output(&self, output: &MintedPostAlonzoTransactionOutput) -> Result<(), Error> { + let record = self.to_conway_output_record(output)?; + self.append(record.into())?; + + let address = pallas_addresses::Address::from_bytes(&output.address)?; + + let child = &self.child_writer(EventContext { + output_address: address.to_string().into(), + ..EventContext::default() + }); + + child.crawl_transaction_output_amount(&output.value)?; + + if let Some(MintedDatumOption::Data(datum)) = &output.datum_option { + let record = self.to_plutus_datum_record(datum)?; + child.append(record.into())?; + } + + Ok(()) + } + + fn crawl_conway_transaction_output( + &self, + output: &MintedTransactionOutput, + ) -> Result<(), Error> { + match output { + MintedTransactionOutput::Legacy(x) => self.crawl_legacy_output(x), + MintedTransactionOutput::PostAlonzo(x) => self.crawl_conway_output(x), + } + } + + fn crawl_conway_witness_set( + &self, + witness_set: &KeepRaw, + ) -> Result<(), Error> { + if let Some(native) = &witness_set.native_script { + for script in native.iter() { + self.append_from(self.to_native_witness_record(script)?)?; + } + } + + if let Some(plutus) = &witness_set.plutus_v1_script { + for script in plutus.iter() { + self.append_from(self.to_plutus_v1_witness_record(script)?)?; + } + } + + if let Some(plutus) = &witness_set.plutus_v2_script { + for script in plutus.iter() { + self.append_from(self.to_plutus_v2_witness_record(script)?)?; + } + } + + if let Some(plutus) = &witness_set.plutus_v3_script { + for script in plutus.iter() { + self.append_from(self.to_plutus_v3_witness_record(script)?)?; + } + } + + if let Some(redeemers) = &witness_set.redeemer { + for (key, value) in redeemers.iter() { + self.append_from(self.to_conway_redeemer_record(key, value)?)?; + } + } + + if let Some(datums) = &witness_set.plutus_data { + for datum in datums.iter() { + self.append_from(self.to_plutus_datum_record(datum)?)?; + } + } + + Ok(()) + } + + pub fn to_conway_certificate_event(&self, certificate: &Certificate) -> Option { + match certificate { + Certificate::StakeRegistration(credential) => EventData::StakeRegistration { + credential: credential.into(), + } + .into(), + Certificate::StakeDeregistration(credential) => EventData::StakeDeregistration { + credential: credential.into(), + } + .into(), + Certificate::StakeDelegation(credential, pool) => EventData::StakeDelegation { + credential: credential.into(), + pool_hash: pool.to_hex(), + } + .into(), + Certificate::PoolRegistration { + operator, + vrf_keyhash, + pledge, + cost, + margin, + reward_account, + pool_owners, + relays, + pool_metadata, + } => EventData::PoolRegistration { + operator: operator.to_hex(), + vrf_keyhash: vrf_keyhash.to_hex(), + pledge: *pledge, + cost: *cost, + margin: (margin.numerator as f64 / margin.denominator as f64), + reward_account: reward_account.to_hex(), + pool_owners: pool_owners.iter().map(|p| p.to_hex()).collect(), + relays: relays.iter().map(super::map::relay_to_string).collect(), + pool_metadata: pool_metadata.to_owned().map(|m| m.url.clone()).into(), + pool_metadata_hash: pool_metadata + .to_owned() + .map(|m| m.hash.clone().to_hex()) + .into(), + } + .into(), + Certificate::PoolRetirement(pool, epoch) => EventData::PoolRetirement { + pool: pool.to_hex(), + epoch: *epoch, + } + .into(), + // all new Conway certs are out of scope for Oura lts/v1 + _ => None, + } + } + + fn crawl_conway_certificate(&self, certificate: &Certificate) -> Result<(), Error> { + if let Some(evt) = self.to_conway_certificate_event(certificate) { + self.append(evt)?; + } + + Ok(()) + } + + fn crawl_conway_transaction( + &self, + tx: &KeepRaw, + tx_hash: &str, + aux_data: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, + ) -> Result<(), Error> { + let record = self.to_conway_transaction_record(tx, tx_hash, aux_data, witness_set)?; + + self.append_from(record.clone())?; + + for (idx, input) in tx.inputs.iter().enumerate() { + let child = self.child_writer(EventContext { + input_idx: Some(idx), + ..EventContext::default() + }); + + child.crawl_transaction_input(input)?; + } + + for (idx, output) in tx.outputs.iter().enumerate() { + let child = self.child_writer(EventContext { + output_idx: Some(idx), + ..EventContext::default() + }); + + child.crawl_conway_transaction_output(output)?; + } + + if let Some(certs) = &tx.certificates { + for (idx, cert) in certs.iter().enumerate() { + let child = self.child_writer(EventContext { + certificate_idx: Some(idx), + ..EventContext::default() + }); + + child.crawl_conway_certificate(cert)?; + } + } + + if let Some(collateral) = &tx.collateral { + for collateral in collateral.iter() { + // TODO: collateral context? + + self.crawl_collateral(collateral)?; + } + } + + if let Some(mint) = &tx.mint { + self.crawl_conway_mints(mint)?; + } + + if let Some(aux_data) = aux_data { + self.crawl_auxdata(aux_data)?; + } + + if let Some(witness_set) = witness_set { + self.crawl_conway_witness_set(witness_set)?; + } + + if self.config.include_transaction_end_events { + self.append(EventData::TransactionEnd(record))?; + } + + Ok(()) + } + + fn crawl_conway_block( + &self, + block: &MintedBlock, + hash: &Hash<32>, + cbor: &[u8], + ) -> Result<(), Error> { + let record = self.to_conway_block_record(block, hash, cbor)?; + + self.append(EventData::Block(record.clone()))?; + + for (idx, tx) in block.transaction_bodies.iter().enumerate() { + let aux_data = block + .auxiliary_data_set + .iter() + .find(|(k, _)| *k == (idx as u32)) + .map(|(_, v)| v); + + let witness_set = block.transaction_witness_sets.get(idx); + + let tx_hash = tx.original_hash().to_hex(); + + let child = self.child_writer(EventContext { + tx_idx: Some(idx), + tx_hash: Some(tx_hash.to_owned()), + ..EventContext::default() + }); + + child.crawl_conway_transaction(tx, &tx_hash, aux_data, witness_set)?; + } + + if self.config.include_block_end_events { + self.append(EventData::BlockEnd(record))?; + } + + Ok(()) + } + + /// Mapper entry-point for decoded Conway blocks + /// + /// Entry-point to start crawling a blocks for events. Meant to be used when + /// we already have a decoded block (for example, N2C). The raw CBOR is also + /// passed through in case we need to attach it to outbound events. + pub fn crawl_conway_with_cbor<'b>( + &self, + block: &'b MintedBlock<'b>, + cbor: &'b [u8], + ) -> Result<(), Error> { + let hash = block.header.original_hash(); + + let child = self.child_writer(EventContext { + block_hash: Some(hex::encode(hash)), + block_number: Some(block.header.header_body.block_number), + slot: Some(block.header.header_body.slot), + timestamp: self.compute_timestamp(block.header.header_body.slot), + ..EventContext::default() + }); + + child.crawl_conway_block(block, &hash, cbor) + } + + /// Mapper entry-point for raw Conway cbor blocks + /// + /// Entry-point to start crawling a blocks for events. Meant to be used when + /// we haven't decoded the CBOR yet (for example, N2N). + pub fn crawl_from_conway_cbor(&self, cbor: &[u8]) -> Result<(), Error> { + let (_, block): (u16, MintedBlock) = pallas_codec::minicbor::decode(cbor)?; + self.crawl_conway_with_cbor(&block, cbor) + } +} diff --git a/src/mapper/map.rs b/src/mapper/map.rs index d05d20a0..79be068b 100644 --- a/src/mapper/map.rs +++ b/src/mapper/map.rs @@ -1,20 +1,21 @@ use std::collections::HashMap; -use pallas::ledger::primitives::alonzo::MintedWitnessSet; -use pallas::ledger::primitives::babbage::MintedDatumOption; -use pallas::ledger::traverse::{ComputeHash, OriginalHash}; -use pallas::{codec::utils::KeepRaw, crypto::hash::Hash}; +use pallas_codec::utils::KeepRaw; +use pallas_crypto::hash::Hash; +use pallas_primitives::alonzo::MintedWitnessSet; +use pallas_primitives::babbage::MintedDatumOption; +use pallas_traverse::{ComputeHash, OriginalHash}; -use pallas::ledger::primitives::{ +use pallas_primitives::{ alonzo::{ self as alonzo, AuxiliaryData, Certificate, InstantaneousRewardSource, InstantaneousRewardTarget, Metadatum, MetadatumLabel, MintedBlock, NetworkId, Relay, TransactionBody, TransactionInput, Value, }, - babbage, ToCanonicalJson, + babbage, conway, ToCanonicalJson, }; -use pallas::network::miniprotocols::Point; +use pallas_miniprotocols::Point; use serde_json::{json, Value as JsonValue}; use crate::model::{ @@ -64,23 +65,23 @@ fn ip_string_from_bytes(bytes: &[u8]) -> String { format!("{}.{}.{}.{}", bytes[0], bytes[1], bytes[2], bytes[3]) } -fn relay_to_string(relay: &Relay) -> String { +pub fn relay_to_string(relay: &Relay) -> String { match relay { Relay::SingleHostAddr(port, ipv4, ipv6) => { let ip = match (ipv6, ipv4) { - (None, None) => "".to_string(), - (_, Some(x)) => ip_string_from_bytes(x.as_ref()), - (Some(x), _) => ip_string_from_bytes(x.as_ref()), + (_, pallas_codec::utils::Nullable::Some(x)) => ip_string_from_bytes(x.as_ref()), + (pallas_codec::utils::Nullable::Some(x), _) => ip_string_from_bytes(x.as_ref()), + _ => "".to_string(), }; match port { - Some(port) => format!("{ip}:{port}"), - None => ip, + pallas_codec::utils::Nullable::Some(port) => format!("{ip}:{port}"), + _ => ip, } } Relay::SingleHostName(port, host) => match port { - Some(port) => format!("{host}:{port}"), - None => host.clone(), + pallas_codec::utils::Nullable::Some(port) => format!("{host}:{port}"), + _ => host.clone(), }, Relay::MultiHostName(host) => host.clone(), } @@ -98,7 +99,7 @@ fn metadatum_to_string_key(datum: &Metadatum) -> String { } } -fn get_tx_output_coin_value(amount: &Value) -> u64 { +pub fn get_tx_output_coin_value(amount: &Value) -> u64 { match amount { Value::Coin(x) => *x, Value::Multiasset(x, _) => *x, @@ -169,7 +170,7 @@ impl EventWriter { &self, output: &alonzo::TransactionOutput, ) -> Result { - let address = pallas::ledger::addresses::Address::from_bytes(&output.address)?; + let address = pallas_addresses::Address::from_bytes(&output.address)?; Ok(TxOutputRecord { address: address.to_string(), @@ -184,7 +185,7 @@ impl EventWriter { &self, output: &babbage::MintedPostAlonzoTransactionOutput, ) -> Result { - let address = pallas::ledger::addresses::Address::from_bytes(&output.address)?; + let address = pallas_addresses::Address::from_bytes(&output.address)?; Ok(TxOutputRecord { address: address.to_string(), @@ -205,7 +206,7 @@ impl EventWriter { pub fn to_transaction_output_asset_record( &self, policy: &Hash<28>, - asset: &pallas::codec::utils::Bytes, + asset: &pallas_codec::utils::Bytes, amount: u64, ) -> OutputAssetRecord { OutputAssetRecord { @@ -219,7 +220,7 @@ impl EventWriter { pub fn to_mint_record( &self, policy: &Hash<28>, - asset: &pallas::codec::utils::Bytes, + asset: &pallas_codec::utils::Bytes, quantity: i64, ) -> MintRecord { MintRecord { @@ -291,6 +292,16 @@ impl EventWriter { }) } + pub fn to_plutus_v3_witness_record( + &self, + script: &conway::PlutusV3Script, + ) -> Result { + Ok(PlutusWitnessRecord { + script_hash: script.compute_hash().to_hex(), + script_hex: script.as_ref().to_hex(), + }) + } + pub fn to_native_witness_record( &self, script: &alonzo::NativeScript, @@ -342,8 +353,11 @@ impl EventWriter { reward_account: reward_account.to_hex(), pool_owners: pool_owners.iter().map(|p| p.to_hex()).collect(), relays: relays.iter().map(relay_to_string).collect(), - pool_metadata: pool_metadata.as_ref().map(|m| m.url.clone()), - pool_metadata_hash: pool_metadata.as_ref().map(|m| m.hash.clone().to_hex()), + pool_metadata: pool_metadata.to_owned().map(|m| m.url.clone()).into(), + pool_metadata_hash: pool_metadata + .to_owned() + .map(|m| m.hash.clone().to_hex()) + .into(), }, Certificate::PoolRetirement(pool, epoch) => EventData::PoolRetirement { pool: pool.to_hex(), diff --git a/src/mapper/mod.rs b/src/mapper/mod.rs index a176e40f..13e68677 100644 --- a/src/mapper/mod.rs +++ b/src/mapper/mod.rs @@ -3,6 +3,7 @@ mod byron; mod cip15; mod cip25; mod collect; +mod conway; mod map; mod prelude; mod shelley; diff --git a/src/mapper/prelude.rs b/src/mapper/prelude.rs index 4564bb92..806ec250 100644 --- a/src/mapper/prelude.rs +++ b/src/mapper/prelude.rs @@ -106,15 +106,15 @@ impl EventWriter { } } -impl From for Era { - fn from(other: pallas::ledger::traverse::Era) -> Self { +impl From for Era { + fn from(other: pallas_traverse::Era) -> Self { match other { - pallas::ledger::traverse::Era::Byron => Era::Byron, - pallas::ledger::traverse::Era::Shelley => Era::Shelley, - pallas::ledger::traverse::Era::Allegra => Era::Allegra, - pallas::ledger::traverse::Era::Mary => Era::Mary, - pallas::ledger::traverse::Era::Alonzo => Era::Alonzo, - pallas::ledger::traverse::Era::Babbage => Era::Babbage, + pallas_traverse::Era::Byron => Era::Byron, + pallas_traverse::Era::Shelley => Era::Shelley, + pallas_traverse::Era::Allegra => Era::Allegra, + pallas_traverse::Era::Mary => Era::Mary, + pallas_traverse::Era::Alonzo => Era::Alonzo, + pallas_traverse::Era::Babbage => Era::Babbage, _ => Era::Unknown, } } diff --git a/src/mapper/shelley.rs b/src/mapper/shelley.rs index 183ec6e7..c10a930c 100644 --- a/src/mapper/shelley.rs +++ b/src/mapper/shelley.rs @@ -1,12 +1,12 @@ -use pallas::codec::utils::KeepRaw; +use pallas_codec::utils::KeepRaw; -use pallas::ledger::primitives::alonzo::{ +use pallas_primitives::alonzo::{ AuxiliaryData, Certificate, Metadata, MintedBlock, MintedWitnessSet, Multiasset, TransactionBody, TransactionInput, TransactionOutput, Value, }; -use pallas::crypto::hash::Hash; -use pallas::ledger::traverse::OriginalHash; +use pallas_crypto::hash::Hash; +use pallas_traverse::OriginalHash; use crate::{ model::{Era, EventContext, EventData}, @@ -89,7 +89,7 @@ impl EventWriter { let record = self.to_legacy_output_record(output)?; self.append(record.into())?; - let address = pallas::ledger::addresses::Address::from_bytes(&output.address)?; + let address = pallas_addresses::Address::from_bytes(&output.address)?; let child = &self.child_writer(EventContext { output_address: address.to_string().into(), @@ -325,7 +325,7 @@ impl EventWriter { /// Shelley. In this way, we can avoid having to fork the crawling procedure /// for each different hard-fork. pub fn crawl_from_shelley_cbor(&self, cbor: &[u8], era: Era) -> Result<(), Error> { - let (_, block): (u16, MintedBlock) = pallas::codec::minicbor::decode(cbor)?; + let (_, block): (u16, MintedBlock) = pallas_codec::minicbor::decode(cbor)?; self.crawl_shelley_with_cbor(&block, cbor, era) } } diff --git a/src/model.rs b/src/model.rs index ad661bc3..89f894b4 100644 --- a/src/model.rs +++ b/src/model.rs @@ -25,6 +25,7 @@ pub enum Era { Mary, Alonzo, Babbage, + Conway, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] @@ -247,7 +248,7 @@ impl From for EventData { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct PlutusRedeemerRecord { pub purpose: String, - pub ex_units_mem: u32, + pub ex_units_mem: u64, pub ex_units_steps: u64, pub input_idx: u32, pub plutus_data: JsonValue, diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs index b378753a..c52bad93 100644 --- a/src/sinks/gcp_pubsub/run.rs +++ b/src/sinks/gcp_pubsub/run.rs @@ -42,6 +42,7 @@ async fn send_pubsub_msg( Ok(()) } +#[allow(clippy::too_many_arguments)] pub fn writer_loop( input: StageReceiver, topic_name: &str, diff --git a/src/sources/common.rs b/src/sources/common.rs index 4687561e..7731a555 100644 --- a/src/sources/common.rs +++ b/src/sources/common.rs @@ -1,13 +1,9 @@ use core::fmt; use std::{ops::Deref, str::FromStr, time::Duration}; -use pallas::{ - ledger::traverse::{probe, Era}, - network::{ - miniprotocols::{chainsync, Point, MAINNET_MAGIC, TESTNET_MAGIC}, - multiplexer::{bearers::Bearer, StdChannel, StdPlexer}, - }, -}; +use pallas_miniprotocols::{chainsync, Point, MAINNET_MAGIC, TESTNET_MAGIC}; +use pallas_multiplexer::{bearers::Bearer, StdChannel, StdPlexer}; +use pallas_traverse::{probe, Era}; use serde::{de::Visitor, Deserializer}; use serde::{Deserialize, Serialize}; @@ -72,9 +68,9 @@ impl FromStr for PointArg { } } -impl ToString for PointArg { - fn to_string(&self) -> String { - format!("{},{}", self.0, self.1) +impl std::fmt::Display for PointArg { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{},{}", self.0, self.1) } } @@ -262,77 +258,91 @@ pub fn should_finalize( false } -pub(crate) fn intersect_starting_point( - client: &mut chainsync::Client, - intersect_arg: &Option, - since_arg: &Option, - utils: &Utils, -) -> Result, Error> -where - chainsync::Message: pallas::codec::Fragment, -{ - let cursor = utils.get_cursor_if_any(); - - match cursor { - Some(cursor) => { - log::info!("found persisted cursor, will use as starting point"); - let desired = cursor.try_into()?; - let (point, _) = client.find_intersect(vec![desired])?; - - Ok(point) - } - None => match intersect_arg { - Some(IntersectArg::Fallbacks(x)) => { - log::info!("found 'fallbacks' intersect argument, will use as starting point"); - let options: Result, _> = x.iter().map(|x| x.clone().try_into()).collect(); - - let (point, _) = client.find_intersect(options?)?; - - Ok(point) - } - Some(IntersectArg::Origin) => { - log::info!("found 'origin' intersect argument, will use as starting point"); - - let point = client.intersect_origin()?; - - Ok(Some(point)) - } - Some(IntersectArg::Point(x)) => { - log::info!("found 'point' intersect argument, will use as starting point"); - let options = vec![x.clone().try_into()?]; - - let (point, _) = client.find_intersect(options)?; - - Ok(point) - } - Some(IntersectArg::Tip) => { - log::info!("found 'tip' intersect argument, will use as starting point"); - - let point = client.intersect_tip()?; - - Ok(Some(point)) - } - None => match since_arg { - Some(x) => { - log::info!("explicit 'since' argument, will use as starting point"); - log::warn!("`since` value is deprecated, please use `intersect`"); - let options = vec![x.clone().try_into()?]; - - let (point, _) = client.find_intersect(options)?; +macro_rules! intersect_starting_point { + ($fn:ident, $client:ty) => { + pub(crate) fn $fn( + client: &mut $client, + intersect_arg: &Option, + since_arg: &Option, + utils: &Utils, + ) -> Result, Error> { + let cursor = utils.get_cursor_if_any(); + + match cursor { + Some(cursor) => { + log::info!("found persisted cursor, will use as starting point"); + let desired = cursor.try_into()?; + let (point, _) = client.find_intersect(vec![desired])?; Ok(point) } - None => { - log::info!("no starting point specified, will use tip of chain"); + None => match intersect_arg { + Some(IntersectArg::Fallbacks(x)) => { + log::info!( + "found 'fallbacks' intersect argument, will use as starting point" + ); + let options: Result, _> = + x.iter().map(|x| x.clone().try_into()).collect(); + + let (point, _) = client.find_intersect(options?)?; + + Ok(point) + } + Some(IntersectArg::Origin) => { + log::info!("found 'origin' intersect argument, will use as starting point"); + + let point = client.intersect_origin()?; + + Ok(Some(point)) + } + Some(IntersectArg::Point(x)) => { + log::info!("found 'point' intersect argument, will use as starting point"); + let options = vec![x.clone().try_into()?]; + + let (point, _) = client.find_intersect(options)?; + + Ok(point) + } + Some(IntersectArg::Tip) => { + log::info!("found 'tip' intersect argument, will use as starting point"); + + let point = client.intersect_tip()?; + + Ok(Some(point)) + } + None => match since_arg { + Some(x) => { + log::info!("explicit 'since' argument, will use as starting point"); + log::warn!("`since` value is deprecated, please use `intersect`"); + let options = vec![x.clone().try_into()?]; + + let (point, _) = client.find_intersect(options)?; + + Ok(point) + } + None => { + log::info!("no starting point specified, will use tip of chain"); + + let point = client.intersect_tip()?; + + Ok(Some(point)) + } + }, + }, + } + } + }; +} - let point = client.intersect_tip()?; +intersect_starting_point!( + intersect_starting_point_n2n, + chainsync::N2NClient +); - Ok(Some(point)) - } - }, - }, - } -} +intersect_starting_point!( + intersect_starting_point_n2c, + chainsync::N2CClient +); pub fn unknown_block_to_events(writer: &EventWriter, body: &Vec) -> Result<(), Error> { match probe::block_era(body) { @@ -352,6 +362,11 @@ pub fn unknown_block_to_events(writer: &EventWriter, body: &Vec) -> Result<( .crawl_from_babbage_cbor(body) .ok_or_warn("error crawling babbage block for events"); } + Era::Conway => { + writer + .crawl_from_conway_cbor(body) + .ok_or_warn("error crawling conway block for events"); + } x => { return Err(format!("This version of Oura can't handle era: {x}").into()); } diff --git a/src/sources/n2c/run.rs b/src/sources/n2c/run.rs index a6aef999..2678813f 100644 --- a/src/sources/n2c/run.rs +++ b/src/sources/n2c/run.rs @@ -1,18 +1,14 @@ use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc, time::Duration}; -use pallas::{ - ledger::traverse::MultiEraBlock, - network::{ - miniprotocols::{chainsync, handshake, Point, MAINNET_MAGIC}, - multiplexer::StdChannel, - }, -}; +use pallas_miniprotocols::{chainsync, handshake, Point, MAINNET_MAGIC}; +use pallas_multiplexer::StdChannel; +use pallas_traverse::MultiEraBlock; use crate::{ mapper::EventWriter, pipelining::StageSender, sources::{ - intersect_starting_point, setup_multiplexer, should_finalize, unknown_block_to_events, + intersect_starting_point_n2c, setup_multiplexer, should_finalize, unknown_block_to_events, FinalizeConfig, }, utils::{retry, Utils}, @@ -217,7 +213,7 @@ fn do_chainsync_attempt( let mut client = chainsync::N2CClient::new(cs_channel); - let intersection = intersect_starting_point( + let intersection = intersect_starting_point_n2c( &mut client, &config.intersect, #[allow(deprecated)] diff --git a/src/sources/n2n/run.rs b/src/sources/n2n/run.rs index f2b83267..75936ecb 100644 --- a/src/sources/n2n/run.rs +++ b/src/sources/n2n/run.rs @@ -1,9 +1,7 @@ use std::{fmt::Debug, ops::Deref, sync::Arc, time::Duration}; -use pallas::network::{ - miniprotocols::{blockfetch, chainsync, handshake, Point, MAINNET_MAGIC}, - multiplexer::StdChannel, -}; +use pallas_miniprotocols::{blockfetch, chainsync, handshake, Point, MAINNET_MAGIC}; +use pallas_multiplexer::StdChannel; use std::sync::mpsc::{Receiver, SyncSender}; @@ -11,7 +9,7 @@ use crate::{ mapper::EventWriter, pipelining::StageSender, sources::{ - intersect_starting_point, setup_multiplexer, should_finalize, unknown_block_to_events, + intersect_starting_point_n2n, setup_multiplexer, should_finalize, unknown_block_to_events, FinalizeConfig, }, utils::{retry, Utils}, @@ -56,7 +54,7 @@ impl ChainObserver { ) -> Result { // parse the header and extract the point of the chain - let header = pallas::ledger::traverse::MultiEraHeader::decode( + let header = pallas_traverse::MultiEraHeader::decode( content.variant, content.byron_prefix.map(|x| x.0), &content.cbor, @@ -224,7 +222,7 @@ fn do_chainsync_attempt( let mut cs_client = chainsync::N2NClient::new(cs_channel); - let intersection = intersect_starting_point( + let intersection = intersect_starting_point_n2n( &mut cs_client, &config.intersect, #[allow(deprecated)] diff --git a/src/utils/mod.rs b/src/utils/mod.rs index b7c8375b..d142c883 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -7,7 +7,7 @@ use std::sync::Arc; -use pallas::network::miniprotocols::{Point, MAINNET_MAGIC, TESTNET_MAGIC}; +use pallas_miniprotocols::{Point, MAINNET_MAGIC, TESTNET_MAGIC}; // TODO: move these values to Pallas pub const PREPROD_MAGIC: u64 = 1; diff --git a/src/utils/time.rs b/src/utils/time.rs index da2bef4d..77db083a 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -9,6 +9,7 @@ pub(crate) trait TimeProvider { /// Maps between slots and wallclock fn slot_to_wallclock(&self, slot: u64) -> u64; fn absolute_slot_to_relative(&self, slot: u64) -> (u64, u64); + fn byron_epoch_slot_to_absolute(&self, epoch: u64, slot: u64) -> u64; } /// A naive, standalone implementation of a time provider @@ -66,6 +67,16 @@ fn compute_era_epoch(era_slot: u64, era_slot_length: u64, era_epoch_length: u64) (epoch, reminder) } +#[inline] +fn relative_slot_to_absolute( + epoch: u64, + sub_epoch_slot: u64, + epoch_length: u64, + slot_length: u64, +) -> u64 { + ((epoch * epoch_length) / slot_length) + sub_epoch_slot +} + impl TimeProvider for NaiveProvider { fn slot_to_wallclock(&self, slot: u64) -> u64 { let NaiveProvider { config, .. } = self; @@ -111,6 +122,15 @@ impl TimeProvider for NaiveProvider { (shelley_start_epoch + era_epoch, reminder) } } + + fn byron_epoch_slot_to_absolute(&self, epoch: u64, slot: u64) -> u64 { + relative_slot_to_absolute( + epoch, + slot, + self.config.byron_epoch_length as u64, + self.config.byron_slot_length as u64, + ) + } } #[cfg(test)] From e408e06299901be34b4443dc6ff2c1375fd4b3af Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 4 Aug 2024 09:35:28 -0300 Subject: [PATCH 5/8] Release v1.9.0 --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 27ee56c9..14b4af02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2232,7 +2232,7 @@ checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" [[package]] name = "oura" -version = "1.8.6" +version = "1.9.0" dependencies = [ "aws-config", "aws-sdk-lambda", diff --git a/Cargo.toml b/Cargo.toml index ee6b8858..4e4cad33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "oura" description = "The tail of Cardano" -version = "1.8.6" +version = "1.9.0" edition = "2021" repository = "https://github.com/txpipe/oura" homepage = "https://github.com/txpipe/oura" From f11a0c740f19611e22de657fb5b49f43c5754630 Mon Sep 17 00:00:00 2001 From: Paulo Cesar <461084+pocesar@users.noreply.github.com> Date: Mon, 5 Aug 2024 18:07:44 -0300 Subject: [PATCH 6/8] fix: running pubsub on platform --- Cargo.lock | 261 ++++++++++++++++++++++++---------- Cargo.toml | 2 +- src/sinks/gcp_pubsub/run.rs | 5 - src/sinks/gcp_pubsub/setup.rs | 9 -- 4 files changed, 184 insertions(+), 93 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f1d25c34..4b3ac4b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -114,9 +114,9 @@ checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" [[package]] name = "asn1-rs" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ad1373757efa0f70ec53939aabc7152e1591cb485208052993070ac8d2429d" +checksum = "5493c3bedbacf7fd7382c6346bbd66687d12bbaad3a89a2d2c303ee6cf20b048" dependencies = [ "asn1-rs-derive", "asn1-rs-impl", @@ -130,9 +130,9 @@ dependencies = [ [[package]] name = "asn1-rs-derive" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7378575ff571966e99a744addeff0bff98b8ada0dedf1956d59e634db95eaac1" +checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" dependencies = [ "proc-macro2", "quote", @@ -428,9 +428,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.20.0" +version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e89b6941c2d1a7045538884d6e760ccfffdf8e1ffc2613d8efa74305e1f3752" +checksum = "0f0e249228c6ad2d240c2dc94b714d711629d52bad946075d8e9b2f5391f0703" dependencies = [ "bindgen", "cc", @@ -813,7 +813,7 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" name = "base64" version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" [[package]] name = "base64" @@ -821,6 +821,12 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64ct" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" + [[package]] name = "bech32" version = "0.9.1" @@ -907,9 +913,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.1" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "bytes-utils" @@ -1322,9 +1328,9 @@ checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" [[package]] name = "dunce" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" [[package]] name = "dyn-clone" @@ -1465,9 +1471,9 @@ checksum = "b3ea1ec5f8307826a5b71094dd91fc04d4ae75d5709b20ad351c7fb4815c86ec" [[package]] name = "flate2" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae" +checksum = "7f211bbe8e69bbd0cfdea405084f128ae8b4aaa6b0b522fc8f2b009084797920" dependencies = [ "crc32fast", "miniz_oxide", @@ -1626,7 +1632,7 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "js-sys", "libc", "wasi", @@ -1647,9 +1653,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "google-cloud-auth" -version = "0.13.2" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bf7cb7864f08a92e77c26bb230d021ea57691788fb5dd51793f96965d19e7f9" +checksum = "1112c453c2e155b3e683204ffff52bcc6d6495d04b68d9e90cd24161270c5058" dependencies = [ "async-trait", "base64 0.21.7", @@ -1669,12 +1675,12 @@ dependencies = [ [[package]] name = "google-cloud-gax" -version = "0.17.0" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cb60314136e37de9e2a05ddb427b9c5a39c3d188de2e2f026c6af74425eef44" +checksum = "9c3eaaad103912825594d674a4b1e556ccbb05a13a6cac17dcfd871997fb760a" dependencies = [ "google-cloud-token", - "http", + "http 1.1.0", "thiserror", "tokio", "tokio-retry", @@ -1685,9 +1691,9 @@ dependencies = [ [[package]] name = "google-cloud-googleapis" -version = "0.12.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db8a478015d079296167e3f08e096dc99cffc2cb50fa203dd38aaa9dd37f8354" +checksum = "0ae8ab26ef7c7c3f7dfb9cc3982293d031d8e78c85d00ddfb704b5c35aeff7c8" dependencies = [ "prost", "prost-types", @@ -1696,9 +1702,9 @@ dependencies = [ [[package]] name = "google-cloud-metadata" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc279bfb50487d7bcd900e8688406475fc750fe474a835b2ab9ade9eb1fc90e2" +checksum = "04f945a208886a13d07636f38fb978da371d0abc3e34bad338124b9f8c135a8f" dependencies = [ "reqwest 0.12.5", "thiserror", @@ -1707,9 +1713,9 @@ dependencies = [ [[package]] name = "google-cloud-pubsub" -version = "0.23.0" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1da196da473976944d408a91213bafe078e7223e10694d3f8ed36b6e210fa130" +checksum = "55ef73601dcec5ea144e59969e921d35d66000211603fee8023b7947af09248f" dependencies = [ "async-channel 1.9.0", "async-stream", @@ -1745,7 +1751,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.2.6", + "indexmap 2.3.0", "slab", "tokio", "tokio-util", @@ -1764,7 +1770,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.1.0", - "indexmap 2.2.6", + "indexmap 2.3.0", "slab", "tokio", "tokio-util", @@ -1973,7 +1979,24 @@ dependencies = [ "rustls-native-certs 0.5.0", "tokio", "tokio-rustls 0.22.0", - "webpki", + "webpki 0.21.4", +] + +[[package]] +name = "hyper-rustls" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.4.1", + "hyper-util", + "rustls 0.23.12", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", ] [[package]] @@ -2089,9 +2112,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" dependencies = [ "equivalent", "hashbrown 0.14.5", @@ -2153,6 +2176,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -2192,10 +2224,10 @@ version = "9.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9ae10193d25051e74945f1ea2d0b42e03cc3b890f7e4cc5faa44997d808193f" dependencies = [ - "base64 0.21.5", + "base64 0.21.7", "js-sys", "pem", - "ring 0.17.7", + "ring 0.17.8", "serde", "serde_json", "simple_asn1", @@ -2583,7 +2615,7 @@ checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" [[package]] name = "oura" -version = "1.9.0" +version = "1.10.0" dependencies = [ "aws-config", "aws-sdk-lambda", @@ -2824,6 +2856,15 @@ dependencies = [ "serde", ] +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -2960,9 +3001,12 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "ppv-lite86" -version = "0.2.17" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] [[package]] name = "prettyplease" @@ -3035,9 +3079,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" dependencies = [ "bytes", "prost-derive", @@ -3045,22 +3089,22 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.72", ] [[package]] name = "prost-types" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" dependencies = [ "prost", ] @@ -3162,9 +3206,9 @@ checksum = "f4ed1d73fb92eba9b841ba2aef69533a060ccc0d3ec71c90aeda5996d4afb7a9" [[package]] name = "regex" -version = "1.10.5" +version = "1.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" +checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" dependencies = [ "aho-corasick", "memchr", @@ -3259,7 +3303,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls-pemfile 2.1.2", + "rustls-pemfile 2.1.3", "serde", "serde_json", "serde_urlencoded", @@ -3380,18 +3424,21 @@ dependencies = [ "base64 0.13.1", "log", "ring 0.16.20", - "sct 0.6.1", - "webpki", + "sct", + "webpki 0.21.4", ] [[package]] name = "rustls" -version = "0.21.10" +version = "0.23.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" dependencies = [ + "aws-lc-rs", "log", - "ring 0.17.7", + "once_cell", + "ring 0.17.8", + "rustls-pki-types", "rustls-webpki", "subtle", "zeroize", @@ -3429,7 +3476,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" dependencies = [ "openssl-probe", - "rustls-pemfile 2.1.2", + "rustls-pemfile 2.1.3", "rustls-pki-types", "schannel", "security-framework", @@ -3446,9 +3493,9 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "2.1.2" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" dependencies = [ "base64 0.22.1", "rustls-pki-types", @@ -3595,9 +3642,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.121" +version = "1.0.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ab380d7d9f22ef3f21ad3e6c1ebe8e4fc7a2000ccba2e4d71fc96f15b2cb609" +checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da" dependencies = [ "itoa", "memchr", @@ -3893,17 +3940,18 @@ dependencies = [ "cfg-if", "p12-keystore", "rustls-connector", - "rustls-pemfile 2.1.2", + "rustls-pemfile 2.1.3", ] [[package]] name = "tempfile" -version = "3.10.1" +version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" +checksum = "b8fcd239983515c23a32fb82099f97d0b11b8c72f654ed659363a95c3dad7a53" dependencies = [ "cfg-if", "fastrand 2.1.0", + "once_cell", "rustix 0.38.34", "windows-sys 0.52.0", ] @@ -4059,16 +4107,17 @@ checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ "rustls 0.19.1", "tokio", - "webpki", + "webpki 0.21.4", ] [[package]] name = "tokio-rustls" -version = "0.24.1" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.21.10", + "rustls 0.23.12", + "rustls-pki-types", "tokio", ] @@ -4107,29 +4156,30 @@ dependencies = [ [[package]] name = "tonic" -version = "0.10.2" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401" dependencies = [ "async-stream", "async-trait", "axum", - "base64 0.21.5", + "base64 0.22.1", "bytes", "flate2", - "h2", - "http", - "http-body", - "hyper", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", "hyper-timeout", "hyper-util", "percent-encoding", "pin-project", "prost", - "rustls 0.21.10", - "rustls-pemfile", + "rustls-pemfile 2.1.3", + "socket2 0.5.7", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls 0.26.0", "tokio-stream", "tower", "tower-layer", @@ -4423,11 +4473,36 @@ dependencies = [ "untrusted 0.7.1", ] +[[package]] +name = "webpki" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" +dependencies = [ + "ring 0.17.8", + "untrusted 0.9.0", +] + [[package]] name = "webpki-roots" -version = "0.25.4" +version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +checksum = "bd7c23921eeb1713a4e851530e9b9756e4fb0e89978582942612524cf09f01cd" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix 0.38.34", +] [[package]] name = "winapi" @@ -4447,11 +4522,11 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -4487,6 +4562,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -4671,6 +4755,27 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", +] + [[package]] name = "zeroize" version = "1.8.1" diff --git a/Cargo.toml b/Cargo.toml index 05fe8106..949196e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "oura" description = "The tail of Cardano" -version = "1.9.2" +version = "1.10.0" edition = "2021" repository = "https://github.com/txpipe/oura" homepage = "https://github.com/txpipe/oura" diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs index 310379f9..f53af0a6 100644 --- a/src/sinks/gcp_pubsub/run.rs +++ b/src/sinks/gcp_pubsub/run.rs @@ -1,6 +1,5 @@ use std::{collections::HashMap, sync::Arc}; -use google_cloud_gax::conn::Environment; use google_cloud_googleapis::pubsub::v1::PubsubMessage; use google_cloud_pubsub::{ client::{Client, ClientConfig}, @@ -42,7 +41,6 @@ async fn send_pubsub_msg( Ok(()) } -#[allow(clippy::too_many_arguments)] pub fn writer_loop( input: StageReceiver, topic_name: &str, @@ -50,9 +48,6 @@ pub fn writer_loop( retry_policy: &retry::Policy, ordering_key: &str, attributes: &GenericKV, - emulator: bool, - emulator_endpoint: &Option, - emulator_project_id: &Option, utils: Arc, ) -> Result<(), crate::Error> { let rt = tokio::runtime::Builder::new_current_thread() diff --git a/src/sinks/gcp_pubsub/setup.rs b/src/sinks/gcp_pubsub/setup.rs index a1cdd064..033bcc19 100644 --- a/src/sinks/gcp_pubsub/setup.rs +++ b/src/sinks/gcp_pubsub/setup.rs @@ -27,12 +27,6 @@ pub struct Config { impl SinkProvider for WithUtils { fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { let topic_name = self.inner.topic.to_owned(); - let mut use_emulator = self.inner.emulator.unwrap_or(false); - let emulator_endpoint = self.inner.emulator_endpoint.to_owned(); - let emulator_project_id = self.inner.emulator_project_id.to_owned(); - if use_emulator && (emulator_endpoint.is_none() || emulator_project_id.is_none()) { - use_emulator = false; - } let error_policy = self .inner @@ -56,9 +50,6 @@ impl SinkProvider for WithUtils { &retry_policy, &ordering_key, &attributes, - use_emulator, - &emulator_endpoint, - &emulator_project_id, utils, ) .expect("writer loop failed"); From 5a289d615f7540dbbbab7908f4292e3f41499c48 Mon Sep 17 00:00:00 2001 From: Paulo Cesar <461084+pocesar@users.noreply.github.com> Date: Mon, 5 Aug 2024 18:10:29 -0300 Subject: [PATCH 7/8] chore: formatting --- src/sinks/gcp_pubsub/run.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs index f53af0a6..6d7364c3 100644 --- a/src/sinks/gcp_pubsub/run.rs +++ b/src/sinks/gcp_pubsub/run.rs @@ -56,7 +56,6 @@ pub fn writer_loop( .build()?; let publisher: Publisher = rt.block_on(async { - let client = Client::new(ClientConfig::default().with_auth().await?).await?; let topic = client.topic(topic_name); Result::<_, crate::Error>::Ok(topic.new_publisher(None)) From cb7b1771c44df6950a6865060fbdc27ad7f5cd82 Mon Sep 17 00:00:00 2001 From: Paulo Cesar <461084+pocesar@users.noreply.github.com> Date: Mon, 5 Aug 2024 18:13:07 -0300 Subject: [PATCH 8/8] chore: cargo fmt --- src/sinks/elastic/run.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/elastic/run.rs b/src/sinks/elastic/run.rs index 0ba6750d..405d2ad8 100644 --- a/src/sinks/elastic/run.rs +++ b/src/sinks/elastic/run.rs @@ -1,4 +1,4 @@ -use elasticsearch::{params::OpType, Elasticsearch, IndexParts, http::StatusCode}; +use elasticsearch::{http::StatusCode, params::OpType, Elasticsearch, IndexParts}; use serde::Serialize; use serde_json::json; use std::sync::Arc;