Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: 减小io压力 #50

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bkunifylogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ bkunifylogbeat.buffertimeout: 2
bkunifylogbeat.eventdataid: -1
bkunifylogbeat.max_cpu_limit: -1
bkunifylogbeat.cpu_check_times: 10
bkunifylogbeat.registry.operation_log_path: /var/lib/gse/operation.log

bkunifylogbeat.multi_config:
- path: "/usr/local/gse/plugins/etc/bkunifylogbeat"
file_pattern: "*.conf"
Expand Down
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ type SecConfigItem struct {

// 采集状态
type Registry struct {
FlushTimeout time.Duration `config:"flush"`
GcFrequency time.Duration `config:"gc_frequency"`
FlushTimeout time.Duration `config:"flush"`
GcFrequency time.Duration `config:"gc_frequency"`
OperationLogPath string `config:"operation_log_path"`
}

//默认配置
Expand Down
124 changes: 110 additions & 14 deletions registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package registrar

import (
"bufio"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -50,18 +51,27 @@ var (
registrarFlushed = bkmonitoring.NewInt("registrar_flushed")
registrarMarshalError = bkmonitoring.NewInt("registrar_marshal_error")
registrarFiles = bkmonitoring.NewInt("registrar_files", monitoring.Gauge)
operationLogFileObj *os.File
operationLogWriter *bufio.Writer
)

type operation struct {
State file.State
Time time.Time
}

// Registrar: 采集进度管理
type Registrar struct {
Channel chan []file.State
done chan struct{}
wg sync.WaitGroup

states *file.States // Map with all file paths inside and the corresponding state
gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write
flushTimeout time.Duration
gcFrequency time.Duration
Channel chan []file.State
done chan struct{}
wg sync.WaitGroup
operationLogWG sync.WaitGroup

states *file.States // Map with all file paths inside and the corresponding state
gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write
flushTimeout time.Duration
gcFrequency time.Duration
operationLogPath string
}

// New creates a new Registrar instance, updating the registry file on
Expand All @@ -71,17 +81,25 @@ func New(config cfg.Registry) (*Registrar, error) {
done: make(chan struct{}),
wg: sync.WaitGroup{},

states: file.NewStates(),
Channel: make(chan []file.State, 1),
flushTimeout: config.FlushTimeout,
gcFrequency: config.GcFrequency,
states: file.NewStates(),
Channel: make(chan []file.State, 1),
flushTimeout: config.FlushTimeout,
gcFrequency: config.GcFrequency,
operationLogPath: config.OperationLogPath,
}
return r, r.Init()
}

// Init: 采集器启动时调用,同时对原采集器采集进度迁移
func (r *Registrar) Init() error {
var states []file.State
var err error

operationLogFileObj, err = os.OpenFile(r.operationLogPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return fmt.Errorf("open opeartion Log file returned error: %v. Continuing", err)
}
operationLogWriter = bufio.NewWriter(operationLogFileObj)

// get time
str, err := bkStorage.Get(timeKey)
Expand Down Expand Up @@ -112,11 +130,14 @@ func (r *Registrar) Init() error {
logp.L.Errorf("json unmarshal error, %s", str)
return fmt.Errorf("error decoding states: %s", err)
}

states = r.migrate(states)
logp.L.Infof("load states: time=>%s, count=>%d, flush=>%s, gcFrequency=>%s",
t, len(states), r.flushTimeout, r.gcFrequency)

r.states.SetStates(ResetStates(states))
operations := r.loadOperation()
r.migrateOperation(operations)
return nil
}

Expand Down Expand Up @@ -157,10 +178,13 @@ func (r *Registrar) run() {
// Writes registry on shutdown
flushTicker := time.NewTicker(r.flushTimeout)
gcTicker := time.NewTicker(r.gcFrequency)
operationLogSizeTimer := time.NewTicker(1 * time.Second)

defer func() {
flushTicker.Stop()
gcTicker.Stop()
operationLogSizeTimer.Stop()
operationLogFileObj.Close()
r.flushRegistry()
r.wg.Done()
}()
Expand All @@ -170,6 +194,8 @@ func (r *Registrar) run() {
case <-r.done:
logp.L.Info("Ending Registrar")
return
case <-operationLogSizeTimer.C:
r.operationLogFileSizeHandler()
case <-flushTicker.C:
r.flushRegistry()
case <-gcTicker.C:
Expand All @@ -187,8 +213,11 @@ func (r *Registrar) onEvents(states []file.State) {
ts := time.Now()
for _, s := range states {
if s.Type == wineventlog.WinLogFileStateType {
r.states.UpdateWithTs(s, s.Timestamp)
stateTS := s.Timestamp
r.logOperation(s, stateTS)
r.states.UpdateWithTs(s, stateTS)
} else {
r.logOperation(s, ts)
r.states.UpdateWithTs(s, ts)
}
}
Expand All @@ -197,7 +226,7 @@ func (r *Registrar) onEvents(states []file.State) {
// flushRegistry writes the registry to disk.
func (r *Registrar) flushRegistry() {
registrarFlushed.Add(1)

r.operationLogWG.Add(1)
// First clean up states
r.gcStates()
states := r.GetStates()
Expand All @@ -213,6 +242,10 @@ func (r *Registrar) flushRegistry() {

bkStorage.Set(registrarKey, string(bytes), 0)
bkStorage.Set(timeKey, time.Now().Format(time.UnixDate), 0)

operationLogFileObj.Truncate(0)
operationLogFileObj.Seek(0, 0)
r.operationLogWG.Done()
}

// migrate file state
Expand Down Expand Up @@ -256,6 +289,7 @@ func (r *Registrar) gcStates() {
for _, state := range states {
if state.TTL == stateNotManage {
state.TTL = stateNanosecond
r.logOperation(state, time.Now())
r.states.Update(state)
}
}
Expand All @@ -269,3 +303,65 @@ func (r *Registrar) gcStates() {

r.gcRequired = false
}

func (r *Registrar) logOperation(s file.State, ts time.Time) {
r.operationLogWG.Wait()
operationLog, err := json.Marshal(operation{
Time: ts,
State: s,
})

if err != nil {
logp.L.Errorf("Marshal operationLog returned error: %v. Continuing...", err)
}

operationLogWriter.WriteString(string(operationLog))
operationLogWriter.WriteString("\n")
operationLogWriter.Flush()
}

func (r Registrar) loadOperation() []operation {
operationLogFileObj.Seek(0, 0)
scanner := bufio.NewScanner(operationLogFileObj)
operations := make([]operation, 0)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
var operationItem = operation{}
err := json.Unmarshal([]byte(line), &operationItem)
logp.L.Errorf("Unmarshal operationLog returned error: %v. Continuing...", err)
operations = append(operations, operationItem)

}
return operations
}

func (r *Registrar) migrateOperation(operations []operation) {
for _, operation := range operations {
s := operation.State
if s.IsEmpty() {
continue
}
if s.Type == wineventlog.WinLogFileStateType {
r.states.UpdateWithTs(s, s.Timestamp)
} else {
r.states.UpdateWithTs(s, operation.Time)
}
}
r.gcRequired = true
r.gcStates()
operationLogFileObj.Truncate(0)
operationLogFileObj.Seek(0, 0)
}

func (r *Registrar) operationLogFileSizeHandler() {
fileStat, err := os.Stat(r.operationLogPath)
if err != nil {
logp.L.Errorf("Get operation Log Stat returned error: %v. Continuing...", err)
}
if fileStat.Size() == 10*1024*1024 {
r.flushRegistry()
}
}
94 changes: 92 additions & 2 deletions registrar/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package registrar

import (
"encoding/json"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -57,15 +58,30 @@ func TestRegistrar(t *testing.T) {
}
}

testOperationLogPath, err := filepath.Abs("../tests/operation.log")
if err != nil {
panic(err)
}
_, err = os.Stat(testOperationLogPath)
if err != nil {
if os.IsExist(err) {
err = os.Remove(testOperationLogPath)
if err != nil {
panic(err)
}
}
}

//Step 2: 初始化registrar
err = bkStorage.Init(testRegPath, nil)
if err != nil {
panic(err)
}

registrar, err := New(cfg.Registry{
FlushTimeout: 1 * time.Second,
GcFrequency: 1 * time.Second,
FlushTimeout: 1 * time.Second,
GcFrequency: 1 * time.Second,
OperationLogPath: testOperationLogPath,
})
if err != nil {
panic(err)
Expand Down Expand Up @@ -96,3 +112,77 @@ func TestRegistrar(t *testing.T) {
bkStorage.Close()
os.Remove(testRegPath)
}

func TestRegistrarIO(t *testing.T) {
testRegPath, err := filepath.Abs("../tests/registrar.bkpipe.db")

if err != nil {
panic(err)
}
// Step 1: 如果文件存在则直接删除
_, err = os.Stat(testRegPath)
if err != nil {
if os.IsExist(err) {
err = os.Remove(testRegPath)
if err != nil {
panic(err)
}
}
}

testOperationLogPath, err := filepath.Abs("../tests/operation.log")
if err != nil {
panic(err)
}
_, err = os.Stat(testOperationLogPath)
if err != nil {
if os.IsExist(err) {
err = os.Remove(testOperationLogPath)
if err != nil {
panic(err)
}
}
}

//Step 2: 初始化registrar
err = bkStorage.Init(testRegPath, nil)
bkStorage.Set(timeKey, time.Now().Format(time.UnixDate), 0)

str, _ := json.Marshal(make([]file.State, 0))
bkStorage.Set(registrarKey, string(str), 0)
if err != nil {
panic(err)
}

registrar, err := New(cfg.Registry{
FlushTimeout: 1 * time.Minute,
GcFrequency: 1 * time.Minute,
OperationLogPath: testOperationLogPath,
})
if err != nil {
panic(err)
}
err = registrar.Init()
if err != nil {
panic(err)
}
registrar.Start()
source := "/data/logs/test.log"
for i := 0; i < 5; i++ {
states := make([]file.State, 0)
for j := 0; j < 10; j++ {
//Step 3: 写入事件
data := tests.MockLogEvent(source, "test")
states = append(states, data.GetState())
}
registrar.Channel <- states
}
time.Sleep(10 * time.Second)
Operations := registrar.loadOperation()
assert.Equal(t, len(Operations), 50)
//Step 5: 关闭并删除文件
registrar.Stop()
bkStorage.Close()
os.Remove(testRegPath)
os.Remove(testOperationLogPath)
}