Skip to content

Commit

Permalink
feat(services/memcached): change to binary protocal
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo committed Feb 23, 2024
1 parent f6a161b commit ae985b9
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 179 deletions.
172 changes: 0 additions & 172 deletions core/src/services/memcached/ascii.rs

This file was deleted.

43 changes: 38 additions & 5 deletions core/src/services/memcached/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use serde::Deserialize;
use tokio::net::TcpStream;
use tokio::sync::OnceCell;

use super::ascii;
use super::binary;
use crate::raw::adapters::kv;
use crate::raw::*;
use crate::*;
Expand All @@ -42,6 +42,10 @@ pub struct MemcachedConfig {
///
/// default is "/"
root: Option<String>,
/// Memcached username, optional.
username: Option<String>,
/// Memcached password, optional.
password: Option<String>,
/// The default ttl for put operations.
default_ttl: Option<Duration>,
}
Expand Down Expand Up @@ -74,6 +78,18 @@ impl MemcachedBuilder {
self
}

/// set the username.
pub fn username(&mut self, username: &str) -> &mut Self {
self.config.username = Some(username.to_string());
self
}

/// set the password.
pub fn password(&mut self, password: &str) -> &mut Self {
self.config.password = Some(password.to_string());
self
}

/// Set the default ttl for memcached services.
pub fn default_ttl(&mut self, ttl: Duration) -> &mut Self {
self.config.default_ttl = Some(ttl);
Expand Down Expand Up @@ -151,6 +167,8 @@ impl Builder for MemcachedBuilder {
let conn = OnceCell::new();
Ok(MemcachedBackend::new(Adapter {
endpoint,
username: self.config.username.clone(),
password: self.config.password.clone(),
conn,
default_ttl: self.config.default_ttl,
})
Expand All @@ -164,6 +182,8 @@ pub type MemcachedBackend = kv::Backend<Adapter>;
#[derive(Clone, Debug)]
pub struct Adapter {
endpoint: String,
username: Option<String>,
password: Option<String>,
default_ttl: Option<Duration>,
conn: OnceCell<bb8::Pool<MemcacheConnectionManager>>,
}
Expand All @@ -173,7 +193,11 @@ impl Adapter {
let pool = self
.conn
.get_or_try_init(|| async {
let mgr = MemcacheConnectionManager::new(&self.endpoint);
let mgr = MemcacheConnectionManager::new(
&self.endpoint,
self.username.clone(),
self.password.clone(),
);

bb8::Pool::builder().build(mgr).await.map_err(|err| {
Error::new(ErrorKind::ConfigInvalid, "connect to memecached failed")
Expand Down Expand Up @@ -237,27 +261,36 @@ impl kv::Adapter for Adapter {
#[derive(Clone, Debug)]
struct MemcacheConnectionManager {
address: String,
username: Option<String>,
password: Option<String>,
}

impl MemcacheConnectionManager {
fn new(address: &str) -> Self {
fn new(address: &str, username: Option<String>, password: Option<String>) -> Self {
Self {
address: address.to_string(),
username,
password,
}
}
}

#[async_trait]
impl bb8::ManageConnection for MemcacheConnectionManager {
type Connection = ascii::Connection;
type Connection = binary::Connection;
type Error = Error;

/// TODO: Implement unix stream support.
async fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {
let conn = TcpStream::connect(&self.address)
.await
.map_err(new_std_io_error)?;
Ok(ascii::Connection::new(conn))
let mut conn = binary::Connection::new(conn);

if let (Some(username), Some(password)) = (self.username.as_ref(), self.password.as_ref()) {
conn.auth(username, password).await?;
}
Ok(conn)
}

async fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {
Expand Down
Loading

0 comments on commit ae985b9

Please sign in to comment.