diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/README.md b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/README.md index d9916280908..30437396917 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/README.md +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/README.md @@ -7,10 +7,9 @@ Bazel cmds: ``` wget "https://www.rabbitmq.com/resources/specs/amqp0-9-1.xml" bazel run //src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator:amqp_code_gen_main -- run - cp generated_files/decode.cc ../ - cp generated_files/decode.h ../ - cp generated_files/types_gen.h ../ + cp generated_files/{decode.h,decode.cc,types_gen.h} ../ cp generated_files/amqp.h src/carnot/funcs/protocols/amqp.h + arc lint ``` diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/amqp_code_gen.py b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/amqp_code_gen.py index 01c23b15aff..43a4ba5d940 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/amqp_code_gen.py +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/amqp_code_gen.py @@ -125,7 +125,7 @@ def gen_json_builder(self): The field name is used as json key to be close to the key used in spec """ if self.field_type == FieldType.table: - return f"// TODO: support KV for {self.field_name} field table type" + return f"// TODO(vsrivatsa): support KV for {self.field_name} field table type" return f'builder->WriteKV("{self.c_field_name}", {self.c_field_name});' def gen_buffer_extract(self): @@ -694,7 +694,7 @@ def gen_process_frame_type(self): AMQPFrameTypes amqp_frame_type = static_cast(req->frame_type); switch (amqp_frame_type) { case AMQPFrameTypes::kFrameHeader: - return ProcessContentHeader(&decoder, req); + return ProcessContentHeader(decoder, req); case AMQPFrameTypes::kFrameBody: { req->msg = ""; auto status = decoder->ExtractBufIgnore(req->payload_size); @@ -707,7 +707,7 @@ def gen_process_frame_type(self): req->msg = ""; break; // Heartbeat frames have no body or length case AMQPFrameTypes::kFrameMethod: - return ProcessFrameMethod(&decoder, req); + return ProcessFrameMethod(decoder, req); default: VLOG(1) << absl::Substitute("Unparsed frame $0", req->frame_type); } diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/decode.h.jinja_template b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/decode.h.jinja_template index 18ccd06bb56..b16d5ad8620 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/decode.h.jinja_template +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/decode.h.jinja_template @@ -33,7 +33,13 @@ namespace amqp { {{ struct_declr }} -// TODO combine with kafka ToString function +template +std::string ToString(T obj) { + utils::JSONObjectBuilder json_object_builder; + obj.ToJSON(&json_object_builder); + return json_object_builder.GetString(); +} +// TODO(vsrivatsa) combine with kafka ToString function Status ProcessPayload(Frame* req, BinaryDecoder* decoder); diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/types_gen.h.jinja_template b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/types_gen.h.jinja_template index b9bdc571aee..a9e041564ff 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/types_gen.h.jinja_template +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator/gen_templates/types_gen.h.jinja_template @@ -52,7 +52,7 @@ enum class AMQPFrameTypes : uint8_t { kFrameBody = 3, kFrameHeartbeat = 8, }; -const char kFrameEnd = 0xCE; +const uint8_t kFrameEnd = 0xCE; const uint8_t kMinFrameLength = 8; // Represents a generic AMQP message. @@ -83,6 +83,11 @@ struct Frame : public FrameBase { uint16_t method_id = 0; size_t ByteSize() const override { return sizeof(Frame) + msg.size(); } + + std::string ToString() const override { + return absl::Substitute("frame_type=[$0] channel=[$1] payload_size=[$2] msg=[$3]", frame_type, + channel, payload_size, msg); + } }; struct Record { @@ -96,14 +101,16 @@ struct Record { std::string px_info = ""; std::string ToString() const { - return absl::Substitute("req=[$0] resp=[$1]", req->ToString(), resp->ToString()); + return absl::Substitute("req=[$0] resp=[$1]", req.ToString(), resp.ToString()); } }; +using channel_id = uint16_t; struct ProtocolTraits : public BaseProtocolTraits { using frame_type = Frame; using record_type = Record; using state_type = NoState; + using key_type = channel_id; }; } // namespace amqp diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.cc b/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.cc index 6f8cd672fd4..a16c2a605ea 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.cc @@ -18,11 +18,7 @@ // Code generated by AMQP protocol generator. DO NOT EDIT. #include "src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h" -#include -#include #include -#include -#include #include "src/common/base/base.h" #include "src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h" @@ -44,6 +40,7 @@ StatusOr ExtractLongString(BinaryDecoder* decoder) { PX_ASSIGN_OR_RETURN(uint32_t len, decoder->ExtractBEInt()); return decoder->ExtractString(len); } + StatusOr ExtractNthBit(BinaryDecoder* decoder, int n) { // Extract Value at Nth bit return decoder->Buf()[0] >> n & 1; @@ -726,7 +723,7 @@ Status ProcessContentHeader(BinaryDecoder* decoder, Frame* req) { return ExtractAMQPTxContentHeader(decoder, req); default: - VLOG(1) << absl::Substitute("Unparsed frame method class $0", class_id); + VLOG(1) << absl::Substitute("Unparsed content header class $0", class_id); } return Status::OK(); } @@ -967,6 +964,7 @@ Status ProcessFrameMethod(BinaryDecoder* decoder, Frame* req) { default: VLOG(1) << absl::Substitute("Unparsed frame method class $0 method $1", class_id, method_id); } + return Status::OK(); } diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h b/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h index 91e59cc1986..fd7c315ce71 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h @@ -19,10 +19,10 @@ #pragma once #include -#include "src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h" #include "src/common/base/base.h" #include "src/common/json/json.h" +#include "src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h" #include "src/stirling/utils/binary_decoder.h" namespace px { @@ -41,7 +41,7 @@ struct AMQPConnectionStart { void ToJSON(utils::JSONObjectBuilder* builder) const { builder->WriteKV("version_major", version_major); builder->WriteKV("version_minor", version_minor); - // TODO(vsrivatsa): support KV for server_properties field table type + // TODO(vsrivatsa): support KV for server-properties field table type builder->WriteKV("mechanisms", mechanisms); builder->WriteKV("locales", locales); } @@ -55,7 +55,7 @@ struct AMQPConnectionStartOk { bool synchronous = 1; void ToJSON(utils::JSONObjectBuilder* builder) const { - // TODO(vsrivatsa): support KV for client_properties field table type + // TODO(vsrivatsa): support KV for client-properties field table type builder->WriteKV("mechanism", mechanism); builder->WriteKV("response", response); builder->WriteKV("locale", locale); @@ -719,6 +719,7 @@ std::string ToString(T obj) { obj.ToJSON(&json_object_builder); return json_object_builder.GetString(); } +// TODO(vsrivatsa) combine with kafka ToString function Status ProcessPayload(Frame* req, BinaryDecoder* decoder); } // namespace amqp diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h b/src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h index b475b80de93..4830ee2299d 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h @@ -17,6 +17,7 @@ */ // Code generated by AMQP protocol generator. DO NOT EDIT. #pragma once + #include #include "src/common/base/base.h" @@ -156,30 +157,26 @@ enum class AMQPFrameTypes : uint8_t { }; const uint8_t kFrameEnd = 0xCE; const uint8_t kMinFrameLength = 8; -constexpr uint8_t kEndByteSize = 1; -const uint8_t kMinFrameWithoutEnd = 7; // Represents a generic AMQP message. struct Frame : public FrameBase { - // Marks end of the frame by hexadecimal value %xCE - - uint8_t frame_type = 0; + uint8_t frame_type; // Communication channel to be used - uint16_t channel = 0; + uint16_t channel; // Defines the length of message upcoming - uint32_t payload_size = 0; + uint32_t payload_size; // Actual body content to be used - std::string msg = ""; + std::string msg; // sync value only known after full body parsing bool synchronous = false; - // `consumed` is used to mark if a request packet has been matched to a + // `consumed` is used to mark if a request frame has been matched to a // response in StitchFrames. This is an optimization to efficiently remove all - // matched packets from the front of the deque. + // matched frames from the front of the deque. bool consumed = false; // if full body parsing already done @@ -197,11 +194,15 @@ struct Frame : public FrameBase { }; struct Record { + // AMQP record can support both sync and async frames. + // async frames have either req/resp set + // sync frames have both req & resp set Frame req; Frame resp; // Debug information. std::string px_info = ""; + std::string ToString() const { return absl::Substitute("req=[$0] resp=[$1]", req.ToString(), resp.ToString()); }