From e39a3a79d9321a9e27fec3bf5d0aeb13c9e952ce Mon Sep 17 00:00:00 2001 From: hcj116 Date: Wed, 11 May 2022 10:59:03 +0800 Subject: [PATCH 1/4] add cms plugin --- go.mod | 1 + go.sum | 2 + sinks/cms/cms.go | 436 ++++++++++++++++++ sinks/cms/cms_test.go | 268 +++++++++++ sinks/cms/metrichub/v20211001/client.go | 238 ++++++++++ sinks/cms/metrichub/v20211001/client_test.go | 51 ++ sinks/cms/metrichub/v20211001/constants.go | 21 + sinks/cms/metrichub/v20211001/signature.go | 79 ++++ .../cms/metrichub/v20211001/signature_test.go | 82 ++++ .../v20211001/struct_system_event.go | 24 + .../v20211001/struct_system_event_test.go | 43 ++ sinks/factory.go | 3 + 12 files changed, 1248 insertions(+) create mode 100644 sinks/cms/cms.go create mode 100644 sinks/cms/cms_test.go create mode 100644 sinks/cms/metrichub/v20211001/client.go create mode 100644 sinks/cms/metrichub/v20211001/client_test.go create mode 100644 sinks/cms/metrichub/v20211001/constants.go create mode 100644 sinks/cms/metrichub/v20211001/signature.go create mode 100644 sinks/cms/metrichub/v20211001/signature_test.go create mode 100644 sinks/cms/metrichub/v20211001/struct_system_event.go create mode 100644 sinks/cms/metrichub/v20211001/struct_system_event_test.go diff --git a/go.mod b/go.mod index a09e2af1..b60bcaba 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/smartystreets/gunit v1.0.0 // indirect github.com/stretchr/testify v1.6.1 go.mongodb.org/mongo-driver v1.5.1 + golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect gopkg.in/olivere/elastic.v3 v3.0.75 gopkg.in/olivere/elastic.v5 v5.0.81 gopkg.in/olivere/elastic.v6 v6.2.23 diff --git a/go.sum b/go.sum index b329ea8a..6ad6c676 100644 --- a/go.sum +++ b/go.sum @@ -362,6 +362,8 @@ golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200509044756-6aff5f38e54f h1:mOhmO9WsBaJCNmaZHPtHs9wOcdqdKCjF6OPJlmDM3KI= golang.org/x/sys v0.0.0-20200509044756-6aff5f38e54f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/sinks/cms/cms.go b/sinks/cms/cms.go new file mode 100644 index 00000000..cd334e64 --- /dev/null +++ b/sinks/cms/cms.go @@ -0,0 +1,436 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cms + +import ( + "encoding/json" + "errors" + "github.com/AliyunContainerService/kube-eventer/core" + . "github.com/AliyunContainerService/kube-eventer/sinks/cms/metrichub/v20211001" + "github.com/AliyunContainerService/kube-eventer/sinks/utils" + k8s "k8s.io/api/core/v1" + "k8s.io/klog" + "math" + "net/url" + "os" + "strings" + "sync" + "time" +) + +const ( + defaultBufferSize = 512 +) + +// SysEventRing 环形队列,循环写入不阻塞,读阻塞。堆积时最老的数据被替换 +type SysEventRing struct { + cond *sync.Cond + popCount int // 累计发送的数量(含discard的数量) + count int + buf []*SystemEvent // 最大缓存4K,再多就丢掉旧数据 + close bool +} + +func (p *SysEventRing) initialize(capacity int) { + p.cond = sync.NewCond(new(sync.Mutex)) + p.buf = make([]*SystemEvent, capacity) + p.count = 0 + p.popCount = 0 + p.close = false +} + +func (p *SysEventRing) phyIndex(n int) int { + return n % len(p.buf) +} + +func (p *SysEventRing) push(events []*SystemEvent) (r int) { + tryPush := func(events []*SystemEvent) (r []*SystemEvent, ok bool) { + p.cond.L.Lock() + defer p.cond.L.Unlock() + + if ok = !p.close; ok { + empty := p.popCount == p.count // 队列是否为空 + + begin := p.phyIndex(p.count) + count := copy(p.buf[begin:], events) + p.count += count + r = events[count:] + + if empty { + p.cond.Signal() + } + } + return + } + + for ok := true; ok && len(events) > 0; r++ { + events, ok = tryPush(events) + } + return +} + +func (p *SysEventRing) front(maxCount int) (r []*SystemEvent, discard int, ok bool) { + p.cond.L.Lock() + defer p.cond.L.Unlock() + + for p.popCount == p.count { + if p.close { + return + } else { + p.cond.Wait() // 列表为空,则等待 + } + } + ok = true + + begin := p.popCount + if p.count > p.popCount+len(p.buf) { + begin = p.count - len(p.buf) + } + discard = begin - p.popCount + p.popCount = begin + + count := p.count - begin + if count > maxCount { + count = maxCount + } + + for count > 0 { + offset := p.phyIndex(begin) + end := offset + count + if end > len(p.buf) { + end = len(p.buf) + } + r = append(r, p.buf[offset:end]...) + count -= end - offset + begin = end + } + + return +} + +func (p *SysEventRing) pop(count int) { + p.cond.L.Lock() + defer p.cond.L.Unlock() + + p.popCount += count + + // 防溢出 + if p.count >= math.MaxUint16 { + diff := p.popCount - p.phyIndex(p.popCount) + p.popCount -= diff + p.count -= diff + } +} + +func (p *SysEventRing) Stop() { + p.cond.L.Lock() + defer p.cond.L.Unlock() + + p.close = true + p.cond.Broadcast() +} + +type IClient interface { + PutSystemEvent(events []*SystemEvent) (response PutSystemEventResponse, err error) +} + +// CmsSink aliyun cloud monitor +type tagCmsSink struct { + // 环式缓存 + SysEventRing + + regionId string + level string + client IClient +} + +func (*tagCmsSink) Name() string { + return "CmsSink" +} + +func (d *tagCmsSink) ExportEvents(batch *core.EventBatch) { + if batch != nil && len(batch.Events) > 0 { + events := make([]*SystemEvent, 0, len(batch.Events)) + for _, coreEvent := range batch.Events { + if event := d.ConvertToSysEvent(coreEvent); event != nil { + events = append(events, event) + } + } + d.push(events) + } +} + +func (d *tagCmsSink) loopConsume(chanClose chan struct{}, batchCount int) { + defer close(chanClose) + + totalDiscard := 0 + for { + if events, discard, ok := d.front(batchCount); !ok { + break + } else { + totalDiscard += discard + if totalDiscard > 0 { + var content map[string]interface{} + if err := json.Unmarshal([]byte(events[0].Content), &content); err == nil { + content["discard"] = totalDiscard + if jsonBytes, err := json.Marshal(content); err == nil { + events[0].Content = string(jsonBytes) + } + } + } + if _, err := d.client.PutSystemEvent(events); err == nil { + totalDiscard = 0 + d.pop(len(events)) + } // else // 此次作废,下次重发 + } + } +} + +func getEventTime(event *k8s.Event, now func() time.Time) string { + var eventTime time.Time + + switch { + case !event.LastTimestamp.IsZero(): + eventTime = event.LastTimestamp.Time + case !event.EventTime.IsZero(): + eventTime = event.EventTime.Time + default: + eventTime = now() + } + + return eventTime.Format(EventTimeLayout) +} + +/* +{ + "message": "MountVolume.SetUp failed for volume \"eventer-token\" : secret \"addon.log.token\" not found", + "reportingInstance": "", + "count": 82, + "source": { + "host": "cn-qingdao.172.28.103.239", + "component": "kubelet" + }, + "reason": "FailedMount", + "type": "Warning", // 只有Normal和Warning + "reportingComponent": "", + "lastTimestamp": "2022-04-14T10:00:48Z", + "firstTimestamp": "2022-04-14T07:30:10Z", + "involvedObject": { + "apiVersion": "v1", + "uid": "8988b623-00cc-4f2b-be36-3286240ab95b", + "resourceVersion": "3405", + "name": "ack-node-problem-detector-eventer-598b6bf66b-m2n7p", + "kind": "Pod", + "namespace": "kube-system" + }, + "metadata": { + "uid": "ea76e1cd-9849-4587-9226-968b729512d6", + "resourceVersion": "48452", + "name": "ack-node-problem-detector-eventer-598b6bf66b-m2n7p.16e5b2c809e990cf", + "managedFields": [ + { + "apiVersion": "v1", + "operation": "Update", + "time": "2022-04-14T07:30:10Z", + "manager": "kubelet", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:source": { + "f:host": {}, + "f:component": {} + }, + "f:firstTimestamp": {}, + "f:count": {}, + "f:involvedObject": { + "f:kind": {}, + "f:uid": {}, + "f:name": {}, + "f:apiVersion": {}, + "f:namespace": {}, + "f:resourceVersion": {} + }, + "f:type": {}, + "f:reason": {}, + "f:message": {}, + "f:lastTimestamp": {} + } + } + ], + "creationTimestamp": "2022-04-14T07:30:10Z", + "namespace": "kube-system" + } +} +*/ + +func (d *tagCmsSink) ConvertToSysEvent(event *k8s.Event) (r *SystemEvent) { + r = &SystemEvent{ + Product: Product, + EventType: event.Reason, + Name: event.GetName(), + EventTime: getEventTime(event, time.Now), + GroupId: "0", // 跟昱杰沟通,此处先填0,以后如果需要groupId,再升级插件。 + Resource: string(event.InvolvedObject.UID), + // ResourceId: "acs:" + Product + ":" + d.regionId + "::uuid/" + string(event.InvolvedObject.UID), + Level: d.level, + Status: event.Type, + // UserId: "", + // Tags: "", + RegionId: d.regionId, + Time: time.Now().Format(EventTimeLayout), + } + + if jsonBytes, err := json.Marshal(event); err == nil { + r.Content = string(jsonBytes) + } + + return +} + +type Config struct { + endPoint string + accessKeyId string + accessKeySecret string + regionId string + level string +} + +// ParseConfig create config from uri +func ParseConfig(uri *url.URL) *Config { + c := &Config{} + + opts := uri.Query() + if uri.Host != "" && !strings.EqualFold(uri.Host, "unknown") { + c.endPoint = uri.Scheme + "://" + uri.Host + } + + doGet := func(optKey, envKey string, def string) string { + if len(opts[optKey]) >= 1 { + return opts[optKey][0] + } + if envKey != "" { + return os.Getenv(envKey) + } + return def + } + + c.regionId = doGet("regionId", "RegionId", "") + c.accessKeyId = doGet("accessKeyId", "AccessKeyId", "") + c.accessKeySecret = doGet("accessKeySecret", "AccessKeySecret", "") + c.level = doGet("level", "", "INFO") + + if c.endPoint == "" { + endPoint := GetEndPoint(c.regionId) + c.endPoint = endPoint.EndPoints[0] + if c.regionId == "" { + c.regionId = endPoint.RegionId + } + } + if c.regionId == "" { + c.regionId = DefaultRegionId + } + + return c +} + +func GetRegion(c *Config, fnParseRegionFromMeta func() (string, error)) (regionId string, err error) { + if c != nil && c.regionId != "" { + // region from client + regionId = c.regionId + } else if regionId, err = fnParseRegionFromMeta(); err != nil { + // region from meta data + klog.Errorf("failed to get Region,because of %v", err) + } + return +} + +func GetAKInfo(c *Config) (*utils.AKInfo, error) { + // 1. first get ak/sk from env + if c != nil && c.accessKeyId != "" && c.accessKeySecret != "" { + return &utils.AKInfo{ + AccessKeyId: c.accessKeyId, + AccessKeySecret: c.accessKeySecret, + }, nil + } + + // 2. aliyun akInfo fetch + if akInfo := fnGetAkInfo(utils.CMSConfigPath); akInfo != nil { + return akInfo, nil + } + + klog.Errorf("get sls akInfo error.") + return nil, errors.New("get cms akInfo error") +} + +var ( + fnGetAkInfo = utils.GetAkInfo +) + +func newClient(c *Config) (client *Client, err error) { + // // get region from env + // region, err := utils.GetRegionFromEnv() + // if err != nil { + // if c.regionId != "" { + // // region from client + // region = c.regionId + // } else { + // // region from meta data + // region, err = utils.ParseRegionFromMeta() + // if err != nil { + // klog.Errorf("failed to get Region,because of %v", err) + // return + // } + // } + // err = nil + // } + + var akInfo *utils.AKInfo + if akInfo, err = GetAKInfo(c); err == nil { + client = CreateMetricHubClient(c.endPoint, akInfo.AccessKeyId, akInfo.AccessKeySecret, akInfo.SecurityToken) + } + return +} + +type tagCmsSinkOpt struct { + bufferSize int + startLoop bool +} + +// NewCmsSink Usage: +// --sink=cms:http://metrichub-[your_region_id].aliyun-inc.com?regionId=[your_region_id]&accessKeyId=[your_access_key]&accessKeySecret=[you_access_secret]&level=[alert_level] +func NewCmsSink(uri *url.URL, opt *tagCmsSinkOpt) (r core.EventSink, err error) { + c := ParseConfig(uri) + + sink := &tagCmsSink{level: c.level} + sink.regionId, err = GetRegion(c, utils.ParseRegionFromMeta) + if err == nil { + var client *Client + if client, err = newClient(c); err == nil { + sink.client = client + } + } + if err == nil { + if opt == nil { + opt = &tagCmsSinkOpt{bufferSize: defaultBufferSize, startLoop: true} + } + sink.SysEventRing.initialize(opt.bufferSize) + if opt.startLoop { + const batchCount = 10 // 一次最多发送10个事件,避免过多时超限 + go sink.loopConsume(make(chan struct{}), batchCount) + } + r = sink + } + + return +} diff --git a/sinks/cms/cms_test.go b/sinks/cms/cms_test.go new file mode 100644 index 00000000..9df8b6a5 --- /dev/null +++ b/sinks/cms/cms_test.go @@ -0,0 +1,268 @@ +package cms + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/AliyunContainerService/kube-eventer/core" + . "github.com/AliyunContainerService/kube-eventer/sinks/cms/metrichub/v20211001" + "github.com/AliyunContainerService/kube-eventer/sinks/utils" + "github.com/stretchr/testify/assert" + k8s "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "math" + "net/url" + "os" + "sync" + "testing" + "time" +) + +const bufferCapacity = defaultBufferSize + +func TestSysEventRing(t *testing.T) { + ring := SysEventRing{} + ring.initialize(bufferCapacity) + assert.Equal(t, bufferCapacity, len(ring.buf)) + assert.Zero(t, ring.popCount) + assert.Zero(t, ring.count) + assert.NotNil(t, ring.count) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + ring.push([]*SystemEvent{{Product: "acs_container"}}) + }() + + events, discard, ok := ring.front(BatchCount) + assert.True(t, ok) + assert.Zero(t, discard) + assert.Equal(t, 1, len(events)) + assert.Equal(t, "acs_container", events[0].Product) + + assert.Zero(t, ring.popCount) + assert.Equal(t, 1, ring.count) + + ring.pop(len(events)) + assert.Equal(t, ring.popCount, ring.count) + + wg.Wait() +} + +func TestSysEventRing_Overflow(t *testing.T) { + ring := SysEventRing{} + ring.initialize(bufferCapacity) + assert.Equal(t, bufferCapacity, len(ring.buf)) + assert.Zero(t, ring.popCount) + assert.Zero(t, ring.count) + assert.False(t, ring.close) + + const overflow = 32 + expectCount := math.MaxUint16 + overflow + for i := 0; i < expectCount; i++ { + count := ring.push([]*SystemEvent{ + { + Product: "acs_container", + Name: fmt.Sprintf("test.%v", i), + }, + }) + assert.Equal(t, 1, count) + } + assert.Zero(t, ring.popCount) + assert.Equal(t, expectCount, ring.count) + + for offset := 0; offset < bufferCapacity; offset += BatchCount { + events, discard, ok := ring.front(BatchCount) + assert.True(t, ok) + if offset == 0 { + assert.Equal(t, expectCount-bufferCapacity, discard) + } else { + assert.Zero(t, discard) + } + expectEventCount := BatchCount + if offset+BatchCount > bufferCapacity { + expectEventCount = bufferCapacity - offset + } + assert.Equal(t, expectEventCount, len(events)) + for i, event := range events { + assert.Equal(t, "acs_container", event.Product) + expectName := fmt.Sprintf("test.%v", expectCount-bufferCapacity+offset+i) + if expectName != event.Name { + fmt.Printf("event[%v].Name ==> %v, expect: %v\n", i+offset, event.Name, expectName) + } + assert.Equal(t, expectName, event.Name) + } + + if offset == 0 { + assert.Equal(t, expectCount-bufferCapacity, ring.popCount) + assert.Equal(t, expectCount, ring.count) + } + + ring.pop(len(events)) + assert.Equal(t, ring.popCount, ring.count-bufferCapacity+offset+len(events)) + assert.Less(t, ring.count, 2*bufferCapacity) + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(100 * time.Millisecond) + ring.Stop() + }() + events, discard, ok := ring.front(BatchCount) // ring关闭,返回无效数据 + assert.Nil(t, events) + assert.Zero(t, discard) + assert.False(t, ok) + + wg.Wait() +} + +func TestNewCmsSing(t *testing.T) { + defer func(val string) { _ = os.Setenv("RegionId", val) }(os.Getenv("RegionId")) + _ = os.Setenv("RegionId", "cn-beijing") + params, err := url.Parse(GetEndPoint("").EndPoints[0] + `?accessKeyId=1&accessKeySecret=2`) + assert.NoError(t, err) + sink, err := NewCmsSink(params, &tagCmsSinkOpt{bufferSize: bufferCapacity, startLoop: false}) + assert.NoError(t, err) + defer sink.Stop() + assert.Equal(t, "CmsSink", sink.Name()) + cmsSink := sink.(*tagCmsSink) + assert.NotNil(t, cmsSink.buf) + assert.Equal(t, "cn-beijing", cmsSink.regionId) +} + +func TestGetEnvTime(t *testing.T) { + now := time.Now() + expectTime := now.Format(EventTimeLayout) + + assert.Equal(t, expectTime, getEventTime(&k8s.Event{LastTimestamp: metav1.NewTime(now)}, nil)) + assert.Equal(t, expectTime, getEventTime(&k8s.Event{EventTime: metav1.NewMicroTime(now)}, nil)) + assert.Equal(t, expectTime, getEventTime(&k8s.Event{}, func() time.Time { return now })) +} + +type tagFakeClient struct { + ch chan []*SystemEvent +} + +func (d *tagFakeClient) PutSystemEvent(events []*SystemEvent) (response PutSystemEventResponse, err error) { + select { + case d.ch <- events: + default: + } + return +} + +func TestTagCmsSink_ExportEvents(t *testing.T) { + params, err := url.Parse(GetEndPoint("").EndPoints[0] + `?regionId=cn-zhangjiakou&accessKeyId=1&accessKeySecret=2`) + assert.NoError(t, err) + sink, err := NewCmsSink(params, nil) + assert.NoError(t, err) + defer sink.Stop() + assert.Equal(t, "CmsSink", sink.Name()) + cmsSink := sink.(*tagCmsSink) + fakeClient := tagFakeClient{ch: make(chan []*SystemEvent, 1)} + cmsSink.client = &fakeClient + + now := time.Now() + event := k8s.Event{ + LastTimestamp: metav1.NewTime(now), + } + cmsSink.ExportEvents(&core.EventBatch{Events: []*k8s.Event{&event}}) + events := <-fakeClient.ch + assert.Equal(t, 1, len(events)) + assert.Equal(t, Product, events[0].Product) + assert.NotEmpty(t, events[0].Content) +} + +func TestTagCmsSink_ExportEvents_DiscardNotZero(t *testing.T) { + params, err := url.Parse(GetEndPoint("").EndPoints[0] + `?regionId=cn-zhangjiakou&accessKeyId=1&accessKeySecret=2`) + assert.NoError(t, err) + sink, err := NewCmsSink(params, &tagCmsSinkOpt{bufferSize: bufferCapacity, startLoop: false}) + assert.NoError(t, err) + assert.Equal(t, "CmsSink", sink.Name()) + cmsSink := sink.(*tagCmsSink) + fakeClient := tagFakeClient{ch: make(chan []*SystemEvent, 2)} + cmsSink.client = &fakeClient + + now := time.Now() + event := k8s.Event{ + LastTimestamp: metav1.NewTime(now), + } + const expectDiscard = 32 + for i := 0; i < bufferCapacity+expectDiscard; i++ { + cmsSink.ExportEvents(&core.EventBatch{Events: []*k8s.Event{&event}}) + } + chanClose := make(chan struct{}) + go cmsSink.loopConsume(chanClose, BatchCount) + + events := <-fakeClient.ch + assert.Equal(t, BatchCount, len(events)) + assert.Equal(t, Product, events[0].Product) + fmt.Println("Content:", events[0].Content) + assert.NotEmpty(t, events[0].Content) + assert.NotEmpty(t, events[1].Content) + content := struct { + Discard int `json:"discard"` + }{} + assert.NoError(t, json.Unmarshal([]byte(events[0].Content), &content)) + assert.Equal(t, expectDiscard, content.Discard) + + cmsSink.Stop() + <-chanClose // 确保loopConsumer已关闭 +} + +func TestGetRegion(t *testing.T) { + err := errors.New("ByDesign") + regionId, actualErr := GetRegion(nil, func() (string, error) { + return "", err + }) + assert.Empty(t, regionId) + assert.Same(t, err, actualErr) +} + +func TestGetAkInfo(t *testing.T) { + defer func(old func(configPath string) *utils.AKInfo) { fnGetAkInfo = old }(fnGetAkInfo) + fakeAkInfo := &utils.AKInfo{} + fnGetAkInfo = func(string) *utils.AKInfo { + return fakeAkInfo + } + akInfo, err := GetAKInfo(nil) + assert.NoError(t, err) + assert.Same(t, fakeAkInfo, akInfo) +} + +// 这个会阻塞 +func TestNewClient_Error(t *testing.T) { + defer func(old func(configPath string) *utils.AKInfo) { fnGetAkInfo = old }(fnGetAkInfo) + fnGetAkInfo = func(string) *utils.AKInfo { return nil } + client, err := newClient(nil) + assert.Nil(t, client) + assert.Error(t, err) +} + +func TestParseConfig(t *testing.T) { + params, err := url.Parse(`https://unknown?accessKeyId=1&accessKeySecret=2`) + assert.NoError(t, err) + c := ParseConfig(params) + assert.Contains(t, c.endPoint, "://") + assert.Contains(t, c.endPoint, "cn-hangzhou") + assert.Equal(t, "cn-hangzhou", c.regionId) +} + +func TestParseConfig2(t *testing.T) { + params, err := url.Parse(`https://host.com/?accessKeyId=1&accessKeySecret=2`) + assert.NoError(t, err) + c := ParseConfig(params) + assert.Equal(t, c.endPoint, `https://host.com`) + assert.Equal(t, "cn-hangzhou", c.regionId) +} + +func TestParseConfig3(t *testing.T) { + params, err := url.Parse(`?accessKeyId=1&accessKeySecret=2`) + assert.NoError(t, err) + c := ParseConfig(params) + assert.Contains(t, c.endPoint, "cn-hangzhou") + assert.Equal(t, DefaultRegionId, c.regionId) +} diff --git a/sinks/cms/metrichub/v20211001/client.go b/sinks/cms/metrichub/v20211001/client.go new file mode 100644 index 00000000..1fe02292 --- /dev/null +++ b/sinks/cms/metrichub/v20211001/client.go @@ -0,0 +1,238 @@ +package metrichub + +import ( + "bytes" + "crypto/md5" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + log "k8s.io/klog" + "net" + "net/http" + "reflect" + "strings" + "time" +) + +// DefaultMetricHubEndPoint 云监控系统事件缺省上报地址 +const ( + urlPath = "/event/system/upload" + + BatchCount = 100 // 单次最大上报100条 + maxBodyBytes = 512 * 1024 // 单次最大不超过512K(http body最大值) + + DefaultRegionId = "cn-hangzhou" +) + +type EndPoint struct { + Name string + RegionId string + EndPoints []string +} + +var ( + timeout10s = http.Client{Timeout: 10 * time.Second} + + // 数据来源:https://ata.alibaba-inc.com/articles/97388 + knownRegion = map[string]EndPoint{ + "cn-hangzhou": {Name: "华东 1 (杭州)", RegionId: "cn-hangzhou", EndPoints: []string{"http://metrichub-cn-hangzhou.aliyun.com"}}, // (杭州、新加坡都可以访问) + "cn-zhangjiakou": {Name: "华北 3(张家口)", RegionId: "cn-zhangjiakou", EndPoints: []string{"http://metrichub-cn-zhangjiakou.aliyun.com"}}, + "cn-shanghai": {Name: "华东 2 (上海)", RegionId: "cn-shanghai", EndPoints: []string{"http://metrichub-cn-shanghai.aliyun.com"}}, + "cn-shanghai-finance-1": {Name: "上海金融云", RegionId: "cn-shanghai-finance-1", EndPoints: []string{"http://metrichub-cn-shanghai-finance-1.aliyun.com"}}, + "cn-beijing": {Name: "华北 2 (北京)", RegionId: "cn-beijing", EndPoints: []string{"http://metrichub-cn-beijing.aliyun.com"}}, + "cn-qingdao": {Name: "华北 1 (青岛)", RegionId: "cn-qingdao", EndPoints: []string{"http://metrichub-cn-qingdao.aliyun.com"}}, + "cn-shenzhen": {Name: "华南 1 (深圳)", RegionId: "cn-shenzhen", EndPoints: []string{"http://metrichub-cn-shenzhen.aliyun.com"}}, + "cn-north-2-gov-1": {Name: "政务云", RegionId: "cn-north-2-gov-1", EndPoints: []string{"http://metrichub-cn-north-2-gov-1.aliyun.com"}}, + "cn-shenzhen-finance-1": {Name: "深圳金融云", RegionId: "cn-shenzhen-finance-1", EndPoints: []string{"http://metrichub-cn-shenzhen-finance-1.aliyun.com"}}, + "cn-hongkong": {Name: "香港", RegionId: "cn-hongkong", EndPoints: []string{"http://metrichub-cn-hongkong.aliyun.com"}}, + "cn-huhehaote": {Name: "华北 5 (呼和浩特)", RegionId: "cn-huhehaote", EndPoints: []string{"http://metrichub-cn-huhehaote.aliyun.com"}}, + "me-east-1": {Name: "中东东部 1(迪拜)", RegionId: "me-east-1", EndPoints: []string{"http://metrichub-me-east-1.aliyun.com"}}, + "us-west-1": {Name: "美国西部 1(硅谷 )", RegionId: "us-west-1", EndPoints: []string{"http://metrichub-us-west-1.aliyun.com"}}, + "us-east-1": {Name: "美国东部 1(弗吉尼亚)", RegionId: "us-east-1", EndPoints: []string{"http://metrichub-us-east-1.aliyun.com"}}, + "ap-northeast-1": {Name: "亚太东北 1 (日本 )", RegionId: "ap-northeast-1", EndPoints: []string{"http://metrichub-ap-northeast-1.aliyun.com"}}, + "eu-central-1": {Name: "欧洲中部 1(法兰克福)", RegionId: "eu-central-1", EndPoints: []string{"http://metrichub-eu-central-1.aliyun.com"}}, + "ap-southeast-2": {Name: "亚太东南 2(悉尼)", RegionId: "ap-southeast-2", EndPoints: []string{"http://metrichub-ap-southeast-2.aliyun.com"}}, + "ap-southeast-1": {Name: "亚太东南 1(新加坡)", RegionId: "ap-southeast-1", EndPoints: []string{"http://metrichub-ap-southeast-1.aliyun.com"}}, + "ap-southeast-3": {Name: "亚太东南3(吉隆坡)", RegionId: "ap-southeast-3", EndPoints: []string{"http://metrichub-ap-southeast-3.aliyun.com"}}, + "cn-heyuan": {Name: "河源", RegionId: "cn-heyuan", EndPoints: []string{"http://metrichub-cn-heyuan.aliyun.com"}}, + "ap-south-1": {Name: "印度-孟买", RegionId: "ap-south-1", EndPoints: []string{"http://metrichub-ap-south-1.aliyun.com", "http://metrichub-ap-south-1.aliyuncs.com"}}, + "ap-southeast-5": {Name: "印尼-雅加达", RegionId: "ap-southeast-5", EndPoints: []string{"http://metrichub-ap-southeast-5.aliyun.com"}}, + "cn-chengdu": {Name: "成都", RegionId: "cn-chengdu", EndPoints: []string{"http://metrichub-cn-chengdu.aliyun.com", "http://metrichub-cn-chengdu.aliyuncs.com"}}, + "cn-chengdu-smarthosting-1": {Name: "成都poc", RegionId: "cn-chengdu-smarthosting-1", EndPoints: []string{"http://metrichub-cn-chengdu-smarthosting-1.aliyun.com"}}, + "cn-zhengzhou-nebula-1": {Name: "河南星云", RegionId: "cn-zhengzhou-nebula-1", EndPoints: []string{"http://metrichub-cn-zhengzhou-nebula-1.aliyun.com"}}, + "cn-wulanchabu": {Name: "乌兰察布", RegionId: "cn-wulanchabu", EndPoints: []string{"http://metrichub-cn-wulanchabu.aliyun.com"}}, + "rus-west-1": {Name: "俄罗斯(莫斯科)", RegionId: "rus-west-1", EndPoints: []string{"http://metrichub-rus-west-1.aliyun.com"}}, + "cn-huhehaote-nebula-1": {Name: "内蒙古星云", RegionId: "cn-huhehaote-nebula-1", EndPoints: []string{"http://metrichub-cn-huhehaote-nebula-1.aliyun.com"}}, + "eu-west-1": {Name: "英国-伦敦", RegionId: "eu-west-1", EndPoints: []string{"http://metrichub-eu-west-1.aliyun.com", "http://metrichub-inner.eu-west-1.aliyuncs.com"}}, + "cn-guangzhou": {Name: "广州(华南3)", RegionId: "cn-guangzhou", EndPoints: []string{"http://metrichub-cn-guangzhou.aliyun.com"}}, + "cn-zhangjiakou-spe": {Name: "张北spe", RegionId: "cn-zhangjiakou-spe", EndPoints: []string{"http://metrichub-cn-zhangjiakou-spe.aliyun.com"}}, + "ap-southeast-6": {Name: "菲律宾", RegionId: "ap-southeast-6", EndPoints: []string{"http://metrichub-ap-southeast-6.aliyun.com"}}, + "cn-shanghai-mybk": {Name: "网商云", RegionId: "cn-shanghai-mybk", EndPoints: []string{"http://metrichub-cn-shanghai-mybk.aliyun.com"}}, + "cn-beijing-finance-1": {Name: "北京金融云", RegionId: "cn-beijing-finance-1", EndPoints: []string{"http://metrichub-cn-beijing-finance-1.aliyuncs.com"}}, + "cn-nanjing": {Name: "南京(华南5)", RegionId: "cn-nanjing", EndPoints: []string{"http://metrichub-cn-nanjing.aliyuncs.com"}}, + "ap-hochiminh-ant": {Name: "越南(胡志明)蚂蚁", RegionId: "ap-hochiminh-ant", EndPoints: []string{"http://metrichub-ap-hochiminh-ant.aliyuncs.com"}}, + "ap-northeast-2": {Name: "韩国(首尔) 3.5.6以上版本", RegionId: "ap-northeast-2", EndPoints: []string{"http://metrichub-ap-northeast-2.aliyuncs.com"}}, + "ap-southeast-7": {Name: "泰国(曼谷) 3.5.6以上版本", RegionId: "ap-southeast-7", EndPoints: []string{"http://metrichub-ap-southeast-7.aliyuncs.com"}}, + } +) + +func GetEndPoint(regionId string) EndPoint { + v, ok := knownRegion[regionId] + if !ok { + v = knownRegion[DefaultRegionId] + } + return v +} + +type Client struct { + endPoint string + accessKeyId string + accessSecret string + sourceIp string + stsToken string +} + +func HostIP() (r string) { + addrSlice, err := net.InterfaceAddrs() + if err == nil { + ips := make([]net.IP, 0, len(addrSlice)) + for _, addr := range addrSlice { + if ip, ok := addr.(*net.IPNet); ok && ip.IP.IsGlobalUnicast() { + ips = append(ips, ip.IP) + } + } + ipStr, _ := json.Marshal(ips) + log.Info("ips: ", string(ipStr)) + if len(ips) > 0 { + r = ips[0].String() + } + } + return +} + +func newClient(endPoint string, accessKeyId, accessSecret, stsToken string) *Client { + r := &Client{ + endPoint: strings.TrimSuffix(endPoint, "/"), + accessKeyId: accessKeyId, + accessSecret: accessSecret, + stsToken: stsToken, + sourceIp: HostIP(), + } + + return r +} + +func CreateMetricHubClient(endPoint, accessKeyId, accessSecret, stsToken string) (r *Client) { + log.Info("msg: ", "metric hub config", ", endPoint: ", endPoint, ", accessKeyId: ", accessKeyId) + if endPoint == "" { + endPoint = GetEndPoint("").EndPoints[0] + } + r = newClient(endPoint, accessKeyId, accessSecret, stsToken) + return +} + +func SafeClose(closer io.Closer) { + if closer != nil { + _ = closer.Close() + } +} +func (p *Client) DoAction(request *http.Request, body []byte) (byte []byte, err error) { + response, err := timeout10s.Do(request) + if err == nil { + defer SafeClose(response.Body) + byte, _ = ioutil.ReadAll(response.Body) + } + errorLogIfNotNil(err, "uri", request.URL.String(), "headers", request.Header, "body", string(body)) + return +} + +func (p *Client) appendHeader(request *http.Request) { + request.Header[UserAgent] = []string{UserAgentValue} + request.Header[Date] = []string{time.Now().Format(time.RFC1123)} + request.Header[XCmsApiVersion] = []string{XCmsApiVersionValue} + request.Header[XCmsSignature] = []string{XCmsSignatureMethod} + request.Header[XCmsIp] = []string{p.sourceIp} + + if len(p.stsToken) > 0 { + request.Header[XCmsCallerType] = []string{XCmsCallerToken} + request.Header[XCmsSecurityToken] = []string{p.stsToken} + } +} + +type PutSystemEventResponse struct { + Code string `json:"code"` + Message string `json:"msg"` +} + +func (p *PutSystemEventResponse) Success() bool { + return "200" == p.Code +} + +func ObjTypeName(obj interface{}, includePkg ...bool) (r string) { + if obj != nil { + typ := reflect.Indirect(reflect.ValueOf(obj)).Type() + r = typ.Name() + if r == "" || len(includePkg) == 0 || includePkg[0] { + r = typ.String() + } + } + return +} +func errorLogIfNotNil(err error, kvs ...interface{}) { + for i := 0; i < len(kvs); i += 2 { + if key, _ := kvs[i].(string); key != "" { + sep := ", " + if i == 0 { + sep = "" + } + kvs[i] = sep + key + ": " + } + } + kvs = append(kvs, ", errType: ", ObjTypeName(err), ", error: ", fmt.Sprintf("%+v", err)) + log.Error(kvs...) +} + +// ErrExceed 大小超限 +type ErrExceed struct { + Size int + MaxSize int +} + +func (e ErrExceed) Error() string { + return fmt.Sprintf("size exceeds limit, max allowed is %v, actual %v", e.MaxSize, e.Size) +} + +func (p *Client) PutSystemEvent(events []*SystemEvent) (response PutSystemEventResponse, err error) { + if len(events) <= 0 { + return + } + + jsonBytes, _ := json.Marshal(events) + if len(jsonBytes) > maxBodyBytes { + err = ErrExceed{MaxSize: maxBodyBytes, Size: len(jsonBytes)} + return + } + request, _ := http.NewRequest(http.MethodPost, p.endPoint+urlPath, bytes.NewReader(jsonBytes)) + request.Header[ContentMd5] = []string{fmt.Sprintf("%X", md5.Sum(jsonBytes))} + request.Header[ContentType] = []string{ContentJson} // must be application/json,without utf-8 + p.appendHeader(request) + + // 处理签名 + var digest string + if digest, _, err = p.SignatureRequest(request); err == nil { + request.Header[Authorization] = []string{fmt.Sprintf("%v:%v", p.accessKeyId, digest)} + + // 发送请求 + var respJsonBytes []byte + respJsonBytes, err = p.DoAction(request, jsonBytes) + if err == nil { + if err = json.Unmarshal(respJsonBytes, &response); err == nil && !response.Success() { + err = errors.New(response.Message) + } + } + } + errorLogIfNotNil(err, "uri", request.URL.String(), "headers", request.Header, "body", string(jsonBytes)) + return response, err +} diff --git a/sinks/cms/metrichub/v20211001/client_test.go b/sinks/cms/metrichub/v20211001/client_test.go new file mode 100644 index 00000000..0ad45cdd --- /dev/null +++ b/sinks/cms/metrichub/v20211001/client_test.go @@ -0,0 +1,51 @@ +package metrichub + +import ( + "github.com/stretchr/testify/assert" + "net/http" + "testing" +) + +func TestClient_AppendHeader(t *testing.T) { + + c := &Client{ + stsToken: "not null", + } + request, err := http.NewRequest(http.MethodPost, "/", nil) + assert.NoError(t, err) + c.appendHeader(request) + assert.NotNil(t, request.Header[XCmsCallerType]) + assert.NotNil(t, request.Header[XCmsSecurityToken]) +} + +// func makePanic(t *testing.T) func(error) { +// return func(err error) { +// fmt.Println(err) +// assert.Nil(t, err) +// } +// } + +// func TestClient_SendZero(t *testing.T) { +// defer PanicCall(makePanic(t)) +// +// _, err := client.PutSystemEvent(nil) +// assert.NoError(t, err) +// } +// +// func TestCreateMetricHubClient(t *testing.T) { +// defer PanicCall(makePanic(t)) +// +// cfg := conf.GetConfigForTest(func(m map[string]interface{}) { +// delete(m["cloudMonitor"].(map[string]interface{}), "metricHubEndPoint") +// }) +// assert.NotNil(t, cfg) +// +// myClient := CreateMetricHubClient(cfg) +// assert.NotNil(t, myClient) +// assert.NotEmpty(t, myClient.endPoint) +// } + +func TestGetEndPoint(t *testing.T) { + assert.Equal(t, "河源", GetEndPoint("cn-heyuan").Name) + assert.Equal(t, "cn-hangzhou", GetEndPoint("xxxx").RegionId) +} diff --git a/sinks/cms/metrichub/v20211001/constants.go b/sinks/cms/metrichub/v20211001/constants.go new file mode 100644 index 00000000..8dba67e6 --- /dev/null +++ b/sinks/cms/metrichub/v20211001/constants.go @@ -0,0 +1,21 @@ +package metrichub + +const ( + ContentMd5 = "Content-MD5" + UserAgent = "User-Agent" + UserAgentValue = "cms-go-sdk-v-1.0" + ContentType = "Content-Type" + ContentJson = "application/json" + Date = "Date" + XCms = "x-cms-" + XAcs = "x-acs-" + XCmsApiVersion = XCms + "api-version" + XCmsApiVersionValue = "1.0" + XCmsSignature = XCms + "signature" + XCmsSignatureMethod = "hmac-sha1" + XCmsIp = XCms + "ip" + XCmsCallerType = XCms + "caller-type" + XCmsCallerToken = "token" + XCmsSecurityToken = XCms + "security-token" + Authorization = "Authorization" +) diff --git a/sinks/cms/metrichub/v20211001/signature.go b/sinks/cms/metrichub/v20211001/signature.go new file mode 100644 index 00000000..d3a6a5d1 --- /dev/null +++ b/sinks/cms/metrichub/v20211001/signature.go @@ -0,0 +1,79 @@ +package metrichub + +import ( + "crypto/hmac" + "crypto/sha1" + "encoding/hex" + "fmt" + "net/http" + "net/url" + "sort" + "strings" +) + +func map2QueryParams(m http.Header) string { + const capacity = 3 + keySlice := make([]string, 0, capacity) + for k, v := range m { + if len(v) > 0 && (strings.HasPrefix(k, XAcs) || strings.HasPrefix(k, XCms)) { + keySlice = append(keySlice, k+":"+strings.TrimSpace(v[0])) + } + } + + sort.Strings(keySlice) + + return strings.Join(keySlice, "\n") +} + +func sortQuery(query url.Values) (r string) { + if len(query) > 0 { + keys := make([]string, 0, len(query)) + for k := range query { + keys = append(keys, k) + } + sort.Strings(keys) + + items := make([]string, 0, len(keys)) + for _, k := range keys { + for _, v := range query[k] { + items = append(items, k+"="+v) + } + } + r = "?" + strings.Join(items, "&") + } + return +} + +// Signature calculates a request's signature digest. +func (p *Client) Signature(method, uri string, headers http.Header) (digest, signStr string, err error) { + date := headers.Get(Date) + if date == "" { + return "", "", fmt.Errorf("can't find 'Date' header") + } + + var u *url.URL + if u, err = url.Parse(uri); err == nil { + get := func(key string) (r string) { + if v := headers[key]; len(v) > 0 { + r = v[0] + } + return + } + signStr = method + "\n" + + get(ContentMd5) + "\n" + + get(ContentType) + "\n" + + date + "\n" + + map2QueryParams(headers) + "\n" + + u.EscapedPath() + sortQuery(u.Query()) + + mac := hmac.New(sha1.New, []byte(p.accessSecret)) + if _, err = mac.Write([]byte(signStr)); err == nil { + digest = strings.ToUpper(hex.EncodeToString(mac.Sum(nil))) + } + } + return +} + +func (p *Client) SignatureRequest(request *http.Request) (digest, signStr string, err error) { + return p.Signature(request.Method, request.URL.RequestURI(), request.Header) +} diff --git a/sinks/cms/metrichub/v20211001/signature_test.go b/sinks/cms/metrichub/v20211001/signature_test.go new file mode 100644 index 00000000..31c30d34 --- /dev/null +++ b/sinks/cms/metrichub/v20211001/signature_test.go @@ -0,0 +1,82 @@ +package metrichub + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "net/http" + "testing" +) + +func TestSignature_OK(t *testing.T) { + // 该用例参考: https://ata.alibaba-inc.com/articles/87434 + p := &Client{ + accessSecret: "testsecret", + } + request, err := http.NewRequest(http.MethodPost, "http://metrichub.aliyun-inc.com/metric/custom/upload", nil) + require.NoError(t, err) + request.Header[ContentMd5] = []string{"0B9BE351E56C90FED853B32524253E8B"} + request.Header[ContentType] = []string{"application/json"} + request.Header[Date] = []string{"Tue, 11 Dec 2018 21:05:51 +0800"} + request.Header[XCmsApiVersion] = []string{"1.0"} + request.Header[XCmsIp] = []string{"127.0.0.1"} + request.Header[XCmsSignature] = []string{XCmsSignatureMethod} + digest, signStr, err := p.SignatureRequest(request) + require.NoError(t, err) + fmt.Println(signStr) + const expectSignStr = `POST +0B9BE351E56C90FED853B32524253E8B +application/json +Tue, 11 Dec 2018 21:05:51 +0800 +x-cms-api-version:1.0 +x-cms-ip:127.0.0.1 +x-cms-signature:hmac-sha1 +/metric/custom/upload` + require.Equal(t, expectSignStr, signStr) + fmt.Println("Signature: " + digest) + require.Equal(t, "1DC19ED63F755ACDE203614C8A1157EB1097E922", digest) +} + +func TestSignature_WithoutDate(t *testing.T) { + p := &Client{ + accessSecret: "testsecret", + } + digest, signStr, err := p.Signature(http.MethodPost, "/metric/custom/upload", http.Header{ + ContentMd5: {"0B9BE351E56C90FED853B32524253E8B"}, + ContentType: {"application/json"}, + XCmsApiVersion: {"1.0"}, + XCmsIp: {"127.0.0.1"}, + XCmsSignature: {XCmsSignatureMethod}, + //Date: "Tue, 11 Dec 2018 21:05:51 +0800", + }) + require.Error(t, err) + assert.Empty(t, digest) + assert.Empty(t, signStr) +} + +func TestSignature_QueryParams(t *testing.T) { + p := &Client{ + accessSecret: "testsecret", + } + digest, signStr, err := p.Signature(http.MethodPost, "/metric/custom/upload?name=hcj&employeeId=11", http.Header{ + ContentMd5: {"0B9BE351E56C90FED853B32524253E8B"}, + ContentType: {"application/json"}, + Date: {"Tue, 11 Dec 2018 21:05:51 +0800"}, + XCmsApiVersion: {"1.0"}, + XCmsIp: {"127.0.0.1"}, + XCmsSignature: {XCmsSignatureMethod}, + }) + require.NoError(t, err) + fmt.Println(signStr) + const expectSignStr = `POST +0B9BE351E56C90FED853B32524253E8B +application/json +Tue, 11 Dec 2018 21:05:51 +0800 +x-cms-api-version:1.0 +x-cms-ip:127.0.0.1 +x-cms-signature:hmac-sha1 +/metric/custom/upload?employeeId=11&name=hcj` + assert.Equal(t, expectSignStr, signStr) + fmt.Println("Signature: " + digest) + assert.Equal(t, "D9F6DDC52C035D7F28418226DA91B3E9C4380556", digest) +} diff --git a/sinks/cms/metrichub/v20211001/struct_system_event.go b/sinks/cms/metrichub/v20211001/struct_system_event.go new file mode 100644 index 00000000..d689e2e2 --- /dev/null +++ b/sinks/cms/metrichub/v20211001/struct_system_event.go @@ -0,0 +1,24 @@ +package metrichub + +const ( + // EventTimeLayout .000 固定给三位毫秒计数,.999 会去掉尾部的0 + EventTimeLayout = "20060102T150405.000-0700" // Example of timeStr: 20200304T000252.190+0800 + Product = "k8s" +) + +type SystemEvent struct { + Product string `json:"product"` + EventType string `json:"eventType"` + Name string `json:"name"` + EventTime string `json:"eventTime"` // 本次报警时间,记录时间, 20211108T163717.313+0800 + GroupId string `json:"groupId,omitempty"` // 应用分组Id + Resource string `json:"resource,omitempty"` // + ResourceId string `json:"resourceId,omitempty"` // dimensions + Level string `json:"level"` // CRITICAL\WARN\INFO,如果不知道填什么就填INFO + Status string `json:"status"` + UserId string `json:"userId,omitempty"` // userId + Tags string `json:"tags,omitempty"` // metric=acs_ecs/cpu_usage,metric=acs_ecs/mem_usage + Content string `json:"content"` // NewSystemEventContent + RegionId string `json:"regionId"` + Time string `json:"time,omitempty"` +} diff --git a/sinks/cms/metrichub/v20211001/struct_system_event_test.go b/sinks/cms/metrichub/v20211001/struct_system_event_test.go new file mode 100644 index 00000000..e09bbb8a --- /dev/null +++ b/sinks/cms/metrichub/v20211001/struct_system_event_test.go @@ -0,0 +1,43 @@ +package metrichub + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestPutSystemEvent(t *testing.T) { + sysEvent := &SystemEvent{ + Product: Product, + // EventType: "MetricAlert:" + EventType, // ! - 不存在 + Name: "P1", + EventTime: time.Now().Format(EventTimeLayout), + GroupId: "0", + ResourceId: `{"userId":"4","instanceId":"i-123abcxf"}`, + Level: "INFO", + Status: "AlertAlarm", + UserId: "04", + Time: time.Now().Format(EventTimeLayout), // ! - 不存在 + } + // require.NoError(t, PutSystemEvent([]*SystemEvent{sysEvent})) + assert.NotNil(t, sysEvent) +} + +func TestPutSystemEvent_Without_RegionId(t *testing.T) { + sysEvent := &SystemEvent{ + Product: Product, + // EventType: EventType, + Name: "P1", + EventTime: time.Now().Format(EventTimeLayout), + GroupId: "0", + ResourceId: `{"userId":"4","instanceId":"i-123abcxf"}`, + Level: "INFO", + Status: "AlertOk", + UserId: "04", + Content: `{"__batchId__":"1234[1]@133455"}`, + RegionId: "", + Time: time.Now().Format(EventTimeLayout), + } + // require.Error(t, PutSystemEvent([]*SystemEvent{sysEvent})) + assert.NotNil(t, sysEvent) +} diff --git a/sinks/factory.go b/sinks/factory.go index 503dd01e..20aedb4d 100755 --- a/sinks/factory.go +++ b/sinks/factory.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/AliyunContainerService/kube-eventer/common/flags" "github.com/AliyunContainerService/kube-eventer/core" + "github.com/AliyunContainerService/kube-eventer/sinks/cms" "github.com/AliyunContainerService/kube-eventer/sinks/dingtalk" "github.com/AliyunContainerService/kube-eventer/sinks/elasticsearch" "github.com/AliyunContainerService/kube-eventer/sinks/eventbridge" @@ -65,6 +66,8 @@ func (this *SinkFactory) Build(uri flags.Uri) (core.EventSink, error) { return eventbridge.NewEventBridgeSink(&uri.Val) case "mongo": return mongo.CreateMongoSink(&uri.Val) + case "cms": + return cms.NewCmsSink(&uri.Val, nil) default: return nil, fmt.Errorf("Sink not recognized: %s", uri.Key) } From 422c455849f179babe202ed000ae6c8f3a839d52 Mon Sep 17 00:00:00 2001 From: hcj116 Date: Mon, 25 Jul 2022 11:53:33 +0800 Subject: [PATCH 2/4] bug fix --- sinks/eventbridge/driver.go | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/sinks/eventbridge/driver.go b/sinks/eventbridge/driver.go index 7ecd35e5..1de49c1c 100644 --- a/sinks/eventbridge/driver.go +++ b/sinks/eventbridge/driver.go @@ -208,20 +208,16 @@ func (ebSink *eventBridgeSink) newClient() (*eventbridge.Client, error) { endpoint := ebSink.parseEventBridgeEndpoint(region) - akInfo, err := utils.ParseAKInfoFromConfigPath() - if err != nil { - akInfo = &utils.AKInfo{} - if ebSink.accessKeyId != "" && ebSink.accessKeySecret != "" { - akInfo.AccessKeyId = ebSink.accessKeyId - akInfo.AccessKeySecret = ebSink.accessKeySecret - } else { - akInfoInMeta, err := utils.ParseAKInfoFromMeta() - if err != nil { - klog.Errorf("failed to get RamRoleToken,because of %v", err) - return nil, err - } - akInfo = akInfoInMeta - } + akInfo := &utils.AKInfo{} + if ebSink.accessKeyId != "" && ebSink.accessKeySecret != "" { + akInfo.AccessKeyId = ebSink.accessKeyId + akInfo.AccessKeySecret = ebSink.accessKeySecret + } else { + akInfo = utils.GetAkInfo(utils.SLSConfigPath) + } + if akInfo == nil { + klog.Errorf("get eb akInfo error.") + return nil, errors.New("get eb akInfo error") } config := &eventbridge.Config{} From a8c121df2481a22ad8d8b33a0a65ee6dca19e08e Mon Sep 17 00:00:00 2001 From: hcj116 Date: Mon, 8 Aug 2022 17:48:58 +0800 Subject: [PATCH 3/4] add resourceId --- sinks/cms/cms.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sinks/cms/cms.go b/sinks/cms/cms.go index cd334e64..46132946 100644 --- a/sinks/cms/cms.go +++ b/sinks/cms/cms.go @@ -276,15 +276,15 @@ func getEventTime(event *k8s.Event, now func() time.Time) string { func (d *tagCmsSink) ConvertToSysEvent(event *k8s.Event) (r *SystemEvent) { r = &SystemEvent{ - Product: Product, - EventType: event.Reason, - Name: event.GetName(), - EventTime: getEventTime(event, time.Now), - GroupId: "0", // 跟昱杰沟通,此处先填0,以后如果需要groupId,再升级插件。 - Resource: string(event.InvolvedObject.UID), - // ResourceId: "acs:" + Product + ":" + d.regionId + "::uuid/" + string(event.InvolvedObject.UID), - Level: d.level, - Status: event.Type, + Product: Product, + EventType: event.Reason, + Name: event.GetName(), + EventTime: getEventTime(event, time.Now), + GroupId: "0", // 跟昱杰沟通,此处先填0,以后如果需要groupId,再升级插件。 + Resource: string(event.InvolvedObject.UID), + ResourceId: "acs:" + Product + ":" + d.regionId + "::uuid/" + string(event.InvolvedObject.UID), + Level: d.level, + Status: event.Type, // UserId: "", // Tags: "", RegionId: d.regionId, From e839d30d0c7a39a1be8c78d7f3216f7bd98ffb0e Mon Sep 17 00:00:00 2001 From: "shichun.fsc" Date: Mon, 15 Aug 2022 15:13:21 +0800 Subject: [PATCH 4/4] cms sink init with userId config. --- sinks/cms/cms.go | 53 ++++++++++++++++++++++++++++--------------- sinks/cms/cms_test.go | 14 ++++++------ 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/sinks/cms/cms.go b/sinks/cms/cms.go index 46132946..90375f16 100644 --- a/sinks/cms/cms.go +++ b/sinks/cms/cms.go @@ -151,9 +151,8 @@ type tagCmsSink struct { // 环式缓存 SysEventRing - regionId string - level string - client IClient + config Config + client IClient } func (*tagCmsSink) Name() string { @@ -282,13 +281,13 @@ func (d *tagCmsSink) ConvertToSysEvent(event *k8s.Event) (r *SystemEvent) { EventTime: getEventTime(event, time.Now), GroupId: "0", // 跟昱杰沟通,此处先填0,以后如果需要groupId,再升级插件。 Resource: string(event.InvolvedObject.UID), - ResourceId: "acs:" + Product + ":" + d.regionId + "::uuid/" + string(event.InvolvedObject.UID), - Level: d.level, + ResourceId: "acs:" + Product + ":" + d.config.regionId + "::uuid/" + string(event.InvolvedObject.UID), + Level: d.config.level, Status: event.Type, - // UserId: "", + UserId: d.config.userId, + RegionId: d.config.regionId, + Time: time.Now().Format(EventTimeLayout), // Tags: "", - RegionId: d.regionId, - Time: time.Now().Format(EventTimeLayout), } if jsonBytes, err := json.Marshal(event); err == nil { @@ -302,12 +301,15 @@ type Config struct { endPoint string accessKeyId string accessKeySecret string - regionId string level string + + regionId string + clusterId string + userId string } // ParseConfig create config from uri -func ParseConfig(uri *url.URL) *Config { +func ParseConfig(uri *url.URL) (*Config, error) { c := &Config{} opts := uri.Query() @@ -325,7 +327,25 @@ func ParseConfig(uri *url.URL) *Config { return def } + c.clusterId = doGet("clusterId", "ClusterId", "") c.regionId = doGet("regionId", "RegionId", "") + if c.regionId == "" { + metaRegionId, err := utils.ParseRegionFromMeta() + if err != nil { + // region from meta data + klog.Errorf("cms init. failed to get Region from metadata, use the default regionId, because of %v", err) + c.regionId = DefaultRegionId + } else { + c.regionId = metaRegionId + } + } + c.userId = doGet("userId", "UserId", "") + if c.userId == "" { + // no userId. exist process. + klog.Errorf("cms init. failed to get UserId.") + return nil, errors.New("failed to get userId when cms init") + } + c.accessKeyId = doGet("accessKeyId", "AccessKeyId", "") c.accessKeySecret = doGet("accessKeySecret", "AccessKeySecret", "") c.level = doGet("level", "", "INFO") @@ -337,11 +357,10 @@ func ParseConfig(uri *url.URL) *Config { c.regionId = endPoint.RegionId } } - if c.regionId == "" { - c.regionId = DefaultRegionId - } - return c + klog.Infof("init cms sink. config: %v", *c) + + return c, nil } func GetRegion(c *Config, fnParseRegionFromMeta func() (string, error)) (regionId string, err error) { @@ -410,10 +429,8 @@ type tagCmsSinkOpt struct { // NewCmsSink Usage: // --sink=cms:http://metrichub-[your_region_id].aliyun-inc.com?regionId=[your_region_id]&accessKeyId=[your_access_key]&accessKeySecret=[you_access_secret]&level=[alert_level] func NewCmsSink(uri *url.URL, opt *tagCmsSinkOpt) (r core.EventSink, err error) { - c := ParseConfig(uri) - - sink := &tagCmsSink{level: c.level} - sink.regionId, err = GetRegion(c, utils.ParseRegionFromMeta) + c, err := ParseConfig(uri) + sink := &tagCmsSink{config: *c} if err == nil { var client *Client if client, err = newClient(c); err == nil { diff --git a/sinks/cms/cms_test.go b/sinks/cms/cms_test.go index 9df8b6a5..927d36b9 100644 --- a/sinks/cms/cms_test.go +++ b/sinks/cms/cms_test.go @@ -130,7 +130,7 @@ func TestNewCmsSing(t *testing.T) { assert.Equal(t, "CmsSink", sink.Name()) cmsSink := sink.(*tagCmsSink) assert.NotNil(t, cmsSink.buf) - assert.Equal(t, "cn-beijing", cmsSink.regionId) + assert.Equal(t, "cn-beijing", cmsSink.config.regionId) } func TestGetEnvTime(t *testing.T) { @@ -243,26 +243,26 @@ func TestNewClient_Error(t *testing.T) { } func TestParseConfig(t *testing.T) { - params, err := url.Parse(`https://unknown?accessKeyId=1&accessKeySecret=2`) + params, err := url.Parse(`https://unknown?regionId=cn-hangzhou&userId=123&accessKeyId=1&accessKeySecret=2`) assert.NoError(t, err) - c := ParseConfig(params) + c, _ := ParseConfig(params) assert.Contains(t, c.endPoint, "://") assert.Contains(t, c.endPoint, "cn-hangzhou") assert.Equal(t, "cn-hangzhou", c.regionId) } func TestParseConfig2(t *testing.T) { - params, err := url.Parse(`https://host.com/?accessKeyId=1&accessKeySecret=2`) + params, err := url.Parse(`https://host.com/?regionId=cn-hangzhou&userId=123accessKeyId=1&accessKeySecret=2`) assert.NoError(t, err) - c := ParseConfig(params) + c, _ := ParseConfig(params) assert.Equal(t, c.endPoint, `https://host.com`) assert.Equal(t, "cn-hangzhou", c.regionId) } func TestParseConfig3(t *testing.T) { - params, err := url.Parse(`?accessKeyId=1&accessKeySecret=2`) + params, err := url.Parse(`?regionId=cn-hangzhou&userId=123accessKeyId=1&accessKeySecret=2`) assert.NoError(t, err) - c := ParseConfig(params) + c, _ := ParseConfig(params) assert.Contains(t, c.endPoint, "cn-hangzhou") assert.Equal(t, DefaultRegionId, c.regionId) }