Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement readiness probe at /readyz #153

Draft
wants to merge 5 commits into
base: beta
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 67 additions & 7 deletions agent_api_rest/postman/ssi-agent.postman_collection.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"info": {
"_postman_id": "3b1e6396-3bf7-43aa-bc3b-21056fb21dfa",
"_postman_id": "d910f16f-e653-4b7d-949e-255b2a7d4834",
"name": "ssi-agent",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json",
"_exporter_id": "24972330"
"_exporter_id": "30650915"
},
"item": [
{
Expand Down Expand Up @@ -97,7 +97,7 @@
"header": [],
"body": {
"mode": "raw",
"raw": "{\n \"offerId\":\"{{OFFER_ID}}\",\n \"credentialConfigurationId\": \"w3c_vc_credential\",\n \"credential\": {\n \"credentialSubject\": {\n \"first_name\": \"Ferris\",\n \"last_name\": \"Crabman\",\n \"dob\": \"1982-01-01\"\n }\n }\n}",
"raw": "{\n \"offerId\":\"{{OFFER_ID}}\",\n \"credentialConfigurationId\": \"w3c_vc_credential\",\n \"credential\": {\n \"credentialSubject\": {\n \"first_name\": \"Ferris\",\n \"last_name\": \"Crabman\",\n \"dob\": \"1982-01-01\"\n }\n },\n \"expires\": \"P2D\"\n}",
"options": {
"raw": {
"language": "json"
Expand Down Expand Up @@ -151,7 +151,7 @@
"header": [],
"body": {
"mode": "raw",
"raw": "{\n \"offerId\":\"{{OFFER_ID}}\",\n \"credentialConfigurationId\": \"w3c_vc_credential\",\n \"credential\": {\n \"credentialSubject\": {\n \"id\": \"https://ecommerce.impierce.com/\",\n \"image\": \"https://static.wikia.nocookie.net/fictionalcompanies/images/c/c2/ACME_Corporation.png\",\n \"name\": \"VirtualVendors\",\n \"certificaat\": {\n \"type\": \"ACMECorpCredential\",\n \"certificeringsDatum\": \"2024-06-26\",\n \"geldigheidsPeriode\": \"1 jaar\",\n \"garanties\": [\n \"Het bedrijf is echt en bereikbaar.\",\n \"Voldoet aan de Thuiswinkel Algemene Voorwaarden.\",\n \"14 dagen bedenktijd.\",\n \"Veilige betaalmethoden.\",\n \"Duidelijke product/servicebeschrijvingen.\",\n \"Transparant bestelproces.\",\n \"Duidelijke prijzen.\",\n \"Veilige betaalomgeving.\",\n \"Veilige omgang met persoonlijke gegevens.\",\n \"Effectieve klachtenafhandeling en onafhankelijke geschillenbemiddeling.\"\n ]\n }\n }\n }\n}",
"raw": "{\n \"offerId\":\"{{OFFER_ID}}\",\n \"credentialConfigurationId\": \"w3c_vc_credential\",\n \"credential\": {\n \"id\": \"https://acme.example.org/1a2b3c4d5e6f\",\n \"credentialSubject\": {\n \"id\": \"https://ecommerce.impierce.com/\",\n \"image\": \"https://static.wikia.nocookie.net/fictionalcompanies/images/c/c2/ACME_Corporation.png\",\n \"name\": \"VirtualVendors\",\n \"certificaat\": {\n \"type\": \"ACMECorpCredential\",\n \"certificeringsDatum\": \"2024-06-26\",\n \"geldigheidsPeriode\": \"1 jaar\",\n \"garanties\": [\n \"Het bedrijf is echt en bereikbaar.\",\n \"Voldoet aan de Thuiswinkel Algemene Voorwaarden.\",\n \"14 dagen bedenktijd.\",\n \"Veilige betaalmethoden.\",\n \"Duidelijke product/servicebeschrijvingen.\",\n \"Transparant bestelproces.\",\n \"Duidelijke prijzen.\",\n \"Veilige betaalomgeving.\",\n \"Veilige omgang met persoonlijke gegevens.\",\n \"Effectieve klachtenafhandeling en onafhankelijke geschillenbemiddeling.\"\n ]\n }\n }\n }\n}",
"options": {
"raw": {
"language": "json"
Expand Down Expand Up @@ -629,21 +629,40 @@
}
],
"request": {
"auth": {
"type": "apikey",
"apikey": [
{
"key": "value",
"value": "E8C5AEF8F5954FCA9F543D4913569C98",
"type": "string"
},
{
"key": "key",
"value": "X-API-KEY",
"type": "string"
}
]
},
"method": "POST",
"header": [],
"body": {
"mode": "raw",
"raw": "{\n \"nonce\": \"this is a nonce\",\n \"presentation_definition\": {\n \"id\":\"Verifiable Presentation request for sign-on\",\n \"input_descriptors\":[\n {\n \"id\":\"Request for Verifiable Credential\",\n \"constraints\":{\n \"fields\":[\n {\n \"path\":[\n \"$.vc.type\"\n ],\n \"filter\":{\n \"type\":\"array\",\n \"contains\":{\n \"const\":\"VerifiableCredential\"\n }\n }\n }\n ]\n }\n }\n ]\n }\n}",
"raw": "{\n \"nonce\": \"this is a nonce\",\n \"presentation_definition\": {\n \"id\":\"Verifiable Presentation request for sign-on\",\n \"input_descriptors\":[\n {\n \"id\":\"Request for Verifiable Credential\",\n \"constraints\":{\n \"fields\":[\n {\n \"path\":[\n \"$.vc.type\"\n ],\n \"filter\":{\n \"type\":\"array\",\n \"contains\":{\n \"const\":\"IdentificationCardCredential\"\n }\n }\n }\n ]\n }\n }\n ]\n }\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"url": {
"raw": "{{HOST}}/v0/authorization_requests",
"raw": "https://zkteco.dev.impierce.com/v0/authorization_requests",
"protocol": "https",
"host": [
"{{HOST}}"
"zkteco",
"dev",
"impierce",
"com"
],
"path": [
"v0",
Expand Down Expand Up @@ -1521,8 +1540,45 @@
"response": []
}
]
},
{
"name": "_monitoring",
"item": [
{
"name": "Liveness probe",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{HOST}}/healthz",
"host": [
"{{HOST}}"
],
"path": [
"healthz"
]
}
},
"response": []
}
]
}
],
"auth": {
"type": "apikey",
"apikey": [
{
"key": "value",
"value": "{{API_KEY}}",
"type": "string"
},
{
"key": "key",
"value": "X-API-KEY",
"type": "string"
}
]
},
"event": [
{
"listen": "prerequest",
Expand Down Expand Up @@ -1603,6 +1659,10 @@
"key": "CONNECTION_ID",
"value": "INITIAL_VALUE",
"type": "string"
},
{
"key": "SERVICE_ID",
"value": ""
}
]
}
1 change: 1 addition & 0 deletions agent_application/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ agent_store = { path = "../agent_store" }
agent_verification = { path = "../agent_verification" }

axum.workspace = true
axum-macros = "0.4"
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
15 changes: 9 additions & 6 deletions agent_application/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(clippy::await_holding_lock)]

