Skip to content

Commit

Permalink
Merge branch 'kv-engine' into 'master'
Browse files Browse the repository at this point in the history
Kv engine and handle the message which timeout too much to avoid blocking too long

See merge request paas/nsqserver!43
  • Loading branch information
absolute8511 committed May 19, 2021
2 parents fe5fdc8 + 988a9c9 commit 11665b4
Show file tree
Hide file tree
Showing 49 changed files with 5,077 additions and 243 deletions.
2 changes: 1 addition & 1 deletion apps/nsq_data_tool/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func checkTopicStats() {
log.Printf("error: %v", err.Error())
return
}
producers, _, err := ci.GetNSQDStats(srcnodes, "", "", true)
producers, _, err := ci.GetNSQDStatsWithClients(srcnodes, "", "", true)
if err != nil {
log.Printf("error: %v", err.Error())
return
Expand Down
5 changes: 5 additions & 0 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Int("queue-write-buffer-size", int(opts.QueueWriteBufferSize), "the write buffer size for topic disk queue file")
flagSet.Int("pub-queue-size", int(opts.PubQueueSize), "the pub queue size for topic")
flagSet.Int("sleepms-between-log-sync-pull", int(opts.SleepMsBetweenLogSyncPull), "the sleep ms between each log sync pull")

flagSet.Bool("kv-enabled", opts.KVEnabled, "enable the kv topic")
flagSet.Int("kv-block-cache", int(opts.KVBlockCache), "kv engine block cache")
flagSet.Int("kv-write-buffer-size", int(opts.KVWriteBufferSize), "kv engine write buffer size")
flagSet.Int("kv-max-write-buffer-number", int(opts.KVMaxWriteBufferNumber), "kv max write buffer number")
return flagSet
}

Expand Down
2 changes: 1 addition & 1 deletion bench/multi_bench/multi_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"time"

"github.com/absolute8511/glog"
"github.com/spaolacci/murmur3"
"github.com/twmb/murmur3"
"github.com/youzan/go-nsq"
"github.com/youzan/nsq/internal/app"
"github.com/youzan/nsq/internal/clusterinfo"
Expand Down
36 changes: 36 additions & 0 deletions consistence/commitlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ type CommitLogData struct {
MsgNum int32
}

func (cl *CommitLogData) dqSeekCnt() int64 {
if cl.MsgCnt > 0 {
return cl.MsgCnt - 1
}
return 0
}

func GetLogDataSize() int {
return logDataSize
}
Expand Down Expand Up @@ -758,6 +765,35 @@ func (self *TopicCommitLogMgr) GetCurrentEnd() (int64, int64) {
return self.currentStart, int64(self.currentCount) * int64(GetLogDataSize())
}

func (self *TopicCommitLogMgr) GetMaxAvailableCleanOffset(fileIndex int64, fileOffset int64) (int64, error) {
self.Lock()
defer self.Unlock()
if fileIndex == self.currentStart {
if fileOffset/int64(GetLogDataSize())+int64(MIN_KEEP_LOG_ITEM) >= int64(self.currentCount) {
if int64(self.currentCount) <= int64(MIN_KEEP_LOG_ITEM) {
return fileOffset, ErrCommitLogCleanKeepMin
} else {
return (int64(self.currentCount-1) - int64(MIN_KEEP_LOG_ITEM)) * int64(GetLogDataSize()), nil
}
}
} else if fileIndex == self.currentStart-1 {
fName := getSegmentFilename(self.path, fileIndex)
stat, err := os.Stat(fName)
if err != nil {
return fileOffset, err
}
leftSize := stat.Size() - fileOffset
if leftSize < 0 {
return fileOffset, ErrCommitLogOffsetInvalid
}
if leftSize/int64(GetLogDataSize())+int64(self.currentCount) <= int64(MIN_KEEP_LOG_ITEM) {
noffset := stat.Size() - (int64(MIN_KEEP_LOG_ITEM)-int64(self.currentCount)+1)*int64(GetLogDataSize())
return noffset, nil
}
}
return fileOffset, nil
}

func (self *TopicCommitLogMgr) prepareCleanOldData(fileIndex int64, fileOffset int64) (LogStartInfo, int64, error) {
self.Lock()
defer self.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion consistence/data_placement_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"sync/atomic"
"time"

"github.com/spaolacci/murmur3"
"github.com/twmb/murmur3"

"github.com/youzan/nsq/nsqd"
)
Expand Down
4 changes: 2 additions & 2 deletions consistence/etcd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type EtcdClient struct {
var etcdTransport client.CancelableTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 15 * time.Second,
Timeout: 10 * time.Second,
KeepAlive: 10 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
WriteBufferSize: 1024,
Expand Down
2 changes: 1 addition & 1 deletion consistence/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type NsqLookupdNodeInfo struct {
Epoch EpochType
}

func (self *NsqLookupdNodeInfo) GetID() string {
func (self NsqLookupdNodeInfo) GetID() string {
return self.ID
}

Expand Down
Loading

0 comments on commit 11665b4

Please sign in to comment.