Skip to content

Commit

Permalink
local nostrdb subscriptions working
Browse files Browse the repository at this point in the history
Signed-off-by: William Casarin <[email protected]>
  • Loading branch information
jb55 committed Feb 7, 2024
1 parent 499f10c commit 74ce870
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 156 deletions.
16 changes: 8 additions & 8 deletions enostr/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,23 @@ use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
pub struct Filter {
#[serde(skip_serializing_if = "Option::is_none")]
ids: Option<Vec<EventId>>,
pub ids: Option<Vec<EventId>>,
#[serde(skip_serializing_if = "Option::is_none")]
authors: Option<Vec<Pubkey>>,
pub authors: Option<Vec<Pubkey>>,
#[serde(skip_serializing_if = "Option::is_none")]
kinds: Option<Vec<u64>>,
pub kinds: Option<Vec<u64>>,
#[serde(rename = "#e")]
#[serde(skip_serializing_if = "Option::is_none")]
events: Option<Vec<EventId>>,
pub events: Option<Vec<EventId>>,
#[serde(rename = "#p")]
#[serde(skip_serializing_if = "Option::is_none")]
pubkeys: Option<Vec<Pubkey>>,
pub pubkeys: Option<Vec<Pubkey>>,
#[serde(skip_serializing_if = "Option::is_none")]
since: Option<u64>, // unix timestamp seconds
pub since: Option<u64>, // unix timestamp seconds
#[serde(skip_serializing_if = "Option::is_none")]
until: Option<u64>, // unix timestamp seconds
pub until: Option<u64>, // unix timestamp seconds
#[serde(skip_serializing_if = "Option::is_none")]
limit: Option<u16>,
pub limit: Option<u16>,
}

