Skip to content

Commit

Permalink
chore: generate rust protobuf objects (#2157)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith authored Oct 16, 2024
1 parent 54d3ce9 commit 5e87391
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 0 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/numaflow-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ tonic-build = "0.12.3"
prost = "0.13.2"
prost-types = "0.13.1"
tonic = "0.12.3"
prost-build = "0.13.3"
4 changes: 4 additions & 0 deletions rust/numaflow-grpc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ clean:
.PHONY: generate
generate: clean
rm -rf src/clients/*.rs
rm -rf src/objects/*.rs
cp -r ../../pkg/apis/proto proto
mv src/clients.rs /tmp/clients.rs.bak
mv src/objects.rs /tmp/objects.rs.bak
> src/clients.rs
> src/objects.rs
-./codegen.sh
mv /tmp/clients.rs.bak src/clients.rs
mv /tmp/objects.rs.bak src/objects.rs
$(MAKE) clean
4 changes: 4 additions & 0 deletions rust/numaflow-grpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
/// gRPC clients and also protobuf objects for gRPC.
pub mod clients;

/// protobuf objects for concrete types
pub mod objects;
18 changes: 18 additions & 0 deletions rust/numaflow-grpc/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
fn main() {
// gRPC clients for UDF
build_client();

// protobuf objects for serde
build_objects();
}

fn build_client() {
tonic_build::configure()
.build_client(true)
.build_server(false)
Expand All @@ -18,3 +26,13 @@ fn main() {
)
.expect("failed to compile protos");
}

fn build_objects() {
prost_build::Config::new()
.out_dir("src/objects")
.compile_protos(
&["proto/isb/message.proto", "proto/wmb/wmb.proto"],
&["proto"],
)
.expect("failed to compile protos");
}
7 changes: 7 additions & 0 deletions rust/numaflow-grpc/src/objects.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#[path = "objects/isb.rs"]
#[rustfmt::skip]
pub mod isb;

#[path = "objects/wmb.rs"]
#[rustfmt::skip]
pub mod wmb;
123 changes: 123 additions & 0 deletions rust/numaflow-grpc/src/objects/isb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// This file is @generated by prost-build.
/// MessageInfo is the message information window of the payload.
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct MessageInfo {
/// EventTime represents the event time of the message
#[prost(message, optional, tag = "1")]
pub event_time: ::core::option::Option<::prost_types::Timestamp>,
/// IsLate is used to indicate if the message is a late data
#[prost(bool, tag = "2")]
pub is_late: bool,
}
/// MessageMetadata is the metadata of the message
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct MessageMetadata {
/// NumDelivered is the number of times the message has been delivered.
#[prost(uint64, tag = "1")]
pub num_delivered: u64,
}
/// Header is the header of the message
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Header {
/// MessageInfo contains the information window of the payload.
#[prost(message, optional, tag = "1")]
pub message_info: ::core::option::Option<MessageInfo>,
/// Kind indicates the kind of Message
#[prost(enumeration = "MessageKind", tag = "2")]
pub kind: i32,
/// ID is used for exactly-once-semantics. ID is a combination of vertex name, offset and index of the message.
#[prost(message, optional, tag = "3")]
pub id: ::core::option::Option<MessageId>,
/// Keys is (key,value) in the map-reduce paradigm will be used for reduce operation
#[prost(string, repeated, tag = "4")]
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
/// Headers is the headers of the message which can be used to store and propagate source headers
#[prost(map = "string, string", tag = "5")]
pub headers: ::std::collections::HashMap<
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
}
/// MessageID is the message ID of the message which is used for exactly-once-semantics.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MessageId {
/// VertexName is the name of the vertex
#[prost(string, tag = "1")]
pub vertex_name: ::prost::alloc::string::String,
/// Offset is the offset of the message
#[prost(string, tag = "2")]
pub offset: ::prost::alloc::string::String,
/// Index is the index of a flatmap message.
#[prost(int32, tag = "3")]
pub index: i32,
}
/// Body is the body of the message
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Body {
/// Payload is the actual data of the message
#[prost(bytes = "vec", tag = "1")]
pub payload: ::prost::alloc::vec::Vec<u8>,
}
/// Message is inter step message
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Message {
#[prost(message, optional, tag = "1")]
pub header: ::core::option::Option<Header>,
#[prost(message, optional, tag = "2")]
pub body: ::core::option::Option<Body>,
}
/// ReadMessage is the message read from the buffer.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ReadMessage {
/// Message is the actual message read from the buffer
#[prost(message, optional, tag = "1")]
pub message: ::core::option::Option<Message>,
/// ReadOffset is the offset at which the message was read
#[prost(string, tag = "2")]
pub read_offset: ::prost::alloc::string::String,
/// Watermark is the watermark timestamp
#[prost(message, optional, tag = "3")]
pub watermark: ::core::option::Option<::prost_types::Timestamp>,
/// Metadata is the metadata of the message after a message is read from the buffer.
#[prost(message, optional, tag = "4")]
pub metadata: ::core::option::Option<MessageMetadata>,
}
/// WriteMessage is a wrapper for an isb message with tag information which will be used for conditional forwarding.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct WriteMessage {
/// Message is the actual message to be written
#[prost(message, optional, tag = "1")]
pub message: ::core::option::Option<Message>,
/// Tags are the tags associated with the message
#[prost(string, repeated, tag = "2")]
pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
/// MessageKind represents the message type of the payload.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum MessageKind {
/// Data payload
Data = 0,
/// Watermark Barrier
Wmb = 1,
}
impl MessageKind {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Self::Data => "DATA",
Self::Wmb => "WMB",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"DATA" => Some(Self::Data),
"WMB" => Some(Self::Wmb),
_ => None,
}
}
}
23 changes: 23 additions & 0 deletions rust/numaflow-grpc/src/objects/wmb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// This file is @generated by prost-build.
/// WMB is used in the KV offset timeline bucket as the value for the given processor entity key.
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct Wmb {
/// Idle is set to true if the given processor entity hasn't published anything
/// to the offset timeline bucket in a batch processing cycle.
/// Idle is used to signal an idle watermark.
#[prost(bool, tag = "1")]
pub idle: bool,
/// Offset is the monotonically increasing index/offset of the buffer (buffer is the physical representation
/// of the partition of the edge).
#[prost(int64, tag = "2")]
pub offset: i64,
/// Watermark is tightly coupled with the offset and will be monotonically increasing for a given ProcessorEntity
/// as the offset increases.
/// When it is idling (Idle==true), for a given offset, the watermark can monotonically increase without offset
/// increasing.
#[prost(int64, tag = "3")]
pub watermark: i64,
/// Partition to identify the partition to which the watermark belongs.
#[prost(int32, tag = "4")]
pub partition: i32,
}

0 comments on commit 5e87391

Please sign in to comment.