Skip to content

Commit

Permalink
Remove broken transaction funcs from Organizer gRPC API. Improve DB c…
Browse files Browse the repository at this point in the history
…oncurrency.
  • Loading branch information
elonen committed May 8, 2024
1 parent 7209fa3 commit 69c5b4f
Show file tree
Hide file tree
Showing 15 changed files with 251 additions and 311 deletions.
6 changes: 0 additions & 6 deletions protobuf/proto/database.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,6 @@ message DbDeleteResponse {

// ----------------------------------------

message DbBeginTransactionRequest {}
message DbCommitTransactionRequest {}
message DbRollbackTransactionRequest {}

// ----------------------------------------

message DbPaging {
uint32 page_num = 1; // Page number (0 = first page)
uint32 page_size = 2; // Number of items per page
Expand Down
8 changes: 1 addition & 7 deletions protobuf/proto/organizer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,12 @@ service OrganizerOutbound {

rpc delete_video(DeleteVideoRequest) returns (Empty); // Delete (trash) video cleanly from both database and filesystem

// Database access (note: these may each happen in a separate DB connection / transaction)
rpc DbGetVideos(DbGetVideosRequest) returns (DbVideoList);
rpc DbGetComments(DbGetCommentsRequest) returns (DbCommentList);
rpc DbGetUserMessages(DbGetUserMessagesRequest) returns (DbUserMessageList);

rpc DbUpsert(DbUpsertRequest) returns (DbUpsertResponse);
rpc DbDelete(DbDeleteRequest) returns (DbDeleteResponse);

// Explicit transaction calls. You should probably wrap these in RAII guards in your program code.
// Don't expect nested transactions to work reliably.
rpc DbBeginTransaction(DbBeginTransactionRequest) returns (Empty);
rpc DbCommitTransaction(DbCommitTransactionRequest) returns (Empty);
rpc DbRollbackTransaction(DbRollbackTransactionRequest) returns (Empty);
}

// Calls that Clapshot server makes to Organizer
Expand Down
6 changes: 3 additions & 3 deletions server/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@ debian-docker: docker

test-local:
@if [ "${CI}" ]; then \
cargo test | grep --line-buffered -vE "(TRACE)|((h2|tower|hyper)\:\:)"; \
else \
echo "!! Running in CI: No-default-features = skip expensive tests"; \
cargo test --no-default-features | grep --line-buffered -vE "(TRACE)|((h2|tower|hyper)\:\:)"; \
else \
cargo test | grep --line-buffered -vE "(TRACE)|((h2|tower|hyper)\:\:)"; \
fi
# (cd ../organizer; make)
# TEST_ORG_CMD="${PLUGIN_BIN}" cargo test test_organizer | grep --line-buffered -vE "(TRACE)|((h2|tower|hyper)\:\:)"

test-docker: docker
docker run --rm ${DOCKER_IMG_NAME}:latest make test-local
docker run --rm -e CI="${CI}" ${DOCKER_IMG_NAME}:latest make test-local

test:
@echo "Please run either 'test-docker' or 'test-local'"
Expand Down
9 changes: 5 additions & 4 deletions server/src/api_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,14 +509,15 @@ async fn run_api_server_async(
Ok(session_cnt) => { user_was_online = session_cnt>0 },
Err(e) => tracing::error!(user=user_id, details=%e, "Failed to send user notification."),
}
if !(matches!(m.topic, UserMessageTopic::Progress | UserMessageTopic::VideoAdded | UserMessageTopic::VideoUpdated)) {
if !(matches!(m.topic, UserMessageTopic::Progress | UserMessageTopic::VideoAdded | UserMessageTopic::VideoUpdated)) {
let msg = models::MessageInsert {
seen: msg.seen || user_was_online,
..msg
};
if let Err(e) = models::Message::insert(&server_state.db, &msg) {
tracing::error!(details=%e, "Failed to save user notification in DB.");
}
server_state.db.conn()
.and_then(|mut conn| models::Message::insert(&mut conn, &msg))
.map_err(|e| tracing::error!(details=%e, "Failed to save user notification in DB."))
.ok();
}
};
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/api_server/server_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl ServerState {
let send_res = self.emit_cmd(cmd, send_to);
if let Ok(sent_count) = send_res {
if persist {
models::Message::insert(&self.db, &models::MessageInsert {
models::Message::insert(&mut self.db.conn()?, &models::MessageInsert {
seen: msg.seen || sent_count > 0,
..msg.clone()
})?;
Expand Down
1 change: 0 additions & 1 deletion server/src/api_server/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use crate::database::{DB, models};



// write(&mut ws, &server_cmd_json!(ListMyVideos, ListMyVideos{})).await;
#[macro_export]
macro_rules! send_server_cmd {
($ws:expr, $cmd_name:ident, $options:expr) => {{
Expand Down
20 changes: 11 additions & 9 deletions server/src/api_server/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,17 @@ async fn test_api_list_user_videos()
async fn test_api_del_video()
{
api_test! {[ws, ts]
let conn = &mut ts.db.conn().unwrap();

// Delete one successfully
{
assert!(models::Video::get(&ts.db, &ts.videos[0].id).is_ok());
assert!(models::Video::get(conn, &ts.videos[0].id).is_ok());

send_server_cmd!(ws, DelVideo, DelVideo{video_id: ts.videos[0].id.clone()});
expect_user_msg(&mut ws, proto::user_message::Type::Ok).await;

// Make sure the dir is gone
assert!(matches!(models::Video::get(&ts.db, &ts.videos[0].id).unwrap_err(), DBError::NotFound()));
assert!(matches!(models::Video::get(conn, &ts.videos[0].id).unwrap_err(), DBError::NotFound()));

// Make sure it's in trash, and DB row was backed up on disk
let trash_dir = ts.videos_dir.join("trash");
Expand All @@ -106,10 +107,10 @@ async fn test_api_del_video()
expect_user_msg(&mut ws, proto::user_message::Type::Error).await;

// Fail to delete someones else's video
assert!(models::Video::get(&ts.db, &ts.videos[1].id).is_ok());
assert!(models::Video::get(conn, &ts.videos[1].id).is_ok());
send_server_cmd!(ws, DelVideo, DelVideo{video_id: ts.videos[1].id.clone()});
expect_user_msg(&mut ws, proto::user_message::Type::Error).await;
assert!(models::Video::get(&ts.db, &ts.videos[1].id).is_ok());
assert!(models::Video::get(conn, &ts.videos[1].id).is_ok());

// Break the database
ts.db.break_db();
Expand Down Expand Up @@ -171,21 +172,22 @@ async fn test_api_rename_video()
api_test! {[ws, ts]
let video = &ts.videos[0];
open_video(&mut ws, &video.id).await;
let conn = &mut ts.db.conn().unwrap();

// Rename the video (with leading/trailing whitespace that will be trimmed)
send_server_cmd!(ws, RenameVideo, RenameVideo{video_id: video.id.clone(), new_name: " New name ".into()});
expect_user_msg(&mut ws, proto::user_message::Type::Ok).await;

// Make sure the video was renamed in the DB
let v = models::Video::get(&ts.db, &video.id).unwrap();
let v = models::Video::get(conn, &video.id).unwrap();
assert_eq!(v.title, Some("New name".to_string()));

// Try to enter an invalid name
send_server_cmd!(ws, RenameVideo, RenameVideo{video_id: video.id.clone(), new_name: " /._ ".into()});
expect_user_msg(&mut ws, proto::user_message::Type::Error).await;

// Make sure name wasn't changed
let v = models::Video::get(&ts.db, &video.id).unwrap();
let v = models::Video::get(conn, &video.id).unwrap();
assert_eq!(v.title, Some("New name".to_string()));
}
}
Expand Down Expand Up @@ -215,7 +217,7 @@ async fn test_api_add_plain_comment()

// Stored in database, the the image must be path to a file, not the actual image data as data URI
let cid = i32::from_str(&c.comments[0].id).unwrap();
assert!(!models::Comment::get(&ts.db, &cid).unwrap().drawing.unwrap().contains("data:image"));
assert!(!models::Comment::get(&mut ts.db.conn().unwrap(), &cid).unwrap().drawing.unwrap().contains("data:image"));
assert!(c.comments[0].clone().drawing.unwrap().starts_with("data:image/webp"));

// Add a comment to a nonexisting video
Expand Down Expand Up @@ -309,7 +311,7 @@ async fn test_api_del_comment()
assert!(m.details.unwrap().contains("repl"));

// Delete the last remaining reply comment[5]
models::Comment::delete(&ts.db, &ts.comments[5].id).unwrap(); // Delete from db directly, to avoid user permission check
models::Comment::delete(&mut ts.db.conn().unwrap(), &ts.comments[5].id).unwrap(); // Delete from db directly, to avoid user permission check

// Try again to delete comment id 1 that should now have no replies
send_server_cmd!(ws, DelComment, DelComment{comment_id: ts.comments[0].id.to_string()});
Expand All @@ -333,7 +335,7 @@ async fn test_api_list_my_messages()
models::MessageInsert { user_id: "user.num1".into(), message: "message2".into(), event_name: "error".into(), video_id: Some("HASH0".into()), details: "STACKTRACE".into(), ..Default::default() },
models::MessageInsert { user_id: "user.num2".into(), message: "message3".into(), event_name: "ok".into(), ..Default::default() },
];
let msgs = msgs.iter().map(|m| models::Message::insert(&ts.db, &m).unwrap()).collect::<Vec<_>>();
let msgs = msgs.iter().map(|m| models::Message::insert(&mut ts.db.conn().unwrap(), &m).unwrap()).collect::<Vec<_>>();

send_server_cmd!(ws, ListMyMessages, ListMyMessages{});
let sm = expect_client_cmd!(&mut ws, ShowMessages);
Expand Down
32 changes: 17 additions & 15 deletions server/src/api_server/ws_handers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use proto::org::authz_user_action_request as authz_req;
async fn get_video_or_send_error(video_id: Option<&str>, ses: &Option<&mut UserSession>, server: &ServerState) -> Res<Option<models::Video>> {
let video_id = video_id.ok_or(anyhow!("video id missing"))?;

match models::Video::get(&server.db, &video_id.into()) {
match models::Video::get(&mut server.db.conn()?, &video_id.into()) {
Err(DBError::NotFound()) => {
if let Some(ses) = ses {
send_user_error!(ses.user_id, server, Topic::Video(video_id), "No such video.");
Expand Down Expand Up @@ -90,7 +90,7 @@ pub async fn msg_list_my_videos(data: &ListMyVideos , ses: &mut UserSession, ser
}

// Organizer didn't handle this, so return a default listing.
let videos = models::Video::get_by_user(&server.db, &ses.user_id, DBPaging::default())?;
let videos = models::Video::get_by_user(&mut server.db.conn()?, &ses.user_id, DBPaging::default())?;
let h_txt = if videos.is_empty() {
"<h2>You have no videos yet.</h2>"
} else {
Expand Down Expand Up @@ -124,15 +124,15 @@ pub async fn msg_open_video(data: &OpenVideo, ses: &mut UserSession, server: &Se

pub async fn send_open_video_cmd(server: &ServerState, session_id: &str, video_id: &str) -> Res<()> {
server.link_session_to_video(session_id, video_id)?;
let v = models::Video::get(&server.db, &video_id.into())?.to_proto3(&server.url_base);
let v = models::Video::get(&mut server.db.conn()?, &video_id.into())?.to_proto3(&server.url_base);
if v.playback_url.is_none() {
return Err(anyhow!("No video file"));
}
server.emit_cmd(
client_cmd!(OpenVideo, {video: Some(v)}),
super::SendTo::UserSession(session_id))?;
let mut cmts = vec![];
for mut c in models::Comment::get_by_video(&server.db, video_id, DBPaging::default())? {
for mut c in models::Comment::get_by_video(&mut server.db.conn()?, video_id, DBPaging::default())? {
server.fetch_drawing_data_into_comment(&mut c).await?;
cmts.push(c.to_proto3());
}
Expand All @@ -153,7 +153,7 @@ pub async fn del_video_and_cleanup(video_id: &str, ses: Option<&mut UserSession>
default_perm, AuthzTopic::Video(&v, authz_req::video_op::Op::Delete)).await?;
}

models::Video::delete(&server.db, &v.id)?;
models::Video::delete(&mut server.db.conn()?, &v.id)?;
let mut details = format!("Added by '{}' ({}) on {}. Filename was {}.",
v.user_name.clone().unwrap_or_default(),
v.user_id.clone().unwrap_or_default(),
Expand Down Expand Up @@ -224,7 +224,7 @@ pub async fn msg_rename_video(data: &RenameVideo, ses: &mut UserSession, server:
send_user_error!(&ses.user_id, server, Topic::Video(&v.id), "Video name too long (max 160)");
return Ok(());
}
models::Video::rename(&server.db, &v.id, new_name)?;
models::Video::rename(&mut server.db.conn()?, &v.id, new_name)?;
send_user_ok!(&ses.user_id, server, Topic::Video(&v.id), "Video renamed.",
format!("New name: '{}'", new_name), true);
}
Expand Down Expand Up @@ -294,7 +294,7 @@ pub async fn msg_add_comment(data: &proto::client::client_to_server_cmd::AddComm
timecode: data.timecode.clone(),
drawing: drwn.clone(),
};
let c = models::Comment::insert(&server.db, &c)
let c = models::Comment::insert(&mut server.db.conn()?, &c)
.map_err(|e| anyhow!("Failed to add comment: {:?}", e))?;
// Send to all clients watching this video
ses.emit_new_comment(server, c, super::SendTo::VideoId(&video_id)).await?;
Expand All @@ -304,21 +304,22 @@ pub async fn msg_add_comment(data: &proto::client::client_to_server_cmd::AddComm

pub async fn msg_edit_comment(data: &EditComment, ses: &mut UserSession, server: &ServerState) -> Res<()> {
let id = i32::from_str(&data.comment_id)?;
let conn = &mut server.db.conn()?;

match models::Comment::get(&server.db, &id) {
match models::Comment::get(conn, &id) {
Ok(old) => {
let default_perm = ses.user_id == old.user_id || ses.user_id == "admin";
org_authz_with_default(&ses.org_session, "edit comment", true, server, &ses.organizer,
default_perm, AuthzTopic::Comment(&old, authz_req::comment_op::Op::Edit)).await?;

let vid = &old.video_id;
models::Comment::edit(&server.db, id, &data.new_comment)?;
models::Comment::edit(conn, id, &data.new_comment)?;

server.emit_cmd(
client_cmd!(DelComment, {comment_id: id.to_string()}),
super::SendTo::VideoId(&vid))?;

let c = models::Comment::get(&server.db, &id)?;
let c = models::Comment::get(conn, &id)?;
ses.emit_new_comment(server, c, super::SendTo::VideoId(&vid)).await?;
}
Err(DBError::NotFound()) => {
Expand All @@ -332,7 +333,7 @@ pub async fn msg_edit_comment(data: &EditComment, ses: &mut UserSession, server:

pub async fn msg_del_comment(data: &DelComment, ses: &mut UserSession, server: &ServerState) -> Res<()> {
let id = i32::from_str(&data.comment_id)?;
match models::Comment::get(&server.db, &id) {
match models::Comment::get(&mut server.db.conn()?, &id) {
Ok(cmt) => {
let default_perm = ses.user_id == cmt.user_id || ses.user_id == "admin";
org_authz_with_default(&ses.org_session, "delete comment", true, server, &ses.organizer,
Expand All @@ -343,12 +344,12 @@ pub async fn msg_del_comment(data: &DelComment, ses: &mut UserSession, server: &
send_user_error!(&ses.user_id, server, Topic::Video(&vid), "Failed to delete comment.", "You can only delete your own comments", true);
return Ok(());
}
let all_comm = models::Comment::get_by_video(&server.db, &vid, DBPaging::default())?;
let all_comm = models::Comment::get_by_video(&mut server.db.conn()?, &vid, DBPaging::default())?;
if all_comm.iter().any(|c| c.parent_id.map(|i| i.to_string()) == Some(id.to_string())) {
send_user_error!(&ses.user_id, server, Topic::Video(&vid), "Failed to delete comment.", "Comment has replies. Cannot delete.", true);
return Ok(());
}
models::Comment::delete(&server.db, &id)?;
models::Comment::delete(&mut server.db.conn()?, &id)?;
server.emit_cmd(
client_cmd!(DelComment, {comment_id: id.to_string()}),
super::SendTo::VideoId(&vid))?;
Expand All @@ -363,13 +364,14 @@ pub async fn msg_del_comment(data: &DelComment, ses: &mut UserSession, server: &


pub async fn msg_list_my_messages(data: &proto::client::client_to_server_cmd::ListMyMessages, ses: &mut UserSession, server: &ServerState) -> Res<()> {
let msgs = models::Message::get_by_user(&server.db, &ses.user_id, DBPaging::default())?;
let conn = &mut server.db.conn()?;
let msgs = models::Message::get_by_user(conn, &ses.user_id, DBPaging::default())?;
server.emit_cmd(
client_cmd!(ShowMessages, { msgs: (&msgs).into_iter().map(|m| m.to_proto3()).collect() }),
super::SendTo::UserSession(&ses.sid)
)?;
for m in msgs {
if !m.seen { models::Message::set_seen(&server.db, m.id, true)?; }
if !m.seen { models::Message::set_seen(conn, m.id, true)?; }
}
Ok(())
}
Expand Down
Loading

0 comments on commit 69c5b4f

Please sign in to comment.