impl Filter {
Expand Down
158 changes: 74 additions & 84 deletions enostr/src/relay/message.rs
Original file line number Diff line number Diff line change
@@ -1,143 +1,133 @@
use crate::Error;
use crate::Event;

Check warning on line 2 in enostr/src/relay/message.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `crate::Event`

Check warning on line 2 in enostr/src/relay/message.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `crate::Event`

Check warning on line 2 in enostr/src/relay/message.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `crate::Event`
use crate::Result;
use log::debug;
use serde_json::Value;

Check warning on line 4 in enostr/src/relay/message.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `serde_json::Value`

Check warning on line 4 in enostr/src/relay/message.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `serde_json::Value`

Check warning on line 4 in enostr/src/relay/message.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `serde_json::Value`
use tracing::{error, info};

Check warning on line 5 in enostr/src/relay/message.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `info`

Check warning on line 5 in enostr/src/relay/message.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `info`

Check warning on line 5 in enostr/src/relay/message.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `info`

use ewebsock::{WsEvent, WsMessage};

#[derive(Debug, Eq, PartialEq)]
pub struct CommandResult {
event_id: String,
pub struct CommandResult<'a> {
event_id: &'a str,
status: bool,
message: String,
message: &'a str,
}

#[derive(Debug, Eq, PartialEq)]
pub enum RelayMessage {
OK(CommandResult),
Eose(String),
Event(String, Event),
Notice(String),
pub enum RelayMessage<'a> {
OK(CommandResult<'a>),
Eose(&'a str),
Event(&'a str, &'a str),
Notice(&'a str),
}

#[derive(Debug)]
pub enum RelayEvent {
pub enum RelayEvent<'a> {
Opened,
Closed,
Other(WsMessage),
Message(RelayMessage),
Other(&'a WsMessage),
Error(Error),
Message(RelayMessage<'a>),
}

impl TryFrom<WsEvent> for RelayEvent {
type Error = Error;

fn try_from(message: WsEvent) -> Result<Self> {
match message {
WsEvent::Opened => Ok(RelayEvent::Opened),
WsEvent::Closed => Ok(RelayEvent::Closed),
WsEvent::Message(ws_msg) => ws_msg.try_into(),
WsEvent::Error(s) => Err(s.into()),
impl<'a> From<&'a WsEvent> for RelayEvent<'a> {
fn from(event: &'a WsEvent) -> RelayEvent<'a> {
match event {
WsEvent::Opened => RelayEvent::Opened,
WsEvent::Closed => RelayEvent::Closed,
WsEvent::Message(ref ws_msg) => ws_msg.into(),
WsEvent::Error(s) => RelayEvent::Error(Error::Generic(s.to_owned())),
}
}
}

impl TryFrom<WsMessage> for RelayEvent {
type Error = Error;

fn try_from(wsmsg: WsMessage) -> Result<Self> {
impl<'a> From<&'a WsMessage> for RelayEvent<'a> {
fn from(wsmsg: &'a WsMessage) -> RelayEvent<'a> {
match wsmsg {
WsMessage::Text(s) => RelayMessage::from_json(&s).map(RelayEvent::Message),
wsmsg => Ok(RelayEvent::Other(wsmsg)),
WsMessage::Text(ref s) => match RelayMessage::from_json(&s).map(RelayEvent::Message) {
Ok(msg) => msg,
Err(err) => RelayEvent::Error(err),
},
wsmsg => {
error!("got {:?} instead of WsMessage::Text", wsmsg);
RelayEvent::Error(Error::DecodeFailed)
}
}
}
}

impl RelayMessage {
pub fn eose(subid: String) -> Self {
impl<'a> RelayMessage<'a> {
pub fn eose(subid: &'a str) -> Self {
RelayMessage::Eose(subid)
}

pub fn notice(msg: String) -> Self {
pub fn notice(msg: &'a str) -> Self {
RelayMessage::Notice(msg)
}

pub fn ok(event_id: String, status: bool, message: String) -> Self {
pub fn ok(event_id: &'a str, status: bool, message: &'a str) -> Self {
RelayMessage::OK(CommandResult {
event_id: event_id,
status,
message: message,
})
}

pub fn event(ev: Event, sub_id: String) -> Self {
pub fn event(ev: &'a str, sub_id: &'a str) -> Self {
RelayMessage::Event(sub_id, ev)
}

// I was lazy and took this from the nostr crate. thx yuki!
pub fn from_json(msg: &str) -> Result<Self> {
pub fn from_json(msg: &'a str) -> Result<RelayMessage<'a>> {
if msg.is_empty() {
return Err(Error::Empty);
}

let v: Vec<Value> = serde_json::from_str(msg).map_err(|_| Error::DecodeFailed)?;

// Notice
// Relay response format: ["NOTICE", <message>]
if v[0] == "NOTICE" {
if v.len() != 2 {
return Err(Error::DecodeFailed);
}
let v_notice: String =
serde_json::from_value(v[1].clone()).map_err(|_| Error::DecodeFailed)?;
return Ok(Self::notice(v_notice));
if &msg[0..=9] == "[\"NOTICE\"," {
// TODO: there could be more than one space, whatever
let start = if msg.bytes().nth(10) == Some(b' ') {
12
} else {
11
};
let end = msg.len() - 2;
return Ok(Self::notice(&msg[start..end]));
}

// Event
// Relay response format: ["EVENT", <subscription id>, <event JSON>]
if v[0] == "EVENT" {
if v.len() != 3 {
return Err(Error::DecodeFailed);
}

let event = Event::from_json(&v[2].to_string()).map_err(|_| Error::DecodeFailed)?;

let subscription_id: String =
serde_json::from_value(v[1].clone()).map_err(|_| Error::DecodeFailed)?;

return Ok(Self::event(event, subscription_id));
if &msg[0..=7] == "[\"EVENT\"" {
return Ok(Self::event(msg, "fixme"));
}

// EOSE (NIP-15)
// Relay response format: ["EOSE", <subscription_id>]
if v[0] == "EOSE" {
if v.len() != 2 {
return Err(Error::DecodeFailed);
}

let subscription_id: String =
serde_json::from_value(v[1].clone()).map_err(|_| Error::DecodeFailed)?;

return Ok(Self::eose(subscription_id));
if &msg[0..=7] == "[\"EOSE\"," {
let start = if msg.bytes().nth(8) == Some(b' ') {
10
} else {
9
};
let end = msg.len() - 2;
return Ok(Self::eose(&msg[start..end]));
}

// OK (NIP-20)
// Relay response format: ["OK", <event_id>, <true|false>, <message>]
if v[0] == "OK" {
if v.len() != 4 {
// Relay response format: ["OK",<event_id>, <true|false>, <message>]
if &msg[0..=4] == "[\"OK\"," {
// TODO: fix this
let event_id = &msg[7..71];
let booly = &msg[73..77];
let status: bool = if booly == "true" {
true
} else if booly == "false" {
false
} else {
return Err(Error::DecodeFailed);
}

let event_id: String =
serde_json::from_value(v[1].clone()).map_err(|_| Error::DecodeFailed)?;

let status: bool =
serde_json::from_value(v[2].clone()).map_err(|_| Error::DecodeFailed)?;

let message: String =
serde_json::from_value(v[3].clone()).map_err(|_| Error::DecodeFailed)?;
};

return Ok(Self::ok(event_id, status, message));
return Ok(Self::ok(event_id, status, "fixme"));
}

Err(Error::DecodeFailed)
Expand Down Expand Up @@ -209,9 +199,9 @@ mod tests {
#[test]
fn test_handle_invalid_event() {
//Mising Event field
let invalid_event_msg = r#"["EVENT", "random_string"]"#;
let invalid_event_msg = r#"["EVENT","random_string"]"#;
//Event JSON with incomplete content
let invalid_event_msg_content = r#"["EVENT", "random_string", {"id":"70b10f70c1318967eddf12527799411b1a9780ad9c43858f5e5fcd45486a13a5","pubkey":"379e863e8357163b5bce5d2688dc4f1dcc2d505222fb8d74db600f30535dfdfe"}]"#;
let invalid_event_msg_content = r#"["EVENT","random_string",{"id":"70b10f70c1318967eddf12527799411b1a9780ad9c43858f5e5fcd45486a13a5","pubkey":"379e863e8357163b5bce5d2688dc4f1dcc2d505222fb8d74db600f30535dfdfe"}]"#;

assert_eq!(
RelayMessage::from_json(invalid_event_msg).unwrap_err(),
Expand Down Expand Up @@ -246,14 +236,14 @@ mod tests {

// The subscription ID is not string
assert_eq!(
RelayMessage::from_json(r#"["EOSE", 404]"#).unwrap_err(),
RelayMessage::from_json(r#"["EOSE",404]"#).unwrap_err(),
Error::DecodeFailed
);
}

#[test]
fn test_handle_valid_ok() -> Result<()> {
let valid_ok_msg = r#"["OK", "b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30", true, "pow: difficulty 25>=24"]"#;
let valid_ok_msg = r#"["OK","b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30",true,"pow: difficulty 25>=24"]"#;
let handled_valid_ok_msg = RelayMessage::ok(
"b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30".to_string(),
true,
Expand All @@ -269,21 +259,21 @@ mod tests {
// Missing params
assert_eq!(
RelayMessage::from_json(
r#"["OK", "b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30"]"#
r#"["OK","b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30"]"#
)
.unwrap_err(),
Error::DecodeFailed
);

// Invalid status
assert_eq!(
RelayMessage::from_json(r#"["OK", "b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30", hello, ""]"#).unwrap_err(),
RelayMessage::from_json(r#"["OK","b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30",hello,""]"#).unwrap_err(),
Error::DecodeFailed
);

// Invalid message
assert_eq!(
RelayMessage::from_json(r#"["OK", "b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30", hello, 404]"#).unwrap_err(),
RelayMessage::from_json(r#"["OK","b1a649ebe8b435ec71d3784793f3bbf4b93e64e17568a741aecd4c7ddeafce30",hello,404]"#).unwrap_err(),
Error::DecodeFailed
);
}
Expand Down
47 changes: 23 additions & 24 deletions enostr/src/relay/pool.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use crate::relay::message::RelayEvent;
use crate::relay::message::{RelayEvent, RelayMessage};

Check warning on line 1 in enostr/src/relay/pool.rs

View workflow job for this annotation

GitHub Actions / Check

unused imports: `RelayEvent`, `RelayMessage`

Check warning on line 1 in enostr/src/relay/pool.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused imports: `RelayEvent`, `RelayMessage`

Check warning on line 1 in enostr/src/relay/pool.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused imports: `RelayEvent`, `RelayMessage`
use crate::relay::{Relay, RelayStatus};
use crate::{ClientMessage, Result};

use std::time::{Duration, Instant};

#[cfg(not(target_arch = "wasm32"))]
use ewebsock::WsMessage;
use ewebsock::{WsEvent, WsMessage};

#[cfg(not(target_arch = "wasm32"))]
use tracing::debug;

use tracing::error;
use tracing::{debug, error, info};

Check warning on line 11 in enostr/src/relay/pool.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `info`

Check warning on line 11 in enostr/src/relay/pool.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `info`

Check warning on line 11 in enostr/src/relay/pool.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `info`

#[derive(Debug)]
pub struct PoolEvent<'a> {
pub relay: &'a str,
pub event: RelayEvent,
pub event: ewebsock::WsEvent,
}

pub struct PoolRelay {
Expand Down Expand Up @@ -145,37 +143,38 @@ impl RelayPool {
/// function searches each relay in the list in order, attempting to
/// receive a message from each. If a message is received, return it.
/// If no message is received from any relays, None is returned.
pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> {
pub fn try_recv<'a>(&'a mut self) -> Option<PoolEvent<'a>> {
for relay in &mut self.relays {
let relay = &mut relay.relay;
if let Some(msg) = relay.receiver.try_recv() {
match msg.try_into() {
Ok(event) => {
if let Some(event) = relay.receiver.try_recv() {
match &event {
WsEvent::Opened => {
relay.status = RelayStatus::Connected;

}
WsEvent::Closed => {
relay.status = RelayStatus::Disconnected;
}
WsEvent::Error(err) => {
error!("{:?}", err);
relay.status = RelayStatus::Disconnected;
}
WsEvent::Message(ev) => {
// let's just handle pongs here.
// We only need to do this natively.
#[cfg(not(target_arch = "wasm32"))]
match event {
RelayEvent::Other(WsMessage::Ping(ref bs)) => {
match &ev {
WsMessage::Ping(ref bs) => {
debug!("pong {}", &relay.url);
relay.sender.send(WsMessage::Pong(bs.to_owned()));
}
_ => {}
}

return Some(PoolEvent {
event,
relay: &relay.url,
});
}

Err(e) => {
relay.status = RelayStatus::Disconnected;
error!("try_recv {:?}", e);
continue;
}
}
return Some(PoolEvent {
event,
relay: &relay.url,
});
}
}

Expand Down
Loading

0 comments on commit 74ce870

Please sign in to comment.