Skip to content

Commit

Permalink
event-broker: Implement systemd socket activation
Browse files Browse the repository at this point in the history
Signed-off-by: Daiki Ueno <[email protected]>
  • Loading branch information
ueno committed Jun 22, 2023
1 parent fb3c6fc commit 6dc3022
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 8 deletions.
109 changes: 109 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions dist/systemd/system/crypto-auditing-event-broker.service
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ After=crypto-auditing-agent.service

[Service]
ExecStart=/usr/bin/crypto-auditing-event-broker
ExecStopPost=rm -f /var/lib/crypto-auditing/audit.sock
KillSignal=SIGINT
TimeoutSec=60s
Restart=on-failure
Expand All @@ -15,4 +14,4 @@ RestartSec=120s
# Group=crypto-auditing

[Install]
WantedBy=default.target
Also=crypto-auditing-event-broker.socket
11 changes: 11 additions & 0 deletions dist/systemd/system/crypto-auditing-event-broker.socket
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[Unit]
Description=Event broker socket for crypto-auditing project

[Socket]
ListenStream=/var/lib/crypto-auditing/audit.sock
# SocketUser=crypto-auditing
# SocketGroup=crypto-auditing
# SocketMode=0660

[Install]
WantedBy=sockets.target
4 changes: 4 additions & 0 deletions event-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ authors = ["The crypto-auditing developers"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["libsystemd"]

[dependencies]
anyhow = "1.0"
clap = { version = "4", features = ["derive"] }
crypto-auditing = { path = "../crypto-auditing" }
futures = "0.3"
inotify = "0.10"
libsystemd = { version = "0.6", optional = true }
serde_cbor = "0.10"
tarpc = { version = "0.33", features = ["serde-transport", "unix"] }
tokio = "1.23"
Expand Down
39 changes: 33 additions & 6 deletions event-broker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
// SPDX-License-Identifier: GPL-3.0-or-later
// Copyright (C) 2022-2023 The crypto-auditing developers.

#[cfg(feature = "libsystemd")]
use anyhow::bail;
use anyhow::{Context as _, Result};
use crypto_auditing::types::EventGroup;
use futures::{future, stream::StreamExt, try_join};
use inotify::{EventMask, Inotify, WatchMask};
#[cfg(feature = "libsystemd")]
use libsystemd::activation::receive_descriptors;
use serde_cbor::de::Deserializer;
use std::collections::HashMap;
use std::os::fd::{AsRawFd, RawFd};
#[cfg(feature = "libsystemd")]
use std::os::fd::{FromRawFd, IntoRawFd};
use std::os::unix::net::UnixListener as StdUnixListener;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use tarpc::{client, context, tokio_serde::formats::Cbor};
use tokio::net::UnixListener;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, info};
Expand Down Expand Up @@ -82,16 +90,35 @@ impl Publisher {
}
}

async fn publish(&self, receiver: Receiver<EventGroup>) -> Result<()> {
let mut connecting_subscribers =
tarpc::serde_transport::unix::listen(&self.socket_path, Cbor::default)
.await?
.filter_map(|r| future::ready(r.ok()));
#[cfg(feature = "libsystemd")]
fn get_std_listener(&self) -> Result<StdUnixListener> {
if let Ok(mut descriptors) = receive_descriptors(false) {
if descriptors.len() > 1 {
bail!("too many file descriptors");
} else if descriptors.len() == 0 {
bail!("no file descriptors received");
}
let fd = descriptors.pop().unwrap().into_raw_fd();
Ok(unsafe { StdUnixListener::from_raw_fd(fd) })
} else {
Ok(StdUnixListener::bind(&self.socket_path)?)
}
}

#[cfg(not(feature = "libsystemd"))]
fn get_std_listener(&self) -> Result<StdUnixListener> {
Ok(StdUnixListener::bind(&self.socket_path)?)
}

async fn publish(&self, receiver: Receiver<EventGroup>) -> Result<()> {
let std_listener = self.get_std_listener()?;
std_listener.set_nonblocking(true)?;
let listener = UnixListener::from_std(std_listener)?;
let subscriptions = self.subscriptions.clone();

tokio::spawn(async move {
while let Some(conn) = connecting_subscribers.next().await {
while let Ok((stream, _sock_addr)) = listener.accept().await {
let conn = tarpc::serde_transport::Transport::from((stream, Cbor::default()));
let subscriber_fd = conn.get_ref().as_raw_fd();

let tarpc::client::NewClient {
Expand Down

0 comments on commit 6dc3022

Please sign in to comment.