Skip to content

Commit

Permalink
Modify Storage trait to return number of written bytes to the cache.
Browse files Browse the repository at this point in the history
This would allow to collect metrics and track stored response sizes.
  • Loading branch information
khvzak committed Jun 11, 2024
1 parent 41c7438 commit da33745
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 25 deletions.
34 changes: 19 additions & 15 deletions casper-server/src/lua/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ where

/// Stores a response in the storage.
///
/// Returns `true` if the response was stored.
/// Returns number of written bytes to the cache if the response was stored.
/// In case of errors returns `nil` and a string with error message.
#[instrument(skip_all, fields(name = self.0.name(), backend = self.0.backend_type()))]
async fn store_response<'l>(&self, lua: &'l Lua, item: Table<'l>) -> LuaDoubleResult<bool> {
async fn store_response<'l>(&self, lua: &'l Lua, item: Table<'l>) -> LuaDoubleResult<usize> {
let start = Instant::now();

let key: Value = item.raw_get("key").context("invalid `key`")?;
Expand Down Expand Up @@ -197,21 +197,21 @@ where
storage_counter_add!(1, "name" => self.0.name(), "operation" => "store");
storage_histogram_rec!(start, "name" => self.0.name(), "operation" => "store");

Ok(result.map(|_| true).map_err(|err| err.into().to_string()))
Ok(result.map_err(|err| err.into().to_string()))
}

/// Stores responses in the storage.
///
/// Returns `true` if all the responses were stored.
/// In case of errors returns `false` and a table of: { string | true }
/// Returns total number of written bytes to the cache if all the responses were stored.
/// In case of errors returns `nil` and a table of: { string | number }
/// string - error message
/// `true` - if response was stored
/// number - number of bytes written to the cache
#[instrument(skip_all, fields(name = self.0.name(), backend = self.0.backend_type()))]
async fn store_responses<'l>(
&self,
lua: &'l Lua,
lua_items: Table<'l>,
) -> LuaResult<(bool, Option<Vec<Value<'l>>>)> {
) -> LuaResult<(Option<usize>, Option<Vec<Value<'l>>>)> {
let start = Instant::now();

// Read rest of the fields
Expand Down Expand Up @@ -273,18 +273,22 @@ where
storage_histogram_rec!(start, "name" => self.0.name(), "operation" => "store");

// If all responses were stored then return `true`
if results.iter().all(|r| r.is_ok()) {
return Ok((true, None));
let mut total_size = 0;
if results.iter().all(|r| {
total_size += r.as_ref().cloned().unwrap_or_default();
r.is_ok()
}) {
return Ok((Some(total_size), None));
}

let results = results
.into_iter()
.map(|res| match res {
Ok(_) => Ok(Value::Boolean(true)),
Ok(size) => Ok(Value::Integer(size as _)),
Err(err) => Ok(Value::String(lua.create_string(&err.into().to_string())?)),
})
.collect::<LuaResult<Vec<_>>>()?;
Ok((false, Some(results)))
Ok((None, Some(results)))
}
}