mod probes;

use agent_api_rest::{app, ApplicationState};
use agent_event_publisher_http::EventPublisherHttp;
use agent_holder::services::HolderServices;
Expand All @@ -9,8 +11,9 @@ use agent_secret_manager::{secret_manager, service::Service as _, subject::Subje
use agent_shared::config::{config, LogFormat};
use agent_store::{in_memory, postgres, EventPublisher};
use agent_verification::services::VerificationServices;
use probes::{liveness::healthz, readiness::readyz_handler};
use std::sync::Arc;
use tokio::{fs, io};
use tokio::io;
use tracing::info;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand Down Expand Up @@ -68,18 +71,18 @@ async fn main() -> io::Result<()> {
agent_identity::state::initialize(&identity_state).await;
agent_issuance::state::initialize(&issuance_state, startup_commands(url.clone())).await;

let health_router = axum::Router::new()
.route("/healthz", axum::routing::get(healthz))
.route("/readyz", axum::routing::get(readyz_handler));

let app = app(ApplicationState {
identity_state: Some(identity_state),
issuance_state: Some(issuance_state),
holder_state: Some(holder_state),
verification_state: Some(verification_state),
});

// This is used to indicate that the server accepts requests.
// In a docker container this file can be searched to see if its ready.
// A better solution can be made later (needed for impierce-demo)
fs::create_dir_all("/tmp/unicore/").await?;
fs::write("/tmp/unicore/accept_requests", []).await?;
let app = health_router.merge(app);

let listener = tokio::net::TcpListener::bind("0.0.0.0:3033").await?;
info!("listening on {}", listener.local_addr()?);
Expand Down
7 changes: 7 additions & 0 deletions agent_application/src/probes/liveness.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use axum::http::StatusCode;
use axum::response::IntoResponse;

/// A simple liveness probe following application monitoring conventions.
pub async fn healthz() -> impl IntoResponse {
StatusCode::OK
}
2 changes: 2 additions & 0 deletions agent_application/src/probes/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod liveness;
pub mod readiness;
46 changes: 46 additions & 0 deletions agent_application/src/probes/readiness.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use agent_shared::config::config;
use agent_shared::config::EventStoreType;
use agent_store::postgres::check_connection;
use axum::http::StatusCode;
use axum::response::IntoResponse;

#[axum_macros::debug_handler]
pub async fn readyz_handler() -> impl IntoResponse {
// check database connection
// check message queue connection

let event_store_type = config().event_store.type_.clone();

// write code: if config is postgres, then call the postgres check_connection function
let status_code = match event_store_type {
EventStoreType::InMemory => {
println!("Checking Postgres connection ...");

// check_connection().await;

if check_connection().await {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
}
// StatusCode::OK

// if postgres::check_connection().await {
// return axum::http::StatusCode::OK;
// } else {
// return axum::http::StatusCode::SERVICE_UNAVAILABLE;
// }
// if let Err(e) = postgres::check_connection().await {
// return axum::http::StatusCode::SERVICE_UNAVAILABLE;
// }
}
EventStoreType::Postgres => {
// do nothing
StatusCode::OK
}
};

// Response::new("foobar".into())

status_code
}
1 change: 1 addition & 0 deletions agent_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ sqlx = { version = "0.7", features = [
"json",
] }
tokio.workspace = true
tracing.workspace = true
39 changes: 36 additions & 3 deletions agent_store/src/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{partition_event_publishers, EventPublisher, Partitions};
use agent_holder::{services::HolderServices, state::HolderState};
use agent_identity::{services::IdentityServices, state::IdentityState};
use agent_identity::{connection, services::IdentityServices, state::IdentityState};
use agent_issuance::{
offer::queries::{access_token::AccessTokenQuery, pre_authorized_code::PreAuthorizedCodeQuery},
services::IssuanceServices,
Expand All @@ -13,8 +13,8 @@ use agent_shared::{
use agent_verification::{services::VerificationServices, state::VerificationState};
use async_trait::async_trait;
use cqrs_es::{Aggregate, Query};
use postgres_es::{default_postgress_pool, PostgresCqrs, PostgresViewRepository};
use sqlx::{Pool, Postgres};
use postgres_es::{PostgresCqrs, PostgresViewRepository};
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
use std::{collections::HashMap, sync::Arc};

struct AggregateHandler<A>
Expand Down Expand Up @@ -323,3 +323,36 @@ pub async fn verification_state(
},
}
}

/// Replacement for `postgres_es::default_postgress_pool`, but returns an error instead of panicking.
pub async fn default_postgress_pool(connection_string: &str) -> Result<Pool<Postgres>, sqlx::Error> {
PgPoolOptions::new()
.max_connections(10)
.connect(connection_string)
.await
// .expect("unable to connect to database")
}

pub async fn check_connection() -> bool {
let connection_string = config().event_store.connection_string.clone().expect(
"Missing config parameter `event_store.connection_string` or `UNICORE__EVENT_STORE__CONNECTION_STRING`",
);
let connection_string = "foobar".to_string();
let pool = if let Ok(pool) = default_postgress_pool(&connection_string)
.await
.inspect_err(|e| tracing::debug!("Database connectivity check failed: {}", e))
{
pool
} else {
return false;
};

// sqlx::query("SELECT 1").fetch_one(&pool).await.is_ok()
match sqlx::query("SELECT 1").execute(&pool).await {
Ok(_) => true,
Err(err) => {
tracing::debug!("Database connectivity check failed: {}", err);
false
}
}
}
Loading