diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index bcde3b68dd..6431ab62ef 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -116,6 +116,7 @@ int auxTcpPortPresent(clusterNode *n); int auxTlsPortSetter(clusterNode *n, void *value, size_t length); sds auxTlsPortGetter(clusterNode *n, sds s); int auxTlsPortPresent(clusterNode *n); +static void clusterBuildMessageHdrLight(clusterMsgLight *hdr, int type, size_t msglen); static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen); void freeClusterLink(clusterLink *link); int verifyClusterNodeId(const char *name, int length); @@ -421,7 +422,7 @@ typedef struct { int refcount; /* Number of cluster link send msg queues containing the message */ union { clusterMsg msg; - clusterMsgLight light_msg; + clusterMsgLight msg_light; }; } clusterMsgSendBlock; @@ -1268,7 +1269,11 @@ static clusterMsgSendBlock *createClusterMsgSendBlock(int type, uint32_t msglen) msgblock->refcount = 1; msgblock->totlen = blocklen; server.stat_cluster_links_memory += blocklen; - clusterBuildMessageHdr(&msgblock->msg, type, msglen); + if IS_LIGHT_MESSAGE (type) { + clusterBuildMessageHdrLight(&msgblock->msg_light, type, msglen); + } else { + clusterBuildMessageHdr(&msgblock->msg, type, msglen); + } return msgblock; } @@ -2895,13 +2900,11 @@ void processLightPacket(clusterLink *link, uint16_t type) { } } -inline int messageTypeSupportsLightHdr(uint16_t type) { +static inline int messageTypeSupportsLightHdr(uint16_t type) { switch (type) { case CLUSTERMSG_TYPE_PUBLISH: return 1; case CLUSTERMSG_TYPE_PUBLISHSHARD: return 1; } - serverLog(LL_NOTICE, "--- Packet of type %s does not support light cluster header", - clusterGetMessageTypeString(type)); return 0; } @@ -2909,11 +2912,13 @@ inline int messageTypeSupportsLightHdr(uint16_t type) { int clusterIsValidPacket(clusterLink *link) { clusterMsg *hdr = (clusterMsg *)link->rcvbuf; uint32_t totlen = ntohl(hdr->totlen); - uint16_t type = ntohs(hdr->type); - int is_light = IS_LIGHT_MESSAGE(type); - if (is_light) { - type &= ~CLUSTERMSG_LIGHT; - serverAssert(messageTypeSupportsLightHdr(type)); + int is_light = IS_LIGHT_MESSAGE(ntohs(hdr->type)); + uint16_t type = ntohs(hdr->type) & ~CLUSTERMSG_MODIFIER_MASK; + + if (is_light && !messageTypeSupportsLightHdr(type)) { + serverLog(LL_NOTICE, "--- Packet of type %s does not support light cluster header", + clusterGetMessageTypeString(type)); + return 0; } if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_received[type]++; @@ -3028,12 +3033,11 @@ int clusterProcessPacket(clusterLink *link) { } clusterMsg *hdr = (clusterMsg *)link->rcvbuf; - uint16_t type = ntohs(hdr->type); mstime_t now = mstime(); - int is_light = IS_LIGHT_MESSAGE(type); + int is_light = IS_LIGHT_MESSAGE(ntohs(hdr->type)); + uint16_t type = ntohs(hdr->type) & ~CLUSTERMSG_MODIFIER_MASK; if (is_light) { - type &= ~CLUSTERMSG_LIGHT; if (!link->node || nodeInHandshake(link->node)) { freeClusterLink(link); serverLog( @@ -3717,24 +3721,21 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) { dictReleaseIterator(di); } -/* Build the message header. hdr must point to a buffer at least - * sizeof(clusterMsg) in bytes. */ -static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) { +static void clusterBuildMessageHdrLight(clusterMsgLight *hdr, int type, size_t msglen) { hdr->ver = htons(CLUSTER_PROTO_VER); hdr->sig[0] = 'R'; hdr->sig[1] = 'C'; hdr->sig[2] = 'm'; hdr->sig[3] = 'b'; hdr->type = htons(type); + hdr->notused1 = 0; + hdr->notused2 = 0; hdr->totlen = htonl(msglen); +} - if (IS_LIGHT_MESSAGE(type)) { - clusterMsgLight *hdr_light = (clusterMsgLight *)hdr; - hdr_light->notused1 = 0; - hdr_light->notused2 = 0; - return; - } - +/* Build the message header. hdr must point to a buffer at least + * sizeof(clusterMsg) in bytes. */ +static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) { uint64_t offset; clusterNode *primary; @@ -3744,6 +3745,12 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) { * in charge for this slots. */ primary = (nodeIsReplica(myself) && myself->replicaof) ? myself->replicaof : myself; + hdr->ver = htons(CLUSTER_PROTO_VER); + hdr->sig[0] = 'R'; + hdr->sig[1] = 'C'; + hdr->sig[2] = 'm'; + hdr->sig[3] = 'b'; + hdr->type = htons(type); memcpy(hdr->sender, myself->name, CLUSTER_NAMELEN); /* If cluster-announce-ip option is enabled, force the receivers of our @@ -3785,6 +3792,8 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) { /* Set the message flags. */ if (clusterNodeIsPrimary(myself) && server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED; + + hdr->totlen = htonl(msglen); } /* Set the i-th entry of the gossip section in the message pointed by 'hdr' @@ -4013,7 +4022,7 @@ clusterMsgSendBlock *clusterCreatePublishMsgBlock(robj *channel, robj *message, clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, msglen); clusterMsgDataPublish *hdr_data_msg; if (is_light) { - clusterMsgLight *hdr_light = &msgblock->light_msg; + clusterMsgLight *hdr_light = &msgblock->msg_light; hdr_data_msg = &hdr_light->data.publish.msg; } else { clusterMsg *hdr = &msgblock->msg; diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 1121091003..c7cc25ba14 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -96,6 +96,8 @@ typedef struct clusterNodeFailReport { #define CLUSTERMSG_LIGHT 0x8000 /* Modifier bit for message types that support light header */ +#define CLUSTERMSG_MODIFIER_MASK (CLUSTERMSG_LIGHT) /* Modifier bit for message types that support light header */ + /* We check for the modifier bit to determine if the message is sent using light header.*/ #define IS_LIGHT_MESSAGE(type) ((type) & CLUSTERMSG_LIGHT)