Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Add throttle as a global config #1547

Merged
merged 13 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Run the backend without worker mode, i.e. only enabling single-shot
# verifications via the /v1/check_email endpoint.
.PHONY: run
run:
cd backend && cargo run --bin reacher_backend


# Run the backend with worker mode on. This enables the /v1/bulk endpoints.
# Make sure to have a Postgres DB and a RabbitMQ instance running.
.PHONY: run-with-worker
run-with-worker: export RCH__WORKER__ENABLE=true
run-with-worker: export RCH__WORKER__RABBITMQ__URL=amqp://guest:guest@localhost:5672
run-with-worker: export RCH__STORAGE__POSTGRES__DB_URL=postgresql://localhost/reacherdb
run-with-worker: run

.PHONY: run-with-commercial-license-trial
run-with-commercial-license-trial: export RCH__COMMERCIAL_LICENSE_TRIAL__URL=http://localhost:3000/api/v1/commercial_license_trial
run-with-commercial-license-trial: run
56 changes: 36 additions & 20 deletions backend/backend_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,47 @@ hotmailb2c = "headless"
# recommended.
yahoo = "headless"

# Throttle the maximum number of requests per second, per minute, per hour, and
# per day for this worker.
# All fields are optional; comment them out to disable the limit.
#
# We however recommend setting the throttle for at least the per-minute and
# per-day limits to prevent the IPs from being blocked by the email providers.
# The default values are set to 60 requests per minute and 10,000 requests per
# day.
#
# Important: these throttle configurations only apply to /v1/* endpoints, and
# not to the previous /v0/check_email endpoint. The latter endpoint always
# executes the verification immediately, regardless of the throttle settings.
#
# Env variables:
# - RCH__THROTTLE__MAX_REQUESTS_PER_SECOND
# - RCH__THROTTLE__MAX_REQUESTS_PER_MINUTE
# - RCH__THROTTLE__MAX_REQUESTS_PER_HOUR
# - RCH__THROTTLE__MAX_REQUESTS_PER_DAY
[throttle]
# max_requests_per_second = 20
max_requests_per_minute = 60
# max_requests_per_hour = 1000
max_requests_per_day = 10000

# Configuration for a queue-based architecture for Reacher. This feature is
# currently in **beta**. The queue-based architecture allows Reacher to scale
# horizontally by running multiple workers that consume emails from a RabbitMQ
# queue.
#
# To enable the queue-based architecture, set the "enable" field to "true" and
# configure the RabbitMQ connection below. The "concurrency" field specifies
# the number of concurrent emails to verify for this worker.
#
# For more information, see the documentation at:
# https://docs.reacher.email/self-hosting/scaling-for-production
[worker]
# Enable the worker to consume emails from the RabbitMQ queues. If set, the
# RabbitMQ configuration below must be set as well.
#
# Env variable: RCH__WORKER__ENABLE
enable = true
enable = false

# RabbitMQ configuration.
[worker.rabbitmq]
Expand All @@ -105,25 +140,6 @@ url = "amqp://guest:guest@localhost:5672"
# Env variable: RCH__WORKER__RABBITMQ__CONCURRENCY
concurrency = 5

# Throttle the maximum number of requests per second, per minute, per hour, and
# per day for this worker.
# All fields are optional; comment them out to disable the limit.
#
# Important: these throttle configurations only apply to /v1/* endpoints, and
# not to the previous /v0/check_email endpoint. The latter endpoint always
# executes the verification immediately, regardless of the throttle settings.
#
# Env variables:
# - RCH__WORKER__THROTTLE__MAX_REQUESTS_PER_SECOND
# - RCH__WORKER__THROTTLE__MAX_REQUESTS_PER_MINUTE
# - RCH__WORKER__THROTTLE__MAX_REQUESTS_PER_HOUR
# - RCH__WORKER__THROTTLE__MAX_REQUESTS_PER_DAY
[worker.throttle]
# max_requests_per_second = 20
# max_requests_per_minute = 100
# max_requests_per_hour = 1000
# max_requests_per_day = 20000

