From 277f0323dbf473ad0bb23865b3ce9723fa07f65a Mon Sep 17 00:00:00 2001 From: lychung83 Date: Fri, 31 Aug 2018 17:30:56 -0700 Subject: [PATCH 1/6] stackdriver exporter implemented --- monitoring/exporter/data_test.go | 126 +++++++++++ monitoring/exporter/exporter.go | 232 ++++++++++++++++++++ monitoring/exporter/exporter_test.go | 265 +++++++++++++++++++++++ monitoring/exporter/mock_check_test.go | 245 +++++++++++++++++++++ monitoring/exporter/project_data.go | 159 ++++++++++++++ monitoring/exporter/row_data_to_point.go | 106 +++++++++ 6 files changed, 1133 insertions(+) create mode 100644 monitoring/exporter/data_test.go create mode 100644 monitoring/exporter/exporter.go create mode 100644 monitoring/exporter/exporter_test.go create mode 100644 monitoring/exporter/mock_check_test.go create mode 100644 monitoring/exporter/project_data.go create mode 100644 monitoring/exporter/row_data_to_point.go diff --git a/monitoring/exporter/data_test.go b/monitoring/exporter/data_test.go new file mode 100644 index 000000000..41b6d0d76 --- /dev/null +++ b/monitoring/exporter/data_test.go @@ -0,0 +1,126 @@ +package exporter + +import ( + "context" + "errors" + "fmt" + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" +) + +// This file defines various data needed for testing. + +func init() { + // For testing convenience, we reduce maximum time series that metric client accepts. + MaxTimeSeriesPerUpload = 3 +} + +const ( + label1name = "key_1" + label2name = "key_2" + label3name = "key_3" + label4name = "key_4" + label5name = "key_5" + + value1 = "value_1" + value2 = "value_2" + value3 = "value_3" + value4 = "value_4" + value5 = "value_5" + value6 = "value_6" + + metric1name = "metric_1" + metric1desc = "this is metric 1" + metric2name = "metric_2" + metric2desc = "this is metric 2" + + project1 = "project-1" + project2 = "project-2" +) + +var ( + ctx = context.Background() + + // This error is used for test to catch some error happpened. + invalidDataError = errors.New("invalid data") + // This error is used for unexpected error. + unrecognizedDataError = errors.New("unrecognized data") + + key1 = getKey(label1name) + key2 = getKey(label2name) + key3 = getKey(label3name) + + view1 = &view.View{ + Name: metric1name, + Description: metric1desc, + TagKeys: nil, + Measure: stats.Int64(metric1name, metric1desc, stats.UnitDimensionless), + Aggregation: view.Sum(), + } + view2 = &view.View{ + Name: metric2name, + Description: metric2desc, + TagKeys: []tag.Key{key1, key2, key3}, + Measure: stats.Int64(metric2name, metric2desc, stats.UnitDimensionless), + Aggregation: view.Sum(), + } + + // To make verification easy, we require all valid rows should int64 values and all of them + // must be distinct. + view1row1 = &view.Row{ + Tags: nil, + Data: &view.SumData{Value: 1}, + } + view1row2 = &view.Row{ + Tags: nil, + Data: &view.SumData{Value: 2}, + } + view1row3 = &view.Row{ + Tags: nil, + Data: &view.SumData{Value: 3}, + } + view2row1 = &view.Row{ + Tags: []tag.Tag{{key1, value1}, {key2, value2}, {key3, value3}}, + Data: &view.SumData{Value: 4}, + } + view2row2 = &view.Row{ + Tags: []tag.Tag{{key1, value4}, {key2, value5}, {key3, value6}}, + Data: &view.SumData{Value: 5}, + } + // This Row does not have valid Data field, so is invalid. + invalidRow = &view.Row{Data: nil} + + startTime1 = endTime1.Add(-10 * time.Second) + endTime1 = startTime2.Add(-time.Second) + startTime2 = endTime2.Add(-10 * time.Second) + endTime2 = time.Now() + + resource1 = &monitoredrespb.MonitoredResource{ + Type: "cloudsql_database", + Labels: map[string]string{ + "project_id": project1, + "region": "us-central1", + "database_id": "cloud-SQL-instance-1", + }, + } + resource2 = &monitoredrespb.MonitoredResource{ + Type: "gce_instance", + Labels: map[string]string{ + "project_id": project2, + "zone": "us-east1", + "database_id": "GCE-instance-1", + }, + } +) + +func getKey(name string) tag.Key { + key, err := tag.NewKey(name) + if err != nil { + panic(fmt.Errorf("key creation failed for key name: %s", name)) + } + return key +} diff --git a/monitoring/exporter/exporter.go b/monitoring/exporter/exporter.go new file mode 100644 index 000000000..3772d348c --- /dev/null +++ b/monitoring/exporter/exporter.go @@ -0,0 +1,232 @@ +// Package exporter provides a way to export data from opencensus to multiple GCP projects. +// +// General assumptions or requirements when using this exporter. +// 1. The basic unit of data is a view.Data with only a single view.Row. We define it as a separate +// type called RowData. +// 2. We can inspect each RowData to tell whether this RowData is applicable for this exporter. +// 3. For RowData that is applicable to this exporter, we require that +// 3.1. Any view associated to RowData corresponds to a stackdriver metric, and it is already +// defined for all GCP projects. +// 3.2. RowData has correcponding GCP projects, and we can determine its project ID. +// 3.3. After trimming labels and tags, configuration of all view data matches that of corresponding +// stackdriver metric +package exporter + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + monitoring "cloud.google.com/go/monitoring/apiv3" + gax "github.com/googleapis/gax-go" + "go.opencensus.io/stats/view" + "google.golang.org/api/option" + "google.golang.org/api/support/bundler" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +// StatsExporter is the exporter that can be registered to opencensus. A StatsExporter object must +// be created by NewStatsExporter(). +type StatsExporter struct { + ctx context.Context + client metricClient + opts *Options + + // copy of some option values which may be modified by exporter. + getProjectID func(*RowData) (string, error) + onError func(error, ...*RowData) + makeResource func(*RowData) (*monitoredrespb.MonitoredResource, error) + + // mu protects access to projDataMap + mu sync.Mutex + // per-project data of exporter + projDataMap map[string]*projectData +} + +// Options designates various parameters used by stats exporter. Default value of fields in Options +// are valid for use. +type Options struct { + // ClientOptions designates options for creating metric client, especially credentials for + // RPC calls. + ClientOptions []option.ClientOption + + // options for bundles amortizing export requests. Note that a bundle is created for each + // project. When not provided, default values in bundle package are used. + BundleDelayThreshold time.Duration + BundleCountThreshold int + + // callback functions provided by user. + + // GetProjectID is used to filter whether given row data can be applicable to this exporter + // and if so, it also determines the projectID of given row data. If + // RowDataNotApplicableError is returned, then the row data is not applicable to this + // exporter, and it will be silently ignored. Though not recommended, other errors can be + // returned, and in that case the error is reported to callers via OnError and the row data + // will not be uploaded to stackdriver. When GetProjectID is not set, all row data will be + // considered not applicable to this exporter. + GetProjectID func(*RowData) (projectID string, err error) + // OnError is used to report any error happened while exporting view data fails. Whenever + // this function is called, it's guaranteed that at least one row data is also passed to + // OnError. Row data passed to OnError must not be modified. When OnError is not set, all + // errors happened on exporting are ignored. + OnError func(error, ...*RowData) + // MakeResource creates monitored resource from RowData. It is guaranteed that only RowData + // that passes GetProjectID will be given to this function. Though not recommended, error + // can be returned, and in that case the error is reported to callers via OnError and the + // row data will not be uploaded to stackdriver. When MakeResource is not set, global + // resource is used for all RowData objects. + MakeResource func(rd *RowData) (*monitoredrespb.MonitoredResource, error) + + // options concerning labels. + + // DefaultLabels store default value of some labels. Labels in DefaultLabels need not be + // specified in tags of view data. Default labels and tags of view may have overlapping + // label keys. In this case, values in tag are used. Default labels are used for labels + // those are constant throughout export operation, like version number of the calling + // program. + DefaultLabels map[string]string + // UnexportedLabels contains key of labels that will not be exported stackdriver. Typical + // uses of unexported labels will be either that marks project ID, or that's used only for + // constructing resource. + UnexportedLabels []string +} + +// default values for options +func defaultGetProjectID(rd *RowData) (string, error) { + return "", RowDataNotApplicableError +} + +func defaultOnError(err error, rds ...*RowData) {} + +func defaultMakeResource(rd *RowData) (*monitoredrespb.MonitoredResource, error) { + return &monitoredrespb.MonitoredResource{Type: "global"}, nil +} + +// NewStatsExporter creates a StatsExporter object. Once a call to NewStatsExporter is made, any +// fields in opts must not be modified at all. ctx will also be used throughout entire exporter +// operation when making RPC call. +func NewStatsExporter(ctx context.Context, opts *Options) (*StatsExporter, error) { + client, err := newMetricClient(ctx, opts.ClientOptions...) + if err != nil { + return nil, fmt.Errorf("failed to create a metric client: %v", err) + } + + e := &StatsExporter{ + ctx: ctx, + client: client, + opts: opts, + projDataMap: make(map[string]*projectData), + } + + // We don't want to modify user-supplied options, so save default options directly in + // exporter. + if opts.GetProjectID != nil { + e.getProjectID = opts.GetProjectID + } else { + e.getProjectID = defaultGetProjectID + } + if opts.OnError != nil { + e.onError = opts.OnError + } else { + e.onError = defaultOnError + } + if opts.MakeResource != nil { + e.makeResource = opts.MakeResource + } else { + e.makeResource = defaultMakeResource + } + + return e, nil +} + +// We wrap monitoring.MetricClient and it's maker for testing. +type metricClient interface { + CreateTimeSeries(context.Context, *monitoringpb.CreateTimeSeriesRequest, ...gax.CallOption) error + Close() error +} + +var newMetricClient = defaultNewMetricClient + +func defaultNewMetricClient(ctx context.Context, opts ...option.ClientOption) (metricClient, error) { + return monitoring.NewMetricClient(ctx, opts...) +} + +// RowData represents a single row in view data. This is our unit of computation. We use a single +// row instead of view data because a view data consists of multiple rows, and each row may belong +// to different projects. +type RowData struct { + View *view.View + Start, End time.Time + Row *view.Row +} + +// ExportView is the method called by opencensus to export view data. It constructs RowData out of +// view.Data objects. +func (e *StatsExporter) ExportView(vd *view.Data) { + for _, row := range vd.Rows { + rd := &RowData{ + View: vd.View, + Start: vd.Start, + End: vd.End, + Row: row, + } + e.exportRowData(rd) + } +} + +// RowDataNotApplicableError is used to tell that given row data is not applicable to the exporter. +// See GetProjectID of Options for more detail. +var RowDataNotApplicableError = errors.New("row data is not applicable to the exporter, so it will be ignored") + +// exportRowData exports a single row data. +func (e *StatsExporter) exportRowData(rd *RowData) { + projID, err := e.getProjectID(rd) + if err != nil { + // We ignore non-applicable RowData. + if err != RowDataNotApplicableError { + newErr := fmt.Errorf("failed to get project ID on row data with view %s: %v", rd.View.Name, err) + e.onError(newErr, rd) + } + return + } + pd := e.getProjectData(projID) + switch err := pd.bndler.Add(rd, 1); err { + case nil: + case bundler.ErrOversizedItem: + go pd.uploadRowData(rd) + default: + newErr := fmt.Errorf("failed to add row data with view %s to bundle for project %s: %v", rd.View.Name, projID, err) + e.onError(newErr, rd) + } +} + +func (e *StatsExporter) getProjectData(projectID string) *projectData { + e.mu.Lock() + defer e.mu.Unlock() + if pd, ok := e.projDataMap[projectID]; ok { + return pd + } + + pd := e.newProjectData(projectID) + e.projDataMap[projectID] = pd + return pd +} + +// Close flushes and closes the exporter. Close must be called after the exporter is unregistered +// and no further calls to ExportView() are made. Once Close() is returned no further access to the +// exporter is allowed in any way. +func (e *StatsExporter) Close() error { + e.mu.Lock() + for _, pd := range e.projDataMap { + pd.bndler.Flush() + } + e.mu.Unlock() + + if err := e.client.Close(); err != nil { + return fmt.Errorf("failed to close the metric client: %v", err) + } + return nil +} diff --git a/monitoring/exporter/exporter_test.go b/monitoring/exporter/exporter_test.go new file mode 100644 index 000000000..447f134fa --- /dev/null +++ b/monitoring/exporter/exporter_test.go @@ -0,0 +1,265 @@ +package exporter + +import ( + "fmt" + "testing" + + "go.opencensus.io/stats/view" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" +) + +// This file contains actual tests. + +// TestProjectClassifyNoError tests that exporter can recognize and distribute incoming data by +// its project. +func TestProjectClassifyNoError(t *testing.T) { + viewData1 := &view.Data{ + View: view1, + Start: startTime1, + End: endTime1, + Rows: []*view.Row{view1row1, view1row2}, + } + viewData2 := &view.Data{ + View: view2, + Start: startTime2, + End: endTime2, + Rows: []*view.Row{view2row1}, + } + + getProjectID := func(rd *RowData) (string, error) { + switch rd.Row { + case view1row1, view2row1: + return project1, nil + case view1row2: + return project2, nil + default: + return "", unrecognizedDataError + } + } + + exp, errStore := newMockExp(t, &Options{GetProjectID: getProjectID}) + exp.ExportView(viewData1) + exp.ExportView(viewData2) + + wantRowData := map[string][]*RowData{ + project1: []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view2, startTime2, endTime2, view2row1}, + }, + project2: []*RowData{ + {view1, startTime1, endTime1, view1row2}, + }, + } + checkErrStorage(t, errStore, nil) + checkExpProjData(t, exp, wantRowData) +} + +// TestProjectClassifyError tests that exporter can properly handle errors while classifying +// incoming data by its project. +func TestProjectClassifyError(t *testing.T) { + viewData1 := &view.Data{ + View: view1, + Start: startTime1, + End: endTime1, + Rows: []*view.Row{view1row1, view1row2}, + } + viewData2 := &view.Data{ + View: view2, + Start: startTime2, + End: endTime2, + Rows: []*view.Row{view2row1, view2row2}, + } + + getProjectID := func(rd *RowData) (string, error) { + switch rd.Row { + case view1row1, view2row2: + return project1, nil + case view1row2: + return "", RowDataNotApplicableError + case view2row1: + return "", invalidDataError + default: + return "", unrecognizedDataError + } + } + + exp, errStore := newMockExp(t, &Options{GetProjectID: getProjectID}) + exp.ExportView(viewData1) + exp.ExportView(viewData2) + + wantErrRdCheck := []errRowDataCheck{ + { + errPrefix: "failed to get project ID", + errSuffix: invalidDataError.Error(), + rds: []*RowData{{view2, startTime2, endTime2, view2row1}}, + }, + } + wantRowData := map[string][]*RowData{ + project1: []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view2, startTime2, endTime2, view2row2}, + }, + } + checkErrStorage(t, errStore, wantErrRdCheck) + checkExpProjData(t, exp, wantRowData) +} + +// +func TestUploadNoError(t *testing.T) { + pd, cl, errStore := newMockUploader(t, &Options{}) + rd := []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view1, startTime1, endTime1, view1row2}, + {view1, startTime1, endTime1, view1row3}, + {view2, startTime2, endTime2, view2row1}, + {view2, startTime2, endTime2, view2row2}, + } + pd.uploadRowData(rd) + + checkErrStorage(t, errStore, nil) + wantClData := [][]int64{ + {1, 2, 3}, + {4, 5}, + } + checkMetricClient(t, cl, wantClData) +} + +// TestUploadTimeSeriesMakeError tests that errors while creating time series are properly handled. +func TestUploadTimeSeriesMakeError(t *testing.T) { + makeResource := func(rd *RowData) (*monitoredrespb.MonitoredResource, error) { + if rd.Row == view1row2 { + return nil, invalidDataError + } + return defaultMakeResource(rd) + } + pd, cl, errStore := newMockUploader(t, &Options{MakeResource: makeResource}) + rd := []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view1, startTime1, endTime1, view1row2}, + {view1, startTime1, endTime1, view1row3}, + // This row data is invalid, so it will trigger inconsistent data error. + {view2, startTime2, endTime2, invalidRow}, + {view2, startTime2, endTime2, view2row1}, + {view2, startTime2, endTime2, view2row2}, + } + pd.uploadRowData(rd) + + wantErrRdCheck := []errRowDataCheck{ + { + errPrefix: "failed to construct resource", + errSuffix: invalidDataError.Error(), + rds: []*RowData{{view1, startTime1, endTime1, view1row2}}, + }, { + errPrefix: "inconsistent data found in view", + errSuffix: metric2name, + rds: []*RowData{{view2, startTime2, endTime2, invalidRow}}, + }, + } + checkErrStorage(t, errStore, wantErrRdCheck) + + wantClData := [][]int64{ + {1, 3, 4}, + {5}, + } + checkMetricClient(t, cl, wantClData) +} + +// TestUploadTimeSeriesMakeError tests that exporter can handle error on metric client's time +// series create RPC call. +func TestUploadWithMetricClientError(t *testing.T) { + pd, cl, errStore := newMockUploader(t, &Options{}) + cl.addReturnErrs(invalidDataError) + rd := []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view1, startTime1, endTime1, view1row2}, + {view1, startTime1, endTime1, view1row3}, + {view2, startTime2, endTime2, view2row1}, + {view2, startTime2, endTime2, view2row2}, + } + pd.uploadRowData(rd) + + wantErrRdCheck := []errRowDataCheck{ + { + errPrefix: "RPC call to create time series failed", + errSuffix: invalidDataError.Error(), + rds: []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view1, startTime1, endTime1, view1row2}, + {view1, startTime1, endTime1, view1row3}, + }, + }, + } + checkErrStorage(t, errStore, wantErrRdCheck) + + wantClData := [][]int64{ + {1, 2, 3}, + {4, 5}, + } + checkMetricClient(t, cl, wantClData) +} + +// TestMakeResource tests that exporter can create monitored resource dynamically. +func TestMakeResource(t *testing.T) { + makeResource := func(rd *RowData) (*monitoredrespb.MonitoredResource, error) { + switch rd.Row { + case view1row1: + return resource1, nil + case view1row2: + return resource2, nil + default: + return nil, unrecognizedDataError + } + } + pd, cl, errStore := newMockUploader(t, &Options{MakeResource: makeResource}) + rd := []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view1, startTime1, endTime1, view1row2}, + } + pd.uploadRowData(rd) + checkErrStorage(t, errStore, nil) + checkMetricClient(t, cl, [][]int64{{1, 2}}) + + tsArr := cl.reqs[0].TimeSeries + for i, wantResource := range []*monitoredrespb.MonitoredResource{resource1, resource2} { + if resource := tsArr[i].Resource; resource != wantResource { + t.Errorf("%d-th time series resource got: %#v, want: %#v", i, resource, wantResource) + } + } +} + +// TestMakeLabel tests that exporter can correctly handle label manipulation process, including +// merging default label with tags, and removing unexported labels. +func TestMakeLabel(t *testing.T) { + opts := &Options{ + DefaultLabels: map[string]string{ + label1name: value4, + label4name: value5, + }, + UnexportedLabels: []string{label3name, label5name}, + } + pd, cl, errStore := newMockUploader(t, opts) + rd := []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view2, startTime2, endTime2, view2row1}, + } + pd.uploadRowData(rd) + checkErrStorage(t, errStore, nil) + checkMetricClient(t, cl, [][]int64{{1, 4}}) + + wantLabels1 := map[string]string{ + label1name: value4, + label4name: value5, + } + wantLabels2 := map[string]string{ + // default value for key1 is suppressed, and value defined in tag of view2row1 is + // used. + label1name: value1, + label2name: value2, + label4name: value5, + } + tsArr := cl.reqs[0].TimeSeries + for i, wantLabels := range []map[string]string{wantLabels1, wantLabels2} { + prefix := fmt.Sprintf("%d-th time series labels mismatch", i+1) + checkLabels(t, prefix, tsArr[i].Metric.Labels, wantLabels) + } +} diff --git a/monitoring/exporter/mock_check_test.go b/monitoring/exporter/mock_check_test.go new file mode 100644 index 000000000..300ea2eed --- /dev/null +++ b/monitoring/exporter/mock_check_test.go @@ -0,0 +1,245 @@ +package exporter + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + gax "github.com/googleapis/gax-go" + "google.golang.org/api/option" + monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +// This file defines various mocks for testing, and checking functions for mocked data. We mock +// metric client and bundler because their actions involves RPC calls or non-deterministic behavior. + +func init() { + newMetricClient = mockNewMetricClient + newExpBundler = mockNewExpBundler +} + +// We define mock Client. + +type mockMetricClient struct { + // returnErrs holds predefined error values to return. Each errors in returnErrs are + // returned per each CreateTimeSeries() call. If all errors in returnErrs are used, all + // other CreateTimeSeries calls will return nil. + returnErrs []error + // reqs saves all incoming requests. + reqs []*monitoringpb.CreateTimeSeriesRequest +} + +func (cl *mockMetricClient) CreateTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest, opts ...gax.CallOption) error { + cl.reqs = append(cl.reqs, req) + // Check returnErrs and if not empty, return the first error from it. + if len(cl.returnErrs) == 0 { + return nil + } + err := cl.returnErrs[0] + // delete the returning error. + cl.returnErrs = cl.returnErrs[1:] + return err +} + +func (cl *mockMetricClient) Close() error { + return nil +} + +func (cl *mockMetricClient) addReturnErrs(errs ...error) { + cl.returnErrs = append(cl.returnErrs, errs...) +} + +func mockNewMetricClient(_ context.Context, _ ...option.ClientOption) (metricClient, error) { + return &mockMetricClient{}, nil +} + +// checkMetricClient checks all recorded requests in mock metric client. We only compare int64 +// values of the time series. To make this work, we assigned different int64 values for all valid +// rows in the test. +func checkMetricClient(t *testing.T, cl *mockMetricClient, wantReqsValues [][]int64) { + reqs := cl.reqs + reqsLen, wantReqsLen := len(reqs), len(wantReqsValues) + if reqsLen != wantReqsLen { + t.Errorf("number of requests got: %d, want %d", reqsLen, wantReqsLen) + return + } + for i := 0; i < reqsLen; i++ { + prefix := fmt.Sprintf("%d-th request mismatch", i+1) + tsArr := reqs[i].TimeSeries + wantTsValues := wantReqsValues[i] + tsArrLen, wantTsArrLen := len(tsArr), len(wantTsValues) + if tsArrLen != wantTsArrLen { + t.Errorf("%s: number of time series got: %d, want: %d", prefix, tsArrLen, wantTsArrLen) + continue + } + for j := 0; j < tsArrLen; j++ { + // This is how monitoring API stores the int64 value. + tsVal := tsArr[j].Points[0].Value.Value.(*monitoringpb.TypedValue_Int64Value).Int64Value + wantTsVal := wantTsValues[j] + if tsVal != wantTsVal { + t.Errorf("%s: Value got: %d, want: %d", prefix, tsVal, wantTsVal) + } + } + } +} + +// We define mock bundler. + +type mockBundler struct { + // rowDataArr saves all incoming RowData to the bundler. + rowDataArr []*RowData +} + +func (b *mockBundler) Add(rowData interface{}, _ int) error { + b.rowDataArr = append(b.rowDataArr, rowData.(*RowData)) + return nil +} + +func (b *mockBundler) Flush() {} + +func mockNewExpBundler(_ func(interface{}), _ time.Duration, _ int) expBundler { + return &mockBundler{} +} + +// We define a storage for all errors happened in export operation. + +type errStorage struct { + errRds []errRowData +} + +type errRowData struct { + err error + rds []*RowData +} + +// onError records any incoming error and accompanying RowData array. This method is passed to the +// exporter to record errors. +func (e *errStorage) onError(err error, rds ...*RowData) { + e.errRds = append(e.errRds, errRowData{err, rds}) +} + +// errRowDataCheck contains data for checking content of error storage. +type errRowDataCheck struct { + errPrefix, errSuffix string + rds []*RowData +} + +// checkErrStorage checks content of error storage. For returned errors, we check prefix and suffix. +func checkErrStorage(t *testing.T, errStore *errStorage, wantErrRdCheck []errRowDataCheck) { + errRds := errStore.errRds + gotLen, wantLen := len(errRds), len(wantErrRdCheck) + if gotLen != wantLen { + t.Errorf("number of reported errors: %d, want: %d", gotLen, wantLen) + return + } + for i := 0; i < gotLen; i++ { + prefix := fmt.Sprintf("%d-th reported error mismatch", i+1) + errRd, wantErrRd := errRds[i], wantErrRdCheck[i] + errStr := errRd.err.Error() + if errPrefix := wantErrRd.errPrefix; !strings.HasPrefix(errStr, errPrefix) { + t.Errorf("%s: error got: %q, want: prefixed by %q", prefix, errStr, errPrefix) + } + if errSuffix := wantErrRd.errSuffix; !strings.HasSuffix(errStr, errSuffix) { + t.Errorf("%s: error got: %q, want: suffiexd by %q", prefix, errStr, errSuffix) + } + if err := checkRowDataArr(errRd.rds, wantErrRd.rds); err != nil { + t.Errorf("%s: RowData array mismatch: %v", prefix, err) + } + } +} + +func checkRowDataArr(rds, wantRds []*RowData) error { + rdLen, wantRdLen := len(rds), len(wantRds) + if rdLen != wantRdLen { + return fmt.Errorf("number row data got: %d, want: %d", rdLen, wantRdLen) + } + for i := 0; i < rdLen; i++ { + if err := checkRowData(rds[i], wantRds[i]); err != nil { + return fmt.Errorf("%d-th row data mismatch: %v", i+1, err) + } + } + return nil +} + +func checkRowData(rd, wantRd *RowData) error { + if rd.View != wantRd.View { + return fmt.Errorf("View got: %s, want: %s", rd.View.Name, wantRd.View.Name) + } + if rd.Start != wantRd.Start { + return fmt.Errorf("Start got: %v, want: %v", rd.Start, wantRd.Start) + } + if rd.End != wantRd.End { + return fmt.Errorf("End got: %v, want: %v", rd.End, wantRd.End) + } + if rd.Row != wantRd.Row { + return fmt.Errorf("Row got: %v, want: %v", rd.Row, wantRd.Row) + } + return nil +} + +// newMockExp creates mock expoter and error storage storing all errors. Caller need not set +// opts.OnError. +func newMockExp(t *testing.T, opts *Options) (*StatsExporter, *errStorage) { + errStore := &errStorage{} + opts.OnError = errStore.onError + exp, err := NewStatsExporter(ctx, opts) + if err != nil { + t.Fatalf("creating exporter failed: %v", err) + } + return exp, errStore +} + +// checkExpProjData checks all data passed to the bundler by bundler.Add(). +func checkExpProjData(t *testing.T, exp *StatsExporter, wantProjData map[string][]*RowData) { + wantProj := map[string]bool{} + for proj := range wantProjData { + wantProj[proj] = true + } + for proj := range exp.projDataMap { + if !wantProj[proj] { + t.Errorf("project in exporter's project data not wanted: %s", proj) + } + } + + for proj, wantRds := range wantProjData { + pd, ok := exp.projDataMap[proj] + if !ok { + t.Errorf("wanted project not found in exporter's project data: %v", proj) + continue + } + // We know that bundler is mocked, so we can check the data in it. + if err := checkRowDataArr(pd.bndler.(*mockBundler).rowDataArr, wantRds); err != nil { + t.Errorf("RowData array mismatch for project %s: %v", proj, err) + } + } +} + +// newMockUploader creates objects to test behavior of projectData.uploadRowData. Other uses are not +// recommended. +func newMockUploader(t *testing.T, opts *Options) (*projectData, *mockMetricClient, *errStorage) { + exp, errStore := newMockExp(t, opts) + pd := exp.newProjectData(project1) + cl := exp.client.(*mockMetricClient) + return pd, cl, errStore +} + +// checkLabels checks data in labels. +func checkLabels(t *testing.T, prefix string, labels, wantLabels map[string]string) { + for labelName, value := range labels { + wantValue, ok := wantLabels[labelName] + if !ok { + t.Errorf("%s: label name in time series not wanted: %s", prefix, labelName) + continue + } + if value != wantValue { + t.Errorf("%s: value for label name %s got: %s, want: %s", prefix, labelName, value, wantValue) + } + } + for wantLabelName := range wantLabels { + if _, ok := labels[wantLabelName]; !ok { + t.Errorf("%s: wanted label name not found in time series: %s", prefix, wantLabelName) + } + } +} diff --git a/monitoring/exporter/project_data.go b/monitoring/exporter/project_data.go new file mode 100644 index 000000000..7464193cc --- /dev/null +++ b/monitoring/exporter/project_data.go @@ -0,0 +1,159 @@ +package exporter + +import ( + "fmt" + "time" + + "go.opencensus.io/tag" + "google.golang.org/api/support/bundler" + metricpb "google.golang.org/genproto/googleapis/api/metric" + monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +// maximum number of time series that stackdriver accepts. Only test may change this value. +var MaxTimeSeriesPerUpload = 200 + +// projectData contain per-project data in exporter. It should be created by newProjectData() +type projectData struct { + parent *StatsExporter + projectID string + // We make bundler for each project because call to monitoring RPC can be grouped only in + // project level + bndler expBundler +} + +// We wrap bundler and its maker for testing purpose. +type expBundler interface { + Add(interface{}, int) error + Flush() +} + +var newExpBundler = defaultNewExpBundler + +// Since options in bundler are directly set to its fields and interface does not allow any fields, +// we put option set-up process inside bundler's maker. +func defaultNewExpBundler(uploader func(interface{}), delayThreshold time.Duration, countThreshold int) expBundler { + bndler := bundler.NewBundler((*RowData)(nil), uploader) + + // Set options for bundler if they are provided by users. + if 0 < delayThreshold { + bndler.DelayThreshold = delayThreshold + } + if 0 < countThreshold { + bndler.BundleCountThreshold = countThreshold + } + + return bndler +} + +func (e *StatsExporter) newProjectData(projectID string) *projectData { + pd := &projectData{ + parent: e, + projectID: projectID, + } + + pd.bndler = newExpBundler(pd.uploadRowData, e.opts.BundleDelayThreshold, e.opts.BundleCountThreshold) + return pd +} + +// uploadRowData is called by bundler to upload row data, and report any error happened meanwhile. +func (pd *projectData) uploadRowData(bundle interface{}) { + exp := pd.parent + rds := bundle.([]*RowData) + + // reqRds contains RowData objects those are uploaded to stackdriver at given iteration. + // It's main usage is for error reporting. For actual uploading operation, we use req. + // remainingRds are RowData that has not been processed at all. + var reqRds, remainingRds []*RowData + for ; len(rds) != 0; rds = remainingRds { + var req *monitoringpb.CreateTimeSeriesRequest + req, reqRds, remainingRds = pd.makeReq(rds) + if req == nil { + // no need to perform RPC call for empty set of requests. + continue + } + if err := exp.client.CreateTimeSeries(exp.ctx, req); err != nil { + newErr := fmt.Errorf("RPC call to create time series failed for project %s: %v", pd.projectID, err) + // We pass all row data not successfully uploaded. + exp.onError(newErr, reqRds...) + } + } +} + +// makeReq creates a request that's suitable to be passed to create time series RPC call. +// +// reqRds contains rows those are contained in req. Main use of reqRds is to be returned to users if +// creating time series failed. (We don't want users to investigate structure of timeseries.) +// remainingRds contains rows those are not used at all in makeReq because of the length limitation +// or request. Another call of makeReq() with remainigRds will handle (some) rows in them. When req +// is nil, then there's nothing to request and reqRds will also contain nothing. +// +// Some rows in rds may fail while converting them to time series, and in that case makeReq() calls +// exporter's onError() directly, not propagating errors to the caller. +func (pd *projectData) makeReq(rds []*RowData) (req *monitoringpb.CreateTimeSeriesRequest, reqRds, remainingRds []*RowData) { + exp := pd.parent + timeSeries := []*monitoringpb.TimeSeries{} + + var i int + var rd *RowData + for i, rd = range rds { + pt := newPoint(rd.View, rd.Row, rd.Start, rd.End) + if pt.Value == nil { + err := fmt.Errorf("inconsistent data found in view %s", rd.View.Name) + pd.parent.onError(err, rd) + continue + } + resource, err := exp.makeResource(rd) + if err != nil { + newErr := fmt.Errorf("failed to construct resource of view %s: %v", rd.View.Name, err) + pd.parent.onError(newErr, rd) + continue + } + + ts := &monitoringpb.TimeSeries{ + Metric: &metricpb.Metric{ + Type: rd.View.Name, + Labels: exp.makeLabels(rd.Row.Tags), + }, + Resource: resource, + Points: []*monitoringpb.Point{pt}, + } + // Growing timeseries and reqRds are done at same time. + timeSeries = append(timeSeries, ts) + reqRds = append(reqRds, rd) + // Don't grow timeseries over the limit. + if len(timeSeries) == MaxTimeSeriesPerUpload { + break + } + } + + // Since i is the last index processed, remainingRds should start from i+1. + remainingRds = rds[i+1:] + if len(timeSeries) == 0 { + req = nil + } else { + req = &monitoringpb.CreateTimeSeriesRequest{ + Name: fmt.Sprintf("projects/%s", pd.projectID), + TimeSeries: timeSeries, + } + } + return req, reqRds, remainingRds +} + +// makeLables constructs label that's ready for being uploaded to stackdriver. +func (e *StatsExporter) makeLabels(tags []tag.Tag) map[string]string { + opts := e.opts + labels := make(map[string]string, len(opts.DefaultLabels)+len(tags)) + for key, val := range opts.DefaultLabels { + labels[key] = val + } + // If there's overlap When combining exporter's default label and tags, values in tags win. + for _, tag := range tags { + labels[tag.Key.Name()] = tag.Value + } + // Some labels are not for exporting. + for _, key := range opts.UnexportedLabels { + delete(labels, key) + } + return labels +} diff --git a/monitoring/exporter/row_data_to_point.go b/monitoring/exporter/row_data_to_point.go new file mode 100644 index 000000000..6c8fefef9 --- /dev/null +++ b/monitoring/exporter/row_data_to_point.go @@ -0,0 +1,106 @@ +package exporter + +import ( + "time" + + timestamppb "github.com/golang/protobuf/ptypes/timestamp" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + distributionpb "google.golang.org/genproto/googleapis/api/distribution" + monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +// Functions in this file is used to convert RowData to monitoring point that are used by uploading +// RPC calls of monitoring client. All functions in this file are copied from +// contrib.go.opencensus.io/exporter/stackdriver. + +func newPoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.Point { + switch v.Aggregation.Type { + case view.AggTypeLastValue: + return newGaugePoint(v, row, end) + default: + return newCumulativePoint(v, row, start, end) + } +} + +func newCumulativePoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.Point { + return &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{ + Seconds: start.Unix(), + Nanos: int32(start.Nanosecond()), + }, + EndTime: ×tamppb.Timestamp{ + Seconds: end.Unix(), + Nanos: int32(end.Nanosecond()), + }, + }, + Value: newTypedValue(v, row), + } +} + +func newGaugePoint(v *view.View, row *view.Row, end time.Time) *monitoringpb.Point { + gaugeTime := ×tamppb.Timestamp{ + Seconds: end.Unix(), + Nanos: int32(end.Nanosecond()), + } + return &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: gaugeTime, + }, + Value: newTypedValue(v, row), + } +} + +func newTypedValue(vd *view.View, r *view.Row) *monitoringpb.TypedValue { + switch v := r.Data.(type) { + case *view.CountData: + return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: v.Value, + }} + case *view.SumData: + switch vd.Measure.(type) { + case *stats.Int64Measure: + return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: int64(v.Value), + }} + case *stats.Float64Measure: + return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{ + DoubleValue: v.Value, + }} + } + case *view.DistributionData: + return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: &distributionpb.Distribution{ + Count: v.Count, + Mean: v.Mean, + SumOfSquaredDeviation: v.SumOfSquaredDev, + // TODO(songya): uncomment this once Stackdriver supports min/max. + // Range: &distributionpb.Distribution_Range{ + // Min: v.Min, + // Max: v.Max, + // }, + BucketOptions: &distributionpb.Distribution_BucketOptions{ + Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{ + ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{ + Bounds: vd.Aggregation.Buckets, + }, + }, + }, + BucketCounts: v.CountPerBucket, + }, + }} + case *view.LastValueData: + switch vd.Measure.(type) { + case *stats.Int64Measure: + return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: int64(v.Value), + }} + case *stats.Float64Measure: + return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{ + DoubleValue: v.Value, + }} + } + } + return nil +} From fcf87afb31941b67115f4894c8028a3e10aefd9a Mon Sep 17 00:00:00 2001 From: lychung83 Date: Fri, 7 Sep 2018 10:35:52 -0700 Subject: [PATCH 2/6] move and rename files --- monitoring/exporter/{ => stackdriver}/data_test.go | 0 monitoring/exporter/{ => stackdriver}/mock_check_test.go | 0 monitoring/exporter/{ => stackdriver}/project_data.go | 0 monitoring/exporter/{ => stackdriver}/row_data_to_point.go | 0 monitoring/exporter/{exporter.go => stackdriver/stackdriver.go} | 0 .../{exporter_test.go => stackdriver/stackdriver_test.go} | 0 6 files changed, 0 insertions(+), 0 deletions(-) rename monitoring/exporter/{ => stackdriver}/data_test.go (100%) rename monitoring/exporter/{ => stackdriver}/mock_check_test.go (100%) rename monitoring/exporter/{ => stackdriver}/project_data.go (100%) rename monitoring/exporter/{ => stackdriver}/row_data_to_point.go (100%) rename monitoring/exporter/{exporter.go => stackdriver/stackdriver.go} (100%) rename monitoring/exporter/{exporter_test.go => stackdriver/stackdriver_test.go} (100%) diff --git a/monitoring/exporter/data_test.go b/monitoring/exporter/stackdriver/data_test.go similarity index 100% rename from monitoring/exporter/data_test.go rename to monitoring/exporter/stackdriver/data_test.go diff --git a/monitoring/exporter/mock_check_test.go b/monitoring/exporter/stackdriver/mock_check_test.go similarity index 100% rename from monitoring/exporter/mock_check_test.go rename to monitoring/exporter/stackdriver/mock_check_test.go diff --git a/monitoring/exporter/project_data.go b/monitoring/exporter/stackdriver/project_data.go similarity index 100% rename from monitoring/exporter/project_data.go rename to monitoring/exporter/stackdriver/project_data.go diff --git a/monitoring/exporter/row_data_to_point.go b/monitoring/exporter/stackdriver/row_data_to_point.go similarity index 100% rename from monitoring/exporter/row_data_to_point.go rename to monitoring/exporter/stackdriver/row_data_to_point.go diff --git a/monitoring/exporter/exporter.go b/monitoring/exporter/stackdriver/stackdriver.go similarity index 100% rename from monitoring/exporter/exporter.go rename to monitoring/exporter/stackdriver/stackdriver.go diff --git a/monitoring/exporter/exporter_test.go b/monitoring/exporter/stackdriver/stackdriver_test.go similarity index 100% rename from monitoring/exporter/exporter_test.go rename to monitoring/exporter/stackdriver/stackdriver_test.go From 3f25a3356a5a85c09c30ed53a175c4420f1ce74f Mon Sep 17 00:00:00 2001 From: lychung83 Date: Fri, 7 Sep 2018 10:41:00 -0700 Subject: [PATCH 3/6] fix suggested by easwars --- monitoring/exporter/stackdriver/data_test.go | 86 +++++-- .../exporter/stackdriver/mock_check_test.go | 235 +++++++++++------- .../exporter/stackdriver/project_data.go | 79 +++--- .../exporter/stackdriver/row_data_to_point.go | 66 +++-- .../exporter/stackdriver/stackdriver.go | 110 ++++---- .../exporter/stackdriver/stackdriver_test.go | 194 +++++++++++---- 6 files changed, 488 insertions(+), 282 deletions(-) diff --git a/monitoring/exporter/stackdriver/data_test.go b/monitoring/exporter/stackdriver/data_test.go index 41b6d0d76..db8bd2549 100644 --- a/monitoring/exporter/stackdriver/data_test.go +++ b/monitoring/exporter/stackdriver/data_test.go @@ -1,4 +1,18 @@ -package exporter +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver import ( "context" @@ -9,16 +23,11 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" - monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + mrpb "google.golang.org/genproto/googleapis/api/monitoredres" ) // This file defines various data needed for testing. -func init() { - // For testing convenience, we reduce maximum time series that metric client accepts. - MaxTimeSeriesPerUpload = 3 -} - const ( label1name = "key_1" label2name = "key_2" @@ -32,11 +41,15 @@ const ( value4 = "value_4" value5 = "value_5" value6 = "value_6" + value7 = "value_7" + value8 = "value_8" metric1name = "metric_1" metric1desc = "this is metric 1" metric2name = "metric_2" metric2desc = "this is metric 2" + metric3name = "metric_3" + metric3desc = "this is metric 3" project1 = "project-1" project2 = "project-2" @@ -45,14 +58,16 @@ const ( var ( ctx = context.Background() + invalidDataErrStr = "invalid data" // This error is used for test to catch some error happpened. - invalidDataError = errors.New("invalid data") + invalidDataError = errors.New(invalidDataErrStr) // This error is used for unexpected error. unrecognizedDataError = errors.New("unrecognized data") - key1 = getKey(label1name) - key2 = getKey(label2name) - key3 = getKey(label3name) + key1 = getKey(label1name) + key2 = getKey(label2name) + key3 = getKey(label3name) + projectKey = getKey(ProjectKeyName) view1 = &view.View{ Name: metric1name, @@ -68,9 +83,16 @@ var ( Measure: stats.Int64(metric2name, metric2desc, stats.UnitDimensionless), Aggregation: view.Sum(), } + view3 = &view.View{ + Name: metric3name, + Description: metric3desc, + TagKeys: []tag.Key{projectKey}, + Measure: stats.Int64(metric3name, metric3desc, stats.UnitDimensionless), + Aggregation: view.Sum(), + } - // To make verification easy, we require all valid rows should int64 values and all of them - // must be distinct. + // To make verification easy, we require all valid rows should have int64 values and all of + // them must be distinct. view1row1 = &view.Row{ Tags: nil, Data: &view.SumData{Value: 1}, @@ -91,15 +113,22 @@ var ( Tags: []tag.Tag{{key1, value4}, {key2, value5}, {key3, value6}}, Data: &view.SumData{Value: 5}, } + view3row1 = &view.Row{ + Tags: []tag.Tag{{projectKey, project1}}, + Data: &view.SumData{Value: 6}, + } + view3row2 = &view.Row{ + Tags: []tag.Tag{{projectKey, project2}}, + Data: &view.SumData{Value: 7}, + } + view3row3 = &view.Row{ + Tags: []tag.Tag{{projectKey, project1}}, + Data: &view.SumData{Value: 8}, + } // This Row does not have valid Data field, so is invalid. invalidRow = &view.Row{Data: nil} - startTime1 = endTime1.Add(-10 * time.Second) - endTime1 = startTime2.Add(-time.Second) - startTime2 = endTime2.Add(-10 * time.Second) - endTime2 = time.Now() - - resource1 = &monitoredrespb.MonitoredResource{ + resource1 = &mrpb.MonitoredResource{ Type: "cloudsql_database", Labels: map[string]string{ "project_id": project1, @@ -107,7 +136,7 @@ var ( "database_id": "cloud-SQL-instance-1", }, } - resource2 = &monitoredrespb.MonitoredResource{ + resource2 = &mrpb.MonitoredResource{ Type: "gce_instance", Labels: map[string]string{ "project_id": project2, @@ -117,6 +146,23 @@ var ( } ) +// Timestamps. We make sure that all time stamps are strictly increasing. +var ( + startTime1, endTime1, startTime2, endTime2 time.Time + startTime3, endTime3, startTime4, endTime4 time.Time +) + +func init() { + ts := time.Now() + for _, t := range []*time.Time{ + &startTime1, &endTime1, &startTime2, &endTime2, + &startTime3, &endTime3, &startTime4, &endTime4, + } { + *t = ts + ts = ts.Add(time.Second) + } +} + func getKey(name string) tag.Key { key, err := tag.NewKey(name) if err != nil { diff --git a/monitoring/exporter/stackdriver/mock_check_test.go b/monitoring/exporter/stackdriver/mock_check_test.go index 300ea2eed..f6b198e49 100644 --- a/monitoring/exporter/stackdriver/mock_check_test.go +++ b/monitoring/exporter/stackdriver/mock_check_test.go @@ -1,73 +1,174 @@ -package exporter +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver import ( "context" "fmt" "strings" "testing" - "time" + monitoring "cloud.google.com/go/monitoring/apiv3" gax "github.com/googleapis/gax-go" "google.golang.org/api/option" - monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" + "google.golang.org/api/support/bundler" + mpb "google.golang.org/genproto/googleapis/monitoring/v3" ) // This file defines various mocks for testing, and checking functions for mocked data. We mock // metric client and bundler because their actions involves RPC calls or non-deterministic behavior. +// Following data are used to store various data generated by exporters' activity. They are used by +// each test to verify intended behavior. Each test should call testDataInit() to clear these data. +var ( + // errStorage records all errors and associated RowData objects reported by exporter. + errStorage []errRowData + // projDataMap is a copy of projDataMap used by each tests. + projDataMap map[string]*projectData + // projRds saves all RowData objects passed to addToBundler call by project ID. Since a + // value of a map is not addressable, we save the pointer to the slice. + projRds map[string]*[]*RowData + // timeSeriesReqs saves all incoming requests for creating time series. + timeSeriesReqs []*mpb.CreateTimeSeriesRequest + // timeSeriesResults holds predefined error values to be returned by mockCreateTimeSerie() + // calls. Each errors in timeSeriesResults are returned per each mockCreateTimeSeries() + // call. If all errors in timeSeriesResults are used, all other mockCreateTimeSeries calls + // will return nil. + timeSeriesResults []error +) + func init() { + // For testing convenience, we reduce maximum time series that metric client accepts. + MaxTimeSeriesPerUpload = 3 + + // Mock functions. newMetricClient = mockNewMetricClient - newExpBundler = mockNewExpBundler + createTimeSeries = mockCreateTimeSeries + newBundler = mockNewBundler + addToBundler = mockAddToBundler } -// We define mock Client. +// testDataInit() initializes all data needed for each test. This function must be called at the +// beginning of each test. +func testDataInit() { + projDataMap = nil + projRds = map[string]*[]*RowData{} + timeSeriesReqs = nil + timeSeriesResults = nil + errStorage = nil +} + +// Mocked functions. -type mockMetricClient struct { - // returnErrs holds predefined error values to return. Each errors in returnErrs are - // returned per each CreateTimeSeries() call. If all errors in returnErrs are used, all - // other CreateTimeSeries calls will return nil. - returnErrs []error - // reqs saves all incoming requests. - reqs []*monitoringpb.CreateTimeSeriesRequest +func mockNewMetricClient(_ context.Context, _ ...option.ClientOption) (*monitoring.MetricClient, error) { + return nil, nil } -func (cl *mockMetricClient) CreateTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest, opts ...gax.CallOption) error { - cl.reqs = append(cl.reqs, req) - // Check returnErrs and if not empty, return the first error from it. - if len(cl.returnErrs) == 0 { +func mockCreateTimeSeries(_ *monitoring.MetricClient, _ context.Context, req *mpb.CreateTimeSeriesRequest, _ ...gax.CallOption) error { + timeSeriesReqs = append(timeSeriesReqs, req) + // Check timeSeriesResults and if not empty, return the first error from it. + if len(timeSeriesResults) == 0 { return nil } - err := cl.returnErrs[0] - // delete the returning error. - cl.returnErrs = cl.returnErrs[1:] + err := timeSeriesResults[0] + // Delete the returning error. + timeSeriesResults = timeSeriesResults[1:] return err } -func (cl *mockMetricClient) Close() error { +func mockNewBundler(_ interface{}, _ func(interface{})) *bundler.Bundler { + // We do not return nil but create an empty Bundler object because + // 1. Exporter.newProjectData() is setting fields of Bundler. + // 2. mockAddToBundler needs to get the project ID of the bundler. To do that we need + // different address for each bundler. + return &bundler.Bundler{} +} + +func mockAddToBundler(bndler *bundler.Bundler, item interface{}, _ int) error { + // Get the project ID of the bndler by inspecting projDataMap. + var projID string + projIDfound := false + for tempProjID, pd := range projDataMap { + if pd.bndler == bndler { + projID = tempProjID + projIDfound = true + break + } + } + if !projIDfound { + return unrecognizedDataError + } + + rds, ok := projRds[projID] + if !ok { + // For new project ID, create the actual slice and save its pointer. + var rdsSlice []*RowData + rds = &rdsSlice + projRds[projID] = rds + } + *rds = append(*rds, item.(*RowData)) return nil } -func (cl *mockMetricClient) addReturnErrs(errs ...error) { - cl.returnErrs = append(cl.returnErrs, errs...) +// newTest*() functions create exporters and project data used for testing. Each test should call +// One of these functions once and only once, and never call NewExporter() directly. + +// newTestExp creates an exporter which saves error to errStorage. Caller should not set +// opts.OnError. +func newTestExp(t *testing.T, opts *Options) *Exporter { + opts.OnError = testOnError + exp, err := NewExporter(ctx, opts) + if err != nil { + t.Fatalf("creating exporter failed: %v", err) + } + // Expose projDataMap so that mockAddToBundler() can use it. + projDataMap = exp.projDataMap + return exp +} + +// newTestProjData creates a projectData object to test behavior of projectData.uploadRowData. Other +// uses are not recommended. As newTestExp, all errors are saved to errStorage. +func newTestProjData(t *testing.T, opts *Options) *projectData { + return newTestExp(t, opts).newProjectData(project1) +} + +// We define a storage for all errors happened in export operation. + +type errRowData struct { + err error + rds []*RowData } -func mockNewMetricClient(_ context.Context, _ ...option.ClientOption) (metricClient, error) { - return &mockMetricClient{}, nil +// testOnError records any incoming error and accompanying RowData array. This function is passed to +// the exporter to record errors. +func testOnError(err error, rds ...*RowData) { + errStorage = append(errStorage, errRowData{err, rds}) } -// checkMetricClient checks all recorded requests in mock metric client. We only compare int64 +// checkMetricClient checks all recorded requests to the metric client. We only compare int64 // values of the time series. To make this work, we assigned different int64 values for all valid // rows in the test. -func checkMetricClient(t *testing.T, cl *mockMetricClient, wantReqsValues [][]int64) { - reqs := cl.reqs - reqsLen, wantReqsLen := len(reqs), len(wantReqsValues) +func checkMetricClient(t *testing.T, wantReqsValues [][]int64) { + reqsLen, wantReqsLen := len(timeSeriesReqs), len(wantReqsValues) if reqsLen != wantReqsLen { t.Errorf("number of requests got: %d, want %d", reqsLen, wantReqsLen) return } for i := 0; i < reqsLen; i++ { prefix := fmt.Sprintf("%d-th request mismatch", i+1) - tsArr := reqs[i].TimeSeries + tsArr := timeSeriesReqs[i].TimeSeries wantTsValues := wantReqsValues[i] tsArrLen, wantTsArrLen := len(tsArr), len(wantTsValues) if tsArrLen != wantTsArrLen { @@ -76,7 +177,7 @@ func checkMetricClient(t *testing.T, cl *mockMetricClient, wantReqsValues [][]in } for j := 0; j < tsArrLen; j++ { // This is how monitoring API stores the int64 value. - tsVal := tsArr[j].Points[0].Value.Value.(*monitoringpb.TypedValue_Int64Value).Int64Value + tsVal := tsArr[j].Points[0].Value.Value.(*mpb.TypedValue_Int64Value).Int64Value wantTsVal := wantTsValues[j] if tsVal != wantTsVal { t.Errorf("%s: Value got: %d, want: %d", prefix, tsVal, wantTsVal) @@ -85,41 +186,6 @@ func checkMetricClient(t *testing.T, cl *mockMetricClient, wantReqsValues [][]in } } -// We define mock bundler. - -type mockBundler struct { - // rowDataArr saves all incoming RowData to the bundler. - rowDataArr []*RowData -} - -func (b *mockBundler) Add(rowData interface{}, _ int) error { - b.rowDataArr = append(b.rowDataArr, rowData.(*RowData)) - return nil -} - -func (b *mockBundler) Flush() {} - -func mockNewExpBundler(_ func(interface{}), _ time.Duration, _ int) expBundler { - return &mockBundler{} -} - -// We define a storage for all errors happened in export operation. - -type errStorage struct { - errRds []errRowData -} - -type errRowData struct { - err error - rds []*RowData -} - -// onError records any incoming error and accompanying RowData array. This method is passed to the -// exporter to record errors. -func (e *errStorage) onError(err error, rds ...*RowData) { - e.errRds = append(e.errRds, errRowData{err, rds}) -} - // errRowDataCheck contains data for checking content of error storage. type errRowDataCheck struct { errPrefix, errSuffix string @@ -127,16 +193,15 @@ type errRowDataCheck struct { } // checkErrStorage checks content of error storage. For returned errors, we check prefix and suffix. -func checkErrStorage(t *testing.T, errStore *errStorage, wantErrRdCheck []errRowDataCheck) { - errRds := errStore.errRds - gotLen, wantLen := len(errRds), len(wantErrRdCheck) +func checkErrStorage(t *testing.T, wantErrRdCheck []errRowDataCheck) { + gotLen, wantLen := len(errStorage), len(wantErrRdCheck) if gotLen != wantLen { t.Errorf("number of reported errors: %d, want: %d", gotLen, wantLen) return } for i := 0; i < gotLen; i++ { prefix := fmt.Sprintf("%d-th reported error mismatch", i+1) - errRd, wantErrRd := errRds[i], wantErrRdCheck[i] + errRd, wantErrRd := errStorage[i], wantErrRdCheck[i] errStr := errRd.err.Error() if errPrefix := wantErrRd.errPrefix; !strings.HasPrefix(errStr, errPrefix) { t.Errorf("%s: error got: %q, want: prefixed by %q", prefix, errStr, errPrefix) @@ -179,52 +244,30 @@ func checkRowData(rd, wantRd *RowData) error { return nil } -// newMockExp creates mock expoter and error storage storing all errors. Caller need not set -// opts.OnError. -func newMockExp(t *testing.T, opts *Options) (*StatsExporter, *errStorage) { - errStore := &errStorage{} - opts.OnError = errStore.onError - exp, err := NewStatsExporter(ctx, opts) - if err != nil { - t.Fatalf("creating exporter failed: %v", err) - } - return exp, errStore -} - -// checkExpProjData checks all data passed to the bundler by bundler.Add(). -func checkExpProjData(t *testing.T, exp *StatsExporter, wantProjData map[string][]*RowData) { +// checkProjData checks all data passed to the bundler by bundler.Add(). +func checkProjData(t *testing.T, wantProjData map[string][]*RowData) { wantProj := map[string]bool{} for proj := range wantProjData { wantProj[proj] = true } - for proj := range exp.projDataMap { + for proj := range projRds { if !wantProj[proj] { t.Errorf("project in exporter's project data not wanted: %s", proj) } } for proj, wantRds := range wantProjData { - pd, ok := exp.projDataMap[proj] + rds, ok := projRds[proj] if !ok { t.Errorf("wanted project not found in exporter's project data: %v", proj) continue } - // We know that bundler is mocked, so we can check the data in it. - if err := checkRowDataArr(pd.bndler.(*mockBundler).rowDataArr, wantRds); err != nil { + if err := checkRowDataArr(*rds, wantRds); err != nil { t.Errorf("RowData array mismatch for project %s: %v", proj, err) } } } -// newMockUploader creates objects to test behavior of projectData.uploadRowData. Other uses are not -// recommended. -func newMockUploader(t *testing.T, opts *Options) (*projectData, *mockMetricClient, *errStorage) { - exp, errStore := newMockExp(t, opts) - pd := exp.newProjectData(project1) - cl := exp.client.(*mockMetricClient) - return pd, cl, errStore -} - // checkLabels checks data in labels. func checkLabels(t *testing.T, prefix string, labels, wantLabels map[string]string) { for labelName, value := range labels { diff --git a/monitoring/exporter/stackdriver/project_data.go b/monitoring/exporter/stackdriver/project_data.go index 7464193cc..258b01bc5 100644 --- a/monitoring/exporter/stackdriver/project_data.go +++ b/monitoring/exporter/stackdriver/project_data.go @@ -1,58 +1,55 @@ -package exporter +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver import ( "fmt" - "time" "go.opencensus.io/tag" "google.golang.org/api/support/bundler" metricpb "google.golang.org/genproto/googleapis/api/metric" - monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" + mpb "google.golang.org/genproto/googleapis/monitoring/v3" ) -// maximum number of time series that stackdriver accepts. Only test may change this value. +// MaxTimeSeriePerUpload is the maximum number of time series that stackdriver accepts. Only test +// may change this value. var MaxTimeSeriesPerUpload = 200 // projectData contain per-project data in exporter. It should be created by newProjectData() type projectData struct { - parent *StatsExporter + parent *Exporter projectID string // We make bundler for each project because call to monitoring RPC can be grouped only in // project level - bndler expBundler + bndler *bundler.Bundler } -// We wrap bundler and its maker for testing purpose. -type expBundler interface { - Add(interface{}, int) error - Flush() -} - -var newExpBundler = defaultNewExpBundler - -// Since options in bundler are directly set to its fields and interface does not allow any fields, -// we put option set-up process inside bundler's maker. -func defaultNewExpBundler(uploader func(interface{}), delayThreshold time.Duration, countThreshold int) expBundler { - bndler := bundler.NewBundler((*RowData)(nil), uploader) - - // Set options for bundler if they are provided by users. - if 0 < delayThreshold { - bndler.DelayThreshold = delayThreshold - } - if 0 < countThreshold { - bndler.BundleCountThreshold = countThreshold - } - - return bndler -} - -func (e *StatsExporter) newProjectData(projectID string) *projectData { +func (e *Exporter) newProjectData(projectID string) *projectData { pd := &projectData{ parent: e, projectID: projectID, } - pd.bndler = newExpBundler(pd.uploadRowData, e.opts.BundleDelayThreshold, e.opts.BundleCountThreshold) + pd.bndler = newBundler((*RowData)(nil), pd.uploadRowData) + // Set options for bundler if they are provided by users. + if 0 < e.opts.BundleDelayThreshold { + pd.bndler.DelayThreshold = e.opts.BundleDelayThreshold + } + if 0 < e.opts.BundleCountThreshold { + pd.bndler.BundleCountThreshold = e.opts.BundleCountThreshold + } return pd } @@ -66,13 +63,13 @@ func (pd *projectData) uploadRowData(bundle interface{}) { // remainingRds are RowData that has not been processed at all. var reqRds, remainingRds []*RowData for ; len(rds) != 0; rds = remainingRds { - var req *monitoringpb.CreateTimeSeriesRequest + var req *mpb.CreateTimeSeriesRequest req, reqRds, remainingRds = pd.makeReq(rds) if req == nil { - // no need to perform RPC call for empty set of requests. + // No need to perform RPC call for empty set of requests. continue } - if err := exp.client.CreateTimeSeries(exp.ctx, req); err != nil { + if err := createTimeSeries(exp.client, exp.ctx, req); err != nil { newErr := fmt.Errorf("RPC call to create time series failed for project %s: %v", pd.projectID, err) // We pass all row data not successfully uploaded. exp.onError(newErr, reqRds...) @@ -90,9 +87,9 @@ func (pd *projectData) uploadRowData(bundle interface{}) { // // Some rows in rds may fail while converting them to time series, and in that case makeReq() calls // exporter's onError() directly, not propagating errors to the caller. -func (pd *projectData) makeReq(rds []*RowData) (req *monitoringpb.CreateTimeSeriesRequest, reqRds, remainingRds []*RowData) { +func (pd *projectData) makeReq(rds []*RowData) (req *mpb.CreateTimeSeriesRequest, reqRds, remainingRds []*RowData) { exp := pd.parent - timeSeries := []*monitoringpb.TimeSeries{} + timeSeries := []*mpb.TimeSeries{} var i int var rd *RowData @@ -110,13 +107,13 @@ func (pd *projectData) makeReq(rds []*RowData) (req *monitoringpb.CreateTimeSeri continue } - ts := &monitoringpb.TimeSeries{ + ts := &mpb.TimeSeries{ Metric: &metricpb.Metric{ Type: rd.View.Name, Labels: exp.makeLabels(rd.Row.Tags), }, Resource: resource, - Points: []*monitoringpb.Point{pt}, + Points: []*mpb.Point{pt}, } // Growing timeseries and reqRds are done at same time. timeSeries = append(timeSeries, ts) @@ -132,7 +129,7 @@ func (pd *projectData) makeReq(rds []*RowData) (req *monitoringpb.CreateTimeSeri if len(timeSeries) == 0 { req = nil } else { - req = &monitoringpb.CreateTimeSeriesRequest{ + req = &mpb.CreateTimeSeriesRequest{ Name: fmt.Sprintf("projects/%s", pd.projectID), TimeSeries: timeSeries, } @@ -141,7 +138,7 @@ func (pd *projectData) makeReq(rds []*RowData) (req *monitoringpb.CreateTimeSeri } // makeLables constructs label that's ready for being uploaded to stackdriver. -func (e *StatsExporter) makeLabels(tags []tag.Tag) map[string]string { +func (e *Exporter) makeLabels(tags []tag.Tag) map[string]string { opts := e.opts labels := make(map[string]string, len(opts.DefaultLabels)+len(tags)) for key, val := range opts.DefaultLabels { diff --git a/monitoring/exporter/stackdriver/row_data_to_point.go b/monitoring/exporter/stackdriver/row_data_to_point.go index 6c8fefef9..8245e003f 100644 --- a/monitoring/exporter/stackdriver/row_data_to_point.go +++ b/monitoring/exporter/stackdriver/row_data_to_point.go @@ -1,20 +1,34 @@ -package exporter +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver import ( "time" - timestamppb "github.com/golang/protobuf/ptypes/timestamp" + tspb "github.com/golang/protobuf/ptypes/timestamp" "go.opencensus.io/stats" "go.opencensus.io/stats/view" - distributionpb "google.golang.org/genproto/googleapis/api/distribution" - monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" + dspb "google.golang.org/genproto/googleapis/api/distribution" + mpb "google.golang.org/genproto/googleapis/monitoring/v3" ) // Functions in this file is used to convert RowData to monitoring point that are used by uploading // RPC calls of monitoring client. All functions in this file are copied from // contrib.go.opencensus.io/exporter/stackdriver. -func newPoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.Point { +func newPoint(v *view.View, row *view.Row, start, end time.Time) *mpb.Point { switch v.Aggregation.Type { case view.AggTypeLastValue: return newGaugePoint(v, row, end) @@ -23,14 +37,14 @@ func newPoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.P } } -func newCumulativePoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.Point { - return &monitoringpb.Point{ - Interval: &monitoringpb.TimeInterval{ - StartTime: ×tamppb.Timestamp{ +func newCumulativePoint(v *view.View, row *view.Row, start, end time.Time) *mpb.Point { + return &mpb.Point{ + Interval: &mpb.TimeInterval{ + StartTime: &tspb.Timestamp{ Seconds: start.Unix(), Nanos: int32(start.Nanosecond()), }, - EndTime: ×tamppb.Timestamp{ + EndTime: &tspb.Timestamp{ Seconds: end.Unix(), Nanos: int32(end.Nanosecond()), }, @@ -39,50 +53,50 @@ func newCumulativePoint(v *view.View, row *view.Row, start, end time.Time) *moni } } -func newGaugePoint(v *view.View, row *view.Row, end time.Time) *monitoringpb.Point { - gaugeTime := ×tamppb.Timestamp{ +func newGaugePoint(v *view.View, row *view.Row, end time.Time) *mpb.Point { + gaugeTime := &tspb.Timestamp{ Seconds: end.Unix(), Nanos: int32(end.Nanosecond()), } - return &monitoringpb.Point{ - Interval: &monitoringpb.TimeInterval{ + return &mpb.Point{ + Interval: &mpb.TimeInterval{ EndTime: gaugeTime, }, Value: newTypedValue(v, row), } } -func newTypedValue(vd *view.View, r *view.Row) *monitoringpb.TypedValue { +func newTypedValue(vd *view.View, r *view.Row) *mpb.TypedValue { switch v := r.Data.(type) { case *view.CountData: - return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{ + return &mpb.TypedValue{Value: &mpb.TypedValue_Int64Value{ Int64Value: v.Value, }} case *view.SumData: switch vd.Measure.(type) { case *stats.Int64Measure: - return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{ + return &mpb.TypedValue{Value: &mpb.TypedValue_Int64Value{ Int64Value: int64(v.Value), }} case *stats.Float64Measure: - return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{ + return &mpb.TypedValue{Value: &mpb.TypedValue_DoubleValue{ DoubleValue: v.Value, }} } case *view.DistributionData: - return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DistributionValue{ - DistributionValue: &distributionpb.Distribution{ + return &mpb.TypedValue{Value: &mpb.TypedValue_DistributionValue{ + DistributionValue: &dspb.Distribution{ Count: v.Count, Mean: v.Mean, SumOfSquaredDeviation: v.SumOfSquaredDev, // TODO(songya): uncomment this once Stackdriver supports min/max. - // Range: &distributionpb.Distribution_Range{ + // Range: &dspb.Distribution_Range{ // Min: v.Min, // Max: v.Max, // }, - BucketOptions: &distributionpb.Distribution_BucketOptions{ - Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{ - ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{ + BucketOptions: &dspb.Distribution_BucketOptions{ + Options: &dspb.Distribution_BucketOptions_ExplicitBuckets{ + ExplicitBuckets: &dspb.Distribution_BucketOptions_Explicit{ Bounds: vd.Aggregation.Buckets, }, }, @@ -93,11 +107,11 @@ func newTypedValue(vd *view.View, r *view.Row) *monitoringpb.TypedValue { case *view.LastValueData: switch vd.Measure.(type) { case *stats.Int64Measure: - return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{ + return &mpb.TypedValue{Value: &mpb.TypedValue_Int64Value{ Int64Value: int64(v.Value), }} case *stats.Float64Measure: - return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{ + return &mpb.TypedValue{Value: &mpb.TypedValue_DoubleValue{ DoubleValue: v.Value, }} } diff --git a/monitoring/exporter/stackdriver/stackdriver.go b/monitoring/exporter/stackdriver/stackdriver.go index 3772d348c..973a9b4a8 100644 --- a/monitoring/exporter/stackdriver/stackdriver.go +++ b/monitoring/exporter/stackdriver/stackdriver.go @@ -1,4 +1,19 @@ -// Package exporter provides a way to export data from opencensus to multiple GCP projects. +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package stackdriver provides an exporter that uploads data from opencensus to stackdriver +// metrics of multiple GCP projects. // // General assumptions or requirements when using this exporter. // 1. The basic unit of data is a view.Data with only a single view.Row. We define it as a separate @@ -10,7 +25,7 @@ // 3.2. RowData has correcponding GCP projects, and we can determine its project ID. // 3.3. After trimming labels and tags, configuration of all view data matches that of corresponding // stackdriver metric -package exporter +package stackdriver import ( "context" @@ -20,25 +35,24 @@ import ( "time" monitoring "cloud.google.com/go/monitoring/apiv3" - gax "github.com/googleapis/gax-go" "go.opencensus.io/stats/view" "google.golang.org/api/option" "google.golang.org/api/support/bundler" - monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" - monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" + mrpb "google.golang.org/genproto/googleapis/api/monitoredres" ) -// StatsExporter is the exporter that can be registered to opencensus. A StatsExporter object must -// be created by NewStatsExporter(). -type StatsExporter struct { +// Exporter is the exporter that can be registered to opencensus. An Exporter object must be +// created by NewExporter(). +type Exporter struct { + // TODO(lawrencechung): If possible, find a way to not storing ctx in the struct. ctx context.Context - client metricClient + client *monitoring.MetricClient opts *Options // copy of some option values which may be modified by exporter. getProjectID func(*RowData) (string, error) onError func(error, ...*RowData) - makeResource func(*RowData) (*monitoredrespb.MonitoredResource, error) + makeResource func(*RowData) (*mrpb.MonitoredResource, error) // mu protects access to projDataMap mu sync.Mutex @@ -58,15 +72,16 @@ type Options struct { BundleDelayThreshold time.Duration BundleCountThreshold int - // callback functions provided by user. + // Callback functions provided by user. // GetProjectID is used to filter whether given row data can be applicable to this exporter // and if so, it also determines the projectID of given row data. If // RowDataNotApplicableError is returned, then the row data is not applicable to this // exporter, and it will be silently ignored. Though not recommended, other errors can be // returned, and in that case the error is reported to callers via OnError and the row data - // will not be uploaded to stackdriver. When GetProjectID is not set, all row data will be - // considered not applicable to this exporter. + // will not be uploaded to stackdriver. When GetProjectID is not set, for any row data with + // tag key name "project_id" (it's defined as ProjectKeyName), the value of the tag will be + // it's project ID. All other row data will be silently ignored. GetProjectID func(*RowData) (projectID string, err error) // OnError is used to report any error happened while exporting view data fails. Whenever // this function is called, it's guaranteed that at least one row data is also passed to @@ -78,9 +93,9 @@ type Options struct { // can be returned, and in that case the error is reported to callers via OnError and the // row data will not be uploaded to stackdriver. When MakeResource is not set, global // resource is used for all RowData objects. - MakeResource func(rd *RowData) (*monitoredrespb.MonitoredResource, error) + MakeResource func(rd *RowData) (*mrpb.MonitoredResource, error) - // options concerning labels. + // Options concerning labels. // DefaultLabels store default value of some labels. Labels in DefaultLabels need not be // specified in tags of view data. Default labels and tags of view may have overlapping @@ -94,27 +109,45 @@ type Options struct { UnexportedLabels []string } -// default values for options +// ProjectKeyName is used by defaultGetProjectID to get the project ID of a given row data. +const ProjectKeyName = "project_id" + +// Default values for options. Their semantics are described in Options. + func defaultGetProjectID(rd *RowData) (string, error) { + for _, tag := range rd.Row.Tags { + if tag.Key.Name() == ProjectKeyName { + return tag.Value, nil + } + } return "", RowDataNotApplicableError } func defaultOnError(err error, rds ...*RowData) {} -func defaultMakeResource(rd *RowData) (*monitoredrespb.MonitoredResource, error) { - return &monitoredrespb.MonitoredResource{Type: "global"}, nil +func defaultMakeResource(rd *RowData) (*mrpb.MonitoredResource, error) { + return &mrpb.MonitoredResource{Type: "global"}, nil } -// NewStatsExporter creates a StatsExporter object. Once a call to NewStatsExporter is made, any -// fields in opts must not be modified at all. ctx will also be used throughout entire exporter -// operation when making RPC call. -func NewStatsExporter(ctx context.Context, opts *Options) (*StatsExporter, error) { +// Following functions are wrapper of functions that may show non-deterministic behavior. Only tests +// can modify these functions. +var ( + newMetricClient = monitoring.NewMetricClient + createTimeSeries = (*monitoring.MetricClient).CreateTimeSeries + newBundler = bundler.NewBundler + addToBundler = (*bundler.Bundler).Add +) + +// NewExporter creates an Exporter object. Once a call to NewExporter is made, any fields in opts +// must not be modified at all. ctx will also be used throughout entire exporter operation when +// making RPC call. +func NewExporter(ctx context.Context, opts *Options) (*Exporter, error) { client, err := newMetricClient(ctx, opts.ClientOptions...) if err != nil { return nil, fmt.Errorf("failed to create a metric client: %v", err) } - e := &StatsExporter{ + e := &Exporter{ ctx: ctx, client: client, opts: opts, @@ -123,37 +156,22 @@ func NewStatsExporter(ctx context.Context, opts *Options) (*StatsExporter, error // We don't want to modify user-supplied options, so save default options directly in // exporter. + e.getProjectID = defaultGetProjectID if opts.GetProjectID != nil { e.getProjectID = opts.GetProjectID - } else { - e.getProjectID = defaultGetProjectID } + e.onError = defaultOnError if opts.OnError != nil { e.onError = opts.OnError - } else { - e.onError = defaultOnError } + e.makeResource = defaultMakeResource if opts.MakeResource != nil { e.makeResource = opts.MakeResource - } else { - e.makeResource = defaultMakeResource } return e, nil } -// We wrap monitoring.MetricClient and it's maker for testing. -type metricClient interface { - CreateTimeSeries(context.Context, *monitoringpb.CreateTimeSeriesRequest, ...gax.CallOption) error - Close() error -} - -var newMetricClient = defaultNewMetricClient - -func defaultNewMetricClient(ctx context.Context, opts ...option.ClientOption) (metricClient, error) { - return monitoring.NewMetricClient(ctx, opts...) -} - // RowData represents a single row in view data. This is our unit of computation. We use a single // row instead of view data because a view data consists of multiple rows, and each row may belong // to different projects. @@ -165,7 +183,7 @@ type RowData struct { // ExportView is the method called by opencensus to export view data. It constructs RowData out of // view.Data objects. -func (e *StatsExporter) ExportView(vd *view.Data) { +func (e *Exporter) ExportView(vd *view.Data) { for _, row := range vd.Rows { rd := &RowData{ View: vd.View, @@ -182,7 +200,7 @@ func (e *StatsExporter) ExportView(vd *view.Data) { var RowDataNotApplicableError = errors.New("row data is not applicable to the exporter, so it will be ignored") // exportRowData exports a single row data. -func (e *StatsExporter) exportRowData(rd *RowData) { +func (e *Exporter) exportRowData(rd *RowData) { projID, err := e.getProjectID(rd) if err != nil { // We ignore non-applicable RowData. @@ -193,7 +211,7 @@ func (e *StatsExporter) exportRowData(rd *RowData) { return } pd := e.getProjectData(projID) - switch err := pd.bndler.Add(rd, 1); err { + switch err := addToBundler(pd.bndler, rd, 1); err { case nil: case bundler.ErrOversizedItem: go pd.uploadRowData(rd) @@ -203,7 +221,7 @@ func (e *StatsExporter) exportRowData(rd *RowData) { } } -func (e *StatsExporter) getProjectData(projectID string) *projectData { +func (e *Exporter) getProjectData(projectID string) *projectData { e.mu.Lock() defer e.mu.Unlock() if pd, ok := e.projDataMap[projectID]; ok { @@ -218,7 +236,7 @@ func (e *StatsExporter) getProjectData(projectID string) *projectData { // Close flushes and closes the exporter. Close must be called after the exporter is unregistered // and no further calls to ExportView() are made. Once Close() is returned no further access to the // exporter is allowed in any way. -func (e *StatsExporter) Close() error { +func (e *Exporter) Close() error { e.mu.Lock() for _, pd := range e.projDataMap { pd.bndler.Flush() diff --git a/monitoring/exporter/stackdriver/stackdriver_test.go b/monitoring/exporter/stackdriver/stackdriver_test.go index 447f134fa..4f3a3b4b4 100644 --- a/monitoring/exporter/stackdriver/stackdriver_test.go +++ b/monitoring/exporter/stackdriver/stackdriver_test.go @@ -1,18 +1,57 @@ -package exporter +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver import ( "fmt" "testing" "go.opencensus.io/stats/view" - monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + mrpb "google.golang.org/genproto/googleapis/api/monitoredres" ) // This file contains actual tests. -// TestProjectClassifyNoError tests that exporter can recognize and distribute incoming data by +// TestAll runs all tests defined in this file. +func TestAll(t *testing.T) { + testData := []struct { + name string + test func(t *testing.T) + }{ + {"ProjectClassifyNoError", testProjectClassifyNoError}, + {"ProjectClassifyError", testProjectClassifyError}, + {"DefaultProjectClassify", testDefaultProjectClassify}, + {"UploadNoError", testUploadNoError}, + {"UploadTimeSeriesMakeError", testUploadTimeSeriesMakeError}, + {"UploadWithMetricClientError", testUploadWithMetricClientError}, + {"MakeResource", testMakeResource}, + {"MakeLabel", testMakeLabel}, + } + + for _, data := range testData { + run := func(t *testing.T) { + testDataInit() + data.test(t) + } + t.Run(data.name, run) + } +} + +// testProjectClassifyNoError tests that exporter can recognize and distribute incoming data by // its project. -func TestProjectClassifyNoError(t *testing.T) { +func testProjectClassifyNoError(t *testing.T) { viewData1 := &view.Data{ View: view1, Start: startTime1, @@ -37,7 +76,7 @@ func TestProjectClassifyNoError(t *testing.T) { } } - exp, errStore := newMockExp(t, &Options{GetProjectID: getProjectID}) + exp := newTestExp(t, &Options{GetProjectID: getProjectID}) exp.ExportView(viewData1) exp.ExportView(viewData2) @@ -50,13 +89,13 @@ func TestProjectClassifyNoError(t *testing.T) { {view1, startTime1, endTime1, view1row2}, }, } - checkErrStorage(t, errStore, nil) - checkExpProjData(t, exp, wantRowData) + checkErrStorage(t, nil) + checkProjData(t, wantRowData) } -// TestProjectClassifyError tests that exporter can properly handle errors while classifying +// testProjectClassifyError tests that exporter can properly handle errors while classifying // incoming data by its project. -func TestProjectClassifyError(t *testing.T) { +func testProjectClassifyError(t *testing.T) { viewData1 := &view.Data{ View: view1, Start: startTime1, @@ -83,14 +122,14 @@ func TestProjectClassifyError(t *testing.T) { } } - exp, errStore := newMockExp(t, &Options{GetProjectID: getProjectID}) + exp := newTestExp(t, &Options{GetProjectID: getProjectID}) exp.ExportView(viewData1) exp.ExportView(viewData2) wantErrRdCheck := []errRowDataCheck{ { errPrefix: "failed to get project ID", - errSuffix: invalidDataError.Error(), + errSuffix: invalidDataErrStr, rds: []*RowData{{view2, startTime2, endTime2, view2row1}}, }, } @@ -100,13 +139,62 @@ func TestProjectClassifyError(t *testing.T) { {view2, startTime2, endTime2, view2row2}, }, } - checkErrStorage(t, errStore, wantErrRdCheck) - checkExpProjData(t, exp, wantRowData) + checkErrStorage(t, wantErrRdCheck) + checkProjData(t, wantRowData) } -// -func TestUploadNoError(t *testing.T) { - pd, cl, errStore := newMockUploader(t, &Options{}) +// testDefaultProjectClassify tests that defaultGetProjectID classifies RowData by tag with key name +// "project_id". +func testDefaultProjectClassify(t *testing.T) { + viewData1 := &view.Data{ + View: view1, + Start: startTime1, + End: endTime1, + Rows: []*view.Row{view1row1}, + } + viewData2 := &view.Data{ + View: view3, + Start: startTime2, + End: endTime2, + Rows: []*view.Row{view3row1, view3row2}, + } + viewData3 := &view.Data{ + View: view2, + Start: startTime3, + End: endTime3, + Rows: []*view.Row{view2row1, view2row2}, + } + viewData4 := &view.Data{ + View: view3, + Start: startTime4, + End: endTime4, + Rows: []*view.Row{view3row3}, + } + + exp := newTestExp(t, &Options{}) + exp.ExportView(viewData1) + exp.ExportView(viewData2) + exp.ExportView(viewData3) + exp.ExportView(viewData4) + + checkErrStorage(t, nil) + // RowData in viewData1 and viewData3 has no project ID tag, so ignored. + wantRowData := map[string][]*RowData{ + project1: []*RowData{ + {view3, startTime2, endTime2, view3row1}, + {view3, startTime4, endTime4, view3row3}, + }, + project2: []*RowData{ + {view3, startTime2, endTime2, view3row2}, + }, + } + checkProjData(t, wantRowData) +} + +// testUploadNoError tests that all RowData objects passed to uploadRowData() are grouped by +// slice of length MaxTimeSeriesPerUpload, and passed to createTimeSeries(). +func testUploadNoError(t *testing.T) { + pd := newTestProjData(t, &Options{}) rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, {view1, startTime1, endTime1, view1row2}, @@ -116,23 +204,23 @@ func TestUploadNoError(t *testing.T) { } pd.uploadRowData(rd) - checkErrStorage(t, errStore, nil) + checkErrStorage(t, nil) wantClData := [][]int64{ {1, 2, 3}, {4, 5}, } - checkMetricClient(t, cl, wantClData) + checkMetricClient(t, wantClData) } -// TestUploadTimeSeriesMakeError tests that errors while creating time series are properly handled. -func TestUploadTimeSeriesMakeError(t *testing.T) { - makeResource := func(rd *RowData) (*monitoredrespb.MonitoredResource, error) { +// testUploadTimeSeriesMakeError tests that errors while creating time series are properly handled. +func testUploadTimeSeriesMakeError(t *testing.T) { + makeResource := func(rd *RowData) (*mrpb.MonitoredResource, error) { if rd.Row == view1row2 { return nil, invalidDataError } return defaultMakeResource(rd) } - pd, cl, errStore := newMockUploader(t, &Options{MakeResource: makeResource}) + pd := newTestProjData(t, &Options{MakeResource: makeResource}) rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, {view1, startTime1, endTime1, view1row2}, @@ -147,7 +235,7 @@ func TestUploadTimeSeriesMakeError(t *testing.T) { wantErrRdCheck := []errRowDataCheck{ { errPrefix: "failed to construct resource", - errSuffix: invalidDataError.Error(), + errSuffix: invalidDataErrStr, rds: []*RowData{{view1, startTime1, endTime1, view1row2}}, }, { errPrefix: "inconsistent data found in view", @@ -155,20 +243,20 @@ func TestUploadTimeSeriesMakeError(t *testing.T) { rds: []*RowData{{view2, startTime2, endTime2, invalidRow}}, }, } - checkErrStorage(t, errStore, wantErrRdCheck) + checkErrStorage(t, wantErrRdCheck) wantClData := [][]int64{ {1, 3, 4}, {5}, } - checkMetricClient(t, cl, wantClData) + checkMetricClient(t, wantClData) } -// TestUploadTimeSeriesMakeError tests that exporter can handle error on metric client's time +// testUploadTimeSeriesMakeError tests that exporter can handle error on metric client's time // series create RPC call. -func TestUploadWithMetricClientError(t *testing.T) { - pd, cl, errStore := newMockUploader(t, &Options{}) - cl.addReturnErrs(invalidDataError) +func testUploadWithMetricClientError(t *testing.T) { + pd := newTestProjData(t, &Options{}) + timeSeriesResults = append(timeSeriesResults, invalidDataError) rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, {view1, startTime1, endTime1, view1row2}, @@ -181,7 +269,7 @@ func TestUploadWithMetricClientError(t *testing.T) { wantErrRdCheck := []errRowDataCheck{ { errPrefix: "RPC call to create time series failed", - errSuffix: invalidDataError.Error(), + errSuffix: invalidDataErrStr, rds: []*RowData{ {view1, startTime1, endTime1, view1row1}, {view1, startTime1, endTime1, view1row2}, @@ -189,18 +277,18 @@ func TestUploadWithMetricClientError(t *testing.T) { }, }, } - checkErrStorage(t, errStore, wantErrRdCheck) + checkErrStorage(t, wantErrRdCheck) wantClData := [][]int64{ {1, 2, 3}, {4, 5}, } - checkMetricClient(t, cl, wantClData) + checkMetricClient(t, wantClData) } -// TestMakeResource tests that exporter can create monitored resource dynamically. -func TestMakeResource(t *testing.T) { - makeResource := func(rd *RowData) (*monitoredrespb.MonitoredResource, error) { +// testMakeResource tests that exporter can create monitored resource dynamically. +func testMakeResource(t *testing.T) { + makeResource := func(rd *RowData) (*mrpb.MonitoredResource, error) { switch rd.Row { case view1row1: return resource1, nil @@ -210,54 +298,54 @@ func TestMakeResource(t *testing.T) { return nil, unrecognizedDataError } } - pd, cl, errStore := newMockUploader(t, &Options{MakeResource: makeResource}) + pd := newTestProjData(t, &Options{MakeResource: makeResource}) rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, {view1, startTime1, endTime1, view1row2}, } pd.uploadRowData(rd) - checkErrStorage(t, errStore, nil) - checkMetricClient(t, cl, [][]int64{{1, 2}}) + checkErrStorage(t, nil) + checkMetricClient(t, [][]int64{{1, 2}}) - tsArr := cl.reqs[0].TimeSeries - for i, wantResource := range []*monitoredrespb.MonitoredResource{resource1, resource2} { + tsArr := timeSeriesReqs[0].TimeSeries + for i, wantResource := range []*mrpb.MonitoredResource{resource1, resource2} { if resource := tsArr[i].Resource; resource != wantResource { - t.Errorf("%d-th time series resource got: %#v, want: %#v", i, resource, wantResource) + t.Errorf("%d-th time series resource got: %#v, want: %#v", i+1, resource, wantResource) } } } -// TestMakeLabel tests that exporter can correctly handle label manipulation process, including +// testMakeLabel tests that exporter can correctly handle label manipulation process, including // merging default label with tags, and removing unexported labels. -func TestMakeLabel(t *testing.T) { +func testMakeLabel(t *testing.T) { opts := &Options{ DefaultLabels: map[string]string{ - label1name: value4, - label4name: value5, + label1name: value7, + label4name: value8, }, UnexportedLabels: []string{label3name, label5name}, } - pd, cl, errStore := newMockUploader(t, opts) + pd := newTestProjData(t, opts) rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, {view2, startTime2, endTime2, view2row1}, } pd.uploadRowData(rd) - checkErrStorage(t, errStore, nil) - checkMetricClient(t, cl, [][]int64{{1, 4}}) + checkErrStorage(t, nil) + checkMetricClient(t, [][]int64{{1, 4}}) wantLabels1 := map[string]string{ - label1name: value4, - label4name: value5, + label1name: value7, + label4name: value8, } wantLabels2 := map[string]string{ - // default value for key1 is suppressed, and value defined in tag of view2row1 is + // Default value for key1 is suppressed, and value defined in tag of view2row1 is // used. label1name: value1, label2name: value2, - label4name: value5, + label4name: value8, } - tsArr := cl.reqs[0].TimeSeries + tsArr := timeSeriesReqs[0].TimeSeries for i, wantLabels := range []map[string]string{wantLabels1, wantLabels2} { prefix := fmt.Sprintf("%d-th time series labels mismatch", i+1) checkLabels(t, prefix, tsArr[i].Metric.Labels, wantLabels) From 8fb9793018236f34ea5f9fc5584e1fd6bf92bfac Mon Sep 17 00:00:00 2001 From: Lawrence Chung Date: Fri, 7 Sep 2018 22:37:27 -0700 Subject: [PATCH 4/6] review round 2 fix --- .../exporter/stackdriver/mock_check_test.go | 4 +- .../exporter/stackdriver/project_data.go | 140 +++++------------- .../exporter/stackdriver/stackdriver.go | 106 +++++++++---- .../exporter/stackdriver/stackdriver_test.go | 16 +- 4 files changed, 124 insertions(+), 142 deletions(-) diff --git a/monitoring/exporter/stackdriver/mock_check_test.go b/monitoring/exporter/stackdriver/mock_check_test.go index f6b198e49..4a25de816 100644 --- a/monitoring/exporter/stackdriver/mock_check_test.go +++ b/monitoring/exporter/stackdriver/mock_check_test.go @@ -127,7 +127,7 @@ func mockAddToBundler(bndler *bundler.Bundler, item interface{}, _ int) error { // newTestExp creates an exporter which saves error to errStorage. Caller should not set // opts.OnError. -func newTestExp(t *testing.T, opts *Options) *Exporter { +func newTestExp(t *testing.T, opts Options) *Exporter { opts.OnError = testOnError exp, err := NewExporter(ctx, opts) if err != nil { @@ -140,7 +140,7 @@ func newTestExp(t *testing.T, opts *Options) *Exporter { // newTestProjData creates a projectData object to test behavior of projectData.uploadRowData. Other // uses are not recommended. As newTestExp, all errors are saved to errStorage. -func newTestProjData(t *testing.T, opts *Options) *projectData { +func newTestProjData(t *testing.T, opts Options) *projectData { return newTestExp(t, opts).newProjectData(project1) } diff --git a/monitoring/exporter/stackdriver/project_data.go b/monitoring/exporter/stackdriver/project_data.go index 258b01bc5..5ffb5050a 100644 --- a/monitoring/exporter/stackdriver/project_data.go +++ b/monitoring/exporter/stackdriver/project_data.go @@ -17,15 +17,14 @@ package stackdriver import ( "fmt" - "go.opencensus.io/tag" "google.golang.org/api/support/bundler" - metricpb "google.golang.org/genproto/googleapis/api/metric" mpb "google.golang.org/genproto/googleapis/monitoring/v3" ) -// MaxTimeSeriePerUpload is the maximum number of time series that stackdriver accepts. Only test -// may change this value. -var MaxTimeSeriesPerUpload = 200 +// MaxTimeSeriePerUpload is the maximum number of time series that's uploaded to the stackdriver +// at once. Consumer may change this value, but note that stackdriver may reject upload request if +// the number of time series is too large. +var MaxTimeSeriesPerUpload = 100 // projectData contain per-project data in exporter. It should be created by newProjectData() type projectData struct { @@ -36,121 +35,50 @@ type projectData struct { bndler *bundler.Bundler } -func (e *Exporter) newProjectData(projectID string) *projectData { - pd := &projectData{ - parent: e, - projectID: projectID, - } - - pd.bndler = newBundler((*RowData)(nil), pd.uploadRowData) - // Set options for bundler if they are provided by users. - if 0 < e.opts.BundleDelayThreshold { - pd.bndler.DelayThreshold = e.opts.BundleDelayThreshold - } - if 0 < e.opts.BundleCountThreshold { - pd.bndler.BundleCountThreshold = e.opts.BundleCountThreshold - } - return pd -} - // uploadRowData is called by bundler to upload row data, and report any error happened meanwhile. func (pd *projectData) uploadRowData(bundle interface{}) { exp := pd.parent rds := bundle.([]*RowData) - // reqRds contains RowData objects those are uploaded to stackdriver at given iteration. - // It's main usage is for error reporting. For actual uploading operation, we use req. - // remainingRds are RowData that has not been processed at all. - var reqRds, remainingRds []*RowData - for ; len(rds) != 0; rds = remainingRds { - var req *mpb.CreateTimeSeriesRequest - req, reqRds, remainingRds = pd.makeReq(rds) - if req == nil { - // No need to perform RPC call for empty set of requests. - continue - } - if err := createTimeSeries(exp.client, exp.ctx, req); err != nil { - newErr := fmt.Errorf("RPC call to create time series failed for project %s: %v", pd.projectID, err) - // We pass all row data not successfully uploaded. - exp.onError(newErr, reqRds...) - } - } -} + // uploadTs contains TimeSeries objects that needs to be uploaded. + var uploadTs []*mpb.TimeSeries = nil + // uploadRds contains RowData objects corresponds to uploadTs. It's used for error reporting + // when upload operation fails. + var uploadRds []*RowData = nil -// makeReq creates a request that's suitable to be passed to create time series RPC call. -// -// reqRds contains rows those are contained in req. Main use of reqRds is to be returned to users if -// creating time series failed. (We don't want users to investigate structure of timeseries.) -// remainingRds contains rows those are not used at all in makeReq because of the length limitation -// or request. Another call of makeReq() with remainigRds will handle (some) rows in them. When req -// is nil, then there's nothing to request and reqRds will also contain nothing. -// -// Some rows in rds may fail while converting them to time series, and in that case makeReq() calls -// exporter's onError() directly, not propagating errors to the caller. -func (pd *projectData) makeReq(rds []*RowData) (req *mpb.CreateTimeSeriesRequest, reqRds, remainingRds []*RowData) { - exp := pd.parent - timeSeries := []*mpb.TimeSeries{} - - var i int - var rd *RowData - for i, rd = range rds { - pt := newPoint(rd.View, rd.Row, rd.Start, rd.End) - if pt.Value == nil { - err := fmt.Errorf("inconsistent data found in view %s", rd.View.Name) - pd.parent.onError(err, rd) - continue - } - resource, err := exp.makeResource(rd) + for _, rd := range rds { + ts, err := exp.makeTS(rd) if err != nil { - newErr := fmt.Errorf("failed to construct resource of view %s: %v", rd.View.Name, err) - pd.parent.onError(newErr, rd) + exp.opts.OnError(err, rd) continue } - - ts := &mpb.TimeSeries{ - Metric: &metricpb.Metric{ - Type: rd.View.Name, - Labels: exp.makeLabels(rd.Row.Tags), - }, - Resource: resource, - Points: []*mpb.Point{pt}, - } - // Growing timeseries and reqRds are done at same time. - timeSeries = append(timeSeries, ts) - reqRds = append(reqRds, rd) - // Don't grow timeseries over the limit. - if len(timeSeries) == MaxTimeSeriesPerUpload { - break + // Time series created. We update both uploadTs and uploadRds. + uploadTs = append(uploadTs, ts) + uploadRds = append(uploadRds, rd) + if len(uploadTs) == MaxTimeSeriesPerUpload { + pd.uploadTimeSeries(uploadTs, uploadRds) + uploadTs = nil + uploadRds = nil } } - - // Since i is the last index processed, remainingRds should start from i+1. - remainingRds = rds[i+1:] - if len(timeSeries) == 0 { - req = nil - } else { - req = &mpb.CreateTimeSeriesRequest{ - Name: fmt.Sprintf("projects/%s", pd.projectID), - TimeSeries: timeSeries, - } + // Upload any remaining time series. + if len(uploadTs) != 0 { + pd.uploadTimeSeries(uploadTs, uploadRds) } - return req, reqRds, remainingRds } -// makeLables constructs label that's ready for being uploaded to stackdriver. -func (e *Exporter) makeLabels(tags []tag.Tag) map[string]string { - opts := e.opts - labels := make(map[string]string, len(opts.DefaultLabels)+len(tags)) - for key, val := range opts.DefaultLabels { - labels[key] = val - } - // If there's overlap When combining exporter's default label and tags, values in tags win. - for _, tag := range tags { - labels[tag.Key.Name()] = tag.Value +// uploadTimeSeries uploads timeSeries. ts and rds must contain matching data, and ts must not be +// empty. When uploading fails, this function calls exporter's OnError() directly, not propagating +// errors to the caller. +func (pd *projectData) uploadTimeSeries(ts []*mpb.TimeSeries, rds []*RowData) { + exp := pd.parent + req := &mpb.CreateTimeSeriesRequest{ + Name: fmt.Sprintf("projects/%s", pd.projectID), + TimeSeries: ts, } - // Some labels are not for exporting. - for _, key := range opts.UnexportedLabels { - delete(labels, key) + if err := createTimeSeries(exp.client, exp.ctx, req); err != nil { + newErr := fmt.Errorf("RPC call to create time series failed for project %s: %v", pd.projectID, err) + // We pass all row data not successfully uploaded. + exp.opts.OnError(newErr, rds...) } - return labels } diff --git a/monitoring/exporter/stackdriver/stackdriver.go b/monitoring/exporter/stackdriver/stackdriver.go index 973a9b4a8..254d42e2d 100644 --- a/monitoring/exporter/stackdriver/stackdriver.go +++ b/monitoring/exporter/stackdriver/stackdriver.go @@ -36,9 +36,12 @@ import ( monitoring "cloud.google.com/go/monitoring/apiv3" "go.opencensus.io/stats/view" + "go.opencensus.io/tag" "google.golang.org/api/option" "google.golang.org/api/support/bundler" + metricpb "google.golang.org/genproto/googleapis/api/metric" mrpb "google.golang.org/genproto/googleapis/api/monitoredres" + mpb "google.golang.org/genproto/googleapis/monitoring/v3" ) // Exporter is the exporter that can be registered to opencensus. An Exporter object must be @@ -47,12 +50,7 @@ type Exporter struct { // TODO(lawrencechung): If possible, find a way to not storing ctx in the struct. ctx context.Context client *monitoring.MetricClient - opts *Options - - // copy of some option values which may be modified by exporter. - getProjectID func(*RowData) (string, error) - onError func(error, ...*RowData) - makeResource func(*RowData) (*mrpb.MonitoredResource, error) + opts Options // mu protects access to projDataMap mu sync.Mutex @@ -67,9 +65,14 @@ type Options struct { // RPC calls. ClientOptions []option.ClientOption - // options for bundles amortizing export requests. Note that a bundle is created for each + // Options for bundles amortizing export requests. Note that a bundle is created for each // project. When not provided, default values in bundle package are used. + + // BundleDelayThreshold determines the max amount of time the exporter can wait before + // uploading data to the stackdriver. BundleDelayThreshold time.Duration + // BundleCountThreshold determines how many RowData objects can be buffered before batch + // uploading them to the backend. BundleCountThreshold int // Callback functions provided by user. @@ -85,8 +88,8 @@ type Options struct { GetProjectID func(*RowData) (projectID string, err error) // OnError is used to report any error happened while exporting view data fails. Whenever // this function is called, it's guaranteed that at least one row data is also passed to - // OnError. Row data passed to OnError must not be modified. When OnError is not set, all - // errors happened on exporting are ignored. + // OnError. Row data passed to OnError must not be modified and OnError must be + // non-blocking. When OnError is not set, all errors happened on exporting are ignored. OnError func(error, ...*RowData) // MakeResource creates monitored resource from RowData. It is guaranteed that only RowData // that passes GetProjectID will be given to this function. Though not recommended, error @@ -129,8 +132,8 @@ func defaultMakeResource(rd *RowData) (*mrpb.MonitoredResource, error) { return &mrpb.MonitoredResource{Type: "global"}, nil } -// Following functions are wrapper of functions that may show non-deterministic behavior. Only tests -// can modify these functions. +// Following functions are wrapper of functions those will be mocked by tests. Only tests can modify +// these functions. var ( newMetricClient = monitoring.NewMetricClient createTimeSeries = (*monitoring.MetricClient).CreateTimeSeries @@ -141,7 +144,7 @@ var ( // NewExporter creates an Exporter object. Once a call to NewExporter is made, any fields in opts // must not be modified at all. ctx will also be used throughout entire exporter operation when // making RPC call. -func NewExporter(ctx context.Context, opts *Options) (*Exporter, error) { +func NewExporter(ctx context.Context, opts Options) (*Exporter, error) { client, err := newMetricClient(ctx, opts.ClientOptions...) if err != nil { return nil, fmt.Errorf("failed to create a metric client: %v", err) @@ -154,19 +157,14 @@ func NewExporter(ctx context.Context, opts *Options) (*Exporter, error) { projDataMap: make(map[string]*projectData), } - // We don't want to modify user-supplied options, so save default options directly in - // exporter. - e.getProjectID = defaultGetProjectID - if opts.GetProjectID != nil { - e.getProjectID = opts.GetProjectID + if e.opts.GetProjectID == nil { + e.opts.GetProjectID = defaultGetProjectID } - e.onError = defaultOnError - if opts.OnError != nil { - e.onError = opts.OnError + if e.opts.OnError == nil { + e.opts.OnError = defaultOnError } - e.makeResource = defaultMakeResource - if opts.MakeResource != nil { - e.makeResource = opts.MakeResource + if e.opts.MakeResource == nil { + e.opts.MakeResource = defaultMakeResource } return e, nil @@ -201,12 +199,12 @@ var RowDataNotApplicableError = errors.New("row data is not applicable to the ex // exportRowData exports a single row data. func (e *Exporter) exportRowData(rd *RowData) { - projID, err := e.getProjectID(rd) + projID, err := e.opts.GetProjectID(rd) if err != nil { // We ignore non-applicable RowData. if err != RowDataNotApplicableError { newErr := fmt.Errorf("failed to get project ID on row data with view %s: %v", rd.View.Name, err) - e.onError(newErr, rd) + e.opts.OnError(newErr, rd) } return } @@ -217,7 +215,7 @@ func (e *Exporter) exportRowData(rd *RowData) { go pd.uploadRowData(rd) default: newErr := fmt.Errorf("failed to add row data with view %s to bundle for project %s: %v", rd.View.Name, projID, err) - e.onError(newErr, rd) + e.opts.OnError(newErr, rd) } } @@ -233,6 +231,23 @@ func (e *Exporter) getProjectData(projectID string) *projectData { return pd } +func (e *Exporter) newProjectData(projectID string) *projectData { + pd := &projectData{ + parent: e, + projectID: projectID, + } + + pd.bndler = newBundler((*RowData)(nil), pd.uploadRowData) + // Set options for bundler if they are provided by users. + if 0 < e.opts.BundleDelayThreshold { + pd.bndler.DelayThreshold = e.opts.BundleDelayThreshold + } + if 0 < e.opts.BundleCountThreshold { + pd.bndler.BundleCountThreshold = e.opts.BundleCountThreshold + } + return pd +} + // Close flushes and closes the exporter. Close must be called after the exporter is unregistered // and no further calls to ExportView() are made. Once Close() is returned no further access to the // exporter is allowed in any way. @@ -248,3 +263,42 @@ func (e *Exporter) Close() error { } return nil } + +// makeTS constructs a time series from a row data. +func (e *Exporter) makeTS(rd *RowData) (*mpb.TimeSeries, error) { + pt := newPoint(rd.View, rd.Row, rd.Start, rd.End) + if pt.Value == nil { + return nil, fmt.Errorf("inconsistent data found in view %s", rd.View.Name) + } + resource, err := e.opts.MakeResource(rd) + if err != nil { + return nil, fmt.Errorf("failed to construct resource of view %s: %v", rd.View.Name, err) + } + ts := &mpb.TimeSeries{ + Metric: &metricpb.Metric{ + Type: rd.View.Name, + Labels: e.makeLabels(rd.Row.Tags), + }, + Resource: resource, + Points: []*mpb.Point{pt}, + } + return ts, nil +} + +// makeLables constructs label that's ready for being uploaded to stackdriver. +func (e *Exporter) makeLabels(tags []tag.Tag) map[string]string { + opts := e.opts + labels := make(map[string]string, len(opts.DefaultLabels)+len(tags)) + for key, val := range opts.DefaultLabels { + labels[key] = val + } + // If there's overlap When combining exporter's default label and tags, values in tags win. + for _, tag := range tags { + labels[tag.Key.Name()] = tag.Value + } + // Some labels are not for exporting. + for _, key := range opts.UnexportedLabels { + delete(labels, key) + } + return labels +} diff --git a/monitoring/exporter/stackdriver/stackdriver_test.go b/monitoring/exporter/stackdriver/stackdriver_test.go index 4f3a3b4b4..8509423a2 100644 --- a/monitoring/exporter/stackdriver/stackdriver_test.go +++ b/monitoring/exporter/stackdriver/stackdriver_test.go @@ -76,7 +76,7 @@ func testProjectClassifyNoError(t *testing.T) { } } - exp := newTestExp(t, &Options{GetProjectID: getProjectID}) + exp := newTestExp(t, Options{GetProjectID: getProjectID}) exp.ExportView(viewData1) exp.ExportView(viewData2) @@ -122,7 +122,7 @@ func testProjectClassifyError(t *testing.T) { } } - exp := newTestExp(t, &Options{GetProjectID: getProjectID}) + exp := newTestExp(t, Options{GetProjectID: getProjectID}) exp.ExportView(viewData1) exp.ExportView(viewData2) @@ -171,7 +171,7 @@ func testDefaultProjectClassify(t *testing.T) { Rows: []*view.Row{view3row3}, } - exp := newTestExp(t, &Options{}) + exp := newTestExp(t, Options{}) exp.ExportView(viewData1) exp.ExportView(viewData2) exp.ExportView(viewData3) @@ -194,7 +194,7 @@ func testDefaultProjectClassify(t *testing.T) { // testUploadNoError tests that all RowData objects passed to uploadRowData() are grouped by // slice of length MaxTimeSeriesPerUpload, and passed to createTimeSeries(). func testUploadNoError(t *testing.T) { - pd := newTestProjData(t, &Options{}) + pd := newTestProjData(t, Options{}) rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, {view1, startTime1, endTime1, view1row2}, @@ -220,7 +220,7 @@ func testUploadTimeSeriesMakeError(t *testing.T) { } return defaultMakeResource(rd) } - pd := newTestProjData(t, &Options{MakeResource: makeResource}) + pd := newTestProjData(t, Options{MakeResource: makeResource}) rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, {view1, startTime1, endTime1, view1row2}, @@ -255,7 +255,7 @@ func testUploadTimeSeriesMakeError(t *testing.T) { // testUploadTimeSeriesMakeError tests that exporter can handle error on metric client's time // series create RPC call. func testUploadWithMetricClientError(t *testing.T) { - pd := newTestProjData(t, &Options{}) + pd := newTestProjData(t, Options{}) timeSeriesResults = append(timeSeriesResults, invalidDataError) rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, @@ -298,7 +298,7 @@ func testMakeResource(t *testing.T) { return nil, unrecognizedDataError } } - pd := newTestProjData(t, &Options{MakeResource: makeResource}) + pd := newTestProjData(t, Options{MakeResource: makeResource}) rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, {view1, startTime1, endTime1, view1row2}, @@ -318,7 +318,7 @@ func testMakeResource(t *testing.T) { // testMakeLabel tests that exporter can correctly handle label manipulation process, including // merging default label with tags, and removing unexported labels. func testMakeLabel(t *testing.T) { - opts := &Options{ + opts := Options{ DefaultLabels: map[string]string{ label1name: value7, label4name: value8, From 1e12b1656bd9256b101374e8467aa47b64c842da Mon Sep 17 00:00:00 2001 From: lychung83 Date: Mon, 10 Sep 2018 18:09:42 -0700 Subject: [PATCH 5/6] review round 3 response --- .../exporter/stackdriver/mock_check_test.go | 109 ++++++++++------- .../exporter/stackdriver/project_data.go | 11 +- .../exporter/stackdriver/row_data_to_point.go | 2 +- .../exporter/stackdriver/stackdriver.go | 31 +++-- .../exporter/stackdriver/stackdriver_test.go | 112 +++++++++++++----- 5 files changed, 176 insertions(+), 89 deletions(-) diff --git a/monitoring/exporter/stackdriver/mock_check_test.go b/monitoring/exporter/stackdriver/mock_check_test.go index 4a25de816..2392dc08f 100644 --- a/monitoring/exporter/stackdriver/mock_check_test.go +++ b/monitoring/exporter/stackdriver/mock_check_test.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "strings" - "testing" monitoring "cloud.google.com/go/monitoring/apiv3" gax "github.com/googleapis/gax-go" @@ -50,9 +49,6 @@ var ( ) func init() { - // For testing convenience, we reduce maximum time series that metric client accepts. - MaxTimeSeriesPerUpload = 3 - // Mock functions. newMetricClient = mockNewMetricClient createTimeSeries = mockCreateTimeSeries @@ -126,22 +122,29 @@ func mockAddToBundler(bndler *bundler.Bundler, item interface{}, _ int) error { // One of these functions once and only once, and never call NewExporter() directly. // newTestExp creates an exporter which saves error to errStorage. Caller should not set -// opts.OnError. -func newTestExp(t *testing.T, opts Options) *Exporter { +// opts.OnError and opts.BundleCountThreshold. +func newTestExp(opts Options) (*Exporter, error) { opts.OnError = testOnError + // For testing convenience, we reduce the number of timeseris in one upload monitoring API + // call. + opts.BundleCountThreshold = 3 exp, err := NewExporter(ctx, opts) if err != nil { - t.Fatalf("creating exporter failed: %v", err) + return nil, fmt.Errorf("creating exporter failed: %v", err) } // Expose projDataMap so that mockAddToBundler() can use it. projDataMap = exp.projDataMap - return exp + return exp, nil } // newTestProjData creates a projectData object to test behavior of projectData.uploadRowData. Other // uses are not recommended. As newTestExp, all errors are saved to errStorage. -func newTestProjData(t *testing.T, opts Options) *projectData { - return newTestExp(t, opts).newProjectData(project1) +func newTestProjData(opts Options) (*projectData, error) { + exp, err := newTestExp(opts) + if err != nil { + return nil, err + } + return exp.newProjectData(project1), nil } // We define a storage for all errors happened in export operation. @@ -157,22 +160,42 @@ func testOnError(err error, rds ...*RowData) { errStorage = append(errStorage, errRowData{err, rds}) } +// multiError stores a sequence of errors. To convert it to an actual error, call toError(). +type multiError struct { + errs []error +} + +func (me *multiError) addf(format string, args ...interface{}) { + me.errs = append(me.errs, fmt.Errorf(format, args...)) +} + +func (me *multiError) toError() error { + switch len(me.errs) { + case 0: + return nil + case 1: + return me.errs[0] + default: + return fmt.Errorf("multiple errors: %q", me.errs) + } +} + // checkMetricClient checks all recorded requests to the metric client. We only compare int64 // values of the time series. To make this work, we assigned different int64 values for all valid // rows in the test. -func checkMetricClient(t *testing.T, wantReqsValues [][]int64) { +func checkMetricClient(wantReqsValues [][]int64) error { reqsLen, wantReqsLen := len(timeSeriesReqs), len(wantReqsValues) if reqsLen != wantReqsLen { - t.Errorf("number of requests got: %d, want %d", reqsLen, wantReqsLen) - return + return fmt.Errorf("number of requests got: %d, want %d", reqsLen, wantReqsLen) } + var errs multiError for i := 0; i < reqsLen; i++ { prefix := fmt.Sprintf("%d-th request mismatch", i+1) tsArr := timeSeriesReqs[i].TimeSeries wantTsValues := wantReqsValues[i] tsArrLen, wantTsArrLen := len(tsArr), len(wantTsValues) if tsArrLen != wantTsArrLen { - t.Errorf("%s: number of time series got: %d, want: %d", prefix, tsArrLen, wantTsArrLen) + errs.addf("%s: number of time series got: %d, want: %d", prefix, tsArrLen, wantTsArrLen) continue } for j := 0; j < tsArrLen; j++ { @@ -180,10 +203,11 @@ func checkMetricClient(t *testing.T, wantReqsValues [][]int64) { tsVal := tsArr[j].Points[0].Value.Value.(*mpb.TypedValue_Int64Value).Int64Value wantTsVal := wantTsValues[j] if tsVal != wantTsVal { - t.Errorf("%s: Value got: %d, want: %d", prefix, tsVal, wantTsVal) + errs.addf("%s: Value got: %d, want: %d", prefix, tsVal, wantTsVal) } } } + return errs.toError() } // errRowDataCheck contains data for checking content of error storage. @@ -193,26 +217,27 @@ type errRowDataCheck struct { } // checkErrStorage checks content of error storage. For returned errors, we check prefix and suffix. -func checkErrStorage(t *testing.T, wantErrRdCheck []errRowDataCheck) { +func checkErrStorage(wantErrRdCheck []errRowDataCheck) error { gotLen, wantLen := len(errStorage), len(wantErrRdCheck) if gotLen != wantLen { - t.Errorf("number of reported errors: %d, want: %d", gotLen, wantLen) - return + return fmt.Errorf("number of reported errors: %d, want: %d", gotLen, wantLen) } + var errs multiError for i := 0; i < gotLen; i++ { prefix := fmt.Sprintf("%d-th reported error mismatch", i+1) errRd, wantErrRd := errStorage[i], wantErrRdCheck[i] errStr := errRd.err.Error() if errPrefix := wantErrRd.errPrefix; !strings.HasPrefix(errStr, errPrefix) { - t.Errorf("%s: error got: %q, want: prefixed by %q", prefix, errStr, errPrefix) + errs.addf("%s: error got: %q, want: prefixed by %q", prefix, errStr, errPrefix) } if errSuffix := wantErrRd.errSuffix; !strings.HasSuffix(errStr, errSuffix) { - t.Errorf("%s: error got: %q, want: suffiexd by %q", prefix, errStr, errSuffix) + errs.addf("%s: error got: %q, want: suffiexd by %q", prefix, errStr, errSuffix) } if err := checkRowDataArr(errRd.rds, wantErrRd.rds); err != nil { - t.Errorf("%s: RowData array mismatch: %v", prefix, err) + errs.addf("%s: RowData array mismatch: %v", prefix, err) } } + return errs.toError() } func checkRowDataArr(rds, wantRds []*RowData) error { @@ -220,69 +245,71 @@ func checkRowDataArr(rds, wantRds []*RowData) error { if rdLen != wantRdLen { return fmt.Errorf("number row data got: %d, want: %d", rdLen, wantRdLen) } + var errs multiError for i := 0; i < rdLen; i++ { if err := checkRowData(rds[i], wantRds[i]); err != nil { - return fmt.Errorf("%d-th row data mismatch: %v", i+1, err) + errs.addf("%d-th row data mismatch: %v", i+1, err) } } - return nil + return errs.toError() } func checkRowData(rd, wantRd *RowData) error { + var errs multiError if rd.View != wantRd.View { - return fmt.Errorf("View got: %s, want: %s", rd.View.Name, wantRd.View.Name) + errs.addf("View got: %s, want: %s", rd.View.Name, wantRd.View.Name) } if rd.Start != wantRd.Start { - return fmt.Errorf("Start got: %v, want: %v", rd.Start, wantRd.Start) + errs.addf("Start got: %v, want: %v", rd.Start, wantRd.Start) } if rd.End != wantRd.End { - return fmt.Errorf("End got: %v, want: %v", rd.End, wantRd.End) + errs.addf("End got: %v, want: %v", rd.End, wantRd.End) } if rd.Row != wantRd.Row { - return fmt.Errorf("Row got: %v, want: %v", rd.Row, wantRd.Row) + errs.addf("Row got: %v, want: %v", rd.Row, wantRd.Row) } - return nil + return errs.toError() } // checkProjData checks all data passed to the bundler by bundler.Add(). -func checkProjData(t *testing.T, wantProjData map[string][]*RowData) { - wantProj := map[string]bool{} - for proj := range wantProjData { - wantProj[proj] = true - } +func checkProjData(wantProjData map[string][]*RowData) error { + var errs multiError for proj := range projRds { - if !wantProj[proj] { - t.Errorf("project in exporter's project data not wanted: %s", proj) + if _, ok := wantProjData[proj]; !ok { + errs.addf("project in exporter's project data not wanted: %s", proj) } } for proj, wantRds := range wantProjData { rds, ok := projRds[proj] if !ok { - t.Errorf("wanted project not found in exporter's project data: %v", proj) + errs.addf("wanted project not found in exporter's project data: %v", proj) continue } if err := checkRowDataArr(*rds, wantRds); err != nil { - t.Errorf("RowData array mismatch for project %s: %v", proj, err) + errs.addf("RowData array mismatch for project %s: %v", proj, err) } } + return errs.toError() } // checkLabels checks data in labels. -func checkLabels(t *testing.T, prefix string, labels, wantLabels map[string]string) { +func checkLabels(prefix string, labels, wantLabels map[string]string) error { + var errs multiError for labelName, value := range labels { wantValue, ok := wantLabels[labelName] if !ok { - t.Errorf("%s: label name in time series not wanted: %s", prefix, labelName) + errs.addf("%s: label name in time series not wanted: %s", prefix, labelName) continue } if value != wantValue { - t.Errorf("%s: value for label name %s got: %s, want: %s", prefix, labelName, value, wantValue) + errs.addf("%s: value for label name %s got: %s, want: %s", prefix, labelName, value, wantValue) } } for wantLabelName := range wantLabels { if _, ok := labels[wantLabelName]; !ok { - t.Errorf("%s: wanted label name not found in time series: %s", prefix, wantLabelName) + errs.addf("%s: wanted label name not found in time series: %s", prefix, wantLabelName) } } + return errs.toError() } diff --git a/monitoring/exporter/stackdriver/project_data.go b/monitoring/exporter/stackdriver/project_data.go index 5ffb5050a..8b39176d3 100644 --- a/monitoring/exporter/stackdriver/project_data.go +++ b/monitoring/exporter/stackdriver/project_data.go @@ -21,16 +21,11 @@ import ( mpb "google.golang.org/genproto/googleapis/monitoring/v3" ) -// MaxTimeSeriePerUpload is the maximum number of time series that's uploaded to the stackdriver -// at once. Consumer may change this value, but note that stackdriver may reject upload request if -// the number of time series is too large. -var MaxTimeSeriesPerUpload = 100 - // projectData contain per-project data in exporter. It should be created by newProjectData() type projectData struct { parent *Exporter projectID string - // We make bundler for each project because call to monitoring RPC can be grouped only in + // We make bundler for each project because call to monitoring API can be grouped only in // project level bndler *bundler.Bundler } @@ -55,7 +50,7 @@ func (pd *projectData) uploadRowData(bundle interface{}) { // Time series created. We update both uploadTs and uploadRds. uploadTs = append(uploadTs, ts) uploadRds = append(uploadRds, rd) - if len(uploadTs) == MaxTimeSeriesPerUpload { + if len(uploadTs) == exp.opts.BundleCountThreshold { pd.uploadTimeSeries(uploadTs, uploadRds) uploadTs = nil uploadRds = nil @@ -77,7 +72,7 @@ func (pd *projectData) uploadTimeSeries(ts []*mpb.TimeSeries, rds []*RowData) { TimeSeries: ts, } if err := createTimeSeries(exp.client, exp.ctx, req); err != nil { - newErr := fmt.Errorf("RPC call to create time series failed for project %s: %v", pd.projectID, err) + newErr := fmt.Errorf("monitoring API call to create time series failed for project %s: %v", pd.projectID, err) // We pass all row data not successfully uploaded. exp.opts.OnError(newErr, rds...) } diff --git a/monitoring/exporter/stackdriver/row_data_to_point.go b/monitoring/exporter/stackdriver/row_data_to_point.go index 8245e003f..dff4b3989 100644 --- a/monitoring/exporter/stackdriver/row_data_to_point.go +++ b/monitoring/exporter/stackdriver/row_data_to_point.go @@ -25,7 +25,7 @@ import ( ) // Functions in this file is used to convert RowData to monitoring point that are used by uploading -// RPC calls of monitoring client. All functions in this file are copied from +// monitoring API calls. All functions in this file are copied from // contrib.go.opencensus.io/exporter/stackdriver. func newPoint(v *view.View, row *view.Row, start, end time.Time) *mpb.Point { diff --git a/monitoring/exporter/stackdriver/stackdriver.go b/monitoring/exporter/stackdriver/stackdriver.go index 254d42e2d..38e2117dd 100644 --- a/monitoring/exporter/stackdriver/stackdriver.go +++ b/monitoring/exporter/stackdriver/stackdriver.go @@ -44,6 +44,10 @@ import ( mpb "google.golang.org/genproto/googleapis/monitoring/v3" ) +// MaxTimeSeriesPerUpload is the maximum number of timeseries objects that will be uploaded to +// Stackdriver in one API call. +const MaxTimeSeriesPerUpload = 100 + // Exporter is the exporter that can be registered to opencensus. An Exporter object must be // created by NewExporter(). type Exporter struct { @@ -62,17 +66,19 @@ type Exporter struct { // are valid for use. type Options struct { // ClientOptions designates options for creating metric client, especially credentials for - // RPC calls. + // monitoring API calls. ClientOptions []option.ClientOption // Options for bundles amortizing export requests. Note that a bundle is created for each - // project. When not provided, default values in bundle package are used. + // project. // BundleDelayThreshold determines the max amount of time the exporter can wait before - // uploading data to the stackdriver. + // uploading data to the stackdriver. If this value is not positive, the default value in + // the bundle package is used. BundleDelayThreshold time.Duration // BundleCountThreshold determines how many RowData objects can be buffered before batch - // uploading them to the backend. + // uploading them to the backend. If this value is not between 1 and MaxTimeSeriesPerUpload, + // MaxTimeSeriesPerUpload is used. BundleCountThreshold int // Callback functions provided by user. @@ -143,7 +149,7 @@ var ( // NewExporter creates an Exporter object. Once a call to NewExporter is made, any fields in opts // must not be modified at all. ctx will also be used throughout entire exporter operation when -// making RPC call. +// making monitoring API call. func NewExporter(ctx context.Context, opts Options) (*Exporter, error) { client, err := newMetricClient(ctx, opts.ClientOptions...) if err != nil { @@ -157,6 +163,12 @@ func NewExporter(ctx context.Context, opts Options) (*Exporter, error) { projDataMap: make(map[string]*projectData), } + if !(0 < e.opts.BundleDelayThreshold) { + e.opts.BundleDelayThreshold = bundler.DefaultDelayThreshold + } + if !(0 < e.opts.BundleCountThreshold && e.opts.BundleCountThreshold <= MaxTimeSeriesPerUpload) { + e.opts.BundleCountThreshold = MaxTimeSeriesPerUpload + } if e.opts.GetProjectID == nil { e.opts.GetProjectID = defaultGetProjectID } @@ -238,13 +250,8 @@ func (e *Exporter) newProjectData(projectID string) *projectData { } pd.bndler = newBundler((*RowData)(nil), pd.uploadRowData) - // Set options for bundler if they are provided by users. - if 0 < e.opts.BundleDelayThreshold { - pd.bndler.DelayThreshold = e.opts.BundleDelayThreshold - } - if 0 < e.opts.BundleCountThreshold { - pd.bndler.BundleCountThreshold = e.opts.BundleCountThreshold - } + pd.bndler.DelayThreshold = e.opts.BundleDelayThreshold + pd.bndler.BundleCountThreshold = e.opts.BundleCountThreshold return pd } diff --git a/monitoring/exporter/stackdriver/stackdriver_test.go b/monitoring/exporter/stackdriver/stackdriver_test.go index 8509423a2..b989f548b 100644 --- a/monitoring/exporter/stackdriver/stackdriver_test.go +++ b/monitoring/exporter/stackdriver/stackdriver_test.go @@ -76,7 +76,10 @@ func testProjectClassifyNoError(t *testing.T) { } } - exp := newTestExp(t, Options{GetProjectID: getProjectID}) + exp, err := newTestExp(Options{GetProjectID: getProjectID}) + if err != nil { + t.Fatal(err) + } exp.ExportView(viewData1) exp.ExportView(viewData2) @@ -89,8 +92,12 @@ func testProjectClassifyNoError(t *testing.T) { {view1, startTime1, endTime1, view1row2}, }, } - checkErrStorage(t, nil) - checkProjData(t, wantRowData) + if err := checkErrStorage(nil); err != nil { + t.Error(err) + } + if err := checkProjData(wantRowData); err != nil { + t.Error(err) + } } // testProjectClassifyError tests that exporter can properly handle errors while classifying @@ -122,7 +129,10 @@ func testProjectClassifyError(t *testing.T) { } } - exp := newTestExp(t, Options{GetProjectID: getProjectID}) + exp, err := newTestExp(Options{GetProjectID: getProjectID}) + if err != nil { + t.Fatal(err) + } exp.ExportView(viewData1) exp.ExportView(viewData2) @@ -139,8 +149,12 @@ func testProjectClassifyError(t *testing.T) { {view2, startTime2, endTime2, view2row2}, }, } - checkErrStorage(t, wantErrRdCheck) - checkProjData(t, wantRowData) + if err := checkErrStorage(wantErrRdCheck); err != nil { + t.Error(err) + } + if err := checkProjData(wantRowData); err != nil { + t.Error(err) + } } // testDefaultProjectClassify tests that defaultGetProjectID classifies RowData by tag with key name @@ -171,13 +185,18 @@ func testDefaultProjectClassify(t *testing.T) { Rows: []*view.Row{view3row3}, } - exp := newTestExp(t, Options{}) + exp, err := newTestExp(Options{}) + if err != nil { + t.Fatal(err) + } exp.ExportView(viewData1) exp.ExportView(viewData2) exp.ExportView(viewData3) exp.ExportView(viewData4) - checkErrStorage(t, nil) + if err := checkErrStorage(nil); err != nil { + t.Error(err) + } // RowData in viewData1 and viewData3 has no project ID tag, so ignored. wantRowData := map[string][]*RowData{ project1: []*RowData{ @@ -188,13 +207,18 @@ func testDefaultProjectClassify(t *testing.T) { {view3, startTime2, endTime2, view3row2}, }, } - checkProjData(t, wantRowData) + if err := checkProjData(wantRowData); err != nil { + t.Error(err) + } } // testUploadNoError tests that all RowData objects passed to uploadRowData() are grouped by // slice of length MaxTimeSeriesPerUpload, and passed to createTimeSeries(). func testUploadNoError(t *testing.T) { - pd := newTestProjData(t, Options{}) + pd, err := newTestProjData(Options{}) + if err != nil { + t.Fatal(err) + } rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, {view1, startTime1, endTime1, view1row2}, @@ -204,12 +228,16 @@ func testUploadNoError(t *testing.T) { } pd.uploadRowData(rd) - checkErrStorage(t, nil) + if err := checkErrStorage(nil); err != nil { + t.Error(err) + } wantClData := [][]int64{ {1, 2, 3}, {4, 5}, } - checkMetricClient(t, wantClData) + if err := checkMetricClient(wantClData); err != nil { + t.Error(err) + } } // testUploadTimeSeriesMakeError tests that errors while creating time series are properly handled. @@ -220,7 +248,10 @@ func testUploadTimeSeriesMakeError(t *testing.T) { } return defaultMakeResource(rd) } - pd := newTestProjData(t, Options{MakeResource: makeResource}) + pd, err := newTestProjData(Options{MakeResource: makeResource}) + if err != nil { + t.Fatal(err) + } rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, {view1, startTime1, endTime1, view1row2}, @@ -243,19 +274,26 @@ func testUploadTimeSeriesMakeError(t *testing.T) { rds: []*RowData{{view2, startTime2, endTime2, invalidRow}}, }, } - checkErrStorage(t, wantErrRdCheck) + if err := checkErrStorage(wantErrRdCheck); err != nil { + t.Error(err) + } wantClData := [][]int64{ {1, 3, 4}, {5}, } - checkMetricClient(t, wantClData) + if err := checkMetricClient(wantClData); err != nil { + t.Error(err) + } } // testUploadTimeSeriesMakeError tests that exporter can handle error on metric client's time -// series create RPC call. +// series create monitoing API call. func testUploadWithMetricClientError(t *testing.T) { - pd := newTestProjData(t, Options{}) + pd, err := newTestProjData(Options{}) + if err != nil { + t.Fatal(err) + } timeSeriesResults = append(timeSeriesResults, invalidDataError) rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, @@ -268,7 +306,7 @@ func testUploadWithMetricClientError(t *testing.T) { wantErrRdCheck := []errRowDataCheck{ { - errPrefix: "RPC call to create time series failed", + errPrefix: "monitoring API call to create time series failed", errSuffix: invalidDataErrStr, rds: []*RowData{ {view1, startTime1, endTime1, view1row1}, @@ -277,13 +315,17 @@ func testUploadWithMetricClientError(t *testing.T) { }, }, } - checkErrStorage(t, wantErrRdCheck) + if err := checkErrStorage(wantErrRdCheck); err != nil { + t.Error(err) + } wantClData := [][]int64{ {1, 2, 3}, {4, 5}, } - checkMetricClient(t, wantClData) + if err := checkMetricClient(wantClData); err != nil { + t.Error(err) + } } // testMakeResource tests that exporter can create monitored resource dynamically. @@ -298,14 +340,21 @@ func testMakeResource(t *testing.T) { return nil, unrecognizedDataError } } - pd := newTestProjData(t, Options{MakeResource: makeResource}) + pd, err := newTestProjData(Options{MakeResource: makeResource}) + if err != nil { + t.Fatal(err) + } rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, {view1, startTime1, endTime1, view1row2}, } pd.uploadRowData(rd) - checkErrStorage(t, nil) - checkMetricClient(t, [][]int64{{1, 2}}) + if err := checkErrStorage(nil); err != nil { + t.Error(err) + } + if err := checkMetricClient([][]int64{{1, 2}}); err != nil { + t.Error(err) + } tsArr := timeSeriesReqs[0].TimeSeries for i, wantResource := range []*mrpb.MonitoredResource{resource1, resource2} { @@ -325,14 +374,21 @@ func testMakeLabel(t *testing.T) { }, UnexportedLabels: []string{label3name, label5name}, } - pd := newTestProjData(t, opts) + pd, err := newTestProjData(opts) + if err != nil { + t.Fatal(err) + } rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, {view2, startTime2, endTime2, view2row1}, } pd.uploadRowData(rd) - checkErrStorage(t, nil) - checkMetricClient(t, [][]int64{{1, 4}}) + if err := checkErrStorage(nil); err != nil { + t.Error(err) + } + if err := checkMetricClient([][]int64{{1, 4}}); err != nil { + t.Error(err) + } wantLabels1 := map[string]string{ label1name: value7, @@ -348,6 +404,8 @@ func testMakeLabel(t *testing.T) { tsArr := timeSeriesReqs[0].TimeSeries for i, wantLabels := range []map[string]string{wantLabels1, wantLabels2} { prefix := fmt.Sprintf("%d-th time series labels mismatch", i+1) - checkLabels(t, prefix, tsArr[i].Metric.Labels, wantLabels) + if err := checkLabels(prefix, tsArr[i].Metric.Labels, wantLabels); err != nil { + t.Error(err) + } } } From 3641a89107769ddc6c694d58db910bc02445f984 Mon Sep 17 00:00:00 2001 From: lychung83 Date: Tue, 25 Sep 2018 18:14:47 -0700 Subject: [PATCH 6/6] minor correction with fake --- monitoring/exporter/stackdriver/data_test.go | 6 +- .../exporter/stackdriver/mock_check_test.go | 5 +- .../exporter/stackdriver/project_data.go | 6 +- .../exporter/stackdriver/row_data_to_point.go | 20 ++- .../exporter/stackdriver/stackdriver.go | 43 ++++-- .../exporter/stackdriver/stackdriver_test.go | 73 ++++++++-- .../exporter/stackdriver/timeseries_fake.go | 130 ++++++++++++++++++ 7 files changed, 249 insertions(+), 34 deletions(-) create mode 100644 monitoring/exporter/stackdriver/timeseries_fake.go diff --git a/monitoring/exporter/stackdriver/data_test.go b/monitoring/exporter/stackdriver/data_test.go index db8bd2549..4612329ec 100644 --- a/monitoring/exporter/stackdriver/data_test.go +++ b/monitoring/exporter/stackdriver/data_test.go @@ -58,11 +58,11 @@ const ( var ( ctx = context.Background() - invalidDataErrStr = "invalid data" + invalidDataStr = "invalid data" // This error is used for test to catch some error happpened. - invalidDataError = errors.New(invalidDataErrStr) + errInvalidData = errors.New(invalidDataStr) // This error is used for unexpected error. - unrecognizedDataError = errors.New("unrecognized data") + errUnrecognizedData = errors.New("unrecognized data") key1 = getKey(label1name) key2 = getKey(label2name) diff --git a/monitoring/exporter/stackdriver/mock_check_test.go b/monitoring/exporter/stackdriver/mock_check_test.go index 2392dc08f..3aa60e5d8 100644 --- a/monitoring/exporter/stackdriver/mock_check_test.go +++ b/monitoring/exporter/stackdriver/mock_check_test.go @@ -20,7 +20,6 @@ import ( "strings" monitoring "cloud.google.com/go/monitoring/apiv3" - gax "github.com/googleapis/gax-go" "google.golang.org/api/option" "google.golang.org/api/support/bundler" mpb "google.golang.org/genproto/googleapis/monitoring/v3" @@ -72,7 +71,7 @@ func mockNewMetricClient(_ context.Context, _ ...option.ClientOption) (*monitori return nil, nil } -func mockCreateTimeSeries(_ *monitoring.MetricClient, _ context.Context, req *mpb.CreateTimeSeriesRequest, _ ...gax.CallOption) error { +func mockCreateTimeSeries(_ context.Context, _ *monitoring.MetricClient, req *mpb.CreateTimeSeriesRequest) error { timeSeriesReqs = append(timeSeriesReqs, req) // Check timeSeriesResults and if not empty, return the first error from it. if len(timeSeriesResults) == 0 { @@ -104,7 +103,7 @@ func mockAddToBundler(bndler *bundler.Bundler, item interface{}, _ int) error { } } if !projIDfound { - return unrecognizedDataError + return errUnrecognizedData } rds, ok := projRds[projID] diff --git a/monitoring/exporter/stackdriver/project_data.go b/monitoring/exporter/stackdriver/project_data.go index 8b39176d3..9e4a0cc71 100644 --- a/monitoring/exporter/stackdriver/project_data.go +++ b/monitoring/exporter/stackdriver/project_data.go @@ -36,10 +36,10 @@ func (pd *projectData) uploadRowData(bundle interface{}) { rds := bundle.([]*RowData) // uploadTs contains TimeSeries objects that needs to be uploaded. - var uploadTs []*mpb.TimeSeries = nil + var uploadTs []*mpb.TimeSeries // uploadRds contains RowData objects corresponds to uploadTs. It's used for error reporting // when upload operation fails. - var uploadRds []*RowData = nil + var uploadRds []*RowData for _, rd := range rds { ts, err := exp.makeTS(rd) @@ -71,7 +71,7 @@ func (pd *projectData) uploadTimeSeries(ts []*mpb.TimeSeries, rds []*RowData) { Name: fmt.Sprintf("projects/%s", pd.projectID), TimeSeries: ts, } - if err := createTimeSeries(exp.client, exp.ctx, req); err != nil { + if err := createTimeSeries(exp.ctx, exp.client, req); err != nil { newErr := fmt.Errorf("monitoring API call to create time series failed for project %s: %v", pd.projectID, err) // We pass all row data not successfully uploaded. exp.opts.OnError(newErr, rds...) diff --git a/monitoring/exporter/stackdriver/row_data_to_point.go b/monitoring/exporter/stackdriver/row_data_to_point.go index dff4b3989..bdbbbca0d 100644 --- a/monitoring/exporter/stackdriver/row_data_to_point.go +++ b/monitoring/exporter/stackdriver/row_data_to_point.go @@ -25,7 +25,7 @@ import ( ) // Functions in this file is used to convert RowData to monitoring point that are used by uploading -// monitoring API calls. All functions in this file are copied from +// monitoring API calls. All functions except newStringPoint in this file are copied from // contrib.go.opencensus.io/exporter/stackdriver. func newPoint(v *view.View, row *view.Row, start, end time.Time) *mpb.Point { @@ -37,6 +37,24 @@ func newPoint(v *view.View, row *view.Row, start, end time.Time) *mpb.Point { } } +// newStringPoint returns a metric point with string value. +func newStringPoint(val string, end time.Time) *mpb.Point { + gaugeTime := &tspb.Timestamp{ + Seconds: end.Unix(), + Nanos: int32(end.Nanosecond()), + } + return &mpb.Point{ + Interval: &mpb.TimeInterval{ + EndTime: gaugeTime, + }, + Value: &mpb.TypedValue{ + Value: &mpb.TypedValue_StringValue{ + StringValue: val, + }, + }, + } +} + func newCumulativePoint(v *view.View, row *view.Row, start, end time.Time) *mpb.Point { return &mpb.Point{ Interval: &mpb.TimeInterval{ diff --git a/monitoring/exporter/stackdriver/stackdriver.go b/monitoring/exporter/stackdriver/stackdriver.go index 38e2117dd..035574822 100644 --- a/monitoring/exporter/stackdriver/stackdriver.go +++ b/monitoring/exporter/stackdriver/stackdriver.go @@ -103,6 +103,10 @@ type Options struct { // row data will not be uploaded to stackdriver. When MakeResource is not set, global // resource is used for all RowData objects. MakeResource func(rd *RowData) (*mrpb.MonitoredResource, error) + // IsValueString tells that whether the value in the row data is a string, and return the + // string value if it is. This function circumvents the restriction that opencensus metrics + // do not support string value. When IsValueString is not set, it always returns false. + IsValueString func(rd *RowData) (string, bool, error) // Options concerning labels. @@ -129,7 +133,7 @@ func defaultGetProjectID(rd *RowData) (string, error) { return tag.Value, nil } } - return "", RowDataNotApplicableError + return "", ErrRowDataNotApplicable } func defaultOnError(err error, rds ...*RowData) {} @@ -138,13 +142,19 @@ func defaultMakeResource(rd *RowData) (*mrpb.MonitoredResource, error) { return &mrpb.MonitoredResource{Type: "global"}, nil } +func defaultIsValueString(rd *RowData) (string, bool, error) { + return "", false, nil +} + // Following functions are wrapper of functions those will be mocked by tests. Only tests can modify // these functions. var ( newMetricClient = monitoring.NewMetricClient - createTimeSeries = (*monitoring.MetricClient).CreateTimeSeries - newBundler = bundler.NewBundler - addToBundler = (*bundler.Bundler).Add + createTimeSeries = func(ctx context.Context, c *monitoring.MetricClient, ts *mpb.CreateTimeSeriesRequest) error { + return c.CreateTimeSeries(ctx, ts) + } + newBundler = bundler.NewBundler + addToBundler = (*bundler.Bundler).Add ) // NewExporter creates an Exporter object. Once a call to NewExporter is made, any fields in opts @@ -178,6 +188,9 @@ func NewExporter(ctx context.Context, opts Options) (*Exporter, error) { if e.opts.MakeResource == nil { e.opts.MakeResource = defaultMakeResource } + if e.opts.IsValueString == nil { + e.opts.IsValueString = defaultIsValueString + } return e, nil } @@ -205,16 +218,16 @@ func (e *Exporter) ExportView(vd *view.Data) { } } -// RowDataNotApplicableError is used to tell that given row data is not applicable to the exporter. +// ErrRowDataNotApplicable is used to tell that given row data is not applicable to the exporter. // See GetProjectID of Options for more detail. -var RowDataNotApplicableError = errors.New("row data is not applicable to the exporter, so it will be ignored") +var ErrRowDataNotApplicable = errors.New("row data is not applicable to the exporter, so it will be ignored") // exportRowData exports a single row data. func (e *Exporter) exportRowData(rd *RowData) { projID, err := e.opts.GetProjectID(rd) if err != nil { // We ignore non-applicable RowData. - if err != RowDataNotApplicableError { + if err != ErrRowDataNotApplicable { newErr := fmt.Errorf("failed to get project ID on row data with view %s: %v", rd.View.Name, err) e.opts.OnError(newErr, rd) } @@ -273,10 +286,18 @@ func (e *Exporter) Close() error { // makeTS constructs a time series from a row data. func (e *Exporter) makeTS(rd *RowData) (*mpb.TimeSeries, error) { - pt := newPoint(rd.View, rd.Row, rd.Start, rd.End) - if pt.Value == nil { - return nil, fmt.Errorf("inconsistent data found in view %s", rd.View.Name) + var pt *mpb.Point + if strVal, ok, err := e.opts.IsValueString(rd); err != nil { + return nil, fmt.Errorf("failed to check whether row data is string valued or not in view: %v", rd.View.Name) + } else if ok { + pt = newStringPoint(strVal, rd.End) + } else { + pt = newPoint(rd.View, rd.Row, rd.Start, rd.End) + if pt.Value == nil { + return nil, fmt.Errorf("inconsistent data found in view %s", rd.View.Name) + } } + resource, err := e.opts.MakeResource(rd) if err != nil { return nil, fmt.Errorf("failed to construct resource of view %s: %v", rd.View.Name, err) @@ -292,7 +313,7 @@ func (e *Exporter) makeTS(rd *RowData) (*mpb.TimeSeries, error) { return ts, nil } -// makeLables constructs label that's ready for being uploaded to stackdriver. +// makeLabels constructs label that's ready for being uploaded to stackdriver. func (e *Exporter) makeLabels(tags []tag.Tag) map[string]string { opts := e.opts labels := make(map[string]string, len(opts.DefaultLabels)+len(tags)) diff --git a/monitoring/exporter/stackdriver/stackdriver_test.go b/monitoring/exporter/stackdriver/stackdriver_test.go index b989f548b..c4dcde6af 100644 --- a/monitoring/exporter/stackdriver/stackdriver_test.go +++ b/monitoring/exporter/stackdriver/stackdriver_test.go @@ -20,6 +20,7 @@ import ( "go.opencensus.io/stats/view" mrpb "google.golang.org/genproto/googleapis/api/monitoredres" + mpb "google.golang.org/genproto/googleapis/monitoring/v3" ) // This file contains actual tests. @@ -36,6 +37,7 @@ func TestAll(t *testing.T) { {"UploadNoError", testUploadNoError}, {"UploadTimeSeriesMakeError", testUploadTimeSeriesMakeError}, {"UploadWithMetricClientError", testUploadWithMetricClientError}, + {"StringValueMetric", testStringValueMetric}, {"MakeResource", testMakeResource}, {"MakeLabel", testMakeLabel}, } @@ -72,7 +74,7 @@ func testProjectClassifyNoError(t *testing.T) { case view1row2: return project2, nil default: - return "", unrecognizedDataError + return "", errUnrecognizedData } } @@ -121,11 +123,11 @@ func testProjectClassifyError(t *testing.T) { case view1row1, view2row2: return project1, nil case view1row2: - return "", RowDataNotApplicableError + return "", ErrRowDataNotApplicable case view2row1: - return "", invalidDataError + return "", errInvalidData default: - return "", unrecognizedDataError + return "", errUnrecognizedData } } @@ -139,7 +141,7 @@ func testProjectClassifyError(t *testing.T) { wantErrRdCheck := []errRowDataCheck{ { errPrefix: "failed to get project ID", - errSuffix: invalidDataErrStr, + errSuffix: invalidDataStr, rds: []*RowData{{view2, startTime2, endTime2, view2row1}}, }, } @@ -244,7 +246,7 @@ func testUploadNoError(t *testing.T) { func testUploadTimeSeriesMakeError(t *testing.T) { makeResource := func(rd *RowData) (*mrpb.MonitoredResource, error) { if rd.Row == view1row2 { - return nil, invalidDataError + return nil, errInvalidData } return defaultMakeResource(rd) } @@ -266,7 +268,7 @@ func testUploadTimeSeriesMakeError(t *testing.T) { wantErrRdCheck := []errRowDataCheck{ { errPrefix: "failed to construct resource", - errSuffix: invalidDataErrStr, + errSuffix: invalidDataStr, rds: []*RowData{{view1, startTime1, endTime1, view1row2}}, }, { errPrefix: "inconsistent data found in view", @@ -294,7 +296,7 @@ func testUploadWithMetricClientError(t *testing.T) { if err != nil { t.Fatal(err) } - timeSeriesResults = append(timeSeriesResults, invalidDataError) + timeSeriesResults = append(timeSeriesResults, nil, errInvalidData) rd := []*RowData{ {view1, startTime1, endTime1, view1row1}, {view1, startTime1, endTime1, view1row2}, @@ -307,11 +309,10 @@ func testUploadWithMetricClientError(t *testing.T) { wantErrRdCheck := []errRowDataCheck{ { errPrefix: "monitoring API call to create time series failed", - errSuffix: invalidDataErrStr, + errSuffix: invalidDataStr, rds: []*RowData{ - {view1, startTime1, endTime1, view1row1}, - {view1, startTime1, endTime1, view1row2}, - {view1, startTime1, endTime1, view1row3}, + {view2, startTime2, endTime2, view2row1}, + {view2, startTime2, endTime2, view2row2}, }, }, } @@ -328,6 +329,52 @@ func testUploadWithMetricClientError(t *testing.T) { } } +// testStringValueMetric tests that exporter can detect string valued metric by IsValueString given +// in option. +func testStringValueMetric(t *testing.T) { + isValueString := func(rd *RowData) (string, bool, error) { + switch rd.Row { + case view1row1: + return "string_value", true, nil + case view1row2: + return "", false, nil + default: + return "", false, errUnrecognizedData + } + } + pd, err := newTestProjData(Options{IsValueString: isValueString}) + if err != nil { + t.Fatal(err) + } + rd := []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view1, startTime1, endTime1, view1row2}, + {view1, startTime1, endTime1, view1row3}, + } + pd.uploadRowData(rd) + wantErrRdCheck := []errRowDataCheck{ + { + errPrefix: "failed to check whether row data is string valued or not", + errSuffix: metric1name, + rds: []*RowData{{view1, startTime1, endTime1, view1row3}}, + }, + } + if err := checkErrStorage(wantErrRdCheck); err != nil { + t.Error(err) + } + wantVals := []interface{}{"string_value", int64(2)} + tsArr := timeSeriesReqs[0].TimeSeries + tsVals := []interface{}{ + tsArr[0].Points[0].Value.Value.(*mpb.TypedValue_StringValue).StringValue, + tsArr[1].Points[0].Value.Value.(*mpb.TypedValue_Int64Value).Int64Value, + } + for i := 0; i < len(tsVals); i++ { + if tsVal, wantVal := tsVals[i], wantVals[i]; tsVal != wantVal { + t.Errorf("%d-th time series value mismatch: got %v, want %v", i+1, tsVal, wantVal) + } + } +} + // testMakeResource tests that exporter can create monitored resource dynamically. func testMakeResource(t *testing.T) { makeResource := func(rd *RowData) (*mrpb.MonitoredResource, error) { @@ -337,7 +384,7 @@ func testMakeResource(t *testing.T) { case view1row2: return resource2, nil default: - return nil, unrecognizedDataError + return nil, errUnrecognizedData } } pd, err := newTestProjData(Options{MakeResource: makeResource}) diff --git a/monitoring/exporter/stackdriver/timeseries_fake.go b/monitoring/exporter/stackdriver/timeseries_fake.go new file mode 100644 index 000000000..4a3ff1d64 --- /dev/null +++ b/monitoring/exporter/stackdriver/timeseries_fake.go @@ -0,0 +1,130 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver + +import ( + "context" + "fmt" + "os" + "strings" + + monitoring "cloud.google.com/go/monitoring/apiv3" + metricpb "google.golang.org/genproto/googleapis/api/metric" + mrpb "google.golang.org/genproto/googleapis/api/monitoredres" + mpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +var warningMsg = ` +********************************************************************************* +* DEBUG LOG: THIS PROGRAM IS USING FAKE MONITORING API FOR DEVELOPMENT PURPOSE. * +********************************************************************************* +`[1:] + +// WARNING: this file contains development purpose fake version of monitoring API. This code should +// not be used in any production environment at all. + +func init() { + if os.Getenv("CLOUD_SQL_PROXY_DEV") != "" { + fmt.Printf(warningMsg) + createTimeSeries = fakeCreateTimeSeries + } +} + +func fakeCreateTimeSeries(ctx context.Context, cl *monitoring.MetricClient, req *mpb.CreateTimeSeriesRequest) error { + if req == nil { + return fmt.Errorf("request is nil") + } + + var fakeTsArr []*mpb.TimeSeries + for i, ts := range req.TimeSeries { + errPrefix := fmt.Sprintf("timeseries[%d]", i) + name := ts.Metric.Type + sepIndex := strings.IndexByte(name, '/') + if sepIndex == -1 { + return fmt.Errorf("%s: metric name does not have '/': %s", errPrefix, name) + } + fakeName := fmt.Sprintf("custom.googleapis.com/cloudsql%s", name[sepIndex:]) + + fakeRscLabels := make(map[string]string) + var uuid string + for key, val := range ts.Resource.Labels { + var fakeKey string + fakeVal := val + switch key { + case "uuid": + uuid = val + continue + case "project_id": + fakeKey = key + case "region": + fakeKey = "zone" + if val == "UNSPECIFIED" { + fakeVal = "global" + } + case "database_id": + fakeKey = "instance_id" + default: + return fmt.Errorf("unknown label key of monitored resource: %s", key) + } + fakeRscLabels[fakeKey] = fakeVal + } + fakeResource := &mrpb.MonitoredResource{ + Type: "gce_instance", + Labels: fakeRscLabels, + } + + isValueString := false + var strVal string + + fakeVal := ts.Points[0].Value.Value + if strTypeVal, ok := fakeVal.(*mpb.TypedValue_StringValue); ok { + isValueString = true + strVal = strTypeVal.StringValue + fakeVal = &mpb.TypedValue_Int64Value{ + Int64Value: 0, + } + } + fakePt := &mpb.Point{ + Interval: ts.Points[0].Interval, + Value: &mpb.TypedValue{ + Value: fakeVal, + }, + } + + fakeLabels := map[string]string{"uuid": uuid} + for key, val := range ts.Metric.Labels { + fakeLabels[key] = val + } + if isValueString { + fakeLabels["string_metric_value"] = strVal + } + + fakeTs := &mpb.TimeSeries{ + Metric: &metricpb.Metric{ + Type: fakeName, + Labels: fakeLabels, + }, + Resource: fakeResource, + Points: []*mpb.Point{fakePt}, + } + fakeTsArr = append(fakeTsArr, fakeTs) + } + + fakeReq := &mpb.CreateTimeSeriesRequest{ + Name: req.Name, + TimeSeries: fakeTsArr, + } + return cl.CreateTimeSeries(ctx, fakeReq) +}