diff --git a/casper-server/src/lua/storage.rs b/casper-server/src/lua/storage.rs index 7d05adc..790d6da 100644 --- a/casper-server/src/lua/storage.rs +++ b/casper-server/src/lua/storage.rs @@ -153,10 +153,10 @@ where /// Stores a response in the storage. /// - /// Returns `true` if the response was stored. + /// Returns number of written bytes to the cache if the response was stored. /// In case of errors returns `nil` and a string with error message. #[instrument(skip_all, fields(name = self.0.name(), backend = self.0.backend_type()))] - async fn store_response<'l>(&self, lua: &'l Lua, item: Table<'l>) -> LuaDoubleResult { + async fn store_response<'l>(&self, lua: &'l Lua, item: Table<'l>) -> LuaDoubleResult { let start = Instant::now(); let key: Value = item.raw_get("key").context("invalid `key`")?; @@ -197,21 +197,21 @@ where storage_counter_add!(1, "name" => self.0.name(), "operation" => "store"); storage_histogram_rec!(start, "name" => self.0.name(), "operation" => "store"); - Ok(result.map(|_| true).map_err(|err| err.into().to_string())) + Ok(result.map_err(|err| err.into().to_string())) } /// Stores responses in the storage. /// - /// Returns `true` if all the responses were stored. - /// In case of errors returns `false` and a table of: { string | true } + /// Returns total number of written bytes to the cache if all the responses were stored. + /// In case of errors returns `nil` and a table of: { string | number } /// string - error message - /// `true` - if response was stored + /// number - number of bytes written to the cache #[instrument(skip_all, fields(name = self.0.name(), backend = self.0.backend_type()))] async fn store_responses<'l>( &self, lua: &'l Lua, lua_items: Table<'l>, - ) -> LuaResult<(bool, Option>>)> { + ) -> LuaResult<(Option, Option>>)> { let start = Instant::now(); // Read rest of the fields @@ -273,18 +273,22 @@ where storage_histogram_rec!(start, "name" => self.0.name(), "operation" => "store"); // If all responses were stored then return `true` - if results.iter().all(|r| r.is_ok()) { - return Ok((true, None)); + let mut total_size = 0; + if results.iter().all(|r| { + total_size += r.as_ref().cloned().unwrap_or_default(); + r.is_ok() + }) { + return Ok((Some(total_size), None)); } let results = results .into_iter() .map(|res| match res { - Ok(_) => Ok(Value::Boolean(true)), + Ok(size) => Ok(Value::Integer(size as _)), Err(err) => Ok(Value::String(lua.create_string(&err.into().to_string())?)), }) .collect::>>()?; - Ok((false, Some(results))) + Ok((None, Some(results))) } } @@ -378,13 +382,13 @@ mod tests { headers = { hello = "world" }, body = "test response 1", }) - local ok, err = $storage:store_response({ + local size, err = $storage:store_response({ key = {"a", "bc"}, // key parts should be concatenated response = resp, surrogate_keys = {"skey1", "skey2"}, ttl = 10, }) - assert(ok and err == nil) + assert(size > 0 and err == nil) resp = $storage:get_response("abc") assert(resp.status == 201) assert(resp:header("hello") == "world") @@ -423,7 +427,7 @@ mod tests { assert(responses[2] == false, "response#2 should not exist") // Store few responses with different keys and surrogate keys - local ok, err = $storage:store_responses({ + local size, err = $storage:store_responses({ { key = {"a", "bc"}, // key parts should be concatenated response = Response.new({ @@ -445,7 +449,7 @@ mod tests { ttl = 10, } }) - assert(ok == true and err == nil, "responses should be stored") + assert(size > 0 and err == nil, "responses should be stored") // Fetch them back responses = $storage:get_responses({"abc", "cde", "def"}) diff --git a/casper-server/src/storage/backends/memory.rs b/casper-server/src/storage/backends/memory.rs index 730d625..126cdc8 100644 --- a/casper-server/src/storage/backends/memory.rs +++ b/casper-server/src/storage/backends/memory.rs @@ -210,14 +210,14 @@ impl Storage for MemoryBackend { results } - async fn store_response<'a>(&self, item: Item<'a>) -> Result<(), Self::Error> { + async fn store_response<'a>(&self, item: Item<'a>) -> Result { self.store_responses([item]).await.remove(0) } async fn store_responses( &self, items: impl IntoIterator>, - ) -> Vec> { + ) -> Vec> { let mut memory = self.inner.lock().await; let mut results = Vec::new(); for item in items { @@ -229,8 +229,9 @@ impl Storage for MemoryBackend { expires: SystemTime::now() + item.ttl, surrogate_keys: item.surrogate_keys, }; + let size = value.headers.len() + value.body.len(); memory.insert(item.key, value); - Ok(()) + Ok(size) })(); results.push(result); } diff --git a/casper-server/src/storage/backends/mod.rs b/casper-server/src/storage/backends/mod.rs index 9ce7384..c8636b6 100644 --- a/casper-server/src/storage/backends/mod.rs +++ b/casper-server/src/storage/backends/mod.rs @@ -88,7 +88,7 @@ impl Storage for Backend { } #[inline] - async fn store_response<'a>(&self, item: Item<'a>) -> Result<(), Self::Error> { + async fn store_response<'a>(&self, item: Item<'a>) -> Result { match self { Backend::Memory(inner) => inner.store_response(item).await, Backend::Redis(inner) => inner.store_response(item).await, @@ -121,7 +121,7 @@ impl Storage for Backend { async fn store_responses( &self, items: impl IntoIterator>, - ) -> Vec> { + ) -> Vec> { match self { Backend::Memory(inner) => inner.store_responses(items).await, Backend::Redis(inner) => inner.store_responses(items).await, diff --git a/casper-server/src/storage/backends/redis/client.rs b/casper-server/src/storage/backends/redis/client.rs index 24ef46d..1598f82 100644 --- a/casper-server/src/storage/backends/redis/client.rs +++ b/casper-server/src/storage/backends/redis/client.rs @@ -375,7 +375,8 @@ impl RedisBackend { } } - async fn store_response_inner<'a>(&self, item: Item<'a>) -> Result<()> { + async fn store_response_inner<'a>(&self, item: Item<'a>) -> Result { + let mut stored_bytes = 0; let mut headers = Bytes::from(encode_headers(&item.headers)?); let mut body = item.body; let body_length = body.len(); @@ -437,6 +438,7 @@ impl RedisBackend { false, ) .await?; + stored_bytes += chunk.len(); } } @@ -452,6 +454,7 @@ impl RedisBackend { flags, }; let response_item_enc = flexbuffers::to_vec(&response_item)?; + let response_item_size = response_item_enc.len(); // Store response item self.pool @@ -463,6 +466,7 @@ impl RedisBackend { false, ) .await?; + stored_bytes += response_item_size; // Update surrogate keys let int_cache_ttl = self.config.internal_cache_ttl; @@ -508,7 +512,7 @@ impl RedisBackend { })) .await?; - Ok(()) + Ok(stored_bytes) } fn get_fetch_timeout(&self) -> Duration { @@ -556,7 +560,7 @@ impl Storage for RedisBackend { .with_context(|| format!("Failed to delete Response(s) for key `{}`", key)) } - async fn store_response<'a>(&self, item: Item<'a>) -> Result<(), Self::Error> { + async fn store_response<'a>(&self, item: Item<'a>) -> Result { self.lazy_connect(); let key = item.key.clone(); let store_timeout = self.get_store_timeout(); diff --git a/casper-server/src/storage/mod.rs b/casper-server/src/storage/mod.rs index 29f90f9..4c1f022 100644 --- a/casper-server/src/storage/mod.rs +++ b/casper-server/src/storage/mod.rs @@ -91,7 +91,7 @@ pub trait Storage { async fn delete_responses(&self, key: ItemKey) -> Result<(), Self::Error>; - async fn store_response<'a>(&self, item: Item<'a>) -> Result<(), Self::Error>; + async fn store_response<'a>(&self, item: Item<'a>) -> Result; // // Provided implementation @@ -122,7 +122,7 @@ pub trait Storage { async fn store_responses( &self, items: impl IntoIterator>, - ) -> Vec> { + ) -> Vec> { // Create list of pending futures to poll them in parallel stream::iter(items.into_iter().map(|it| self.store_response(it))) .buffered(Self::MAX_CONCURRENCY)