Skip to content

Commit 79239da

Browse files
authored
Merge pull request #502 from splitgraph/http-health
Add a http health endpoint
2 parents 00d3a43 + 91d72a9 commit 79239da

File tree

19 files changed

+419
-187
lines changed

19 files changed

+419
-187
lines changed

.github/workflows/ci.yml

+3-18
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,6 @@ jobs:
1111
# set debuginfo to 1 (line tables only, for panic tracebacks)
1212
RUSTFLAGS: "-C debuginfo=1"
1313

14-
services:
15-
postgres:
16-
image: postgres:14
17-
env:
18-
POSTGRES_PASSWORD: postgres
19-
POSTGRES_DB: db_test
20-
ports:
21-
- 5432/tcp
22-
options: >-
23-
--health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
24-
2514
steps:
2615
- uses: actions/checkout@v3
2716

@@ -68,19 +57,15 @@ jobs:
6857
run: |
6958
cargo build
7059
71-
- name: Spin up the test object stores
72-
run: docker compose up -d
60+
- name: Spin up the test object stores and postgres catalog
61+
run: docker compose up -d --wait || true
7362

7463
# TODO split tests into unit and integration (one requires postgres?)
7564
- name: Run tests
7665
run: |
7766
cargo test --workspace
7867
env:
79-
# Database URL for end-to-end + postgres repository tests; the host is `localhost` since we don't specify
80-
# a child container to run the job in (and instead run everything in the root specified by `runs-on`).
81-
# The port has been randomly assigned so we need to fetch it.
82-
DATABASE_URL:
83-
"postgres://postgres:postgres@localhost:${{ job.services.postgres.ports[5432] }}/db_test"
68+
DATABASE_URL: "postgres://postgres:test@localhost:5432/db_test"
8469

8570
# TODO recompiles the whole thing with different flags (busting the cache?,
8671
# also codecov needs a token for private repos; also this breaks in Docker

Cargo.lock

+10-7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ hex = ">=0.4.0"
116116
itertools = { workspace = true }
117117
lazy_static = ">=1.4.0"
118118
# Pick up hashbrown >0.13.1 to resolve conflicts with wasm crates
119-
metrics = { git = "https://github.com/metrics-rs/metrics", rev = "bc29c4f984605ccadb0daede451b28523c1aff86", optional = true }
120-
metrics-exporter-prometheus = { git = "https://github.com/metrics-rs/metrics", rev = "bc29c4f984605ccadb0daede451b28523c1aff86", optional = true }
119+
metrics = { version = "0.22.1", optional = true }
120+
metrics-exporter-prometheus = { version = "0.13.1", optional = true }
121121
moka = { version = "0.12.2", default_features = false, features = ["future", "atomic64", "quanta"] }
122122
object_store = "0.9"
123123
parking_lot = "0.12.1"

docker-compose.yml

+9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
version: "3.9"
22
services:
3+
postgres:
4+
image: postgres:16
5+
environment:
6+
POSTGRES_USER: postgres
7+
POSTGRES_PASSWORD: test
8+
ports:
9+
- "5432:5432"
10+
restart: unless-stopped
11+
312
minio:
413
image: minio/minio:latest
514
ports:

src/config/context.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ pub fn setup_metrics(metrics: &schema::Metrics) {
189189
describe_counter!(GRPC_REQUESTS, "Counter tracking gRPC request statistics");
190190
}
191191

