diff --git a/Cargo.lock b/Cargo.lock index 0f475fa..8642ade 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -707,6 +707,7 @@ dependencies = [ "redis", "reqwest 0.12.3", "serde", + "serde-pickle", "serde_json", "thiserror", "tokio", @@ -1395,6 +1396,12 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "iter-read" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c397ca3ea05ad509c4ec451fea28b4771236a376ca1c69fd5143aae0cf8f93c4" + [[package]] name = "itertools" version = "0.12.1" @@ -1680,6 +1687,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c165a9ab64cf766f73521c0dd2cfdff64f488b8f0b3e621face3462d3db536d7" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-bigint-dig" version = "0.8.4" @@ -1705,11 +1722,10 @@ checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" [[package]] name = "num-integer" -version = "0.1.45" +version = "0.1.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" dependencies = [ - "autocfg", "num-traits", ] @@ -1726,9 +1742,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.17" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", "libm", @@ -2533,6 +2549,19 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-pickle" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c762ad136a26407c6a80825813600ceeab5e613660d93d79a41f0ec877171e71" +dependencies = [ + "byteorder", + "iter-read", + "num-bigint", + "num-traits", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.196" diff --git a/feature-flags/Cargo.toml b/feature-flags/Cargo.toml index ddfe070..1e0c111 100644 --- a/feature-flags/Cargo.toml +++ b/feature-flags/Cargo.toml @@ -24,6 +24,7 @@ redis = { version = "0.23.3", features = [ serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } +serde-pickle = { version = "1.1.1"} [lints] workspace = true diff --git a/feature-flags/src/api.rs b/feature-flags/src/api.rs index c94eed6..ebad1f5 100644 --- a/feature-flags/src/api.rs +++ b/feature-flags/src/api.rs @@ -25,6 +25,9 @@ pub enum FlagError { #[error("failed to parse request: {0}")] RequestParsingError(#[from] serde_json::Error), + #[error("failed to parse redis data: {0}")] + DataParsingError(#[from] serde_pickle::Error), + #[error("Empty distinct_id in request")] EmptyDistinctId, #[error("No distinct_id in request")] @@ -44,6 +47,7 @@ impl IntoResponse for FlagError { match self { FlagError::RequestDecodingError(_) | FlagError::RequestParsingError(_) + | FlagError::DataParsingError(_) | FlagError::EmptyDistinctId | FlagError::MissingDistinctId => (StatusCode::BAD_REQUEST, self.to_string()), diff --git a/feature-flags/src/config.rs b/feature-flags/src/config.rs index 3fa6f50..cc7ad37 100644 --- a/feature-flags/src/config.rs +++ b/feature-flags/src/config.rs @@ -4,7 +4,7 @@ use envconfig::Envconfig; #[derive(Envconfig, Clone)] pub struct Config { - #[envconfig(default = "127.0.0.1:0")] + #[envconfig(default = "127.0.0.1:3001")] pub address: SocketAddr, #[envconfig(default = "postgres://posthog:posthog@localhost:15432/test_database")] diff --git a/feature-flags/src/lib.rs b/feature-flags/src/lib.rs index 9175b5c..71a5e69 100644 --- a/feature-flags/src/lib.rs +++ b/feature-flags/src/lib.rs @@ -5,3 +5,4 @@ pub mod router; pub mod server; pub mod v0_endpoint; pub mod v0_request; +pub mod team; diff --git a/feature-flags/src/redis.rs b/feature-flags/src/redis.rs index 8c03820..70b7146 100644 --- a/feature-flags/src/redis.rs +++ b/feature-flags/src/redis.rs @@ -2,7 +2,7 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use redis::AsyncCommands; +use redis::{AsyncCommands, RedisError}; use tokio::time::timeout; // average for all commands is <10ms, check grafana @@ -10,12 +10,15 @@ const REDIS_TIMEOUT_MILLISECS: u64 = 10; /// A simple redis wrapper /// Copied from capture/src/redis.rs. -/// TODO: Modify this to support hincrby, get, and set commands. +/// TODO: Modify this to support hincrby #[async_trait] pub trait Client { // A very simplified wrapper, but works for our usage async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result>; + + async fn get(&self, k: String) -> Result; + async fn set(&self, k: String, v: String) -> Result<()>; } pub struct RedisClient { @@ -40,38 +43,31 @@ impl Client for RedisClient { Ok(fut?) } -} -// TODO: Find if there's a better way around this. -#[derive(Clone)] -pub struct MockRedisClient { - zrangebyscore_ret: Vec, -} + async fn get(&self, k: String) -> Result { + let mut conn = self.client.get_async_connection().await?; -impl MockRedisClient { - pub fn new() -> MockRedisClient { - MockRedisClient { - zrangebyscore_ret: Vec::new(), - } - } + let results = conn.get(k.clone()); + // TODO: Is this safe? Should we be doing something else for error handling here? + let fut: Result, RedisError> = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; - pub fn zrangebyscore_ret(&mut self, ret: Vec) -> Self { - self.zrangebyscore_ret = ret; + // TRICKY: We serialise data to json, then django pickles it. + // Here we deserialize the bytes using serde_pickle, to get the json string. + let string_response: String = serde_pickle::from_slice(&fut?, Default::default())?; - self.clone() + Ok(string_response) } -} -impl Default for MockRedisClient { - fn default() -> Self { - Self::new() - } -} + async fn set(&self, k: String, v: String) -> Result<()> { + // TRICKY: We serialise data to json, then django pickles it. + // Here we serialize the json string to bytes using serde_pickle. + let bytes = serde_pickle::to_vec(&v, Default::default())?; -#[async_trait] -impl Client for MockRedisClient { - // A very simplified wrapper, but works for our usage - async fn zrangebyscore(&self, _k: String, _min: String, _max: String) -> Result> { - Ok(self.zrangebyscore_ret.clone()) + let mut conn = self.client.get_async_connection().await?; + + let results = conn.set(k, bytes); + let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; + + Ok(fut?) } -} +} \ No newline at end of file diff --git a/feature-flags/src/v0_endpoint.rs b/feature-flags/src/v0_endpoint.rs index 8f77611..4a46d45 100644 --- a/feature-flags/src/v0_endpoint.rs +++ b/feature-flags/src/v0_endpoint.rs @@ -33,7 +33,7 @@ use crate::{ )] #[debug_handler] pub async fn flags( - _state: State, + state: State, InsecureClientIp(ip): InsecureClientIp, meta: Query, headers: HeaderMap, @@ -59,19 +59,19 @@ pub async fn flags( .get("content-type") .map_or("", |v| v.to_str().unwrap_or("")) { - "application/x-www-form-urlencoded" => { - return Err(FlagError::RequestDecodingError(String::from( - "invalid form data", - ))); + "application/json" => { + tracing::Span::current().record("content_type", "application/json"); + FlagRequest::from_bytes(body) } ct => { - tracing::Span::current().record("content_type", ct); - - FlagRequest::from_bytes(body) + return Err(FlagError::RequestDecodingError(format!( + "unsupported content type: {}", + ct + ))); } }?; - let token = request.extract_and_verify_token()?; + let token = request.extract_and_verify_token(state.redis.clone()).await?; tracing::Span::current().record("token", &token); diff --git a/feature-flags/src/v0_request.rs b/feature-flags/src/v0_request.rs index f2269df..2954b2e 100644 --- a/feature-flags/src/v0_request.rs +++ b/feature-flags/src/v0_request.rs @@ -1,11 +1,11 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use bytes::Bytes; use serde::{Deserialize, Serialize}; use serde_json::Value; use tracing::instrument; -use crate::api::FlagError; +use crate::{api::FlagError, redis::Client, team::Team}; #[derive(Deserialize, Default)] pub struct FlagsQueryParams { @@ -54,15 +54,18 @@ impl FlagRequest { Ok(serde_json::from_str::(&payload)?) } - pub fn extract_and_verify_token(&self) -> Result { + pub async fn extract_and_verify_token(&self, redis_client: Arc) -> Result { let token = match self { FlagRequest { token: Some(token), .. } => token.to_string(), _ => return Err(FlagError::NoTokenError), }; - // TODO: Get tokens from redis, confirm this one is valid - // validate_token(&token)?; + + let team = Team::from_redis(redis_client, token.clone()).await?; + + // TODO: Remove this, is useless, doing just for now because + tracing::Span::current().record("team_id", &team.id); Ok(token) } }