From d9efbdc3114b9d1fe7ca6588c372b33e57279394 Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Wed, 29 Dec 2021 15:01:01 +0300 Subject: [PATCH] Introduce the `buffer-pool` feature --- src/buffer_pool/disabled.rs | 25 +++++++++ src/buffer_pool/enabled.rs | 103 ++++++++++++++++++++++++++++++++++++ src/buffer_pool/mod.rs | 16 ++++++ 3 files changed, 144 insertions(+) create mode 100644 src/buffer_pool/disabled.rs create mode 100644 src/buffer_pool/enabled.rs create mode 100644 src/buffer_pool/mod.rs diff --git a/src/buffer_pool/disabled.rs b/src/buffer_pool/disabled.rs new file mode 100644 index 0000000..16f61fe --- /dev/null +++ b/src/buffer_pool/disabled.rs @@ -0,0 +1,25 @@ +#![cfg(not(feature = "buffer-pool"))] + +use std::ops::Deref; + +#[derive(Debug)] +#[repr(transparent)] +pub struct Buffer(Vec); + +impl AsMut> for Buffer { + fn as_mut(&mut self) -> &mut Vec { + &mut self.0 + } +} + +impl Deref for Buffer { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + +pub const fn get_buffer() -> Buffer { + Buffer(Vec::new()) +} diff --git a/src/buffer_pool/enabled.rs b/src/buffer_pool/enabled.rs new file mode 100644 index 0000000..5d08d21 --- /dev/null +++ b/src/buffer_pool/enabled.rs @@ -0,0 +1,103 @@ +#![cfg(feature = "buffer-pool")] + +use crossbeam::queue::ArrayQueue; +use once_cell::sync::Lazy; + +use std::{mem::replace, ops::Deref, sync::Arc}; + +const DEFAULT_MYSQL_BUFFER_POOL_CAP: usize = 128; +const DEFAULT_MYSQL_BUFFER_SIZE_CAP: usize = 4 * 1024 * 1024; + +static BUFFER_POOL: Lazy> = Lazy::new(|| Default::default()); + +#[inline(always)] +pub fn get_buffer() -> Buffer { + BUFFER_POOL.get() +} + +#[derive(Debug)] +struct Inner { + buffer_cap: usize, + pool: ArrayQueue>, +} + +impl Inner { + fn get(self: &Arc) -> Buffer { + let mut buf = self.pool.pop().unwrap_or_default(); + + // SAFETY: + // 1. OK – 0 is always within capacity + // 2. OK - nothing to initialize + unsafe { buf.set_len(0) } + + Buffer(buf, Some(self.clone())) + } + + fn put(&self, mut buf: Vec) { + buf.shrink_to(self.buffer_cap); + let _ = self.pool.push(buf); + } +} + +/// Smart pointer to a buffer pool. +#[derive(Debug, Clone)] +pub struct BufferPool(Option>); + +impl BufferPool { + pub fn new() -> Self { + let pool_cap = std::env::var("RUST_MYSQL_BUFFER_POOL_CAP") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(DEFAULT_MYSQL_BUFFER_POOL_CAP); + + let buffer_cap = std::env::var("RUST_MYSQL_BUFFER_SIZE_CAP") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(DEFAULT_MYSQL_BUFFER_SIZE_CAP); + + Self((pool_cap > 0).then(|| { + Arc::new(Inner { + buffer_cap, + pool: ArrayQueue::new(pool_cap), + }) + })) + } + + pub fn get(self: &Arc) -> Buffer { + match self.0 { + Some(ref inner) => inner.get(), + None => Buffer(Vec::new(), None), + } + } +} + +impl Default for BufferPool { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug)] +pub struct Buffer(Vec, Option>); + +impl AsMut> for Buffer { + fn as_mut(&mut self) -> &mut Vec { + &mut self.0 + } +} + +impl Deref for Buffer { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + +impl Drop for Buffer { + fn drop(&mut self) { + if let Some(ref inner) = self.1 { + inner.put(replace(&mut self.0, vec![])); + } + } +} diff --git a/src/buffer_pool/mod.rs b/src/buffer_pool/mod.rs new file mode 100644 index 0000000..95565e2 --- /dev/null +++ b/src/buffer_pool/mod.rs @@ -0,0 +1,16 @@ +// Copyright (c) 2021 Anatoly Ikorsky +// +// Licensed under the Apache License, Version 2.0 +// or the MIT +// license , at your +// option. All files in the project carrying such notice may not be copied, +// modified, or distributed except according to those terms. + +mod disabled; +mod enabled; + +#[cfg(feature = "buffer-pool")] +pub use enabled::{get_buffer, Buffer}; + +#[cfg(not(feature = "buffer-pool"))] +pub use disabled::{get_buffer, Buffer};