Skip to content

Commit

Permalink
Implement google pub/sub push route (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
Terkwood authored Nov 26, 2018
1 parent de4b5c2 commit 3f95012
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 4 deletions.
2 changes: 1 addition & 1 deletion cloud_images/pond/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pond"
version = "0.2.1"
version = "0.3.0"
authors = ["Terkwood <[email protected]>"]
edition = "2018"

Expand Down
1 change: 1 addition & 0 deletions cloud_images/pond/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub struct Config {
pub redis_namespace: String,
rocket_databases: String,
pub cors_allow_origin: Option<String>,
pub push_secret: String,
}

impl Config {
Expand Down
1 change: 1 addition & 0 deletions cloud_images/pond/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod authorization;
pub mod claims;
pub mod config;
pub mod key_pairs;
mod push;
mod redis_conn;
mod tanks;
pub mod web;
104 changes: 104 additions & 0 deletions cloud_images/pond/src/push.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use base64;
use crate::redis_conn::RedisDbConn;
use redis_delta::RDelta;
use rocket_contrib::databases::redis::Commands;
use std::collections::HashMap;

/// Push data structure which adheres to Google Cloud Pub/Sub
/// specification. Each of these is an individual Redis Delta
/// containing various payloads depending on the Redis type
/// that it represents.
/// See https://cloud.google.com/pubsub/docs/push
#[derive(Debug, Deserialize)]
pub struct PushData {
pub message: Message,
pub subscription: String,
}

impl PushData {
// FIXME deal with out-of-order messaging
pub fn ingest(&self, conn: RedisDbConn) -> Result<(), PushDataError> {
let rdelta = self.message.deserialize()?;
let result = match rdelta {
RDelta::UpdateHash {
key,
mut fields,
time: _,
} => {
let mut name_vals: Vec<(String, String)> = vec![];
for rf in fields.drain(..) {
name_vals.push((rf.name, rf.val));
}
Ok(conn.0.hset_multiple(key, &name_vals)?)
}
RDelta::UpdateSet { key, vals, time: _ } =>
// FIXME this method can leave some members in place
// FIXME who no longer exist in the source database
// FIXME instead of using SADD, we should... ?
// FIXME ...pop everything atomically and re-insert?
// FIXME ...or only signal the addition of members from the source?
// FIXME ...or maybe this is decent behavior after all?
{
Ok(conn.0.sadd(key, vals)?)
}
RDelta::UpdateString { key, val, time: _ } => Ok(conn.0.set(key, val)?),
};

if let Err(e) = &result {
eprintln!("Error on ingest! {:?}", e)
}

result
}
}

#[derive(Debug, Deserialize)]
pub struct Message {
pub attributes: Option<HashMap<String, String>>,
pub data: Base64,
pub message_id: String,
}
impl Message {
pub fn deserialize(&self) -> Result<RDelta, PushDataError> {
let json_r: Result<RDelta, _> = serde_json::from_slice(&self.data.decode()?[..]);
Ok(json_r?)
}
}

#[derive(Debug, Deserialize)]
pub struct Base64(pub String);
impl Base64 {
/// You can consume this with `serde_json::from_slice`
pub fn decode(&self) -> Result<Vec<u8>, base64::DecodeError> {
base64::decode(&self.0)
}
}

#[derive(Debug)]
pub enum PushDataError {
Base64,
Json,
Utf8,
Redis,
}
impl From<rocket_contrib::databases::redis::RedisError> for PushDataError {
fn from(_e: rocket_contrib::databases::redis::RedisError) -> PushDataError {
PushDataError::Redis
}
}
impl From<std::str::Utf8Error> for PushDataError {
fn from(_e: std::str::Utf8Error) -> PushDataError {
PushDataError::Utf8
}
}
impl From<serde_json::Error> for PushDataError {
fn from(_e: serde_json::Error) -> PushDataError {
PushDataError::Json
}
}

impl From<base64::DecodeError> for PushDataError {
fn from(_e: base64::DecodeError) -> PushDataError {
PushDataError::Base64
}
}
4 changes: 2 additions & 2 deletions cloud_images/pond/src/tanks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub fn fetch_all(conn: RedisDbConn, namespace: &str) -> Result<Vec<Tank>, redis:

fn fetch_num_tanks(conn: &RedisDbConn, namespace: &str) -> Result<u16, redis::RedisError> {
let key = Key::AllTanks {
ns: Namespace(namespace),
ns: Namespace(namespace.to_owned()),
}
.to_string();
conn.0.get(key)
Expand All @@ -98,7 +98,7 @@ fn fetch_tank_status(
namespace: &str,
) -> Result<Option<Tank>, redis::RedisError> {
let key = Key::Tank {
ns: Namespace(namespace),
ns: Namespace(namespace.to_owned()),
id,
}
.to_string();
Expand Down
44 changes: 43 additions & 1 deletion cloud_images/pond/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use crate::authentication::{authenticate, AuthenticationResult};
use crate::authorization::authorize;
use crate::config::Config;
use crate::key_pairs;
use crate::push::{PushData, PushDataError};
use crate::redis_conn::*;
use crate::tanks;
use rocket::http::hyper::header::{AccessControlAllowOrigin, AccessControlMaxAge};
use rocket::http::RawStr;
use rocket::http::Status;
use rocket::request::{self, FromRequest, Request};
use rocket::{Outcome, State};
Expand Down Expand Up @@ -129,11 +131,51 @@ fn token_from_bearer_string(bearer_string: &str) -> Result<String, ()> {
}
}

/// An endpoint which receives push messages from Google pub/sub platform.
/// These messages summarize changes to the Redis database hosted in
/// in local proximity to the temp & ph sensors.
/// See https://cloud.google.com/pubsub/docs/push
///
/// Here is an example of sending a base64 encoded payload to the endpoint.
///
/// ```sh
/// curl -k -d '{ "message": { "attributes": { "key": "value" }, "data": "eyJ1cGRhdGVfaGFzaCI6eyJrZXkiOiJwcmF3bmJhYnkvc2Vuc29ycy90ZW1wL2FhYWFhYWFhLWVlZWUtYWFhYS1hYWFhLWFhYWFhYWFhYWFhYSIsImZpZWxkcyI6W3sibmFtZSI6InRlbXBfdXBkYXRlX2NvdW50IiwidmFsIjoiNDEwOTY2In0seyJuYW1lIjoidGVtcF91cGRhdGVfdGltZSIsInZhbCI6IjE1NDI3NTI3MTAifSx7Im5hbWUiOiJ0ZW1wX2MiLCJ2YWwiOiIyNC42MiJ9LHsibmFtZSI6InRlbXBfZiIsInZhbCI6Ijc2LjMyIn1dLCJ0aW1lIjoxNTQyNzUyNzE1fX0=", "message_id": "136969346945" },"subscription": "projects/myproject/subscriptions/mysubscription"}' -H "Content-Type: application/json" -X POST https://localhost:8000/push_redis\?token\=fancy_shared_sekrit
/// ```
///
/// In this case, the base64 "data" attribute decodes as follows:
/// ```json
/// {"update_hash":{"key":"prawnbaby/sensors/temp/aaaaaaaa-eeee-aaaa-aaaa-aaaaaaaaaaaa","fields":[{"name":"temp_update_count","val":"410966"},{"name":"temp_update_time","val":"1542752710"},{"name":"temp_c","val":"24.62"},{"name":"temp_f","val":"76.32"}],"time":1542752715}}
/// ```
#[post(
"/push_redis?<token>",
format = "application/json",
data = "<data>"
)]
pub fn push_redis(
data: Json<PushData>,
token: &RawStr,
conn: RedisDbConn,
config: State<Config>,
) -> Status {
let push_secret: String = config.push_secret.to_string();
// This can be improved.
// See https://github.com/Terkwood/prawnalith/issues/60
if token.as_str() == push_secret {
match data.ingest(conn) {
Ok(_) => Status::NoContent,
Err(PushDataError::Redis) => Status::InternalServerError,
Err(_) => Status::UnprocessableEntity,
}
} else {
Status::Unauthorized
}
}

pub fn startup(config: Config) {
rocket::ignite()
.manage(config)
.attach(RedisDbConn::fairing())
.mount("/", routes![tanks, tanks_options])
.mount("/", routes![tanks, tanks_options, push_redis])
.launch();
}

Expand Down

0 comments on commit 3f95012

Please sign in to comment.