Skip to content

Commit

Permalink
fix: rename broker_id to node_id for Range and S3WalObject (#240)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Oct 11, 2023
1 parent 9d910cd commit 604ea61
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1284,7 +1284,7 @@ public CompletableFuture<StreamMetadata> openStream(long streamId, long epoch, i
// Create a new range for the stream
Range range = new Range();
range.setStreamId(streamId);
range.setBrokerId(stream.getDstNodeId());
range.setNodeId(stream.getDstNodeId());
range.setStartOffset(startOffset);
range.setEndOffset(startOffset);
range.setEpoch(epoch + 1);
Expand Down Expand Up @@ -1580,7 +1580,7 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,
s3WALObject.setObjectId(objectId);
s3WALObject.setObjectSize(walObject.getObjectSize());
s3WALObject.setBaseDataTimestamp(dataTs);
s3WALObject.setBrokerId(brokerId);
s3WALObject.setNodeId(brokerId);
s3WALObject.setSequenceId(walObject.getSequenceId());
s3WALObject.setSubStreams(gson.toJson(walObject.getSubStreamsMap()));
s3WALObjectMapper.create(s3WALObject);
Expand Down Expand Up @@ -1727,8 +1727,8 @@ public CompletableFuture<Void> commitStreamObject(apache.rocketmq.controller.v1.
public CompletableFuture<List<S3WALObject>> listWALObjects() {
CompletableFuture<List<S3WALObject>> future = new CompletableFuture<>();
try (SqlSession session = this.sessionFactory.openSession()) {
S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class);
List<S3WALObject> walObjects = s3WALObjectMapper.list(this.config.nodeId(), null).stream()
S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class);
List<S3WALObject> walObjects = s3WalObjectMapper.list(this.config.nodeId(), null).stream()
.map(s3WALObject -> {
Map<Long, SubStream> subStreams = gson.fromJson(new String(s3WALObject.getSubStreams().getBytes(StandardCharsets.UTF_8)), new TypeToken<Map<Long, SubStream>>() {

Expand All @@ -1747,8 +1747,8 @@ public CompletableFuture<List<S3WALObject>> listWALObjects(long streamId,
long endOffset, int limit) {
CompletableFuture<List<S3WALObject>> future = new CompletableFuture<>();
try (SqlSession session = this.sessionFactory.openSession()) {
S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class);
List<S3WalObject> s3WalObjects = s3WALObjectMapper.list(this.config.nodeId(), null);
S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class);
List<S3WalObject> s3WalObjects = s3WalObjectMapper.list(this.config.nodeId(), null);

List<S3WALObject> walObjects = s3WalObjects.stream()
.map(s3WALObject -> {
Expand Down Expand Up @@ -1806,7 +1806,7 @@ private S3WALObject buildS3WALObject(
return S3WALObject.newBuilder()
.setObjectId(originalObject.getObjectId())
.setObjectSize(originalObject.getObjectSize())
.setBrokerId(originalObject.getBrokerId())
.setBrokerId(originalObject.getNodeId())
.setSequenceId(originalObject.getSequenceId())
.setBaseDataTimestamp(originalObject.getBaseDataTimestamp())
.setCommittedTimestamp(originalObject.getCommittedTimestamp())
Expand Down Expand Up @@ -1877,7 +1877,7 @@ public CompletableFuture<Pair<List<apache.rocketmq.controller.v1.S3StreamObject>
return CompletableFuture.supplyAsync(() -> {
try (SqlSession session = this.sessionFactory.openSession()) {
S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class);
S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class);
S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class);
List<apache.rocketmq.controller.v1.S3StreamObject> s3StreamObjects = s3StreamObjectMapper.list(null, streamId, startOffset, endOffset, limit)
.parallelStream()
.map(streamObject -> apache.rocketmq.controller.v1.S3StreamObject.newBuilder()
Expand All @@ -1891,18 +1891,18 @@ public CompletableFuture<Pair<List<apache.rocketmq.controller.v1.S3StreamObject>
.build())
.toList();

List<S3WALObject> walObjects = s3WALObjectMapper.list(config.nodeId(), null)
List<S3WALObject> walObjects = s3WalObjectMapper.list(config.nodeId(), null)
.parallelStream()
.map(s3WALObject -> {
.map(s3WalObject -> {
TypeToken<Map<Long, SubStream>> typeToken = new TypeToken<>() {
};
Map<Long, SubStream> subStreams = gson.fromJson(new String(s3WALObject.getSubStreams().getBytes(StandardCharsets.UTF_8)), typeToken.getType());
Map<Long, SubStream> subStreams = gson.fromJson(new String(s3WalObject.getSubStreams().getBytes(StandardCharsets.UTF_8)), typeToken.getType());
Map<Long, SubStream> streamsRecords = new HashMap<>();
subStreams.entrySet().stream()
.filter(entry -> !Objects.isNull(entry) && entry.getKey().equals(streamId))
.filter(entry -> entry.getValue().getStartOffset() <= endOffset && entry.getValue().getEndOffset() > startOffset)
.forEach(entry -> streamsRecords.put(entry.getKey(), entry.getValue()));
return streamsRecords.isEmpty() ? null : buildS3WALObject(s3WALObject, streamsRecords);
return streamsRecords.isEmpty() ? null : buildS3WALObject(s3WalObject, streamsRecords);
})
.filter(Objects::nonNull)
.limit(limit - s3StreamObjects.size())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class Range {

Long endOffset;

Integer brokerId;
Integer nodeId;

public Long getId() {
return id;
Expand Down Expand Up @@ -83,24 +83,24 @@ public void setEndOffset(Long endOffset) {
this.endOffset = endOffset;
}

public Integer getBrokerId() {
return brokerId;
public Integer getNodeId() {
return nodeId;
}

public void setBrokerId(Integer brokerId) {
this.brokerId = brokerId;
public void setNodeId(Integer nodeId) {
this.nodeId = nodeId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Range range = (Range) o;
return Objects.equals(id, range.id) && Objects.equals(rangeId, range.rangeId) && Objects.equals(streamId, range.streamId) && Objects.equals(epoch, range.epoch) && Objects.equals(startOffset, range.startOffset) && Objects.equals(endOffset, range.endOffset) && Objects.equals(brokerId, range.brokerId);
return Objects.equals(id, range.id) && Objects.equals(rangeId, range.rangeId) && Objects.equals(streamId, range.streamId) && Objects.equals(epoch, range.epoch) && Objects.equals(startOffset, range.startOffset) && Objects.equals(endOffset, range.endOffset) && Objects.equals(nodeId, range.nodeId);
}

@Override
public int hashCode() {
return Objects.hash(id, rangeId, streamId, epoch, startOffset, endOffset, brokerId);
return Objects.hash(id, rangeId, streamId, epoch, startOffset, endOffset, nodeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,49 +22,49 @@

public class S3WalObject {

long objectId;
Long objectId;

int brokerId;
Integer nodeId;

long objectSize;
Long objectSize;

long sequenceId;
Long sequenceId;

String subStreams;

long baseDataTimestamp;

long committedTimestamp;

public long getObjectId() {
public Long getObjectId() {
return objectId;
}

public void setObjectId(long objectId) {
public void setObjectId(Long objectId) {
this.objectId = objectId;
}

public long getObjectSize() {
return objectSize;
public Integer getNodeId() {
return nodeId;
}

public void setObjectSize(long objectSize) {
this.objectSize = objectSize;
public void setNodeId(Integer nodeId) {
this.nodeId = nodeId;
}

public int getBrokerId() {
return brokerId;
public Long getObjectSize() {
return objectSize;
}

public void setBrokerId(int brokerId) {
this.brokerId = brokerId;
public void setObjectSize(Long objectSize) {
this.objectSize = objectSize;
}

public long getSequenceId() {
public Long getSequenceId() {
return sequenceId;
}

public void setSequenceId(long sequenceId) {
public void setSequenceId(Long sequenceId) {
this.sequenceId = sequenceId;
}

Expand Down Expand Up @@ -94,14 +94,19 @@ public void setCommittedTimestamp(long committedTimestamp) {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
S3WalObject that = (S3WalObject) o;
return objectId == that.objectId && brokerId == that.brokerId && objectSize == that.objectSize && sequenceId == that.sequenceId && baseDataTimestamp == that.baseDataTimestamp && committedTimestamp == that.committedTimestamp && Objects.equals(subStreams, that.subStreams);
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
S3WalObject object = (S3WalObject) o;
return baseDataTimestamp == object.baseDataTimestamp && committedTimestamp == object.committedTimestamp
&& objectId.equals(object.objectId) && nodeId.equals(object.nodeId)
&& objectSize.equals(object.objectSize) && sequenceId.equals(object.sequenceId)
&& Objects.equals(subStreams, object.subStreams);
}

@Override
public int hashCode() {
return Objects.hash(objectId, brokerId, objectSize, sequenceId, subStreams, baseDataTimestamp, committedTimestamp);
return Objects.hash(objectId, nodeId, objectSize, sequenceId, subStreams, baseDataTimestamp, committedTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,24 @@ public interface RangeMapper {
*/
int create(Range range);

Range getById(long id);

List<Range> listByRangeId(int rangeId);
/**
*
* @param id Primary key
* @return Range instance
*/
Range getById(@Param("id") long id);

List<Range> listByStreamId(long streamId);
List<Range> listByStreamId(@Param("streamId") long streamId);

void delete(@Param("rangeId") Integer rangeId, @Param("streamId") Long streamId);

List<Range> listByBrokerId(int brokerId);
List<Range> listByNodeId(@Param("nodeId") int nodeId);

Range get(@Param("rangeId") Integer rangeId,
@Param("streamId") Long streamId,
@Param("brokerId") Integer brokerId);
@Param("nodeId") Integer nodeId);

List<Range> list(@Param("brokerId") Integer brokerId,
List<Range> list(@Param("nodeId") Integer nodeId,
@Param("streamId") Long streamId,
@Param("offset") Long offset);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ public interface S3WalObjectMapper {
S3WalObject getByObjectId(long objectId);

int delete(@Param("objectId") Long objectId,
@Param("brokerId") Integer brokerId,
@Param("nodeId") Integer nodeId,
@Param("sequenceId") Long sequenceId);

List<S3WalObject> list(@Param("brokerId") Integer brokerId,
@Param("sequenceId") Long sequenceId);
List<S3WalObject> list(@Param("nodeId") Integer nodeId, @Param("sequenceId") Long sequenceId);

int commit(S3WalObject s3WALObject);
}
38 changes: 16 additions & 22 deletions controller/src/main/resources/database/mapper/RangeMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
<mapper namespace="com.automq.rocketmq.controller.metadata.database.mapper.RangeMapper">

<insert id="create" parameterType="Range" useGeneratedKeys="true" keyProperty="id" keyColumn="id">
INSERT INTO `range` (range_id, stream_id, epoch, start_offset, end_offset, broker_id)
INSERT INTO `range` (range_id, stream_id, epoch, start_offset, end_offset, node_id)
VALUES (
#{rangeId},
#{streamId},
#{epoch},
#{startOffset},
#{endOffset},
#{brokerId}
#{nodeId}
)
</insert>

Expand All @@ -47,8 +47,8 @@
<if test="null != endOffset">
end_offset = #{endOffset},
</if>
<if test="null != brokerId">
broker_id = #{brokerId},
<if test="null != nodeId">
node_id = #{nodeId},
</if>
</set>
WHERE 1 = 1
Expand All @@ -60,28 +60,22 @@
</if>
</update>

<select id="listByRangeId" resultType="Range">
SELECT id, range_id, stream_id, epoch, start_offset, end_offset, broker_id
FROM `range`
WHERE range_id = #{rangeId}
</select>

<select id="getById" resultType="Range" parameterType="long">
SELECT id, range_id, stream_id, epoch, start_offset, end_offset, broker_id
<select id="getById" resultType="Range">
SELECT id, range_id, stream_id, epoch, start_offset, end_offset, node_id
FROM `range`
WHERE id = #{id}
</select>

<select id="get" resultType="Range">
SELECT id, range_id, stream_id, epoch, start_offset, end_offset, broker_id
SELECT id, range_id, stream_id, epoch, start_offset, end_offset, node_id
FROM `range`
WHERE 1 = 1
<if test="null != streamId and null != rangeId">
AND stream_id = #{streamId}
AND range_id = #{rangeId}
</if>
<if test="null != brokerId">
AND broker_id = #{brokerId}
<if test="null != nodeId">
AND node_id = #{nodeId}
</if>
</select>

Expand All @@ -97,23 +91,23 @@
</delete>

<select id="listByStreamId" resultType="Range">
SELECT id, range_id, stream_id, epoch, start_offset, end_offset, broker_id
SELECT id, range_id, stream_id, epoch, start_offset, end_offset, node_id
FROM `range`
WHERE stream_id = #{streamId}
</select>

<select id="listByBrokerId" resultType="Range">
SELECT id, range_id, stream_id, epoch, start_offset, end_offset, broker_id
<select id="listByNodeId" resultType="Range">
SELECT id, range_id, stream_id, epoch, start_offset, end_offset, node_id
FROM `range`
WHERE broker_id = #{brokerId}
WHERE node_id = #{nodeId}
</select>

<select id="list" resultType="Range">
SELECT id, range_id, stream_id, epoch, start_offset, end_offset, broker_id
SELECT id, range_id, stream_id, epoch, start_offset, end_offset, node_id
FROM `range`
WHERE 1 = 1
<if test="null != brokerId">
AND broker_id = #{brokerId}
<if test="null != nodeId">
AND node_id = #{nodeId}
</if>
<if test="null != streamId">
AND stream_id = #{streamId}
Expand Down
Loading

0 comments on commit 604ea61

Please sign in to comment.