Skip to content

Commit

Permalink
geyser: remove startup_status
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Nov 1, 2023
1 parent 7bd3596 commit 534e0ef
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 92 deletions.
17 changes: 15 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,30 @@ The minor version will be incremented upon a breaking change and the patch versi

### Fixes

### Features

### Breaking

## 2023-11-01

- @triton-one/yellowstone-grpc:0.3.0
- yellowstone-grpc-client-1.12.0+solana.1.17.1
- yellowstone-grpc-geyser-1.11.0+solana.1.17.1
- yellowstone-grpc-proto-1.11.0+solana.1.17.1
- yellowstone-grpc-tools-1.0.0-rc.6+solana.1.17.1

### Fixes

- geyser: trigger end of startup when parent slot 0 seen in `update_slot_status` notification because `notify_end_of_startup` is not triggered when cluster started from genesis ([#207](https://github.com/rpcpool/yellowstone-grpc/pull/207))
- tools: correctly handle SIGINT in kafka ([#219](https://github.com/rpcpool/yellowstone-grpc/pull/219))
- geyser: use Ordering::Relaxed instead of SeqCst ([#221](https://github.com/rpcpool/yellowstone-grpc/pull/221))
- proto: add optional field `ping` to `SubscribeRequest` ([#227](https://github.com/rpcpool/yellowstone-grpc/pull/227))
- geyser: remove startup_status (allow reload plugin) ([#229](https://github.com/rpcpool/yellowstone-grpc/pull/229))

### Features

- proto: add optional field `filter_by_commitment` to Slots filter ([#223](https://github.com/rpcpool/yellowstone-grpc/pull/223))

### Breaking

## 2023-10-19

- yellowstone-grpc-tools-1.0.0-rc.5+solana.1.17.1
Expand Down
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[workspace]
members = [
"examples/rust", # 1.10.0+solana.1.17.1
"yellowstone-grpc-client", # 1.11.1+solana.1.17.1
"yellowstone-grpc-geyser", # 1.10.0+solana.1.17.1
"yellowstone-grpc-proto", # 1.10.0+solana.1.17.1
"yellowstone-grpc-tools", # 1.0.0-rc.5+solana.1.17.1
"examples/rust", # 1.11.0+solana.1.17.1
"yellowstone-grpc-client", # 1.12.0+solana.1.17.1
"yellowstone-grpc-geyser", # 1.11.0+solana.1.17.1
"yellowstone-grpc-proto", # 1.11.0+solana.1.17.1
"yellowstone-grpc-tools", # 1.0.0-rc.6+solana.1.17.1
]

[profile.release]
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-client-simple"
version = "1.10.0+solana.1.17.1"
version = "1.11.0+solana.1.17.1"
authors = ["Triton One"]
edition = "2021"
publish = false
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-client-nodejs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@triton-one/yellowstone-grpc",
"version": "0.2.1",
"version": "0.3.0",
"license": "Apache-2.0",
"author": "Triton One",
"description": "Yellowstone gRPC Geyser Node.js Client",
Expand Down
4 changes: 2 additions & 2 deletions yellowstone-grpc-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-client"
version = "1.11.1+solana.1.17.1"
version = "1.12.0+solana.1.17.1"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Geyser Simple Client"
Expand All @@ -16,7 +16,7 @@ http = "0.2.8"
thiserror = "1.0"
tonic = { version = "0.10.2", features = ["gzip", "tls", "tls-roots"] }
tonic-health = "0.10.2"
yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto", version = "1.10.0+solana.1.17.1" }
yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto", version = "1.11.0+solana.1.17.1" }

[dev-dependencies]
tokio = { version = "1.32.0", features = ["macros"] }
2 changes: 1 addition & 1 deletion yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-geyser"
version = "1.10.0+solana.1.17.1"
version = "1.11.0+solana.1.17.1"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Geyser Plugin"
Expand Down
104 changes: 31 additions & 73 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,16 @@ use {
ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult,
SlotStatus,
},
std::{
sync::{
atomic::{AtomicU8, Ordering},
Arc,
},
time::Duration,
},
std::{sync::Arc, time::Duration},
tokio::{
runtime::Runtime,
sync::{mpsc, Notify},
},
};

const STARTUP_END_OF_RECEIVED: u8 = 1 << 0;
const STARTUP_PROCESSED_RECEIVED: u8 = 1 << 1;

#[derive(Debug)]
pub struct PluginInner {
runtime: Runtime,
startup_status: AtomicU8,
snapshot_channel: Option<crossbeam_channel::Sender<Option<Message>>>,
grpc_channel: mpsc::UnboundedSender<Message>,
grpc_shutdown: Arc<Notify>,
Expand All @@ -53,15 +43,8 @@ impl Plugin {
where
F: FnOnce(&PluginInner) -> PluginResult<()>,
{
// Full block reconstruction will fail before first processed slot received
let inner = self.inner.as_ref().expect("initialized");
if inner.startup_status.load(Ordering::Relaxed)
== STARTUP_END_OF_RECEIVED | STARTUP_PROCESSED_RECEIVED
{
f(inner)
} else {
Ok(())
}
f(inner)
}
}

Expand Down Expand Up @@ -97,7 +80,6 @@ impl GeyserPlugin for Plugin {

self.inner = Some(PluginInner {
runtime,
startup_status: AtomicU8::new(0),
snapshot_channel,
grpc_channel,
grpc_shutdown,
Expand All @@ -122,49 +104,43 @@ impl GeyserPlugin for Plugin {
slot: u64,
is_startup: bool,
) -> PluginResult<()> {
let account = match account {
ReplicaAccountInfoVersions::V0_0_1(_info) => {
unreachable!("ReplicaAccountInfoVersions::V0_0_1 is not supported")
}
ReplicaAccountInfoVersions::V0_0_2(_info) => {
unreachable!("ReplicaAccountInfoVersions::V0_0_2 is not supported")
}
ReplicaAccountInfoVersions::V0_0_3(info) => info,
};
let message = Message::Account((account, slot, is_startup).into());
self.with_inner(|inner| {
let account = match account {
ReplicaAccountInfoVersions::V0_0_1(_info) => {
unreachable!("ReplicaAccountInfoVersions::V0_0_1 is not supported")
}
ReplicaAccountInfoVersions::V0_0_2(_info) => {
unreachable!("ReplicaAccountInfoVersions::V0_0_2 is not supported")
}
ReplicaAccountInfoVersions::V0_0_3(info) => info,
};

if is_startup {
let inner = self.inner.as_ref().expect("initialized");
if let Some(channel) = &inner.snapshot_channel {
match channel.send(Some(message)) {
Ok(()) => MESSAGE_QUEUE_SIZE.inc(),
Err(_) => panic!("failed to send message to startup queue: channel closed"),
let message = Message::Account((account, slot, is_startup).into());
if is_startup {
if let Some(channel) = &inner.snapshot_channel {
match channel.send(Some(message)) {
Ok(()) => MESSAGE_QUEUE_SIZE.inc(),
Err(_) => panic!("failed to send message to startup queue: channel closed"),
}
}
} else {
inner.send_message(message);
}

Ok(())
} else {
self.with_inner(|inner| {
inner.send_message(message);
Ok(())
})
}
})
}

fn notify_end_of_startup(&self) -> PluginResult<()> {
let inner = self.inner.as_ref().expect("initialized");

inner
.startup_status
.fetch_or(STARTUP_END_OF_RECEIVED, Ordering::Relaxed);

if let Some(channel) = &inner.snapshot_channel {
match channel.send(None) {
Ok(()) => MESSAGE_QUEUE_SIZE.inc(),
Err(_) => panic!("failed to send message to startup queue: channel closed"),
self.with_inner(|inner| {
if let Some(channel) = &inner.snapshot_channel {
match channel.send(None) {
Ok(()) => MESSAGE_QUEUE_SIZE.inc(),
Err(_) => panic!("failed to send message to startup queue: channel closed"),
}
}
}

Ok(())
Ok(())
})
}

fn update_slot_status(
Expand All @@ -173,28 +149,10 @@ impl GeyserPlugin for Plugin {
parent: Option<u64>,
status: SlotStatus,
) -> PluginResult<()> {
let inner = self.inner.as_ref().expect("initialized");
// if plugin run on genesis of network, notify_end_of_startup will not be triggered, and slot 0 will not
// come through this function.
if parent == Some(0) {
inner
.startup_status
.fetch_or(STARTUP_END_OF_RECEIVED, Ordering::Relaxed);
}
if status == SlotStatus::Processed {
let _ = inner.startup_status.compare_exchange(
STARTUP_END_OF_RECEIVED,
STARTUP_END_OF_RECEIVED | STARTUP_PROCESSED_RECEIVED,
Ordering::Relaxed,
Ordering::Relaxed,
);
}

self.with_inner(|inner| {
let message = Message::Slot((slot, parent, status).into());
inner.send_message(message);
prom::update_slot_status(status, slot);

Ok(())
})
}
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-proto/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-proto"
version = "1.10.0+solana.1.17.1"
version = "1.11.0+solana.1.17.1"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Geyser Protobuf Definitions"
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-tools/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-tools"
version = "1.0.0-rc.5+solana.1.17.1"
version = "1.0.0-rc.6+solana.1.17.1"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Tools"
Expand Down

0 comments on commit 534e0ef

Please sign in to comment.