Skip to content

Commit

Permalink
Updates to bulk indexer. Add version and routing fields, optimize the…
Browse files Browse the repository at this point in the history
… memory useage.

update to make field optional, use json api to marshal metadata

update comment

Signed-off-by: Nevins Bartolomeo <[email protected]>
  • Loading branch information
Nevins authored and VijayanB committed Feb 4, 2022
1 parent 79d499d commit 3da5909
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 45 deletions.
107 changes: 63 additions & 44 deletions opensearchutil/bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -111,16 +109,37 @@ type BulkIndexerStats struct {
// BulkIndexerItem represents an indexer item.
//
type BulkIndexerItem struct {
Index string
Action string
DocumentID string
Body io.Reader
RetryOnConflict *int
Index string
Action string
DocumentID string
Routing *string
Version *int64
VersionType *string
IfSeqNum *int64
IfPrimaryTerm *int64
WaitForActiveShards interface{}
Refresh *string
RequireAlias *bool
Body io.ReadSeeker
RetryOnConflict *int

OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem) // Per item
OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Per item
}

type bulkActionMetadata struct {
Index string `json:"_index,omitempty"`
DocumentID string `json:"_id,omitempty"`
Routing *string `json:"routing,omitempty"`
Version *int64 `json:"version,omitempty"`
VersionType *string `json:"version_type,omitempty"`
IfSeqNum *int64 `json:"if_seq_num,omitempty"`
IfPrimaryTerm *int64 `json:"if_primary_term,omitempty"`
WaitForActiveShards interface{} `json:"wait_for_active_shards,omitempty"`
Refresh *string `json:"refresh,omitempty"`
RequireAlias *bool `json:"require_alias,omitempty"`
}

// BulkIndexerResponse represents the OpenSearch response.
//
type BulkIndexerResponse struct {
Expand Down Expand Up @@ -404,61 +423,61 @@ func (w *worker) run() {
// writeMeta formats and writes the item metadata to the buffer; it must be called under a lock.
//
func (w *worker) writeMeta(item BulkIndexerItem) error {
w.buf.WriteRune('{')
w.aux = strconv.AppendQuote(w.aux, item.Action)
w.buf.Write(w.aux)
w.aux = w.aux[:0]
w.buf.WriteRune(':')
w.buf.WriteRune('{')
if item.DocumentID != "" {
w.buf.WriteString(`"_id":`)
w.aux = strconv.AppendQuote(w.aux, item.DocumentID)
w.buf.Write(w.aux)
w.aux = w.aux[:0]
var err error
meta := bulkActionMetadata{
Index: item.Index,
DocumentID: item.DocumentID,
Version: item.Version,
VersionType: item.VersionType,
Routing: item.Routing,
IfPrimaryTerm: item.IfPrimaryTerm,
IfSeqNum: item.IfSeqNum,
WaitForActiveShards: item.WaitForActiveShards,
Refresh: item.Refresh,
RequireAlias: item.RequireAlias,
}
if item.Index != "" {
if item.DocumentID != "" {
w.buf.WriteRune(',')
}
w.buf.WriteString(`"_index":`)
w.aux = strconv.AppendQuote(w.aux, item.Index)
w.buf.Write(w.aux)
w.aux = w.aux[:0]
// Can not specify version or seq num if no document ID is passed
if meta.DocumentID == "" {
meta.Version = nil
meta.VersionType = nil
}
w.aux, err = json.Marshal(map[string]bulkActionMetadata{
item.Action: meta,
})
if err != nil {
return err
}
_, err = w.buf.Write(w.aux)
if err != nil {
return err
}
w.aux = w.aux[:0]
_, err = w.buf.WriteRune('\n')
if err != nil {
return err
}
w.buf.WriteRune('}')
w.buf.WriteRune('}')
w.buf.WriteRune('\n')
return nil
}

// writeBody writes the item body to the buffer; it must be called under a lock.
//
func (w *worker) writeBody(item *BulkIndexerItem) error {
if item.Body != nil {

var getBody func() io.Reader

if item.OnSuccess != nil || item.OnFailure != nil {
var buf bytes.Buffer
buf.ReadFrom(item.Body)
getBody = func() io.Reader {
r := buf
return ioutil.NopCloser(&r)
if _, err := w.buf.ReadFrom(item.Body); err != nil {
if w.bi.config.OnError != nil {
w.bi.config.OnError(context.Background(), err)
}
item.Body = getBody()
return err
}

if _, err := w.buf.ReadFrom(item.Body); err != nil {
if _, err := item.Body.Seek(0, io.SeekStart); err != nil {
if w.bi.config.OnError != nil {
w.bi.config.OnError(context.Background(), err)
}
return err
}
w.buf.WriteRune('\n')

if getBody != nil && (item.OnSuccess != nil || item.OnFailure != nil) {
item.Body = getBody()
}
w.buf.WriteRune('\n')
}
return nil
}
Expand Down
65 changes: 64 additions & 1 deletion opensearchutil/bulk_indexer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// specific language governing permissions and limitations
// under the License.

//go:build !integration
// +build !integration

package opensearchutil
Expand Down Expand Up @@ -671,7 +672,61 @@ func TestBulkIndexer(t *testing.T) {
DocumentID: "42",
Index: "test",
}},
`{"index":{"_id":"42","_index":"test"}}` + "\n",
`{"index":{"_index":"test","_id":"42"}}` + "\n",
},
{
"with version and no document",
args{BulkIndexerItem{
Action: "index",
Index: "test",
Version: int64Pointer(23),
}},
`{"index":{"_index":"test"}}` + "\n",
},
{
"with version",
args{BulkIndexerItem{
Action: "index",
DocumentID: "42",
Index: "test",
Version: int64Pointer(24),
}},
`{"index":{"_index":"test","_id":"42","version":24}}` + "\n",
},
{
"with version and version_type",
args{BulkIndexerItem{
Action: "index",
DocumentID: "42",
Index: "test",
Version: int64Pointer(25),
VersionType: strPointer("external"),
}},
`{"index":{"_index":"test","_id":"42","version":25,"version_type":"external"}}` + "\n",
},
{
"wait_for_active_shards",
args{BulkIndexerItem{
Action: "index",
DocumentID: "42",
Index: "test",
Version: int64Pointer(25),
VersionType: strPointer("external"),
WaitForActiveShards: 1,
}},
`{"index":{"_index":"test","_id":"42","version":25,"version_type":"external","wait_for_active_shards":1}}` + "\n",
},
{
"wait_for_active_shards, all",
args{BulkIndexerItem{
Action: "index",
DocumentID: "42",
Index: "test",
Version: int64Pointer(25),
VersionType: strPointer("external"),
WaitForActiveShards: "all",
}},
`{"index":{"_index":"test","_id":"42","version":25,"version_type":"external","wait_for_active_shards":"all"}}` + "\n",
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -700,3 +755,11 @@ type customJSONDecoder struct{}
func (d customJSONDecoder) UnmarshalFromReader(r io.Reader, blk *BulkIndexerResponse) error {
return json.NewDecoder(r).Decode(blk)
}

func strPointer(s string) *string {
return &s
}

func int64Pointer(i int64) *int64 {
return &i
}

0 comments on commit 3da5909

Please sign in to comment.