From d3a3efed39e497327dedeabbafb7f4d6976cc4a3 Mon Sep 17 00:00:00 2001 From: steden <1470804@qq.com> Date: Sun, 31 Dec 2023 21:26:39 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=E4=B8=8A=E4=BC=A0?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E5=88=B0FOPS=E4=B8=AD=E5=BF=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- configure/getFopsServer.go | 14 ++++++ farseer.yaml | 4 +- flog/config.go | 5 +- flog/fileProvider.go | 3 +- flog/fopsProvider.go | 96 ++++++++++++++++++++++++++++++++++++++ flog/init.go | 5 ++ 6 files changed, 121 insertions(+), 6 deletions(-) create mode 100644 configure/getFopsServer.go create mode 100644 flog/fopsProvider.go diff --git a/configure/getFopsServer.go b/configure/getFopsServer.go new file mode 100644 index 000000000..6b10d29d1 --- /dev/null +++ b/configure/getFopsServer.go @@ -0,0 +1,14 @@ +package configure + +import "strings" + +func GetFopsServer() string { + fopsServer := strings.ToLower(GetString("Fops.Server")) + if !strings.HasPrefix(fopsServer, "http") { + panic("[farseer.yaml]Fops.Server配置不正确,示例:https://fops.fsgit.com") + } + if !strings.HasSuffix(fopsServer, "/") { + fopsServer += "/" + } + return fopsServer +} diff --git a/farseer.yaml b/farseer.yaml index 0ebe36828..8990cb78f 100644 --- a/farseer.yaml +++ b/farseer.yaml @@ -62,8 +62,8 @@ Log: FileCountLimit: 20 # 文件数量限制 RefreshInterval: 1 # 写入到文件的时间间隔,秒单位,最少为1 Disable: true - Database: - ElasticSearch: + Fops: + Disable: false Component: task: true # 打印task组件的日志 cacheManage: true # 打印cacheManage组件的日志 diff --git a/flog/config.go b/flog/config.go index a783f59a7..9f7b40a24 100644 --- a/flog/config.go +++ b/flog/config.go @@ -3,8 +3,9 @@ package flog type Config struct { Component componentConfig Default levelFormat - Console levelFormat - File fileConfig + Console levelFormat // 输出到控制台 + File fileConfig // 写到文件 + Fops levelFormat // 上传到FOPS } // 组件日志 diff --git a/flog/fileProvider.go b/flog/fileProvider.go index 3b7abe378..d071493e1 100644 --- a/flog/fileProvider.go +++ b/flog/fileProvider.go @@ -69,8 +69,7 @@ func (r *fileLoggerPersistent) Log(LogLevel eumLogLevel.Enum, log *logData, exce // 将缓冲区的日志,每隔1秒,写入文件 func (r *fileLoggerPersistent) writeFile() { - ticker := time.NewTicker(time.Second * time.Duration(r.config.RefreshInterval)) - for range ticker.C { + for range time.NewTicker(time.Second * time.Duration(r.config.RefreshInterval)).C { // 组装要写入的日志内容 var logs []string for len(r.logsBuffer) > 0 { diff --git a/flog/fopsProvider.go b/flog/fopsProvider.go new file mode 100644 index 000000000..8b106045f --- /dev/null +++ b/flog/fopsProvider.go @@ -0,0 +1,96 @@ +package flog + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/farseer-go/fs" + "github.com/farseer-go/fs/configure" + "github.com/farseer-go/fs/container" + "github.com/farseer-go/fs/core" + "github.com/farseer-go/fs/core/eumLogLevel" + "github.com/farseer-go/fs/dateTime" + "github.com/farseer-go/fs/parse" + "github.com/farseer-go/fs/trace" + "net/http" + "time" +) + +// FopsProvider 上传到FOPS +type FopsProvider struct { +} + +func (r *FopsProvider) CreateLogger(categoryName string, formatter IFormatter, logLevel eumLogLevel.Enum) ILoggerPersistent { + persistent := &fopsLoggerPersistent{formatter: formatter, fopsServer: configure.GetFopsServer(), queue: make(chan *logData, 10000)} + // 异步开启上传 + go persistent.enableUpload() + return persistent +} + +type fopsLoggerPersistent struct { + formatter IFormatter + fopsServer string // fops服务端 + queue chan *logData // 待上传的列表 +} + +func (r *fopsLoggerPersistent) IsEnabled(logLevel eumLogLevel.Enum) bool { + return true +} + +func (r *fopsLoggerPersistent) Log(LogLevel eumLogLevel.Enum, log *logData, exception error) { + r.queue <- log +} + +// 开启上传 +func (r *fopsLoggerPersistent) enableUpload() { + for range time.NewTicker(3 * time.Second).C { + var lst []*logData + // 当队列中有数据 且 取出的数量<1000时,则继续取出 + for len(r.queue) > 0 && len(lst) < 1000 { + lst = append(lst, <-r.queue) + } + + // 没有取到数据 + if len(lst) == 0 { + continue + } + + // 上传 + if err := r.upload(lst); err != nil { + // 重新放回队列 + for i := 0; i < len(lst); i++ { + r.queue <- lst[i] + } + // 不能使用flog.Error,如果此处执行了,会一直产生无用的错误信息 + fmt.Println(r.formatter.Formatter(&logData{CreateAt: dateTime.Now(), LogLevel: eumLogLevel.Warning, Component: "", Content: err.Error(), newLine: true})) + } + } +} + +type UploadRequest struct { + List []*logData +} + +func (r *fopsLoggerPersistent) upload(lstLog []*logData) error { + bodyByte, _ := json.Marshal(UploadRequest{List: lstLog}) + url := r.fopsServer + "flog/upload" + newRequest, _ := http.NewRequest("POST", url, bytes.NewReader(bodyByte)) + newRequest.Header.Set("Content-Type", "application/json") + // 链路追踪 + if traceContext := container.Resolve[trace.IManager]().GetCurTrace(); traceContext != nil { + newRequest.Header.Set("Trace-Id", parse.ToString(traceContext.GetTraceId())) + newRequest.Header.Set("Trace-App-Name", fs.AppName) + } + client := &http.Client{} + rsp, err := client.Do(newRequest) + if err != nil { + return err + } + + apiRsp := core.NewApiResponseByReader[any](rsp.Body) + if apiRsp.StatusCode != 200 { + return fmt.Errorf("上传日志到%s失败(%v):%s", url, rsp.StatusCode, apiRsp.StatusMessage) + } + + return err +} diff --git a/flog/init.go b/flog/init.go index 5d3353458..d456549a8 100644 --- a/flog/init.go +++ b/flog/init.go @@ -43,6 +43,11 @@ func InitLog() core.ILog { factory.AddProviderFormatter(&FileProvider{config: logConfig.File}, formatter, logLevel) } + // 上传到FOPS + if !logConfig.Fops.Disable && configure.GetString("Fops.Server") != "" { + formatter, logLevel := loadLevelFormat(logConfig.Console, defaultLevel, defaultFormat, logConfig.Default) + factory.AddProviderFormatter(&FopsProvider{}, formatter, logLevel) + } log = factory.CreateLogger("") return log }