From 3da5909210e234efe4666942d6ca4b2a93885521 Mon Sep 17 00:00:00 2001 From: Nevins Date: Wed, 25 Aug 2021 15:59:53 -0400 Subject: [PATCH] Updates to bulk indexer. Add version and routing fields, optimize the memory useage. update to make field optional, use json api to marshal metadata update comment Signed-off-by: Nevins Bartolomeo --- opensearchutil/bulk_indexer.go | 107 +++++++++++-------- opensearchutil/bulk_indexer_internal_test.go | 65 ++++++++++- 2 files changed, 127 insertions(+), 45 deletions(-) diff --git a/opensearchutil/bulk_indexer.go b/opensearchutil/bulk_indexer.go index feda1aa2f..2d43df61e 100644 --- a/opensearchutil/bulk_indexer.go +++ b/opensearchutil/bulk_indexer.go @@ -32,10 +32,8 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "runtime" - "strconv" "sync" "sync/atomic" "time" @@ -111,16 +109,37 @@ type BulkIndexerStats struct { // BulkIndexerItem represents an indexer item. // type BulkIndexerItem struct { - Index string - Action string - DocumentID string - Body io.Reader - RetryOnConflict *int + Index string + Action string + DocumentID string + Routing *string + Version *int64 + VersionType *string + IfSeqNum *int64 + IfPrimaryTerm *int64 + WaitForActiveShards interface{} + Refresh *string + RequireAlias *bool + Body io.ReadSeeker + RetryOnConflict *int OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem) // Per item OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Per item } +type bulkActionMetadata struct { + Index string `json:"_index,omitempty"` + DocumentID string `json:"_id,omitempty"` + Routing *string `json:"routing,omitempty"` + Version *int64 `json:"version,omitempty"` + VersionType *string `json:"version_type,omitempty"` + IfSeqNum *int64 `json:"if_seq_num,omitempty"` + IfPrimaryTerm *int64 `json:"if_primary_term,omitempty"` + WaitForActiveShards interface{} `json:"wait_for_active_shards,omitempty"` + Refresh *string `json:"refresh,omitempty"` + RequireAlias *bool `json:"require_alias,omitempty"` +} + // BulkIndexerResponse represents the OpenSearch response. // type BulkIndexerResponse struct { @@ -404,30 +423,39 @@ func (w *worker) run() { // writeMeta formats and writes the item metadata to the buffer; it must be called under a lock. // func (w *worker) writeMeta(item BulkIndexerItem) error { - w.buf.WriteRune('{') - w.aux = strconv.AppendQuote(w.aux, item.Action) - w.buf.Write(w.aux) - w.aux = w.aux[:0] - w.buf.WriteRune(':') - w.buf.WriteRune('{') - if item.DocumentID != "" { - w.buf.WriteString(`"_id":`) - w.aux = strconv.AppendQuote(w.aux, item.DocumentID) - w.buf.Write(w.aux) - w.aux = w.aux[:0] + var err error + meta := bulkActionMetadata{ + Index: item.Index, + DocumentID: item.DocumentID, + Version: item.Version, + VersionType: item.VersionType, + Routing: item.Routing, + IfPrimaryTerm: item.IfPrimaryTerm, + IfSeqNum: item.IfSeqNum, + WaitForActiveShards: item.WaitForActiveShards, + Refresh: item.Refresh, + RequireAlias: item.RequireAlias, } - if item.Index != "" { - if item.DocumentID != "" { - w.buf.WriteRune(',') - } - w.buf.WriteString(`"_index":`) - w.aux = strconv.AppendQuote(w.aux, item.Index) - w.buf.Write(w.aux) - w.aux = w.aux[:0] + // Can not specify version or seq num if no document ID is passed + if meta.DocumentID == "" { + meta.Version = nil + meta.VersionType = nil + } + w.aux, err = json.Marshal(map[string]bulkActionMetadata{ + item.Action: meta, + }) + if err != nil { + return err + } + _, err = w.buf.Write(w.aux) + if err != nil { + return err + } + w.aux = w.aux[:0] + _, err = w.buf.WriteRune('\n') + if err != nil { + return err } - w.buf.WriteRune('}') - w.buf.WriteRune('}') - w.buf.WriteRune('\n') return nil } @@ -435,30 +463,21 @@ func (w *worker) writeMeta(item BulkIndexerItem) error { // func (w *worker) writeBody(item *BulkIndexerItem) error { if item.Body != nil { - - var getBody func() io.Reader - - if item.OnSuccess != nil || item.OnFailure != nil { - var buf bytes.Buffer - buf.ReadFrom(item.Body) - getBody = func() io.Reader { - r := buf - return ioutil.NopCloser(&r) + if _, err := w.buf.ReadFrom(item.Body); err != nil { + if w.bi.config.OnError != nil { + w.bi.config.OnError(context.Background(), err) } - item.Body = getBody() + return err } - if _, err := w.buf.ReadFrom(item.Body); err != nil { + if _, err := item.Body.Seek(0, io.SeekStart); err != nil { if w.bi.config.OnError != nil { w.bi.config.OnError(context.Background(), err) } return err } - w.buf.WriteRune('\n') - if getBody != nil && (item.OnSuccess != nil || item.OnFailure != nil) { - item.Body = getBody() - } + w.buf.WriteRune('\n') } return nil } diff --git a/opensearchutil/bulk_indexer_internal_test.go b/opensearchutil/bulk_indexer_internal_test.go index 667120b75..13e8a9288 100644 --- a/opensearchutil/bulk_indexer_internal_test.go +++ b/opensearchutil/bulk_indexer_internal_test.go @@ -24,6 +24,7 @@ // specific language governing permissions and limitations // under the License. +//go:build !integration // +build !integration package opensearchutil @@ -671,7 +672,61 @@ func TestBulkIndexer(t *testing.T) { DocumentID: "42", Index: "test", }}, - `{"index":{"_id":"42","_index":"test"}}` + "\n", + `{"index":{"_index":"test","_id":"42"}}` + "\n", + }, + { + "with version and no document", + args{BulkIndexerItem{ + Action: "index", + Index: "test", + Version: int64Pointer(23), + }}, + `{"index":{"_index":"test"}}` + "\n", + }, + { + "with version", + args{BulkIndexerItem{ + Action: "index", + DocumentID: "42", + Index: "test", + Version: int64Pointer(24), + }}, + `{"index":{"_index":"test","_id":"42","version":24}}` + "\n", + }, + { + "with version and version_type", + args{BulkIndexerItem{ + Action: "index", + DocumentID: "42", + Index: "test", + Version: int64Pointer(25), + VersionType: strPointer("external"), + }}, + `{"index":{"_index":"test","_id":"42","version":25,"version_type":"external"}}` + "\n", + }, + { + "wait_for_active_shards", + args{BulkIndexerItem{ + Action: "index", + DocumentID: "42", + Index: "test", + Version: int64Pointer(25), + VersionType: strPointer("external"), + WaitForActiveShards: 1, + }}, + `{"index":{"_index":"test","_id":"42","version":25,"version_type":"external","wait_for_active_shards":1}}` + "\n", + }, + { + "wait_for_active_shards, all", + args{BulkIndexerItem{ + Action: "index", + DocumentID: "42", + Index: "test", + Version: int64Pointer(25), + VersionType: strPointer("external"), + WaitForActiveShards: "all", + }}, + `{"index":{"_index":"test","_id":"42","version":25,"version_type":"external","wait_for_active_shards":"all"}}` + "\n", }, } for _, tt := range tests { @@ -700,3 +755,11 @@ type customJSONDecoder struct{} func (d customJSONDecoder) UnmarshalFromReader(r io.Reader, blk *BulkIndexerResponse) error { return json.NewDecoder(r).Decode(blk) } + +func strPointer(s string) *string { + return &s +} + +func int64Pointer(i int64) *int64 { + return &i +}