Skip to content

Commit

Permalink
Add initial MQTT support
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed May 12, 2024
1 parent 6751907 commit 5e0dc43
Show file tree
Hide file tree
Showing 21 changed files with 1,300 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased
### Added
- Introduced MQTT protocol support (see [FetchStepMqtt](https://docs.kamu.dev/odf/reference/#fetchstepmqtt) and the new [`mqtt` example](/examples/mqtt))
- The `kamu system compact` command now accepts the `--keep-metadata-only` flag, which performs hard
compaction of dataset(root or derived) without retaining `AddData` or `ExecuteTransform` blocks
### Fixed
Expand Down
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions examples/mqtt/pub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import datetime
import json
import paho.mqtt.publish as publish

for i in range(10):
payload = json.dumps({
"event_time": datetime.datetime.now().isoformat(),
"value": i,
})
print("Publishing:", payload)
publish.single(
hostname="test.mosquitto.org",
port=1883,
topic="dev.kamu.example.mqtt.temp",
payload=payload,
retain=True,
qos=1,
)
1 change: 1 addition & 0 deletions examples/mqtt/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
paho-mqtt
23 changes: 23 additions & 0 deletions examples/mqtt/temp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
kind: DatasetSnapshot
version: 1
content:
name: temp
kind: Root
metadata:
- kind: SetPollingSource
fetch:
kind: mqtt
host: test.mosquitto.org
port: 1883
topics:
- path: dev.kamu.example.mqtt.temp
qos: AtMostOnce
read:
kind: NdJson
schema:
- event_time TIMESTAMP
- value FLOAT
merge:
kind: Ledger
primaryKey:
- event_time
21 changes: 20 additions & 1 deletion resources/schema.gql
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ type ExecuteTransformInput {
newOffset: Int
}

union FetchStep = FetchStepUrl | FetchStepFilesGlob | FetchStepContainer
union FetchStep = FetchStepUrl | FetchStepFilesGlob | FetchStepContainer | FetchStepMqtt

type FetchStepContainer {
image: String!
Expand All @@ -712,6 +712,14 @@ type FetchStepFilesGlob {
order: SourceOrdering
}

type FetchStepMqtt {
host: String!
port: Int!
username: String
password: String
topics: [MqttTopicSubscription!]!
}

type FetchStepUrl {
url: String!
eventTime: EventTimeSource
Expand Down Expand Up @@ -1129,6 +1137,17 @@ type MetadataManifestUnsupportedVersion implements CommitResult & CreateDatasetF
message: String!
}

enum MqttQos {
AT_MOST_ONCE
AT_LEAST_ONCE
EXACTLY_ONCE
}

type MqttTopicSubscription {
path: String!
qos: MqttQos
}

scalar Multihash

type Mutation {
Expand Down
70 changes: 70 additions & 0 deletions src/adapter/graphql/src/scalars/odf_generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ pub enum FetchStep {
Url(FetchStepUrl),
FilesGlob(FetchStepFilesGlob),
Container(FetchStepContainer),
Mqtt(FetchStepMqtt),
}

impl From<odf::FetchStep> for FetchStep {
Expand All @@ -429,6 +430,7 @@ impl From<odf::FetchStep> for FetchStep {
odf::FetchStep::Url(v) => Self::Url(v.into()),
odf::FetchStep::FilesGlob(v) => Self::FilesGlob(v.into()),
odf::FetchStep::Container(v) => Self::Container(v.into()),
odf::FetchStep::Mqtt(v) => Self::Mqtt(v.into()),
}
}
}
Expand Down Expand Up @@ -490,6 +492,27 @@ impl From<odf::FetchStepContainer> for FetchStepContainer {
}
}

#[derive(SimpleObject, Debug, Clone, PartialEq, Eq)]
pub struct FetchStepMqtt {
pub host: String,
pub port: i32,
pub username: Option<String>,
pub password: Option<String>,
pub topics: Vec<MqttTopicSubscription>,
}

impl From<odf::FetchStepMqtt> for FetchStepMqtt {
fn from(v: odf::FetchStepMqtt) -> Self {
Self {
host: v.host.into(),
port: v.port.into(),
username: v.username.map(Into::into),
password: v.password.map(Into::into),
topics: v.topics.into_iter().map(Into::into).collect(),
}
}
}

#[derive(Enum, Debug, Clone, Copy, PartialEq, Eq)]
pub enum SourceOrdering {
ByEventTime,
Expand Down Expand Up @@ -643,6 +666,53 @@ impl From<odf::MetadataEvent> for MetadataEvent {
}
}

////////////////////////////////////////////////////////////////////////////////
// MqttTopicSubscription
// https://github.com/kamu-data/open-data-fabric/blob/master/open-data-fabric.md#mqtttopicsubscription-schema
////////////////////////////////////////////////////////////////////////////////

#[derive(SimpleObject, Debug, Clone, PartialEq, Eq)]
pub struct MqttTopicSubscription {
pub path: String,
pub qos: Option<MqttQos>,
}

impl From<odf::MqttTopicSubscription> for MqttTopicSubscription {
fn from(v: odf::MqttTopicSubscription) -> Self {
Self {
path: v.path.into(),
qos: v.qos.map(Into::into),
}
}
}

#[derive(Enum, Debug, Clone, Copy, PartialEq, Eq)]
pub enum MqttQos {
AtMostOnce,
AtLeastOnce,
ExactlyOnce,
}

impl From<odf::MqttQos> for MqttQos {
fn from(v: odf::MqttQos) -> Self {
match v {
odf::MqttQos::AtMostOnce => Self::AtMostOnce,
odf::MqttQos::AtLeastOnce => Self::AtLeastOnce,
odf::MqttQos::ExactlyOnce => Self::ExactlyOnce,
}
}
}

impl Into<odf::MqttQos> for MqttQos {
fn into(self) -> odf::MqttQos {
match self {
Self::AtMostOnce => odf::MqttQos::AtMostOnce,
Self::AtLeastOnce => odf::MqttQos::AtLeastOnce,
Self::ExactlyOnce => odf::MqttQos::ExactlyOnce,
}
}
}

////////////////////////////////////////////////////////////////////////////////
// OffsetInterval
// https://github.com/kamu-data/open-data-fabric/blob/master/open-data-fabric.md#offsetinterval-schema
Expand Down
25 changes: 25 additions & 0 deletions src/domain/opendatafabric/schemas/odf.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,22 @@ table EnvVar {
value: string;
}

////////////////////////////////////////////////////////////////////////////////
// MqttTopicSubscription
// https://github.com/kamu-data/open-data-fabric/blob/master/open-data-fabric.md#mqtttopicsubscription-schema
////////////////////////////////////////////////////////////////////////////////

enum MqttQos: int32 {
AtMostOnce,
AtLeastOnce,
ExactlyOnce,
}

table MqttTopicSubscription {
path: string;
qos: MqttQos = null;
}

////////////////////////////////////////////////////////////////////////////////
// FetchStep
// https://github.com/kamu-data/open-data-fabric/blob/master/open-data-fabric.md#fetchstep-schema
Expand Down Expand Up @@ -354,10 +370,19 @@ table FetchStepContainer {
env: [EnvVar];
}

table FetchStepMqtt {
host: string;
port: int32;
username: string;
password: string;
topics: [MqttTopicSubscription];
}

union FetchStep {
FetchStepUrl,
FetchStepFilesGlob,
FetchStepContainer,
FetchStepMqtt,
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
75 changes: 74 additions & 1 deletion src/domain/opendatafabric/src/dtos/dtos_dyntraits_generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::path::Path;
use chrono::{DateTime, Utc};

use crate::dtos;
use crate::dtos::{CompressionFormat, DatasetKind, SourceOrdering};
use crate::dtos::{CompressionFormat, DatasetKind, MqttQos, SourceOrdering};
use crate::formats::*;
use crate::identity::*;

Expand Down Expand Up @@ -575,6 +575,7 @@ pub enum FetchStep<'a> {
Url(&'a dyn FetchStepUrl),
FilesGlob(&'a dyn FetchStepFilesGlob),
Container(&'a dyn FetchStepContainer),
Mqtt(&'a dyn FetchStepMqtt),
}

impl<'a> From<&'a dtos::FetchStep> for FetchStep<'a> {
Expand All @@ -583,6 +584,7 @@ impl<'a> From<&'a dtos::FetchStep> for FetchStep<'a> {
dtos::FetchStep::Url(v) => FetchStep::Url(v),
dtos::FetchStep::FilesGlob(v) => FetchStep::FilesGlob(v),
dtos::FetchStep::Container(v) => FetchStep::Container(v),
dtos::FetchStep::Mqtt(v) => FetchStep::Mqtt(v),
}
}
}
Expand All @@ -593,6 +595,7 @@ impl Into<dtos::FetchStep> for FetchStep<'_> {
FetchStep::Url(v) => dtos::FetchStep::Url(v.into()),
FetchStep::FilesGlob(v) => dtos::FetchStep::FilesGlob(v.into()),
FetchStep::Container(v) => dtos::FetchStep::Container(v.into()),
FetchStep::Mqtt(v) => dtos::FetchStep::Mqtt(v.into()),
}
}
}
Expand All @@ -618,6 +621,14 @@ pub trait FetchStepContainer {
fn env(&self) -> Option<Box<dyn Iterator<Item = &dyn EnvVar> + '_>>;
}