Expand Down Expand Up @@ -378,13 +382,13 @@ mod tests {
headers = { hello = "world" },
body = "test response 1",
})
local ok, err = $storage:store_response({
local size, err = $storage:store_response({
key = {"a", "bc"}, // key parts should be concatenated
response = resp,
surrogate_keys = {"skey1", "skey2"},
ttl = 10,
})
assert(ok and err == nil)
assert(size > 0 and err == nil)
resp = $storage:get_response("abc")
assert(resp.status == 201)
assert(resp:header("hello") == "world")
Expand Down Expand Up @@ -423,7 +427,7 @@ mod tests {
assert(responses[2] == false, "response#2 should not exist")

// Store few responses with different keys and surrogate keys
local ok, err = $storage:store_responses({
local size, err = $storage:store_responses({
{
key = {"a", "bc"}, // key parts should be concatenated
response = Response.new({
Expand All @@ -445,7 +449,7 @@ mod tests {
ttl = 10,
}
})
assert(ok == true and err == nil, "responses should be stored")
assert(size > 0 and err == nil, "responses should be stored")

// Fetch them back
responses = $storage:get_responses({"abc", "cde", "def"})
Expand Down
7 changes: 4 additions & 3 deletions casper-server/src/storage/backends/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,14 @@ impl Storage for MemoryBackend {
results
}

async fn store_response<'a>(&self, item: Item<'a>) -> Result<(), Self::Error> {
async fn store_response<'a>(&self, item: Item<'a>) -> Result<usize, Self::Error> {
self.store_responses([item]).await.remove(0)
}

async fn store_responses(
&self,
items: impl IntoIterator<Item = Item<'_>>,
) -> Vec<Result<(), Self::Error>> {
) -> Vec<Result<usize, Self::Error>> {
let mut memory = self.inner.lock().await;
let mut results = Vec::new();
for item in items {
Expand All @@ -229,8 +229,9 @@ impl Storage for MemoryBackend {
expires: SystemTime::now() + item.ttl,
surrogate_keys: item.surrogate_keys,
};
let size = value.headers.len() + value.body.len();
memory.insert(item.key, value);
Ok(())
Ok(size)
})();
results.push(result);
}
Expand Down
4 changes: 2 additions & 2 deletions casper-server/src/storage/backends/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Storage for Backend {
}

#[inline]
async fn store_response<'a>(&self, item: Item<'a>) -> Result<(), Self::Error> {
async fn store_response<'a>(&self, item: Item<'a>) -> Result<usize, Self::Error> {
match self {
Backend::Memory(inner) => inner.store_response(item).await,
Backend::Redis(inner) => inner.store_response(item).await,
Expand Down Expand Up @@ -121,7 +121,7 @@ impl Storage for Backend {
async fn store_responses(
&self,
items: impl IntoIterator<Item = Item<'_>>,
) -> Vec<Result<(), Self::Error>> {
) -> Vec<Result<usize, Self::Error>> {
match self {
Backend::Memory(inner) => inner.store_responses(items).await,
Backend::Redis(inner) => inner.store_responses(items).await,
Expand Down
10 changes: 7 additions & 3 deletions casper-server/src/storage/backends/redis/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ impl RedisBackend {
}
}

async fn store_response_inner<'a>(&self, item: Item<'a>) -> Result<()> {
async fn store_response_inner<'a>(&self, item: Item<'a>) -> Result<usize> {
let mut stored_bytes = 0;
let mut headers = Bytes::from(encode_headers(&item.headers)?);
let mut body = item.body;
let body_length = body.len();
Expand Down Expand Up @@ -437,6 +438,7 @@ impl RedisBackend {
false,
)
.await?;
stored_bytes += chunk.len();
}
}

Expand All @@ -452,6 +454,7 @@ impl RedisBackend {
flags,
};
let response_item_enc = flexbuffers::to_vec(&response_item)?;
let response_item_size = response_item_enc.len();

// Store response item
self.pool
Expand All @@ -463,6 +466,7 @@ impl RedisBackend {
false,
)
.await?;
stored_bytes += response_item_size;

// Update surrogate keys
let int_cache_ttl = self.config.internal_cache_ttl;
Expand Down Expand Up @@ -508,7 +512,7 @@ impl RedisBackend {
}))
.await?;

Ok(())
Ok(stored_bytes)
}

fn get_fetch_timeout(&self) -> Duration {
Expand Down Expand Up @@ -556,7 +560,7 @@ impl Storage for RedisBackend {
.with_context(|| format!("Failed to delete Response(s) for key `{}`", key))
}

async fn store_response<'a>(&self, item: Item<'a>) -> Result<(), Self::Error> {
async fn store_response<'a>(&self, item: Item<'a>) -> Result<usize, Self::Error> {
self.lazy_connect();
let key = item.key.clone();
let store_timeout = self.get_store_timeout();
Expand Down
4 changes: 2 additions & 2 deletions casper-server/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub trait Storage {

async fn delete_responses(&self, key: ItemKey) -> Result<(), Self::Error>;

async fn store_response<'a>(&self, item: Item<'a>) -> Result<(), Self::Error>;
async fn store_response<'a>(&self, item: Item<'a>) -> Result<usize, Self::Error>;

//
// Provided implementation
Expand Down Expand Up @@ -122,7 +122,7 @@ pub trait Storage {
async fn store_responses(
&self,
items: impl IntoIterator<Item = Item<'_>>,
) -> Vec<Result<(), Self::Error>> {
) -> Vec<Result<usize, Self::Error>> {
// Create list of pending futures to poll them in parallel
stream::iter(items.into_iter().map(|it| self.store_response(it)))
.buffered(Self::MAX_CONCURRENCY)
Expand Down

0 comments on commit da33745

Please sign in to comment.