Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Roshan Khatri <[email protected]>
  • Loading branch information
roshkhatri committed Jul 18, 2024
1 parent f0db157 commit 666132f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
17 changes: 9 additions & 8 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2899,15 +2899,15 @@ static uint32_t getPublishMessageLength(clusterMsgDataPublishMessage *cursor) {
return msg_length;
}

int pubsubProcessLightPacket(clusterLink *link, uint16_t type) {
int processLightPacket(clusterLink *link, uint16_t type) {
clusterMsgLight *hdr = (clusterMsgLight *)link->rcvbuf;
robj *channel, *message;
uint64_t data_count;

/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
if ((type == CLUSTERMSG_TYPE_PUBLISH_LIGHT && serverPubsubSubscriptionCount() > 0) ||
(type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT && serverPubsubShardSubscriptionCount() > 0)) {
robj *channel, *message;
uint64_t data_count;
data_count = ntohu64(hdr->data.publish.msg.data_count);
clusterMsgDataPublishMessage *cursor = getFirstPublishMessage(&hdr->data.publish.msg);
channel = readPublishMessageFromCursor(cursor);
Expand Down Expand Up @@ -2986,7 +2986,7 @@ int clusterIsValidPacket(clusterLink *link) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) - 8 + ntohl(hdr->data.publish.msg.channel_len) +
ntohl(hdr->data.publish.msg.message_len);
} else if (IS_PUBSUB_LIGHT_MESSAGE(type)) {
} else if (type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT) {
clusterMsgLight *hdr_pubsub = (clusterMsgLight *)link->rcvbuf;
explen = sizeof(clusterMsgLight) - sizeof(union clusterMsgDataLight);
explen += sizeof(clusterMsgDataPublishMulti);
Expand Down Expand Up @@ -3048,7 +3048,7 @@ int clusterProcessPacket(clusterLink *link) {
uint16_t type = ntohs(hdr->type);
mstime_t now = mstime();

if (IS_PUBSUB_LIGHT_MESSAGE(type)) {
if (IS_LIGHT_MESSAGE(type)) {
if (!link->node || nodeInHandshake(link->node)) {
freeClusterLink(link);
serverLog(
Expand All @@ -3059,7 +3059,7 @@ int clusterProcessPacket(clusterLink *link) {
}
clusterNode *sender = link->node;
sender->data_received = now;
return pubsubProcessLightPacket(link, type);
return processLightPacket(link, type);
}

uint16_t flags = ntohs(hdr->flags);
Expand Down Expand Up @@ -3612,7 +3612,7 @@ static inline int isSigAndMsgLenValid(clusterMsg *hdr) {
if (memcmp(hdr->sig, "RCmb", 4) != 0) return 0;
uint16_t type = ntohs(hdr->type);
uint32_t totlen = ntohl(hdr->totlen);
uint32_t minlen = IS_PUBSUB_LIGHT_MESSAGE(type) ? CLUSTERMSG_LIGHT_MIN_LEN : CLUSTERMSG_MIN_LEN;
uint32_t minlen = IS_LIGHT_MESSAGE(type) ? CLUSTERMSG_LIGHT_MIN_LEN : CLUSTERMSG_MIN_LEN;
if (totlen < minlen) return 0;
return 1;
}
Expand Down Expand Up @@ -3757,9 +3757,10 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {
hdr->type = htons(type);
hdr->totlen = htonl(msglen);

if (IS_PUBSUB_LIGHT_MESSAGE(type)) {
if (IS_LIGHT_MESSAGE(type)) {
clusterMsgLight *hdr_light = (clusterMsgLight *)hdr;
hdr_light->notused1 = 0;
hdr_light->notused2 = 0;
return;
}

Expand Down
5 changes: 3 additions & 2 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ typedef struct clusterNodeFailReport {
#define CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT 12 /* Pub/Sub Publish shard propagation using light header*/
#define CLUSTERMSG_TYPE_COUNT 13 /* Total number of message types. */

#define IS_PUBSUB_LIGHT_MESSAGE(type) \
(type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT)
#define IS_LIGHT_MESSAGE(type) (type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT)

/* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this
Expand Down Expand Up @@ -320,6 +319,7 @@ typedef struct {
uint16_t ver; /* Protocol version, currently set to CLUSTER_PROTO_VER. */
uint16_t notused1;
uint16_t type; /* Message type */
uint16_t notused2;
union clusterMsgDataLight data;
} clusterMsgLight;

Expand All @@ -328,6 +328,7 @@ static_assert(offsetof(clusterMsgLight, totlen) == offsetof(clusterMsg, totlen),
static_assert(offsetof(clusterMsgLight, ver) == offsetof(clusterMsg, ver), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, notused1) == offsetof(clusterMsg, port), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, type) == offsetof(clusterMsg, type), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, notused2) == offsetof(clusterMsg, count), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, data) == 16, "unexpected field offset");

#define CLUSTERMSG_LIGHT_MIN_LEN (sizeof(clusterMsgLight) - sizeof(union clusterMsgDataLight))
Expand Down

0 comments on commit 666132f

Please sign in to comment.