pub trait FetchStepMqtt {
fn host(&self) -> &str;
fn port(&self) -> i32;
fn username(&self) -> Option<&str>;
fn password(&self) -> Option<&str>;
fn topics(&self) -> Box<dyn Iterator<Item = &dyn MqttTopicSubscription> + '_>;
}

impl FetchStepUrl for dtos::FetchStepUrl {
fn url(&self) -> &str {
self.url.as_ref()
Expand Down Expand Up @@ -683,6 +694,28 @@ impl FetchStepContainer for dtos::FetchStepContainer {
}
}

impl FetchStepMqtt for dtos::FetchStepMqtt {
fn host(&self) -> &str {
self.host.as_ref()
}
fn port(&self) -> i32 {
self.port
}
fn username(&self) -> Option<&str> {
self.username.as_ref().map(|v| -> &str { v.as_ref() })
}
fn password(&self) -> Option<&str> {
self.password.as_ref().map(|v| -> &str { v.as_ref() })
}
fn topics(&self) -> Box<dyn Iterator<Item = &dyn MqttTopicSubscription> + '_> {
Box::new(
self.topics
.iter()
.map(|i| -> &dyn MqttTopicSubscription { i }),
)
}
}

impl Into<dtos::FetchStepUrl> for &dyn FetchStepUrl {
fn into(self) -> dtos::FetchStepUrl {
dtos::FetchStepUrl {
Expand Down Expand Up @@ -716,6 +749,18 @@ impl Into<dtos::FetchStepContainer> for &dyn FetchStepContainer {
}
}

impl Into<dtos::FetchStepMqtt> for &dyn FetchStepMqtt {
fn into(self) -> dtos::FetchStepMqtt {
dtos::FetchStepMqtt {
host: self.host().to_owned(),
port: self.port(),
username: self.username().map(|v| v.to_owned()),
password: self.password().map(|v| v.to_owned()),
topics: self.topics().map(|i| i.into()).collect(),
}
}
}

////////////////////////////////////////////////////////////////////////////////
// MergeStrategy
// https://github.com/kamu-data/open-data-fabric/blob/master/open-data-fabric.md#mergestrategy-schema
Expand Down Expand Up @@ -905,6 +950,34 @@ impl Into<dtos::MetadataEvent> for MetadataEvent<'_> {
}
}

////////////////////////////////////////////////////////////////////////////////
// MqttTopicSubscription
// https://github.com/kamu-data/open-data-fabric/blob/master/open-data-fabric.md#mqtttopicsubscription-schema
////////////////////////////////////////////////////////////////////////////////

pub trait MqttTopicSubscription {
fn path(&self) -> &str;
fn qos(&self) -> Option<MqttQos>;
}

impl MqttTopicSubscription for dtos::MqttTopicSubscription {
fn path(&self) -> &str {
self.path.as_ref()
}
fn qos(&self) -> Option<MqttQos> {
self.qos.as_ref().map(|v| -> MqttQos { *v })
}
}

impl Into<dtos::MqttTopicSubscription> for &dyn MqttTopicSubscription {
fn into(self) -> dtos::MqttTopicSubscription {
dtos::MqttTopicSubscription {
path: self.path().to_owned(),
qos: self.qos().map(|v| v.into()),
}
}
}

////////////////////////////////////////////////////////////////////////////////
// OffsetInterval
// https://github.com/kamu-data/open-data-fabric/blob/master/open-data-fabric.md#offsetinterval-schema
Expand Down
Loading

0 comments on commit 5e0dc43

Please sign in to comment.