From 074de01bbd5fc2020da339b650e19399e809f61c Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Tue, 16 Jan 2024 16:38:14 +0000 Subject: [PATCH 1/4] Fix amqp code generation templates to ensure resulting code compiles Signed-off-by: Dom Del Nano --- .../amqp/amqp_code_generator/amqp_code_gen.py | 4 ++-- .../gen_templates/decode.h.jinja_template | 6 ++++++ .../gen_templates/types_gen.h.jinja_template | 11 +++++++++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/amqp_code_gen.py b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/amqp_code_gen.py index 01c23b15aff..409843bdc31 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/amqp_code_gen.py +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/amqp_code_gen.py @@ -694,7 +694,7 @@ def gen_process_frame_type(self): AMQPFrameTypes amqp_frame_type = static_cast(req->frame_type); switch (amqp_frame_type) { case AMQPFrameTypes::kFrameHeader: - return ProcessContentHeader(&decoder, req); + return ProcessContentHeader(decoder, req); case AMQPFrameTypes::kFrameBody: { req->msg = ""; auto status = decoder->ExtractBufIgnore(req->payload_size); @@ -707,7 +707,7 @@ def gen_process_frame_type(self): req->msg = ""; break; // Heartbeat frames have no body or length case AMQPFrameTypes::kFrameMethod: - return ProcessFrameMethod(&decoder, req); + return ProcessFrameMethod(decoder, req); default: VLOG(1) << absl::Substitute("Unparsed frame $0", req->frame_type); } diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/decode.h.jinja_template b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/decode.h.jinja_template index 18ccd06bb56..09ecf540de8 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/decode.h.jinja_template +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/decode.h.jinja_template @@ -33,6 +33,12 @@ namespace amqp { {{ struct_declr }} +template +std::string ToString(T obj) { + utils::JSONObjectBuilder json_object_builder; + obj.ToJSON(&json_object_builder); + return json_object_builder.GetString(); +} // TODO combine with kafka ToString function Status ProcessPayload(Frame* req, BinaryDecoder* decoder); diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/types_gen.h.jinja_template b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/types_gen.h.jinja_template index b9bdc571aee..a9e041564ff 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/types_gen.h.jinja_template +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/types_gen.h.jinja_template @@ -52,7 +52,7 @@ enum class AMQPFrameTypes : uint8_t { kFrameBody = 3, kFrameHeartbeat = 8, }; -const char kFrameEnd = 0xCE; +const uint8_t kFrameEnd = 0xCE; const uint8_t kMinFrameLength = 8; // Represents a generic AMQP message. @@ -83,6 +83,11 @@ struct Frame : public FrameBase { uint16_t method_id = 0; size_t ByteSize() const override { return sizeof(Frame) + msg.size(); } + + std::string ToString() const override { + return absl::Substitute("frame_type=[$0] channel=[$1] payload_size=[$2] msg=[$3]", frame_type, + channel, payload_size, msg); + } }; struct Record { @@ -96,14 +101,16 @@ struct Record { std::string px_info = ""; std::string ToString() const { - return absl::Substitute("req=[$0] resp=[$1]", req->ToString(), resp->ToString()); + return absl::Substitute("req=[$0] resp=[$1]", req.ToString(), resp.ToString()); } }; +using channel_id = uint16_t; struct ProtocolTraits : public BaseProtocolTraits { using frame_type = Frame; using record_type = Record; using state_type = NoState; + using key_type = channel_id; }; } // namespace amqp From bb1d4e57df00494335f4ce1617f4961b2404457a Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Tue, 16 Jan 2024 16:38:37 +0000 Subject: [PATCH 2/4] Update the templated files after fixing compilation issues Signed-off-by: Dom Del Nano --- .../socket_tracer/protocols/amqp/decode.cc | 8 +++---- .../socket_tracer/protocols/amqp/decode.h | 19 +++++++++-------- .../socket_tracer/protocols/amqp/types_gen.h | 21 ++++++++++--------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.cc b/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.cc index 6f8cd672fd4..a16c2a605ea 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.cc @@ -18,11 +18,7 @@ // Code generated by AMQP protocol generator. DO NOT EDIT. #include "src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h" -#include -#include #include -#include -#include #include "src/common/base/base.h" #include "src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h" @@ -44,6 +40,7 @@ StatusOr ExtractLongString(BinaryDecoder* decoder) { PX_ASSIGN_OR_RETURN(uint32_t len, decoder->ExtractBEInt()); return decoder->ExtractString(len); } + StatusOr ExtractNthBit(BinaryDecoder* decoder, int n) { // Extract Value at Nth bit return decoder->Buf()[0] >> n & 1; @@ -726,7 +723,7 @@ Status ProcessContentHeader(BinaryDecoder* decoder, Frame* req) { return ExtractAMQPTxContentHeader(decoder, req); default: - VLOG(1) << absl::Substitute("Unparsed frame method class $0", class_id); + VLOG(1) << absl::Substitute("Unparsed content header class $0", class_id); } return Status::OK(); } @@ -967,6 +964,7 @@ Status ProcessFrameMethod(BinaryDecoder* decoder, Frame* req) { default: VLOG(1) << absl::Substitute("Unparsed frame method class $0 method $1", class_id, method_id); } + return Status::OK(); } diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h b/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h index 91e59cc1986..dbb59188aaf 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h @@ -19,10 +19,10 @@ #pragma once #include -#include "src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h" #include "src/common/base/base.h" #include "src/common/json/json.h" +#include "src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h" #include "src/stirling/utils/binary_decoder.h" namespace px { @@ -41,7 +41,7 @@ struct AMQPConnectionStart { void ToJSON(utils::JSONObjectBuilder* builder) const { builder->WriteKV("version_major", version_major); builder->WriteKV("version_minor", version_minor); - // TODO(vsrivatsa): support KV for server_properties field table type + // TODO: support KV for server-properties field table type builder->WriteKV("mechanisms", mechanisms); builder->WriteKV("locales", locales); } @@ -55,7 +55,7 @@ struct AMQPConnectionStartOk { bool synchronous = 1; void ToJSON(utils::JSONObjectBuilder* builder) const { - // TODO(vsrivatsa): support KV for client_properties field table type + // TODO: support KV for client-properties field table type builder->WriteKV("mechanism", mechanism); builder->WriteKV("response", response); builder->WriteKV("locale", locale); @@ -241,7 +241,7 @@ struct AMQPExchangeDeclare { builder->WriteKV("reserved_2", reserved_2); builder->WriteKV("reserved_3", reserved_3); builder->WriteKV("no_wait", no_wait); - // TODO(vsrivatsa): support KV for arguments field table type + // TODO: support KV for arguments field table type } }; @@ -302,7 +302,7 @@ struct AMQPQueueDeclare { builder->WriteKV("exclusive", exclusive); builder->WriteKV("auto_delete", auto_delete); builder->WriteKV("no_wait", no_wait); - // TODO(vsrivatsa): support KV for arguments field table type + // TODO: support KV for arguments field table type } }; @@ -334,7 +334,7 @@ struct AMQPQueueBind { builder->WriteKV("exchange", exchange); builder->WriteKV("routing_key", routing_key); builder->WriteKV("no_wait", no_wait); - // TODO(vsrivatsa): support KV for arguments field table type + // TODO: support KV for arguments field table type } }; @@ -357,7 +357,7 @@ struct AMQPQueueUnbind { builder->WriteKV("queue", queue); builder->WriteKV("exchange", exchange); builder->WriteKV("routing_key", routing_key); - // TODO(vsrivatsa): support KV for arguments field table type + // TODO: support KV for arguments field table type } }; @@ -464,7 +464,7 @@ struct AMQPBasicConsume { builder->WriteKV("no_ack", no_ack); builder->WriteKV("exclusive", exclusive); builder->WriteKV("no_wait", no_wait); - // TODO(vsrivatsa): support KV for arguments field table type + // TODO: support KV for arguments field table type } }; @@ -651,7 +651,7 @@ struct AMQPBasicContentHeader { builder->WriteKV("property_flags", property_flags); builder->WriteKV("content_type", content_type); builder->WriteKV("content_encoding", content_encoding); - // TODO(vsrivatsa): support KV for headers field table type + // TODO: support KV for headers field table type builder->WriteKV("delivery_mode", delivery_mode); builder->WriteKV("priority", priority); builder->WriteKV("correlation_id", correlation_id); @@ -719,6 +719,7 @@ std::string ToString(T obj) { obj.ToJSON(&json_object_builder); return json_object_builder.GetString(); } +// TODO combine with kafka ToString function Status ProcessPayload(Frame* req, BinaryDecoder* decoder); } // namespace amqp diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h b/src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h index b475b80de93..4830ee2299d 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h @@ -17,6 +17,7 @@ */ // Code generated by AMQP protocol generator. DO NOT EDIT. #pragma once + #include #include "src/common/base/base.h" @@ -156,30 +157,26 @@ enum class AMQPFrameTypes : uint8_t { }; const uint8_t kFrameEnd = 0xCE; const uint8_t kMinFrameLength = 8; -constexpr uint8_t kEndByteSize = 1; -const uint8_t kMinFrameWithoutEnd = 7; // Represents a generic AMQP message. struct Frame : public FrameBase { - // Marks end of the frame by hexadecimal value %xCE - - uint8_t frame_type = 0; + uint8_t frame_type; // Communication channel to be used - uint16_t channel = 0; + uint16_t channel; // Defines the length of message upcoming - uint32_t payload_size = 0; + uint32_t payload_size; // Actual body content to be used - std::string msg = ""; + std::string msg; // sync value only known after full body parsing bool synchronous = false; - // `consumed` is used to mark if a request packet has been matched to a + // `consumed` is used to mark if a request frame has been matched to a // response in StitchFrames. This is an optimization to efficiently remove all - // matched packets from the front of the deque. + // matched frames from the front of the deque. bool consumed = false; // if full body parsing already done @@ -197,11 +194,15 @@ struct Frame : public FrameBase { }; struct Record { + // AMQP record can support both sync and async frames. + // async frames have either req/resp set + // sync frames have both req & resp set Frame req; Frame resp; // Debug information. std::string px_info = ""; + std::string ToString() const { return absl::Substitute("req=[$0] resp=[$1]", req.ToString(), resp.ToString()); } From f4f68f9b4e834bcaecdfcbde74ac33a28ef3c8a3 Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Tue, 16 Jan 2024 16:40:32 +0000 Subject: [PATCH 3/4] Update README to simplify updating the auto generated files Signed-off-by: Dom Del Nano --- .../protocols/amqp/amqp_code_generator/README.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/README.md b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/README.md index d9916280908..30437396917 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/README.md +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/README.md @@ -7,10 +7,9 @@ Bazel cmds: ``` wget "https://www.rabbitmq.com/resources/specs/amqp0-9-1.xml" bazel run //src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator:amqp_code_gen_main -- run - cp generated_files/decode.cc ../ - cp generated_files/decode.h ../ - cp generated_files/types_gen.h ../ + cp generated_files/{decode.h,decode.cc,types_gen.h} ../ cp generated_files/amqp.h src/carnot/funcs/protocols/amqp.h + arc lint ``` From 8a4b0dbd4ead36b9fd6a654073966ce9ebe6d44f Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Tue, 16 Jan 2024 17:21:18 +0000 Subject: [PATCH 4/4] Fix linting issues Signed-off-by: Dom Del Nano --- .../amqp/amqp_code_generator/amqp_code_gen.py | 2 +- .../gen_templates/decode.h.jinja_template | 2 +- .../socket_tracer/protocols/amqp/decode.h | 18 +++++++++--------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/amqp_code_gen.py b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/amqp_code_gen.py index 409843bdc31..43a4ba5d940 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/amqp_code_gen.py +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/amqp_code_gen.py @@ -125,7 +125,7 @@ def gen_json_builder(self): The field name is used as json key to be close to the key used in spec """ if self.field_type == FieldType.table: - return f"// TODO: support KV for {self.field_name} field table type" + return f"// TODO(vsrivatsa): support KV for {self.field_name} field table type" return f'builder->WriteKV("{self.c_field_name}", {self.c_field_name});' def gen_buffer_extract(self): diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/decode.h.jinja_template b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/decode.h.jinja_template index 09ecf540de8..b16d5ad8620 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/decode.h.jinja_template +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/decode.h.jinja_template @@ -39,7 +39,7 @@ std::string ToString(T obj) { obj.ToJSON(&json_object_builder); return json_object_builder.GetString(); } -// TODO combine with kafka ToString function +// TODO(vsrivatsa) combine with kafka ToString function Status ProcessPayload(Frame* req, BinaryDecoder* decoder); diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h b/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h index dbb59188aaf..fd7c315ce71 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h @@ -41,7 +41,7 @@ struct AMQPConnectionStart { void ToJSON(utils::JSONObjectBuilder* builder) const { builder->WriteKV("version_major", version_major); builder->WriteKV("version_minor", version_minor); - // TODO: support KV for server-properties field table type + // TODO(vsrivatsa): support KV for server-properties field table type builder->WriteKV("mechanisms", mechanisms); builder->WriteKV("locales", locales); } @@ -55,7 +55,7 @@ struct AMQPConnectionStartOk { bool synchronous = 1; void ToJSON(utils::JSONObjectBuilder* builder) const { - // TODO: support KV for client-properties field table type + // TODO(vsrivatsa): support KV for client-properties field table type builder->WriteKV("mechanism", mechanism); builder->WriteKV("response", response); builder->WriteKV("locale", locale); @@ -241,7 +241,7 @@ struct AMQPExchangeDeclare { builder->WriteKV("reserved_2", reserved_2); builder->WriteKV("reserved_3", reserved_3); builder->WriteKV("no_wait", no_wait); - // TODO: support KV for arguments field table type + // TODO(vsrivatsa): support KV for arguments field table type } }; @@ -302,7 +302,7 @@ struct AMQPQueueDeclare { builder->WriteKV("exclusive", exclusive); builder->WriteKV("auto_delete", auto_delete); builder->WriteKV("no_wait", no_wait); - // TODO: support KV for arguments field table type + // TODO(vsrivatsa): support KV for arguments field table type } }; @@ -334,7 +334,7 @@ struct AMQPQueueBind { builder->WriteKV("exchange", exchange); builder->WriteKV("routing_key", routing_key); builder->WriteKV("no_wait", no_wait); - // TODO: support KV for arguments field table type + // TODO(vsrivatsa): support KV for arguments field table type } }; @@ -357,7 +357,7 @@ struct AMQPQueueUnbind { builder->WriteKV("queue", queue); builder->WriteKV("exchange", exchange); builder->WriteKV("routing_key", routing_key); - // TODO: support KV for arguments field table type + // TODO(vsrivatsa): support KV for arguments field table type } }; @@ -464,7 +464,7 @@ struct AMQPBasicConsume { builder->WriteKV("no_ack", no_ack); builder->WriteKV("exclusive", exclusive); builder->WriteKV("no_wait", no_wait); - // TODO: support KV for arguments field table type + // TODO(vsrivatsa): support KV for arguments field table type } }; @@ -651,7 +651,7 @@ struct AMQPBasicContentHeader { builder->WriteKV("property_flags", property_flags); builder->WriteKV("content_type", content_type); builder->WriteKV("content_encoding", content_encoding); - // TODO: support KV for headers field table type + // TODO(vsrivatsa): support KV for headers field table type builder->WriteKV("delivery_mode", delivery_mode); builder->WriteKV("priority", priority); builder->WriteKV("correlation_id", correlation_id); @@ -719,7 +719,7 @@ std::string ToString(T obj) { obj.ToJSON(&json_object_builder); return json_object_builder.GetString(); } -// TODO combine with kafka ToString function +// TODO(vsrivatsa) combine with kafka ToString function Status ProcessPayload(Frame* req, BinaryDecoder* decoder); } // namespace amqp