diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..af0cdb7 --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,20 @@ +name: Rust + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +env: + CARGO_TERM_COLOR: always + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Build + run: cargo build --verbose diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6985cf1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,14 @@ +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..2f0d783 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "logsqlite" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +axum = { version= "0.6.20", features= ["macros"]} +futures = "0.3.28" +serde_json = "1.0.105" +serde = {version = "1.0.183", features = ["derive"]} +hyperlocal = "0.8" +hyper = "0.14.27" +tokio = {version = "1.29.1", features = ["fs", "io-util", "rt-multi-thread", "macros", "time", "sync"] } +chrono = "0.4.26" +bytes = "1" +futures-util = { version = "0.3", default-features = false, features = ["alloc"] } +prost = { version= "0.12" } +http = "0.2" +log = "0.4" +simple_logger = "4.2" +configparser = "3.0" +sqlx = { version = "0.7.3", features = ["runtime-tokio", "macros", "sqlite"]} +bincode = "1.3.3" + +[build-dependencies] +prost-build = "0.5" + +[profile.release] +lto=true +strip=true diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..45c1082 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,24 @@ +# syntax=docker/dockerfile:1.7-labs + +FROM rust:bookworm as prefetch +WORKDIR /src/logsqlite +RUN apt update && apt install -y build-essential libsqlite3-0 libsqlite3-dev protobuf-compiler && apt clean +COPY Cargo.toml /src/logsqlite/ +RUN mkdir /src/logsqlite/src +RUN echo 'fn main() {}' >/src/logsqlite/src/main.rs +RUN cargo b --release + +FROM prefetch as build +WORKDIR /src/logsqlite +COPY Cargo.toml build.rs /src/logsqlite/ +COPY src /src/logsqlite/src +RUN cargo b --release && cp target/*/logsqlite . + +FROM debian:bookworm as logsqlite_rootfs +RUN apt update && apt install -y libsqlite3-0 && apt clean +RUN mkdir -p /var/spool/logsqlite /etc/logsqlite /run/docker/plugins +COPY --from=build /src/logsqlite/logsqlite /bin/logsqlite +COPY conf.ini /etc/logsqlite/ +VOLUME /var/spool/logsqlite +WORKDIR /bin/ +ENTRYPOINT [ "/bin/logsqlite", "/etc/logsqlite/conf.ini" ] diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..79839f7 --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 David Schramm + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..536ce22 --- /dev/null +++ b/README.md @@ -0,0 +1,24 @@ +# logsqlite +Moby/Docker logging driver plugin which uses sqlite3 databases. + +Allows **faster** querying of logs (f.e using `docker logs --since/--until`) than the default JSON File logging driver + +# Building +* `cargo b --release` + +# Installation (systemd) +1. `cp logsqlite /usr/local/bin/` +2. `mkdir /etc/logsqlite && cp conf.ini /etc/logsqlite/` +3. `cp logsqlite.service /etc/systemd/system/ && systemctl daemon-reload` +4. `mkdir /var/spool/logsqlite/` +5. `systemctl enable logsqlite && systemctl start logsqlite` + +# Configuration +See `conf.ini` + +# Using the driver +- as the default logging driver: + - add `"log-driver": "logsqlite"` in the `daemon.json` +- per container + - `docker run --log-driver logsqlite` + - or in the docker-compose: https://docs.docker.com/compose/compose-file/compose-file-v3/#logging diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..e9f551a --- /dev/null +++ b/build.rs @@ -0,0 +1,6 @@ +extern crate prost_build; + +fn main() { + println!("cargo:rerun-if-changed=src/entry.proto"); + prost_build::compile_protos(&["src/entry.proto"], &["src/"]).unwrap(); +} diff --git a/build_plugin.sh b/build_plugin.sh new file mode 100755 index 0000000..8a59c81 --- /dev/null +++ b/build_plugin.sh @@ -0,0 +1,13 @@ +#!/bin/bash +set -e +docker build -t logsqlite_rootfs . + +id=$(docker create logsqlite_rootfs true) + +mkdir -p plugin/rootfs + +docker export "$id" | sudo tar -x -C plugin/rootfs + +docker rm -vf "$id" + +docker plugin create logsqlite ./plugin/ diff --git a/conf.ini b/conf.ini new file mode 100644 index 0000000..190196d --- /dev/null +++ b/conf.ini @@ -0,0 +1,16 @@ +[general] +# the directory where docker plugins sockets are placed in +# (usually you can leave this to the default path) +# don't forget to also adapt your systemd-service if you change this one +plugins_dir=/run/docker/plugins/ + +# the directory where logsqlite puts the databases into +databases_dir=/var/spool/logsqlite/ + +# this database is used for keeping the state of to-be-logged containers +# across restarts of logsqlite +state_database=/etc/logsqlite/state.db + +[cleanup] +# in which interval the log cleaner runs +interval=600 # in seconds diff --git a/images/LogSQlite Main Logo.svg b/images/LogSQlite Main Logo.svg deleted file mode 100644 index a573901..0000000 --- a/images/LogSQlite Main Logo.svg +++ /dev/null @@ -1,34 +0,0 @@ - - - - - - - - - - diff --git a/logsqlite.service b/logsqlite.service new file mode 100644 index 0000000..722ceb0 --- /dev/null +++ b/logsqlite.service @@ -0,0 +1,17 @@ +[Unit] +Description=Moby/Docker logdriver for logging into sqlite databases + +[Service] +Type=simple +User=root +Group=root +Restart=always +#RemainAfterExit=no +#Environment="DEBUG=true" +WorkingDirectory=/var/spool/logsqlite/ +ExecStartPre=-rm /var/run/docker/plugins/logsqlite.sock +ExecStart=/usr/local/bin/logsqlite /etc/logsqlite/conf.ini +#StandardOutput=null + +[Install] +WantedBy=multi-user.target diff --git a/plugin/config.json b/plugin/config.json new file mode 100644 index 0000000..4964946 --- /dev/null +++ b/plugin/config.json @@ -0,0 +1,27 @@ +{ + "description": "Sqlite Logging Driver", + "documentation": "https://github.com/daschr/logsqlite", + "entrypoint": [ + "/bin/logsqlite", + "/etc/logsqlite/conf.ini" + ], + "network": { + "type": "host" + }, + "interface": { + "types": [ + "docker.logdriver/1.0" + ], + "socket": "logsqlite.sock" + }, + "env": [ + { + "name": "DEBUG", + "description": "Log debug messages", + "value": "false", + "settable": [ + "value" + ] + } + ] +} diff --git a/src/cleaner.rs b/src/cleaner.rs new file mode 100644 index 0000000..68e0708 --- /dev/null +++ b/src/cleaner.rs @@ -0,0 +1,159 @@ +use log::{debug, error, info}; +use sqlx::{Connection, SqliteConnection}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Duration; +use tokio::{sync::RwLock, time}; + +use crate::config::LogConfig; + +#[derive(Clone)] +pub struct LogCleaner { + fifos: Arc>>, + containers: Arc>>, + dbs_path: String, +} + +impl LogCleaner { + pub fn new(dbs_path: String) -> Self { + LogCleaner { + fifos: Arc::new(RwLock::new(HashMap::new())), + containers: Arc::new(RwLock::new(HashMap::new())), + dbs_path, + } + } + + pub async fn add(&self, container_id: &str, fifo: PathBuf, log_conf: LogConfig) { + self.fifos + .write() + .await + .insert(fifo, container_id.to_string()); + + let mut map = self.containers.write().await; + map.insert(container_id.to_string(), log_conf); + } + + pub async fn remove(&self, fifo: &Path) -> Option { + let container_id: String = match self.fifos.write().await.remove(fifo) { + Some(v) => v, + None => return None, + }; + + self.containers.write().await.remove(&container_id) + } + + async fn get_first_tail_rowid( + con: &mut SqliteConnection, + tail: u64, + ) -> Result { + let count = match sqlx::query_as::("SELECT count(*) FROM logs") + .fetch_one(&mut *con) + .await + { + Ok(v) => v.0 as u64, + Err(sqlx::Error::RowNotFound) => { + return Ok(0); + } + Err(e) => { + return Err(e); + } + }; + + let rowid = + sqlx::query_as::("SELECT ROWID FROM logs LIMIT 1 OFFSET ?1") + .bind(if count > tail { + (count - tail) as i64 + } else { + 0 + }) + .fetch_one(&mut *con) + .await? + .0 as u64; + + Ok(rowid) + } + + async fn cleanup_db( + &self, + log_conf: &LogConfig, + con: &mut SqliteConnection, + ) -> Result<(), sqlx::Error> { + match (log_conf.cleanup_age, log_conf.cleanup_max_lines) { + (Some(cleanup_age), Some(max_lines)) => { + let rowid = Self::get_first_tail_rowid(con, max_lines as u64).await?; + + let mut max_time = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap(); + + max_time -= cleanup_age; + + debug!( + "cleanup: DELETE FROM logs WHERE ts < {} OR ROWID < {}", + max_time.as_nanos(), + rowid + ); + sqlx::query("DELETE FROM logs WHERE ts < ?1 OR ROWID < ?2") + .bind(max_time.as_nanos() as i64) + .bind(rowid as i64) + .execute(con) + .await?; + } + (Some(cleanup_age), None) => { + let mut max_time = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap(); + + max_time -= cleanup_age; + + debug!( + "cleanup: DELETE FROM logs WHERE ts < {}", + max_time.as_nanos() + ); + sqlx::query("DELETE FROM logs WHERE ts < ?1") + .bind(max_time.as_nanos() as i64) + .execute(con) + .await?; + } + (None, Some(max_lines)) => { + let rowid = Self::get_first_tail_rowid(con, max_lines as u64).await?; + + debug!("cleanup: DELETE FROM logs WHERE ROWID < {}", rowid); + sqlx::query("DELETE FROM logs WHERE ROWID < ?1") + .bind(rowid as i64) + .execute(con) + .await?; + } + (None, None) => (), // never happens + } + + Ok(()) + } + + pub async fn run(&self, cleanup_interval: Duration) -> Result<(), sqlx::Error> { + loop { + info!("starting cleanup"); + for (container, log_conf) in self.containers.read().await.iter() { + debug!( + "[cleanup] cleaning up container: {}, max_age: {:?} max_lines: {:?}", + container, log_conf.cleanup_age, log_conf.cleanup_max_lines + ); + let db_url = format!("sqlite://{}/{}", self.dbs_path, container); + match SqliteConnection::connect(&db_url).await { + Err(e) => { + error!("[cleanup] failed to open connection: {:?}", e); + } + Ok(mut con) => { + if let Err(e) = self.cleanup_db(&log_conf, &mut con).await { + error!("[cleanup] could not cleanup {}: {:?}", container, e); + } + } + } + } + info!("cleanup done"); + + time::sleep(cleanup_interval).await; + } + } +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..fb033ec --- /dev/null +++ b/src/config.rs @@ -0,0 +1,231 @@ +use configparser::ini::Ini; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::num::ParseIntError; +use std::path::PathBuf; +use std::time::Duration; + +#[derive(Debug)] +pub struct Config { + pub unix_socket_path: PathBuf, + pub databases_dir: PathBuf, + pub state_database: PathBuf, + pub cleanup_interval: Duration, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LogConfig { + pub max_lines_per_tx: u64, + pub max_size_per_tx: usize, // bytes + pub message_read_timeout: Duration, + pub cleanup_age: Option, + pub cleanup_max_lines: Option, + pub cleanup_interval: Duration, + pub delete_when_stopped: bool, +} + +#[allow(unused)] +pub enum ConfigSource { + File(T), + Text(T), +} + +#[derive(Debug, Serialize)] +pub enum ParsingError { + IniError(String), + ConfError(String), +} + +impl From for ParsingError { + fn from(value: String) -> Self { + ParsingError::IniError(value) + } +} + +impl From for ParsingError { + fn from(value: ParseIntError) -> Self { + ParsingError::ConfError(value.to_string()) + } +} + +fn get_dir>( + conf: &Ini, + section: &str, + name: &str, + default: T, +) -> Result { + match conf.get(section, name) { + Some(s) => { + let p = PathBuf::from(s.as_str()); + if !p.exists() { + return Err(ParsingError::ConfError(format!("{s} does not exist"))); + } + + if !p.is_dir() { + return Err(ParsingError::ConfError(format!("{s} is not a directory"))); + } + + Ok(p) + } + None => Ok(default.into()), + } +} + +fn parse_as_duration(v: &str) -> Result { + let pos = { + let mut r = 0; + for c in v.chars() { + if !c.is_ascii_digit() { + break; + } + r += 1; + } + r + }; + + if pos == 0 { + return Err(String::from("Cannot parse time: no number")); + } + + let mut num: u64 = v[0..pos].parse::().unwrap(); + match &v[pos..] { + "w" | "W" => num *= 7 * 24 * 60 * 60, + "d" | "D" => num *= 24 * 60 * 60, + "h" | "H" => num *= 60 * 60, + "m" | "M" => num *= 60, + "s" | "S" => (), + s => { + return Err(format!("Unknown time specifier \"{}\"", s)); + } + } + + Ok(Duration::from_secs(num)) +} + +fn parse_si_prefixed_size(v: &str) -> Result { + let pos = { + let mut r = 0; + for c in v.chars() { + if !c.is_ascii_digit() { + break; + } + r += 1; + } + r + }; + + if pos == 0 { + return Err(String::from("Cannot parse size: no number")); + } + + let mut num: u64 = v[0..pos].parse::().unwrap(); + match &v[pos..] { + "g" | "G" => num *= 1024 * 1024 * 1024, + "m" | "M" => num *= 1024 * 1024, + "k" | "K" => num *= 1024, + "b" | "B" => (), + s => { + return Err(format!("Unknown time specifier \"{}\"", s)); + } + } + + Ok(num as usize) +} + +impl TryFrom> for Config { + type Error = ParsingError; + + fn try_from(v: ConfigSource) -> Result { + let mut config = Ini::new(); + match v { + ConfigSource::File(f) => config.load(f)?, + ConfigSource::Text(t) => config.read(t)?, + }; + + let c = Config { + unix_socket_path: get_dir( + &config, + "general", + "plugins_dir", + "/var/run/docker/plugins/", + )? + .join("logsqlite.sock"), + databases_dir: get_dir(&config, "general", "databases_dir", "/var/spool/logsqlite/")?, + state_database: match config.get("general", "state_database") { + Some(p) => PathBuf::from(p), + None => { + return Err(ParsingError::ConfError(String::from( + "Missing \"state_database\"", + ))) + } + }, + cleanup_interval: config + .getuint("cleanup", "interval")? + .map(Duration::from_secs) + .unwrap_or(Duration::from_secs(30 * 60)), + }; + + Ok(c) + } +} + +impl Default for LogConfig { + fn default() -> Self { + LogConfig { + max_lines_per_tx: 10_000, + max_size_per_tx: 10 * 1024 * 1024, + message_read_timeout: Duration::from_millis(100), + cleanup_age: None, + cleanup_max_lines: None, + cleanup_interval: Duration::from_secs(10 * 60), + delete_when_stopped: true, + } + } +} + +impl TryFrom<&Option>> for LogConfig { + type Error = ParsingError; + + fn try_from(cust_conf: &Option>) -> Result { + let mut conf = LogConfig::default(); + + if cust_conf.is_none() { + return Ok(conf); + } + + let cust_conf = cust_conf.as_ref().unwrap(); + for (opt, val) in cust_conf.iter() { + match opt.as_str() { + "message_read_timeout" => { + conf.message_read_timeout = Duration::from_millis(val.parse::()?); + } + "max_lines_per_tx" => { + conf.max_lines_per_tx = val.parse::()?; + } + "max_size_per_tx" => { + conf.max_size_per_tx = parse_si_prefixed_size(val)?; + } + "cleanup_age" => { + conf.cleanup_age = Some(parse_as_duration(val)?); + } + "cleanup_max_lines" => { + conf.cleanup_max_lines = Some(val.parse::()?); + } + "delete_when_stopped" => { + conf.delete_when_stopped = match val.to_lowercase().as_str() { + "true" => true, + "false" => false, + _ => { + return Err(ParsingError::ConfError(String::from( + "delete_when_stopped is neither \"true\" or \"false\"", + ))); + } + } + } + _ => (), + } + } + + Ok(conf) + } +} diff --git a/src/docker.rs b/src/docker.rs new file mode 100644 index 0000000..0f60b6d --- /dev/null +++ b/src/docker.rs @@ -0,0 +1,198 @@ +use crate::config::{Config, LogConfig}; +use crate::logger::SqliteLogStream; +use crate::statehandler::StateHandlerMessage; + +use axum::{ + body::StreamBody, + debug_handler, + extract::{Json, State}, + http::Uri, + response::IntoResponse, +}; +use log::{debug, error, info}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use tokio::sync::mpsc::Sender; + +pub struct ApiState { + pub state_handler_tx: Sender, + pub config: Arc, +} + +impl ApiState { + pub async fn new( + state_handler_tx: Sender, + config: Arc, + ) -> Result { + Ok(ApiState { + state_handler_tx, + config, + }) + } +} + +#[allow(non_snake_case)] +#[derive(Debug, Serialize, Deserialize)] +pub struct Info { + Config: Option>, + ContainerID: String, + ContainerName: Option, + ContainerEntrypoint: Option, + ContainerArgs: Option>, + ContainerImageID: Option, + ContainerImageName: Option, + ContainerCreated: Option, + ContainerEnv: Option>, + ContainerLabels: Option>, + LogPath: Option, + DaemonName: Option, +} + +#[allow(non_snake_case)] +#[derive(Debug, Serialize, Deserialize)] +pub struct StartLoggingConf { + File: String, + Info: Info, +} + +pub async fn start_logging( + State(state): State>, + Json(conf): Json, +) -> Json { + info!("[start_logging] conf: {:?}", conf); + + state + .state_handler_tx + .send(StateHandlerMessage::StartLogging { + container_id: conf.Info.ContainerID.clone(), + fifo: PathBuf::from(conf.File.as_str()), + log_conf: match LogConfig::try_from(&conf.Info.Config) { + Ok(c) => c, + Err(e) => { + error!("Failed to parse config: {:?}: {:?}", conf.Info.Config, e); + return json!({"Err": e}).into(); + } + }, + }) + .await + .expect("failed to enqueue StateHandlerMessage"); + + info!("[docker::start_logging] send StartLogging message to StateHandler"); + + json!({"Err": ""}).into() +} + +#[allow(non_snake_case)] +#[derive(Debug, Serialize, Deserialize)] +pub struct StopLoggingConf { + File: String, +} + +#[debug_handler] +pub async fn stop_logging( + State(state): State>, + Json(conf): Json, +) -> Json { + info!("[stop_logging] conf: {:?}", conf); + + state + .state_handler_tx + .send(StateHandlerMessage::StopLogging { + fifo: PathBuf::from(conf.File), + }) + .await + .expect("Failed to enqueue StateHandlerMessage"); + info!("[docker::stop_logging] send StopLogging message to StateHandler"); + + json!({"Err": ""}).into() +} + +pub async fn capabilities() -> Json { + debug!("[capabilities] called"); + json!({"Cap": {"ReadLogs": true}}).into() +} + +#[allow(non_snake_case)] +#[derive(Debug, Serialize, Deserialize)] +pub struct ReadConfig { + Since: Option, + Until: Option, + Tail: Option, + Follow: Option, +} + +#[allow(non_snake_case)] +#[derive(Debug, Serialize, Deserialize)] +pub struct ReadLogsConf { + Config: ReadConfig, + Info: Info, +} + +/* +Object {"Config": Object {"Follow": Bool(false), "Since": String("0001-01-01T00:00:00Z"), +"Tail": Number(-1), "Until": String("0001-01-01T00:00:00Z")}, +"Info": Object {"Config": Object {}, "ContainerArgs": Array [], "ContainerCreated": String("2023-08-17T14:15:45.983205858Z"), "ContainerEntrypoint": String("bash"), "ContainerEnv": Array [String("PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")], "ContainerID": String("71a33f78bc9b362c91de1746d54d8fea51a17bfddde1745c2f9fbfdb1e4d2b90"), "ContainerImageID": String("sha256:01f29b872827fa6f9aed0ea0b2ede53aea4ad9d66c7920e81a8db6d1fd9ab7f9"), "ContainerImageName": String("ubuntu"), "ContainerLabels": Object {"org.opencontainers.image.ref.name": String("ubuntu"), "org.opencontainers.image.version": String("22.04")}, "ContainerName": String("/wonderful_raman"), "DaemonName": String("docker"), "LogPath": String("")}} +*/ + +pub async fn read_logs( + State(state): State>, + Json(mut conf): Json, +) -> Result> { + info!("[read_logs] conf: {:?}", conf); + + let tail = match conf.Config.Tail { + Some(v) if v < 1 => None, + Some(v) => Some(v as u64), + None => None, + }; + + if conf + .Config + .Since + .as_mut() + .is_some_and(|x| x == "0001-01-01T00:00:00Z") + { + conf.Config.Since = None; + } + + if conf + .Config + .Until + .as_mut() + .is_some_and(|x| x == "0001-01-01T00:00:00Z") + { + conf.Config.Until = None; + } + + let logstream = match SqliteLogStream::new( + &state.config.databases_dir.as_path(), + &conf.Info.ContainerID, + conf.Config.Since, + conf.Config.Until, + tail, + conf.Config.Follow.unwrap_or(false), + ) + .await + { + Ok(l) => l, + Err(e) => { + error!("Error creating logstream: {:?}", &e); + return Err( + json!({ "Err": format!("[logsqlite] Could not read logs: {:?}", e) }).into(), + ); + } + }; + + Ok(StreamBody::new(logstream)) +} + +pub async fn activate() -> Json { + debug!("activate called"); + json!({"Implements": ["LogDriver"]}).into() +} + +pub async fn fallback(uri: Uri) -> &'static str { + debug!("[fallback] uri: {:?}", uri); + "not found" +} diff --git a/src/entry.proto b/src/entry.proto new file mode 100644 index 0000000..a74787f --- /dev/null +++ b/src/entry.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package docker.logentry; + +message LogEntry { + string source = 1; + int64 time_nano = 2; + bytes line = 3; + bool partial = 4; + PartialLogEntryMetadata partial_log_metadata = 5; +} + +message PartialLogEntryMetadata { + bool last = 1; + string id = 2; + int32 ordinal = 3; +} diff --git a/src/logger.rs b/src/logger.rs new file mode 100644 index 0000000..ed574ac --- /dev/null +++ b/src/logger.rs @@ -0,0 +1,465 @@ +use crate::config::{Config, LogConfig}; +use chrono::DateTime; +use log::{debug, error, info, warn}; + +use crate::statehandler::StateHandlerMessage; +use tokio::sync::mpsc::Sender; + +use core::pin::Pin; +use futures::{ + executor::block_on, + stream::Stream, + task::{Context, Poll}, +}; +use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, SqliteConnection}; +use std::path::Path; +use std::{cell::UnsafeCell, collections::HashMap, path::PathBuf, str::FromStr, sync::Arc}; + +use prost::Message; +use tokio::{ + self, + fs::File, + io::{AsyncRead, AsyncReadExt}, + sync::RwLock, + task::JoinHandle, + time::{sleep, timeout, Duration, Instant}, +}; + +// Include the `items` module, which is generated from items.proto. +pub mod logentry { + include!(concat!(env!("OUT_DIR"), "/docker.logentry.rs")); +} + +pub struct Logger { + exit: RwLock, +} + +#[derive(Debug)] +pub enum LoggerError { + IoError(std::io::Error), + SqlError(sqlx::Error), + JoinError(tokio::task::JoinError), + DecodeError(prost::DecodeError), + EncodeError(prost::EncodeError), + Exited, +} + +impl From for LoggerError { + fn from(e: std::io::Error) -> Self { + LoggerError::IoError(e) + } +} + +impl From for LoggerError { + fn from(e: sqlx::Error) -> Self { + LoggerError::SqlError(e) + } +} + +impl From for LoggerError { + fn from(e: tokio::task::JoinError) -> Self { + LoggerError::JoinError(e) + } +} + +impl From for LoggerError { + fn from(e: prost::DecodeError) -> Self { + LoggerError::DecodeError(e) + } +} + +impl From for LoggerError { + fn from(e: prost::EncodeError) -> Self { + LoggerError::EncodeError(e) + } +} + +impl Logger { + fn new() -> Self { + Logger { + exit: RwLock::new(false), + } + } + + async fn read_protobuf( + &self, + reader: &mut R, + msg: &mut Vec, + read_timeout: Duration, + ) -> Result, LoggerError> { + #[allow(unused_assignments)] + let mut msg_size = 0usize; + + match timeout(read_timeout, reader.read_u32()).await { + Ok(Ok(v)) => { + msg_size = v as usize; + } + Ok(Err(e)) => { + // this is some serious error and we cannot continue, the fifo may be closed + return Err(e.into()); + } + Err(_) => { + return Ok(None); + } + } + + let mut read = 0; + let mut bf = [0u8; 1024]; + msg.clear(); + + while read < msg_size { + let tbr = if msg_size - read >= bf.len() { + bf.len() + } else { + msg_size - read + }; + let read_bytes = reader.read(&mut bf[0..tbr]).await?; + + read += read_bytes; + msg.extend_from_slice(&bf[0..read_bytes]); + } + + let mut dec_msg = logentry::LogEntry::decode(msg.as_slice())?; + dec_msg.line.push(b'\n'); + + msg.clear(); + msg.extend_from_slice(&(dec_msg.encoded_len() as u32).to_be_bytes()); + msg.reserve(dec_msg.encoded_len()); + dec_msg.encode(msg)?; + + Ok(Some(dec_msg.time_nano as u64)) + } + + async fn log( + &self, + fifo: PathBuf, + db_path: PathBuf, + log_conf: LogConfig, + ) -> Result<(), LoggerError> { + let mut dbcon = + SqliteConnectOptions::from_str(&format!("sqlite://{}", &db_path.display()))? + .create_if_missing(true) + .connect() + .await?; + + sqlx::query( + "CREATE TABLE IF NOT EXISTS logs (ts NUMBER, message BLOB); + CREATE INDEX IF NOT EXISTS idx_ts ON logs(ts);", + ) + .execute(&mut dbcon) + .await?; + + let mut fd = File::open(&fifo).await?; + let mut message: Vec = Vec::new(); + + sqlx::query("BEGIN TRANSACTION;") + .execute(&mut dbcon) + .await?; + + let mut no_entries_ts = Instant::now(); + let mut nb_entries = 0; + let mut acc_entries_size = 0; + + while !*self.exit.read().await { + debug!("read_protobuf from {}...", fifo.display()); + match self + .read_protobuf(&mut fd, &mut message, log_conf.message_read_timeout) + .await + { + Ok(Some(ts)) => { + sqlx::query("INSERT INTO logs(ts, message) VALUES(?1, ?2)") + .bind(ts as i64) + .bind(&message) + .persistent(true) + .execute(&mut dbcon) + .await?; + + nb_entries += 1; + acc_entries_size += message.len(); + + if nb_entries >= log_conf.max_lines_per_tx + || acc_entries_size >= log_conf.max_size_per_tx + { + sqlx::query("END TRANSACTION;BEGIN TRANSACTION;") + .execute(&mut dbcon) + .await?; + + info!( + "reached {} entries with {} bytes size, ending current transacion ({:.2} lines/s) for {}", + nb_entries, + acc_entries_size, + nb_entries as f64 + * (1_000_000f64 + / Instant::now().duration_since(no_entries_ts).as_micros() + as f64), + fifo.display() + ); + + no_entries_ts = Instant::now(); + nb_entries = 0; + acc_entries_size = 0; + } + } + Ok(None) => { + sqlx::query("END TRANSACTION;BEGIN TRANSACTION;") + .execute(&mut dbcon) + .await?; + + no_entries_ts = Instant::now(); + nb_entries = 0; + acc_entries_size = 0; + } + Err(e) => { + warn!("Failed to read a protobuf message: {:?}", e); + return Err(e); + } + } + } + + sqlx::query("END TRANSACTION;").execute(&mut dbcon).await?; + + Err(LoggerError::Exited) + } + + async fn exit(&self) { + *self.exit.write().await = true; + } +} + +pub struct LoggerPool { + workers: RwLock, tokio::task::JoinHandle<()>)>>, + config: Arc, +} + +impl LoggerPool { + pub fn new(config: Arc) -> Self { + LoggerPool { + workers: RwLock::new(HashMap::new()), + config, + } + } + + pub async fn start_logging( + &self, + container_id: &str, + fifo_path: PathBuf, + log_conf: LogConfig, + tx: Sender, + ) { + let logger = Arc::new(Logger::new()); + let mut db_path = self.config.databases_dir.clone(); + db_path.push(container_id); + + let f_path = fifo_path.clone(); + let c_l = logger.clone(); + + let c_container_id = container_id.to_string(); + + let handle = tokio::spawn(async move { + tx.send(StateHandlerMessage::LoggingStopped { + container_id: c_container_id, + fifo: f_path.clone(), + result: c_l.log(f_path, db_path, log_conf).await, + }) + .await + .expect("Could not enqueue StateHandlerMessage"); + }); + + self.workers + .write() + .await + .insert(fifo_path, (logger.clone(), handle)); + } + + pub async fn stop_logging(&self, fifo_path: &Path) { + let res = self.workers.write().await.remove(fifo_path); + + if let Some((logger, handle)) = res { + logger.exit().await; + handle.await.ok(); + } + } +} + +// DIRTY WORKAROUND, since we cannot know if the client is disconnected +// so that we can retun Poll::Ready(None) instant of Poll::Pending + +const FOLLOW_WAKETIME: u64 = 1; +const FOLLOW_COUNTER_MAX: usize = 60 * 60; + +#[derive(Debug)] +pub struct SqliteLogStream { + stmt_s: String, + parameters: Vec, + next_rowid: u64, + counter: u64, + tail: Option, + follow: bool, + follow_counter: usize, + con: UnsafeCell, + waker: Option>, +} + +impl SqliteLogStream { + pub async fn new( + dbs_path: &Path, + container_id: &str, + since: Option, + until: Option, + tail: Option, + follow: bool, + ) -> Result { + let db_path = format!("{}/{}", dbs_path.display(), container_id); + debug!("[SqliteLogStream] db_path: {}", &db_path); + let mut con = SqliteConnectOptions::from_str(&format!("sqlite://{}", &db_path))? + .immutable(true) + .read_only(true) + .serialized(false) + .connect() + .await?; + + let mut cond = String::from("WHERE ROWID >= ?1"); + let mut parameters: Vec = vec![0]; + + if since.is_some() { + if let Ok(time) = DateTime::parse_from_str(since.as_ref().unwrap().as_str(), "%+") { + let since = time + .timestamp_nanos_opt() + .expect("timestamp in nanosecs does not fit in i64 anymore"); + + cond.push_str(&format!(" AND ts>=?{}", parameters.len() + 1)); + parameters.push(since as u64); + } + }; + + if until.is_some() { + if let Ok(time) = DateTime::parse_from_str(until.as_ref().unwrap().as_str(), "%+") { + let until = time + .timestamp_nanos_opt() + .expect("timestamp in nanosecs does not fit in i64 anymore"); + + cond.push_str(&format!(" AND ts<=?{}", parameters.len() + 1)); + parameters.push(until as u64); + } + }; + + let mut first_rowid = 1u64; + if let Some(tail) = tail { + let stmt_s = format!("SELECT count(*) FROM logs {}", cond); + debug!("stmt_s: {} params {:?}", stmt_s, ¶meters); + + let mut stmt = sqlx::query_as::<_, (i64,)>(&stmt_s); + for param in ¶meters { + stmt = stmt.bind(*param as i64); + } + + let nrows: u64 = stmt.fetch_one(&mut con).await?.0 as u64; + + let stmt_s = format!( + "SELECT ROWID FROM logs {} LIMIT 1 OFFSET ?{}", + cond, + parameters.len() + 1 + ); + + debug!("stmt_s: {}", &stmt_s); + + stmt = sqlx::query_as::<_, (i64,)>(&stmt_s); + + parameters.push(if nrows > tail { nrows - tail } else { 0 }); + + for param in ¶meters { + stmt = stmt.bind(*param as i64); + } + + first_rowid = stmt.fetch_one(&mut con).await?.0 as u64; + + parameters.pop(); + debug!("first_rowid: {}", first_rowid); + } + + let stmt_s = format!("SELECT ROWID,message FROM logs {} LIMIT 1", cond); + + Ok(SqliteLogStream { + stmt_s, + parameters, + next_rowid: first_rowid, + counter: 0, + tail: if follow { None } else { tail }, + follow, + follow_counter: 0, + con: UnsafeCell::new(con), + waker: None, + }) + } +} + +impl Stream for SqliteLogStream { + type Item = Result, &'static str>; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + if self.tail.is_some() && self.counter >= self.tail.unwrap() { + return Poll::Ready(None); + } + + self.parameters[0] = self.next_rowid; + let res = { + let stmt_s = self.stmt_s.as_str(); + let mut stmt = sqlx::query_as::<_, (i64, Vec)>(stmt_s); + for param in &self.parameters { + stmt = stmt.bind(*param as i64); + } + + let res: Option<(i64, Vec)> = + match block_on(stmt.fetch_one(unsafe { &mut *self.con.get() })) { + Ok(s) => Some(s), + Err(sqlx::Error::RowNotFound) => None, + Err(e) => { + error!("[SqliteLogStream] Got SQL error: {:?}", e); + return Poll::Ready(None); + } + }; + + debug!( + "[stream] {:?} [{}]", + res, + if res.is_some() { + res.as_ref().unwrap().1.len() + } else { + 0 + } + ); + + res + }; + + if res.is_some() { + self.counter += 1; + self.follow_counter = 0; + self.next_rowid = res.as_ref().unwrap().0 as u64 + 1; + } + + match res { + Some((_, r)) => Poll::Ready(Some(Ok(r))), + None if self.follow && self.follow_counter < FOLLOW_COUNTER_MAX => { + self.follow_counter += 1; + let waker = ctx.waker().clone(); + self.waker = Some(tokio::spawn(async move { + sleep(Duration::from_secs(FOLLOW_WAKETIME)).await; + waker.wake(); + })); + + Poll::Pending + } + None => Poll::Ready(None), + } + } +} + +impl Drop for SqliteLogStream { + fn drop(&mut self) { + if let Some(waker) = self.waker.as_mut() { + debug!("aborting new_entry_waker"); + waker.abort(); + } + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..407bc5e --- /dev/null +++ b/src/main.rs @@ -0,0 +1,122 @@ +mod cleaner; +mod config; +mod docker; +mod logger; +mod statehandler; + +use axum::http::Request; +use axum::middleware::map_request; +use axum::{routing::post, Router, Server}; +use docker::ApiState; +use hyperlocal::UnixServerExt; +use log::{self, debug, error}; +use statehandler::{State, StateHandler}; +use std::{env, process::exit, sync::Arc}; +use tokio::task; + +async fn normalize_dockerjson(mut req: Request) -> Request { + let headers = req.headers_mut(); + match headers.get("content-type") { + Some(ct) => { + debug!("[normalize_dockerjson] {:?}", ct); + } + None => { + headers.insert("content-type", "application/json".parse().unwrap()); + } + } + + req +} + +#[tokio::main] +async fn main() -> Result<(), config::ParsingError> { + simple_logger::init_with_level(env::var("DEBUG").map_or_else( + |_| log::Level::Info, + |s| { + if matches!(s.to_lowercase().as_str(), "true" | "1") { + log::Level::Debug + } else { + log::Level::Info + } + }, + )) + .expect("could not set loglevel"); + + let args: Vec = env::args().collect(); + if args.len() < 2 { + eprintln!("Usage: {} [path to config]", args[0]); + return Ok(()); + } + + let conf: Arc = Arc::new(config::Config::try_from( + config::ConfigSource::File(args[1].clone()), + )?); + + debug!("config: {:?}", &conf); + + let state = Arc::new(State::new(conf.clone()).await); + + let (mut state_handler, state_handler_tx) = match StateHandler::new(state.clone()).await { + Ok(s) => s, + Err(e) => { + eprintln!("failed to initialize StateHandler: {:?}", e); + exit(1); + } + }; + + let api_state = Arc::new( + ApiState::new(state_handler_tx, conf.clone()) + .await + .expect("Failed to create ApiState"), + ); + + let cleaner = state.cleaner.clone(); + let cleanup_interval = conf.cleanup_interval.clone(); + task::spawn(async move { + match cleaner.run(cleanup_interval).await { + Ok(()) => { + error!("Cleaner exited!"); + } + Err(e) => { + error!("Error running cleaner: {:?}", e); + } + } + }); + + if let Err(e) = state_handler.replay_state().await { + eprintln!("Failed to replay state: {:?}", e); + exit(1); + } + + let state_handler_handle = tokio::spawn(async move { + if let Err(e) = state_handler.handle().await { + error!("Error at StateHandler: {:?}", e); + } + }); + + let router = Router::new() + .route("/LogDriver.StartLogging", post(docker::start_logging)) + .route("/LogDriver.StopLogging", post(docker::stop_logging)) + .route("/LogDriver.Capabilities", post(docker::capabilities)) + .route("/LogDriver.ReadLogs", post(docker::read_logs)) + .route("/Plugin.Activate", post(docker::activate)) + .layer(map_request(normalize_dockerjson)) + .fallback(docker::fallback) + .with_state(api_state); + let builder = Server::bind_unix(conf.unix_socket_path.as_path()) + .expect("could not listen on unix socket"); + + let api_server_handle = + tokio::spawn(async move { builder.serve(router.into_make_service()).await.unwrap() }); + + tokio::select! { + _ = state_handler_handle => { + error!("StateHandler exited!"); + } + _ = api_server_handle => { + error!("DockerAPI server exited!"); + } + }; + + Ok(()) +} diff --git a/src/statehandler.rs b/src/statehandler.rs new file mode 100644 index 0000000..de3e13a --- /dev/null +++ b/src/statehandler.rs @@ -0,0 +1,220 @@ +use bincode::{deserialize, serialize}; +use futures_util::StreamExt; +use sqlx::Sqlite; +use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, Error as SqlxError, SqliteConnection}; +use std::fs; +use std::sync::Arc; +use std::{ + path::{Path, PathBuf}, + str::FromStr, +}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; + +use crate::cleaner::LogCleaner; +use crate::config::{Config, LogConfig}; +use crate::log::{info, warn}; +use crate::logger::{LoggerError, LoggerPool}; + +pub struct State { + logger_pool: LoggerPool, + config: Arc, + pub cleaner: LogCleaner, +} + +impl State { + pub async fn new(config: Arc) -> Self { + State { + logger_pool: LoggerPool::new(config.clone()), + cleaner: LogCleaner::new(config.databases_dir.display().to_string()), + config, + } + } + + async fn start_logging( + &self, + container_id: &str, + fifo: &Path, + log_conf: LogConfig, + tx: Sender, + ) { + self.cleaner + .add(&container_id, fifo.to_path_buf(), log_conf.clone()) + .await; + + self.logger_pool + .start_logging(&container_id, fifo.to_path_buf(), log_conf, tx) + .await; + } +} + +pub enum StateHandlerMessage { + StartLogging { + container_id: String, + fifo: PathBuf, + log_conf: LogConfig, + }, + StopLogging { + fifo: PathBuf, + }, + LoggingStopped { + container_id: String, + fifo: PathBuf, + result: Result<(), LoggerError>, + }, +} + +pub struct StateHandler { + dbcon: SqliteConnection, + rx: Receiver, + tx: Sender, + state: Arc, +} + +impl StateHandler { + pub async fn new(state: Arc) -> Result<(Self, Sender), SqlxError> { + let mut dbcon = SqliteConnectOptions::from_str(&format!( + "sqlite://{}", + state.config.state_database.display() + ))? + .create_if_missing(true) + .connect() + .await?; + + sqlx::query( + "CREATE TABLE IF NOT EXISTS active_fetches(container_id text PRIMARY KEY, fifo text, log_conf blob)", + ) + .execute(&mut dbcon) + .await?; + + let (tx, rx) = channel(1024); + + Ok(( + StateHandler { + dbcon, + rx, + tx: tx.clone(), + state, + }, + tx, + )) + } + + pub async fn handle(&mut self) -> Result<(), SqlxError> { + while let Some(msg) = self.rx.recv().await { + match msg { + StateHandlerMessage::StartLogging { + container_id, + fifo, + log_conf, + } => { + info!("[StateHandler] starting logging of {}", container_id); + + let blob_log_conf: Vec = serialize(&log_conf).unwrap(); + sqlx::query( + "INSERT OR REPLACE INTO active_fetches(container_id, fifo, log_conf) VALUES (?1, ?2, ?3)", + ) + .bind(&container_id) + .bind(fifo.display().to_string()) + .bind(blob_log_conf) + .execute(&mut self.dbcon) + .await?; + + self.state + .start_logging(&container_id, &fifo, log_conf, self.tx.clone()) + .await; + } + StateHandlerMessage::StopLogging { fifo } => { + info!("[StateHandler] stopping logging of fifo {}", fifo.display()); + + sqlx::query("DELETE FROM active_fetches where fifo = ?1") + .bind(fifo.as_path().display().to_string()) + .execute(&mut self.dbcon) + .await?; + + self.state.logger_pool.stop_logging(&fifo).await; + } + StateHandlerMessage::LoggingStopped { + container_id, + fifo, + result, + } => { + if let Err(e) = result { + warn!( + "[StateHandler] logger of container {} returned: {:?}", + container_id, e + ); + match e { + LoggerError::DecodeError(_) => { + info!( + "[StateHandler] restarting logging of {} using fifo {}, since last error was a decode error", + &container_id, + fifo.display() + ); + + if let Some(log_conf) = self.state.cleaner.remove(&fifo).await { + self.state + .start_logging( + &container_id, + &fifo, + log_conf, + self.tx.clone(), + ) + .await; + } else { + warn!("Container {} stopped but it had no LogConfig in the cleaner!", container_id); + } + } + _ => { + sqlx::query("DELETE FROM active_fetches where fifo = ?1") + .bind(fifo.as_path().display().to_string()) + .execute(&mut self.dbcon) + .await?; + + if let Some(log_conf) = self.state.cleaner.remove(&fifo).await { + info!("log_conf: '{:?}'", &log_conf); + if log_conf.delete_when_stopped { + let mut dbpath = self.state.config.databases_dir.clone(); + dbpath.push(container_id); + + info!("removing {}", dbpath.display()); + fs::remove_file(dbpath).ok(); + } + } + } + } + } + } + } + } + + Ok(()) + } + + pub async fn replay_state(&mut self) -> Result<(), sqlx::Error> { + let mut stream = sqlx::query_as::)>( + "SELECT container_id, fifo FROM active_fetches", + ) + .fetch(&mut (self.dbcon)); + + while let Some(r) = stream.next().await { + let (container_id, fifo, blob_log_conf) = r?; + + let log_conf: LogConfig = deserialize(&blob_log_conf).unwrap(); + info!( + "[StateHandler] replaying logging of {} using fifo {}", + container_id, fifo + ); + + self.state + .start_logging( + &container_id, + PathBuf::from(fifo).as_path(), + log_conf, + self.tx.clone(), + ) + .await; + } + + Ok(()) + } +}