# Below are the configurations for the storage of the email verification
# results. We currently support the following storage backends:
# - Postgres
Expand Down
4 changes: 2 additions & 2 deletions backend/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@
"duration": {
"$ref": "#/components/schemas/Duration"
},
"server_name": {
"backend_name": {
"type": "string",
"x-stoplight": {
"id": "2jrbdecvqh4t5"
Expand All @@ -717,7 +717,7 @@
"start_time",
"end_time",
"duration",
"server_name",
"backend_name",
"smtp"
]
},
Expand Down
71 changes: 52 additions & 19 deletions backend/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::storage::{postgres::PostgresStorage, StorageAdapter};
use crate::throttle::ThrottleManager;
use crate::worker::do_work::TaskWebhook;
use crate::worker::setup_rabbit_mq;
use anyhow::{bail, Context};
Expand All @@ -29,7 +30,7 @@ use sqlx::PgPool;
use std::sync::Arc;
use tracing::warn;

#[derive(Debug, Default, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct BackendConfig {
/// Name of the backend.
pub backend_name: String,
Expand Down Expand Up @@ -65,36 +66,61 @@ pub struct BackendConfig {
/// Whether to enable the Commercial License Trial. Setting this to true
pub commercial_license_trial: Option<CommercialLicenseTrialConfig>,

/// Throttle configuration for all requests
pub throttle: ThrottleConfig,

// Internal fields, not part of the configuration.
#[serde(skip)]
channel: Option<Arc<Channel>>,

#[serde(skip)]
storage_adapter: Arc<StorageAdapter>,

#[serde(skip)]
throttle_manager: Arc<ThrottleManager>,
}

impl BackendConfig {
/// Create an empty BackendConfig. This is useful for testing purposes.
pub fn empty() -> Self {
Self {
backend_name: "".to_string(),
from_email: "".to_string(),
hello_name: "".to_string(),
webdriver_addr: "".to_string(),
proxy: None,
verif_method: VerifMethodConfig::default(),
http_host: "127.0.0.1".to_string(),
http_port: 8080,
header_secret: None,
smtp_timeout: None,
sentry_dsn: None,
worker: WorkerConfig::default(),
storage: Some(StorageConfig::Noop),
commercial_license_trial: None,
throttle: ThrottleConfig::new_without_throttle(),
channel: None,
storage_adapter: Arc::new(StorageAdapter::Noop),
throttle_manager: Arc::new(
ThrottleManager::new(ThrottleConfig::new_without_throttle()),
),
}
}

/// Get the worker configuration.
///
/// # Panics
///
/// Panics if the worker configuration is missing.
pub fn must_worker_config(&self) -> Result<MustWorkerConfig, anyhow::Error> {
match (
self.worker.enable,
&self.worker.throttle,
&self.worker.rabbitmq,
&self.channel,
) {
(true, Some(throttle), Some(rabbitmq), Some(channel)) => Ok(MustWorkerConfig {
match (self.worker.enable, &self.worker.rabbitmq, &self.channel) {
(true, Some(rabbitmq), Some(channel)) => Ok(MustWorkerConfig {
channel: channel.clone(),
throttle: throttle.clone(),
rabbitmq: rabbitmq.clone(),

webhook: self.worker.webhook.clone(),
}),

(true, _, _, _) => bail!("Worker configuration is missing"),
(true, _, _) => bail!("Worker configuration is missing"),
_ => bail!("Calling must_worker_config on a non-worker backend"),
}
}
Expand Down Expand Up @@ -126,6 +152,9 @@ impl BackendConfig {
};
self.channel = channel;

// Initialize throttle manager
self.throttle_manager = Arc::new(ThrottleManager::new(self.throttle.clone()));

Ok(())
}

Expand All @@ -142,6 +171,10 @@ impl BackendConfig {
StorageAdapter::Noop => None,
}
}

pub fn get_throttle_manager(&self) -> Arc<ThrottleManager> {
self.throttle_manager.clone()
}
}

#[derive(Debug, Default, Deserialize, Clone, Serialize)]
Expand All @@ -159,9 +192,6 @@ pub struct VerifMethodConfig {
#[derive(Debug, Default, Deserialize, Clone, Serialize)]
pub struct WorkerConfig {
pub enable: bool,

/// Throttle configuration for the worker.
pub throttle: Option<ThrottleConfig>,
pub rabbitmq: Option<RabbitMQConfig>,
/// Optional webhook configuration to send email verification results.
pub webhook: Option<TaskWebhook>,
Expand All @@ -172,8 +202,6 @@ pub struct WorkerConfig {
#[derive(Debug, Clone)]
pub struct MustWorkerConfig {
pub channel: Arc<Channel>,

pub throttle: ThrottleConfig,
pub rabbitmq: RabbitMQConfig,
pub webhook: Option<TaskWebhook>,
}
Expand All @@ -185,7 +213,7 @@ pub struct RabbitMQConfig {
pub concurrency: u16,
}

#[derive(Debug, Deserialize, Clone, Serialize)]
#[derive(Debug, Default, Deserialize, Clone, Serialize)]
pub struct ThrottleConfig {
pub max_requests_per_second: Option<u32>,
pub max_requests_per_minute: Option<u32>,
Expand Down Expand Up @@ -236,8 +264,13 @@ pub async fn load_config() -> Result<BackendConfig, anyhow::Error> {

let cfg = cfg.build()?.try_deserialize::<BackendConfig>()?;

if !cfg.worker.enable && (cfg.worker.rabbitmq.is_some() || cfg.worker.throttle.is_some()) {
warn!(target: LOG_TARGET, "worker.enable is set to false, ignoring throttling and concurrency settings.")
if cfg.worker.enable {
warn!(target: LOG_TARGET, "The worker feature is currently in beta. Please send any feedback to [email protected].");

match &cfg.storage {
Some(StorageConfig::Postgres(_)) => {}
_ => bail!("When worker mode is enabled, a Postgres database must be configured."),
}
}

Ok(cfg)
Expand Down
11 changes: 9 additions & 2 deletions backend/src/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub trait DisplayDebug: fmt::Display + Debug + Sync + Send {}
impl<T: fmt::Display + Debug + Sync + Send> DisplayDebug for T {}

/// Struct describing an error response.
#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub struct ReacherResponseError {
pub code: StatusCode,
pub error: Box<dyn DisplayDebug>,
Expand Down Expand Up @@ -121,7 +121,14 @@ impl From<StorageError> for ReacherResponseError {

impl From<reqwest::Error> for ReacherResponseError {
fn from(e: reqwest::Error) -> Self {
ReacherResponseError::new(StatusCode::INTERNAL_SERVER_ERROR, e)
ReacherResponseError::new(
e.status()
.map(|s| s.as_u16())
.map(StatusCode::from_u16)
.and_then(Result::ok)
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
e,
)
}
}

Expand Down
20 changes: 0 additions & 20 deletions backend/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ use crate::config::BackendConfig;
use check_if_email_exists::LOG_TARGET;
use error::handle_rejection;
pub use error::ReacherResponseError;
use sqlx::PgPool;
use sqlxmq::JobRunnerHandle;
use std::env;
use std::net::IpAddr;
use std::sync::Arc;
use tracing::info;
pub use v0::check_email::post::CheckEmailRequest;
use warp::http::StatusCode;
use warp::Filter;

pub fn create_routes(
Expand Down Expand Up @@ -101,24 +99,6 @@ pub async fn run_warp_server(
Ok(runner)
}

/// Warp filter to add the database pool to the handler. If the pool is not
/// configured, it will return an error.
pub fn with_db(
pg_pool: Option<PgPool>,
) -> impl Filter<Extract = (PgPool,), Error = warp::Rejection> + Clone {
warp::any().and_then(move || {
let pool = pg_pool.clone();
async move {
pool.ok_or_else(|| {
warp::reject::custom(ReacherResponseError::new(
StatusCode::SERVICE_UNAVAILABLE,
"Please configure a Postgres database on Reacher before calling this endpoint",
))
})
}
})
}

/// The header which holds the Reacher backend secret.
pub const REACHER_SECRET_HEADER: &str = "x-reacher-secret";

Expand Down
5 changes: 3 additions & 2 deletions backend/src/http/v1/bulk/get_progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use sqlx::PgPool;
use warp::http::StatusCode;
use warp::Filter;

use super::with_worker_db;
use crate::config::BackendConfig;
use crate::http::{with_db, ReacherResponseError};
use crate::http::ReacherResponseError;

/// NOTE: Type conversions from postgres to rust types
/// are according to the table given by
Expand Down Expand Up @@ -149,7 +150,7 @@ pub fn v1_get_bulk_job_progress(
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("v1" / "bulk" / i32)
.and(warp::get())
.and(with_db(config.get_pg_pool()))
.and(with_worker_db(config))
.and_then(http_handler)
// View access logs by setting `RUST_LOG=reacher`.
.with(warp::log(LOG_TARGET))
Expand Down
5 changes: 3 additions & 2 deletions backend/src/http/v1/bulk/get_results/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use std::{convert::TryInto, sync::Arc};
use warp::http::StatusCode;
use warp::Filter;

use super::with_worker_db;
use crate::config::BackendConfig;
use crate::http::{with_db, ReacherResponseError};
use crate::http::ReacherResponseError;
use csv_helper::{CsvResponse, CsvWrapper};

mod csv_helper;
Expand Down Expand Up @@ -180,7 +181,7 @@ pub fn v1_get_bulk_job_results(
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("v1" / "bulk" / i32 / "results")
.and(warp::get())
.and(with_db(config.get_pg_pool()))
.and(with_worker_db(config))
.and(warp::query::<Request>())
.and_then(http_handler)
// View access logs by setting `RUST_LOG=reacher_backend`.
Expand Down
Loading
Loading