From 65c88ef0c88d5595bde64f09bc24318d70375277 Mon Sep 17 00:00:00 2001 From: Geoffrey Beausire Date: Mon, 9 May 2022 17:36:43 +0200 Subject: [PATCH] Move peer discovery from "services" to "peers-clear" This is necessary to implement TLS as service only report the clear service port. It will be easy to expand the "services_name" to return peers-tls --- Cargo.toml | 1 + src/cluster/mod.rs | 1 + src/cluster/node.rs | 36 ++---------- src/cluster/peers.rs | 128 +++++++++++++++++++++++++++++++++++++++++++ src/net/host.rs | 12 +++- 5 files changed, 146 insertions(+), 32 deletions(-) create mode 100644 src/cluster/peers.rs diff --git a/Cargo.toml b/Cargo.toml index 17809619..f99e5844 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ error-chain = "0.12" parking_lot = "0.9" pwhash = "0.3" serde = { version = "1.0", features = ["derive"], optional = true } +logos = "0.12.0" [features] serialization = ["serde"] diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index 83b4cedc..b68b3df2 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -17,6 +17,7 @@ pub mod node; pub mod node_validator; pub mod partition; pub mod partition_tokenizer; +pub mod peers; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering}; diff --git a/src/cluster/node.rs b/src/cluster/node.rs index 1a207cc4..f7b89649 100644 --- a/src/cluster/node.rs +++ b/src/cluster/node.rs @@ -30,6 +30,8 @@ use crate::errors::{ErrorKind, Result, ResultExt}; use crate::net::{ConnectionPool, Host, PooledConnection}; use crate::policy::ClientPolicy; +use super::peers::parse_peers_info; + pub const PARTITIONS: usize = 4096; #[derive(Debug)] @@ -138,9 +140,9 @@ impl Node { const fn services_name(&self) -> &'static str { if self.client_policy.use_services_alternate { - "services-alternate" + "peers-clear-alt" } else { - "services" + "peers-clear-std" } } @@ -197,35 +199,7 @@ impl Node { Some(friend_string) => friend_string, }; - let friend_names = friend_string.split(';'); - for friend in friend_names { - let mut friend_info = friend.split(':'); - if friend_info.clone().count() != 2 { - error!( - "Node info from asinfo:services is malformed. Expected HOST:PORT, but got \ - '{}'", - friend - ); - continue; - } - - let host = friend_info.next().unwrap(); - let port = u16::from_str(friend_info.next().unwrap())?; - let alias = match self.client_policy.ip_map { - Some(ref ip_map) if ip_map.contains_key(host) => { - Host::new(ip_map.get(host).unwrap(), port) - } - _ => Host::new(host, port), - }; - - if current_aliases.contains_key(&alias) { - self.reference_count.fetch_add(1, Ordering::Relaxed); - } else if !friends.contains(&alias) { - friends.push(alias); - } - } - - Ok(friends) + parse_peers_info(friend_string) } fn update_partitions(&self, info_map: &HashMap) -> Result<()> { diff --git a/src/cluster/peers.rs b/src/cluster/peers.rs new file mode 100644 index 00000000..03b41ddb --- /dev/null +++ b/src/cluster/peers.rs @@ -0,0 +1,128 @@ +use crate::errors::{ErrorKind, Result}; +use crate::net::{Host, ToHosts}; +use logos::{Lexer, Logos}; + +#[derive(Logos, Debug, PartialEq)] +enum Token { + #[token("[")] + OpenBracket, + + #[token("]")] + CloseBracket, + + #[regex("[0-9a-zA-Z-./_: ]+")] + Text, + + #[error] + #[regex(r"[,]+", logos::skip)] + Error, +} + +fn parse_error(lex: &Lexer, source: &str) -> String { + format!( + "Failed to parse peers: {}, at {:?} ({})", + source, + lex.span(), + lex.slice() + ) +} + +pub fn parse_peers_info(info_peers: &str) -> Result> { + let mut lex = Token::lexer(info_peers); + + let _peer_gen = match lex.next() { + Some(Token::Text) => lex.slice(), + _ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))), + }; + let default_port_str = match lex.next() { + Some(Token::Text) => lex.slice(), + _ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))), + }; + + let default_port = match default_port_str.parse::() { + Ok(port) => port, + Err(_) => bail!(ErrorKind::BadResponse(format!( + "Invalid default port: {}", + default_port_str + ))), + }; + + match lex.next() { + Some(Token::OpenBracket) => parse_peers(info_peers, &mut lex, default_port), + _ => Ok(Vec::new()), + } +} + +fn parse_peers(info_peers: &str, lex: &mut Lexer, default_port: u16) -> Result> { + let mut peers = Vec::new(); + loop { + match lex.next() { + Some(Token::OpenBracket) => peers.extend(parse_peer(info_peers, lex, default_port)?), + Some(Token::CloseBracket) => return Ok(peers), + _ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))), + } + lex.next(); // Close brackets + } +} + +fn parse_peer(info_peers: &str, lex: &mut Lexer, default_port: u16) -> Result> { + let _id = match lex.next() { + Some(Token::Text) => lex.slice(), + _ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))), + }; + + let mut token = lex.next(); + if Some(Token::Text) == token { + let _tls_hostname = lex.slice(); + token = lex.next(); + } + + match token { + Some(Token::OpenBracket) => (), + _ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))), + }; + + let hosts = match lex.next() { + Some(Token::Text) => lex.slice(), + _ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))), + } + .to_hosts_with_default_port(default_port)?; + + lex.next(); // Close brackets + Ok(hosts) +} + +#[cfg(test)] +mod tests { + use std::vec; + + use super::*; + + #[test] + fn parse_peers_works() { + let work = "6,3000,[[12A0,aerospike.com,[1.2.3.4:4333]],[BB9040011AC4202,,[10.11.12.13]],[11A1,,[localhost]]]"; + let fail = "6,3foobar,[[12A0,aerospike.com,[1.2.3.4:4333]],[11A1,,[10.11.12.13:4333]]]"; + let empty = "6,3000,[]"; + assert!(parse_peers_info(fail).is_err()); + let work = parse_peers_info(work).unwrap(); + println!("{:?}", work); + assert!( + work == vec![ + Host { + name: "1.2.3.4".to_string(), + port: 4333 + }, + Host { + name: "10.11.12.13".to_string(), + port: 3000 + }, + Host { + name: "localhost".to_string(), + port: 3000 + } + ] + ); + let empty = parse_peers_info(empty).unwrap(); + assert!(empty == vec![]); + } +} diff --git a/src/net/host.rs b/src/net/host.rs index 8bd52a33..3f66df88 100644 --- a/src/net/host.rs +++ b/src/net/host.rs @@ -67,17 +67,24 @@ pub trait ToHosts { /// /// Any errors encountered during conversion will be returned as an `Err`. fn to_hosts(&self) -> Result>; + fn to_hosts_with_default_port(&self, default_port: u16) -> Result>; } impl ToHosts for Vec { fn to_hosts(&self) -> Result> { Ok(self.clone()) } + fn to_hosts_with_default_port(&self, default_port: u16) -> Result> { + Ok(self.clone()) + } } impl ToHosts for String { fn to_hosts(&self) -> Result> { - let mut parser = Parser::new(self, 3000); + self.to_hosts_with_default_port(3000) + } + fn to_hosts_with_default_port(&self, default_port: u16) -> Result> { + let mut parser = Parser::new(self, default_port); parser .read_hosts() .chain_err(|| ErrorKind::InvalidArgument(format!("Invalid hosts list: '{}'", self))) @@ -88,6 +95,9 @@ impl<'a> ToHosts for &'a str { fn to_hosts(&self) -> Result> { (*self).to_string().to_hosts() } + fn to_hosts_with_default_port(&self, default_port: u16) -> Result> { + (*self).to_string().to_hosts_with_default_port(default_port) + } } #[cfg(test)]