Skip to content

Commit

Permalink
Refactor cacheable cursor create time
Browse files Browse the repository at this point in the history
  • Loading branch information
ftkg committed Nov 28, 2022
1 parent f0ead1c commit 26e1b5f
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions server/core_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
userIDOne := stream.Subject.String()
userIDTwo := stream.Subcontext.String()
messages := make([]*api.ChannelMessage, 0, limit)
createTimeNano := make(map[string]int64)
var nextCursor, prevCursor *channelMessageListCursor

var dbID string
Expand All @@ -148,19 +147,23 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
var dbContent string
var dbCreateTime pgtype.Timestamptz
var dbUpdateTime pgtype.Timestamptz
var dbFirstCreateTimeNano int64
var dbLastCreateTimeNano int64
for rows.Next() {
if len(messages) >= limit {
dbLastCreateTimeNano = dbCreateTime.Time.UnixNano()
nextCursor = &channelMessageListCursor{
StreamMode: stream.Mode,
StreamSubject: stream.Subject.String(),
StreamSubcontext: stream.Subcontext.String(),
StreamLabel: stream.Label,
CreateTime: dbCreateTime.Time.UnixNano(),
CreateTime: dbLastCreateTimeNano,
Id: dbID,
Forward: forward,
IsNext: true,
}
break

}

err = rows.Scan(&dbID, &dbCode, &dbSenderID, &dbUsername, &dbContent, &dbCreateTime, &dbUpdateTime)
Expand All @@ -181,7 +184,6 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
UpdateTime: &timestamppb.Timestamp{Seconds: dbUpdateTime.Time.Unix()},
Persistent: &wrapperspb.BoolValue{Value: true},
}
createTimeNano[dbID] = dbCreateTime.Time.UnixNano()
switch stream.Mode {
case StreamModeChannel:
message.RoomName = stream.Label
Expand All @@ -194,14 +196,17 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:

messages = append(messages, message)

if dbFirstCreateTimeNano == 0 {
dbFirstCreateTimeNano = dbCreateTime.Time.UnixNano()
}
// There can only be a previous page if this is a paginated listing.
if incomingCursor != nil && prevCursor == nil {
prevCursor = &channelMessageListCursor{
StreamMode: stream.Mode,
StreamSubject: stream.Subject.String(),
StreamSubcontext: stream.Subcontext.String(),
StreamLabel: stream.Label,
CreateTime: dbCreateTime.Time.UnixNano(),
CreateTime: dbFirstCreateTimeNano,
Id: dbID,
Forward: forward,
IsNext: false,
Expand All @@ -210,6 +215,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
}
_ = rows.Close()

flipped := false
if incomingCursor != nil && !incomingCursor.IsNext {
// If this was a previous page listing, flip the results to their normal order and swap the cursors.
nextCursor, prevCursor = prevCursor, nextCursor
Expand All @@ -223,17 +229,22 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
for i, j := 0, len(messages)-1; i < j; i, j = i+1, j-1 {
messages[i], messages[j] = messages[j], messages[i]
}
flipped = true
}

var cacheableCursor *channelMessageListCursor
if l := len(messages); l > 0 {
createTime := dbLastCreateTimeNano
if flipped {
createTime = dbFirstCreateTimeNano
}
// There is at least 1 message returned by the listing, so use it as the foundation of a new cacheable cursor.
cacheableCursor = &channelMessageListCursor{
StreamMode: stream.Mode,
StreamSubject: stream.Subject.String(),
StreamSubcontext: stream.Subcontext.String(),
StreamLabel: stream.Label,
CreateTime: createTimeNano[messages[l-1].MessageId],
CreateTime: createTime,
Id: messages[l-1].MessageId,
Forward: true,
IsNext: true,
Expand Down Expand Up @@ -348,7 +359,6 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
if start > 0 {
// Check if there might be a prev cursor
setPrevCursor = true
firstRecords = firstRecords[:len(firstRecords)-1]
}

records = records[start:end]
Expand Down

0 comments on commit 26e1b5f

Please sign in to comment.