192-
pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result<SeafowlContext> {
192+
pub async fn build_context(cfg: schema::SeafowlConfig) -> Result<SeafowlContext> {
193193
let mut runtime_config = RuntimeConfig::new();
194194
if let Some(max_memory) = cfg.runtime.max_memory {
195195
runtime_config = runtime_config
@@ -217,7 +217,7 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result<SeafowlContext
217217
// Register the HTTP object store for external tables
218218
add_http_object_store(&context, &cfg.misc.ssl_cert_file);
219219

220-
let metastore = build_metastore(cfg, internal_object_store.clone()).await;
220+
let metastore = build_metastore(&cfg, internal_object_store.clone()).await;
221221

222222
// Create default DB/collection
223223
if let Err(CatalogError::CatalogDoesNotExist { .. }) =
@@ -244,12 +244,12 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result<SeafowlContext
244244
// (it will reload its schema before running the query)
245245

246246
Ok(SeafowlContext {
247+
config: cfg,
247248
inner: context,
248249
metastore: Arc::new(metastore),
249250
internal_object_store,
250251
default_catalog: DEFAULT_DB.to_string(),
251252
default_schema: DEFAULT_SCHEMA.to_string(),
252-
max_partition_size: cfg.misc.max_partition_size,
253253
})
254254
}
255255

@@ -297,7 +297,7 @@ mod tests {
297297
},
298298
};
299299

300-
let context = build_context(&config).await.unwrap();
300+
let context = build_context(config).await.unwrap();
301301

302302
// Run a query against the context to test it works
303303
let results = context

src/config/schema.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,8 @@ impl ObjectCacheProperties {
199199
Arc::new(CachingObjectStore::new(
200200
inner,
201201
&path,
202-
self.capacity,
203202
self.min_fetch_size,
203+
self.capacity,
204204
Duration::from_secs(self.ttl_s),
205205
))
206206
}
@@ -473,7 +473,9 @@ If Seafowl is running on GCP a token should be fetched using the GCP metadata en
473473

474474
pub fn load_config(path: &Path) -> Result<SeafowlConfig, ConfigError> {
475475
let config = Config::builder()
476-
.add_source(File::with_name(path.to_str().expect("Error parsing path")))
476+
.add_source(
477+
File::with_name(path.to_str().expect("Error parsing path")).required(false),
478+
)
477479
.add_source(Environment::with_prefix(ENV_PREFIX).separator(ENV_SEPARATOR));
478480

479481
config.build()?.try_deserialize().and_then(validate_config)
@@ -486,7 +488,7 @@ pub fn load_config_from_string(
486488
env_override: Option<Map<String, String>>,
487489
) -> Result<SeafowlConfig, ConfigError> {
488490
let config = Config::builder()
489-
.add_source(File::from_str(config_str, FileFormat::Toml))
491+
.add_source(File::from_str(config_str, FileFormat::Toml).required(false))
490492
.add_source(
491493
Environment::with_prefix(ENV_PREFIX)
492494
.separator(ENV_SEPARATOR)

src/context/delta.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ impl SeafowlContext {
381381
plan,
382382
self.internal_object_store.clone(),
383383
table_prefix,
384-
self.max_partition_size,
384+
self.config.misc.max_partition_size,
385385
)
386386
.await?;
387387

src/context/mod.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::object_store::wrapped::InternalObjectStore;
99
use crate::wasm_udf::data_types::{get_volatility, CreateFunctionDetails};
1010
use crate::wasm_udf::wasm::create_udf_from_wasm;
1111

12+
use crate::config::schema::SeafowlConfig;
1213
use base64::{engine::general_purpose::STANDARD, Engine};
1314
pub use datafusion::error::{DataFusionError as Error, Result};
1415
use datafusion::{error::DataFusionError, prelude::SessionContext, sql::TableReference};
@@ -21,12 +22,12 @@ use uuid::Uuid;
2122
// The core Seafowl object, responsible for parsing, logical and physical planning, as well as
2223
// interacting with the catalog and object store.
2324
pub struct SeafowlContext {
25+
pub config: SeafowlConfig,
2426
pub inner: SessionContext,
2527
pub metastore: Arc<Metastore>,
2628
pub internal_object_store: Arc<InternalObjectStore>,
2729
pub default_catalog: String,
2830
pub default_schema: String,
29-
pub max_partition_size: u32,
3031
}
3132

3233
impl SeafowlContext {
@@ -42,12 +43,12 @@ impl SeafowlContext {
4243
build_state_with_table_factories(session_config, self.inner().runtime_env());
4344

4445
Arc::from(SeafowlContext {
46+
config: self.config.clone(),
4547
inner: SessionContext::new_with_state(state),
4648
metastore: self.metastore.clone(),
4749
internal_object_store: self.internal_object_store.clone(),
4850
default_catalog: catalog,
4951
default_schema: schema,
50-
max_partition_size: self.max_partition_size,
5152
})
5253
}
5354

@@ -226,7 +227,7 @@ pub mod test_utils {
226227
runtime: Default::default(),
227228
misc: Default::default(),
228229
};
229-
build_context(&config).await.unwrap()
230+
build_context(config).await.unwrap()
230231
}
231232

232233
pub async fn in_memory_context_with_test_db() -> Arc<SeafowlContext> {

src/context/physical.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ impl SeafowlContext {
347347
&update_plan,
348348
self.internal_object_store.clone(),
349349
table_prefix,
350-
self.max_partition_size,
350+
self.config.misc.max_partition_size,
351351
)
352352
.await?;
353353

@@ -460,7 +460,7 @@ impl SeafowlContext {
460460
&filter_plan,
461461
self.internal_object_store.clone(),
462462
table_prefix,
463-
self.max_partition_size,
463+
self.config.misc.max_partition_size,
464464
)
465465
.await?;
466466

src/frontend/http.rs

+61-14
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use warp::{hyper, Rejection};
99

1010
use arrow::json::writer::record_batches_to_json_rows;
1111
use arrow::record_batch::RecordBatch;
12+
#[cfg(feature = "frontend-arrow-flight")]
13+
use arrow_flight::flight_service_client::FlightServiceClient;
1214

1315
use arrow_integration_test::{schema_from_json, schema_to_json};
1416
use arrow_schema::SchemaRef;
@@ -42,9 +44,9 @@ use crate::auth::{token_to_principal, AccessPolicy, Action, UserContext};
4244
use crate::catalog::DEFAULT_DB;
4345
#[cfg(feature = "metrics")]
4446
use crate::config::context::HTTP_REQUESTS;
45-
use crate::config::schema::{AccessSettings, MEBIBYTES};
47+
use crate::config::schema::{AccessSettings, HttpFrontend, MEBIBYTES};
4648
use crate::{
47-
config::schema::{str_to_hex_hash, HttpFrontend},
49+
config::schema::str_to_hex_hash,
4850
context::logical::{is_read_only, is_statement_read_only},
4951
context::SeafowlContext,
5052
};
@@ -482,8 +484,29 @@ async fn load_part(mut part: Part) -> Result<Vec<u8>, ApiError> {
482484
Ok(bytes)
483485
}
484486

485-
// We need the allow to silence the compiler: it asks us to add warp::generic::Tuple to the first
486-
// parameter of the return type, but that struct is not exportable (generic is private).
487+
/// GET /healthz or /readyz
488+
pub async fn health_endpoint(context: Arc<SeafowlContext>) -> Result<Response, ApiError> {
489+
#[cfg(feature = "frontend-arrow-flight")]
490+
if let Some(flight) = &context.config.frontend.flight {
491+
// TODO: run SELECT 1 or something similar?
492+
if let Err(err) = FlightServiceClient::connect(format!(
493+
"http://{}:{}",
494+
flight.bind_host, flight.bind_port
495+
))
496+
.await
497+
{
498+
warn!(%err, "Arrow Flight client can't connect, health check failed");
499+
return Ok(warp::reply::with_status(
500+
"not_ready",
501+
StatusCode::SERVICE_UNAVAILABLE,
502+
)
503+
.into_response());
504+
};
505+
};
506+
507+
Ok(warp::reply::with_status("ready", StatusCode::OK).into_response())
508+
}
509+
487510
pub fn filters(
488511
context: Arc<SeafowlContext>,
489512
config: HttpFrontend,
@@ -501,22 +524,34 @@ pub fn filters(
501524
.max_age(CORS_MAXAGE);
502525

503526
let log = warp::log::custom(|info: Info<'_>| {
527+
let path = info.path();
528+
504529
#[cfg(feature = "metrics")]
505-
counter!(
506-
HTTP_REQUESTS,
507-
"method" => info.method().as_str().to_string(),
508-
// Omit a potential db prefix or url-encoded query from the path
509-
"route" => if info.path().contains("/upload/") { "/upload" } else { "/q" },
510-
"status" => info.status().as_u16().to_string(),
511-
)
512-
.increment(1);
530+
{
531+
let route = if path.contains("/upload/") {
532+
"/upload".to_string()
533+
} else if path.contains("/q") {
534+
"/q".to_string()
535+
} else {
536+
path.to_string()
537+
};
538+
539+
counter!(
540+
HTTP_REQUESTS,
541+
"method" => info.method().as_str().to_string(),
542+
// Omit a potential db prefix or url-encoded query from the path
543+
"route" => route,
544+
"status" => info.status().as_u16().to_string(),
545+
)
546+
.increment(1);
547+
}
513548

514549
info!(
515550
target: module_path!(),
516551
"{} \"{} {} {:?}\" {} \"{}\" \"{}\" {:?}",
517552
info.remote_addr().map(|addr| addr.to_string()).unwrap_or("-".to_string()),
518553
info.method(),
519-
info.path(),
554+
path,
520555
info.version(),
521556
info.status().as_u16(),
522557
info.referer().unwrap_or("-"),
@@ -581,7 +616,7 @@ pub fn filters(
581616
.map(into_response);
582617

583618
// Upload endpoint
584-
let ctx = context;
619+
let ctx = context.clone();
585620
let upload_route = warp::path!(String / "upload" / String / String)
586621
.or(warp::any()
587622
.map(move || DEFAULT_DB.to_string())
@@ -596,9 +631,21 @@ pub fn filters(
596631
.then(upload)
597632
.map(into_response);
598633

634+
// Health-check/readiness probe
635+
let ctx = context;
636+
let health_route = warp::path!("healthz")
637+
.or(warp::path!("readyz"))
638+
.and(warp::path::end())
639+
.and(warp::get())
640+
.unify()
641+
.and(warp::any().map(move || ctx.clone()))
642+
.then(health_endpoint)
643+
.map(into_response);
644+
599645
cached_read_query_route
600646
.or(uncached_read_write_query_route)
601647
.or(upload_route)
648+
.or(health_route)
602649
.with(cors)
603650
.with(log)
604651
.map(|r| with_header(r, header::VARY, VARY))

0 commit comments

Comments
 (0)