Skip to content

Commit

Permalink
feature: support notify server
Browse files Browse the repository at this point in the history
  • Loading branch information
tyrchen committed May 4, 2024
1 parent 4e54204 commit 40086b7
Show file tree
Hide file tree
Showing 14 changed files with 341 additions and 42 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chat_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct ChatUser {

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd, sqlx::Type)]
#[sqlx(type_name = "chat_type", rename_all = "snake_case")]
#[serde(rename_all = "snake_case")]
pub enum ChatType {
Single,
Group,
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions chat_server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ impl AppConfig {
pub fn load() -> Result<Self> {
// read from ./app.yml, or /etc/config/app.yml, or from env CHAT_CONFIG
let ret = match (
File::open("app.yml"),
File::open("/etc/config/app.yml"),
File::open("chat.yml"),
File::open("/etc/config/chat.yml"),
env::var("CHAT_CONFIG"),
) {
(Ok(reader), _, _) => serde_yaml::from_reader(reader),
Expand Down
11 changes: 10 additions & 1 deletion migrations/20240504032504_triggers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@ CREATE TRIGGER add_to_chat_trigger
CREATE OR REPLACE FUNCTION add_to_message()
RETURNS TRIGGER
AS $$
DECLARE
USERS bigint[];
BEGIN
IF TG_OP = 'INSERT' THEN
RAISE NOTICE 'add_to_message: %', NEW;
-- select chat with chat_id in NEW
SELECT
members INTO USERS
FROM
chats
WHERE
id = NEW.chat_id;
PERFORM
pg_notify('chat_message_created', row_to_json(NEW)::text);
pg_notify('chat_message_created', json_build_object('message', NEW, 'members', USERS)::text);
END IF;
RETURN NEW;
END;
Expand Down
5 changes: 4 additions & 1 deletion notify_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ anyhow = { workspace = true }
axum = { workspace = true }
axum-extra = { version = "0.9.3", features = ["typed-header"] }
chat-core = { workspace = true }
dashmap = "5.5.3"
futures = "0.3.30"
jwt-simple = { workspace = true }
serde = { workspace = true }
serde_json = "1.0.116"
serde_yaml = { workspace = true }
sqlx = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = "0.1.15"
tokio-stream = { version = "0.1.15", features = ["sync"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
15 changes: 15 additions & 0 deletions notify_server/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ <h1>Server Sent Events</h1>
source.onmessage = function(event) {
console.log("Got:", event.data);
};
source.addEventListener("NewChat", function(event) {
console.log("NewChat:", event.data);
});

source.addEventListener("AddToChat", function(event) {
console.log("AddToChat:", event.data);
});

source.addEventListener("RemoveFromChat", function(event) {
console.log("RemoveFromChat:", event.data);
});

source.addEventListener("NewMessage", function(event) {
console.log("NewMessage:", event.data);
});
</script>
</body>
</html>
8 changes: 8 additions & 0 deletions notify_server/notify.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
server:
port: 6687
db_url: postgres://postgres:postgres@localhost:5432/chat
auth:
pk: |
-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEAfM+lwNHj6TRJ3EGP38lIJcOo9Dlt2u2JzcwWMbu7jQY=
-----END PUBLIC KEY-----
37 changes: 37 additions & 0 deletions notify_server/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
use std::{env, fs::File};

#[derive(Debug, Serialize, Deserialize)]
pub struct AppConfig {
pub server: ServerConfig,
pub auth: AuthConfig,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct AuthConfig {
pub pk: String,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ServerConfig {
pub port: u16,
pub db_url: String,
}

impl AppConfig {
pub fn load() -> Result<Self> {
// read from ./app.yml, or /etc/config/app.yml, or from env CHAT_CONFIG
let ret = match (
File::open("notify.yml"),
File::open("/etc/config/notify.yml"),
env::var("NOTIFY_CONFIG"),
) {
(Ok(reader), _, _) => serde_yaml::from_reader(reader),
(_, Ok(reader), _) => serde_yaml::from_reader(reader),
(_, _, Ok(path)) => serde_yaml::from_reader(File::open(path)?),
_ => bail!("Config file not found"),
};
Ok(ret?)
}
}
38 changes: 38 additions & 0 deletions notify_server/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use axum::http::StatusCode;
use axum::response::Json;
use axum::response::{IntoResponse, Response};
use serde::{Deserialize, Serialize};
use thiserror::Error;

#[derive(Debug, Serialize, Deserialize)]
pub struct ErrorOutput {
pub error: String,
}

#[derive(Error, Debug)]
pub enum AppError {
#[error("io error: {0}")]
IoError(#[from] std::io::Error),

#[error("jwt error: {0}")]
JwtError(#[from] jwt_simple::Error),
}

impl ErrorOutput {
pub fn new(error: impl Into<String>) -> Self {
Self {
error: error.into(),
}
}
}

impl IntoResponse for AppError {
fn into_response(self) -> Response<axum::body::Body> {
let status = match &self {
Self::JwtError(_) => StatusCode::FORBIDDEN,
Self::IoError(_) => StatusCode::INTERNAL_SERVER_ERROR,
};

(status, Json(ErrorOutput::new(self.to_string()))).into_response()
}
}
80 changes: 54 additions & 26 deletions notify_server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,76 @@
mod config;
mod error;
mod notif;
mod sse;

use axum::{
middleware::from_fn_with_state,
response::{Html, IntoResponse},
routing::get,
Router,
};
use chat_core::{Chat, Message};
use futures::StreamExt;
use sqlx::postgres::PgListener;
use chat_core::{
middlewares::{verify_token, TokenVerify},
DecodingKey, User,
};
use dashmap::DashMap;
use sse::sse_handler;
use tracing::info;
use std::{ops::Deref, sync::Arc};
use tokio::sync::broadcast;

pub use config::AppConfig;
pub use error::AppError;
pub use notif::{setup_pg_listener, AppEvent};

pub type UserMap = Arc<DashMap<u64, broadcast::Sender<Arc<AppEvent>>>>;

#[derive(Clone)]
pub struct AppState(Arc<AppStateInner>);

pub enum Event {
NewChat(Chat),
AddToChat(Chat),
RemoveFromChat(Chat),
NewMessage(Message),
pub struct AppStateInner {
pub config: AppConfig,
users: UserMap,
dk: DecodingKey,
}

const INDEX_HTML: &str = include_str!("../index.html");

pub fn get_router() -> Router {
Router::new()
.route("/", get(index_handler))
pub fn get_router() -> (Router, AppState) {
let config = AppConfig::load().expect("Failed to load config");
let state = AppState::new(config);
let app = Router::new()
.route("/events", get(sse_handler))
.layer(from_fn_with_state(state.clone(), verify_token::<AppState>))
.route("/", get(index_handler))
.with_state(state.clone());

(app, state)
}

pub async fn setup_pg_listener() -> anyhow::Result<()> {
let mut listener =
PgListener::connect("postgresql://postgres:postgres@localhost:5432/chat").await?;
listener.listen("chat_updated").await?;
listener.listen("chat_message_created").await?;
async fn index_handler() -> impl IntoResponse {
Html(INDEX_HTML)
}

let mut stream = listener.into_stream();
impl TokenVerify for AppState {
type Error = AppError;

tokio::spawn(async move {
while let Some(Ok(notif)) = stream.next().await {
info!("Received notification: {:?}", notif);
}
});
fn verify(&self, token: &str) -> Result<User, Self::Error> {
Ok(self.dk.verify(token)?)
}
}

Ok(())
impl Deref for AppState {
type Target = AppStateInner;

fn deref(&self) -> &Self::Target {
&self.0
}
}

async fn index_handler() -> impl IntoResponse {
Html(INDEX_HTML)
impl AppState {
pub fn new(config: AppConfig) -> Self {
let dk = DecodingKey::load(&config.auth.pk).expect("Failed to load public key");
let users = Arc::new(DashMap::new());
Self(Arc::new(AppStateInner { config, dk, users }))
}
}
5 changes: 3 additions & 2 deletions notify_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ async fn main() -> Result<()> {

let addr = "0.0.0.0:6687";

setup_pg_listener().await?;
let (app, state) = get_router();

setup_pg_listener(state).await?;

let app = get_router();
let listener = TcpListener::bind(&addr).await?;
info!("Listening on: {}", addr);

Expand Down
Loading

0 comments on commit 40086b7

Please sign in to comment.