Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address compilation issues with auto generated amqp files #1820

Merged
merged 4 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ Bazel cmds:
```
wget "https://www.rabbitmq.com/resources/specs/amqp0-9-1.xml"
bazel run //src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator:amqp_code_gen_main -- run
cp generated_files/decode.cc ../
cp generated_files/decode.h ../
cp generated_files/types_gen.h ../
cp generated_files/{decode.h,decode.cc,types_gen.h} ../
cp generated_files/amqp.h src/carnot/funcs/protocols/amqp.h
arc lint
```


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def gen_json_builder(self):
The field name is used as json key to be close to the key used in spec
"""
if self.field_type == FieldType.table:
return f"// TODO: support KV for {self.field_name} field table type"
return f"// TODO(vsrivatsa): support KV for {self.field_name} field table type"
return f'builder->WriteKV("{self.c_field_name}", {self.c_field_name});'

def gen_buffer_extract(self):
Expand Down Expand Up @@ -694,7 +694,7 @@ def gen_process_frame_type(self):
AMQPFrameTypes amqp_frame_type = static_cast<AMQPFrameTypes>(req->frame_type);
switch (amqp_frame_type) {
case AMQPFrameTypes::kFrameHeader:
return ProcessContentHeader(&decoder, req);
return ProcessContentHeader(decoder, req);
case AMQPFrameTypes::kFrameBody: {
req->msg = "";
auto status = decoder->ExtractBufIgnore(req->payload_size);
Expand All @@ -707,7 +707,7 @@ def gen_process_frame_type(self):
req->msg = "";
break; // Heartbeat frames have no body or length
case AMQPFrameTypes::kFrameMethod:
return ProcessFrameMethod(&decoder, req);
return ProcessFrameMethod(decoder, req);
default:
VLOG(1) << absl::Substitute("Unparsed frame $0", req->frame_type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ namespace amqp {

{{ struct_declr }}

// TODO combine with kafka ToString function
template <typename T>
std::string ToString(T obj) {
utils::JSONObjectBuilder json_object_builder;
obj.ToJSON(&json_object_builder);
return json_object_builder.GetString();
}
// TODO(vsrivatsa) combine with kafka ToString function
Status ProcessPayload(Frame* req, BinaryDecoder* decoder);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ enum class AMQPFrameTypes : uint8_t {
kFrameBody = 3,
kFrameHeartbeat = 8,
};
const char kFrameEnd = 0xCE;
const uint8_t kFrameEnd = 0xCE;
const uint8_t kMinFrameLength = 8;

// Represents a generic AMQP message.
Expand Down Expand Up @@ -83,6 +83,11 @@ struct Frame : public FrameBase {
uint16_t method_id = 0;

size_t ByteSize() const override { return sizeof(Frame) + msg.size(); }

std::string ToString() const override {
return absl::Substitute("frame_type=[$0] channel=[$1] payload_size=[$2] msg=[$3]", frame_type,
channel, payload_size, msg);
}
};

struct Record {
Expand All @@ -96,14 +101,16 @@ struct Record {
std::string px_info = "";

std::string ToString() const {
return absl::Substitute("req=[$0] resp=[$1]", req->ToString(), resp->ToString());
return absl::Substitute("req=[$0] resp=[$1]", req.ToString(), resp.ToString());
}
};

using channel_id = uint16_t;
struct ProtocolTraits : public BaseProtocolTraits<Record> {
using frame_type = Frame;
using record_type = Record;
using state_type = NoState;
using key_type = channel_id;
};

} // namespace amqp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
// Code generated by AMQP protocol generator. DO NOT EDIT.
#include "src/stirling/source_connectors/socket_tracer/protocols/amqp/decode.h"

#include <map>
#include <stack>
#include <string>
#include <utility>
#include <vector>

#include "src/common/base/base.h"
#include "src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h"
Expand All @@ -44,6 +40,7 @@ StatusOr<std::string> ExtractLongString(BinaryDecoder* decoder) {
PX_ASSIGN_OR_RETURN(uint32_t len, decoder->ExtractBEInt<uint32_t>());
return decoder->ExtractString(len);
}

StatusOr<bool> ExtractNthBit(BinaryDecoder* decoder, int n) {
// Extract Value at Nth bit
return decoder->Buf()[0] >> n & 1;
Expand Down Expand Up @@ -726,7 +723,7 @@ Status ProcessContentHeader(BinaryDecoder* decoder, Frame* req) {
return ExtractAMQPTxContentHeader(decoder, req);

default:
VLOG(1) << absl::Substitute("Unparsed frame method class $0", class_id);
VLOG(1) << absl::Substitute("Unparsed content header class $0", class_id);
}
return Status::OK();
}
Expand Down Expand Up @@ -967,6 +964,7 @@ Status ProcessFrameMethod(BinaryDecoder* decoder, Frame* req) {
default:
VLOG(1) << absl::Substitute("Unparsed frame method class $0 method $1", class_id, method_id);
}

return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
#pragma once

#include <string>
#include "src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h"

#include "src/common/base/base.h"
#include "src/common/json/json.h"
#include "src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h"
#include "src/stirling/utils/binary_decoder.h"

namespace px {
Expand All @@ -41,7 +41,7 @@ struct AMQPConnectionStart {
void ToJSON(utils::JSONObjectBuilder* builder) const {
builder->WriteKV("version_major", version_major);
builder->WriteKV("version_minor", version_minor);
// TODO(vsrivatsa): support KV for server_properties field table type
// TODO(vsrivatsa): support KV for server-properties field table type
builder->WriteKV("mechanisms", mechanisms);
builder->WriteKV("locales", locales);
}
Expand All @@ -55,7 +55,7 @@ struct AMQPConnectionStartOk {
bool synchronous = 1;

void ToJSON(utils::JSONObjectBuilder* builder) const {
// TODO(vsrivatsa): support KV for client_properties field table type
// TODO(vsrivatsa): support KV for client-properties field table type
builder->WriteKV("mechanism", mechanism);
builder->WriteKV("response", response);
builder->WriteKV("locale", locale);
Expand Down Expand Up @@ -719,6 +719,7 @@ std::string ToString(T obj) {
obj.ToJSON(&json_object_builder);
return json_object_builder.GetString();
}
// TODO(vsrivatsa) combine with kafka ToString function
Status ProcessPayload(Frame* req, BinaryDecoder* decoder);

} // namespace amqp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
// Code generated by AMQP protocol generator. DO NOT EDIT.
#pragma once

#include <string>

#include "src/common/base/base.h"
Expand Down Expand Up @@ -156,30 +157,26 @@ enum class AMQPFrameTypes : uint8_t {
};
const uint8_t kFrameEnd = 0xCE;
const uint8_t kMinFrameLength = 8;
constexpr uint8_t kEndByteSize = 1;
const uint8_t kMinFrameWithoutEnd = 7;

// Represents a generic AMQP message.
struct Frame : public FrameBase {
// Marks end of the frame by hexadecimal value %xCE

uint8_t frame_type = 0;
uint8_t frame_type;

// Communication channel to be used
uint16_t channel = 0;
uint16_t channel;

// Defines the length of message upcoming
uint32_t payload_size = 0;
uint32_t payload_size;

// Actual body content to be used
std::string msg = "";
std::string msg;

// sync value only known after full body parsing
bool synchronous = false;

// `consumed` is used to mark if a request packet has been matched to a
// `consumed` is used to mark if a request frame has been matched to a
// response in StitchFrames. This is an optimization to efficiently remove all
// matched packets from the front of the deque.
// matched frames from the front of the deque.
bool consumed = false;

// if full body parsing already done
Expand All @@ -197,11 +194,15 @@ struct Frame : public FrameBase {
};

struct Record {
// AMQP record can support both sync and async frames.
// async frames have either req/resp set
// sync frames have both req & resp set
Frame req;
Frame resp;

// Debug information.
std::string px_info = "";

std::string ToString() const {
return absl::Substitute("req=[$0] resp=[$1]", req.ToString(), resp.ToString());
}
Expand Down
Loading