Skip to content
Xavi edited this page Dec 7, 2018 · 6 revisions

The Bulk API in Elasticsearch enables users to perform many index/update/delete operations in a single request.

Instead of doing it manually (as described here), you might also be interested in Bulk processor. Bulk processor does bulk processing in the background, similar to the BulkProcessor from the Java API.

Here's an example of how to use the Bulk API directly with Elastic.

client, err := elastic.NewClient()
if err != nil {
  // ...
}

// Set up 4 bulk requests: 2 index requests, 1 delete request, 1 update request.
index1Req := elastic.NewBulkIndexRequest().Index("twitter").Type("tweet").Id("1").Doc(tweet1)
index2Req := elastic.NewBulkIndexRequest().OpType("create").Index("twitter").Type("tweet").Id("2").Doc(tweet2)
delete1Req := elastic.NewBulkDeleteRequest().Index("twitter").Type("tweet").Id("1")
update2Req := elastic.NewBulkUpdateRequest().Index("twitter").Type("tweet").Id("2").
                Doc(struct {
                Retweets int `json:"retweets"`
        }{
                Retweets: 42,
        })

// Create the bulk and add the 4 requests to it
bulkRequest := client.Bulk()
bulkRequest = bulkRequest.Add(index1Req)
bulkRequest = bulkRequest.Add(index2Req)
bulkRequest = bulkRequest.Add(delete1Req)
bulkRequest = bulkRequest.Add(update2Req)

// NumberOfActions contains the number of requests in a bulk
if bulkRequest.NumberOfActions() != 4 {
  // ...
}

// Do sends the bulk requests to Elasticsearch
bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
  // ...
}

// Bulk request actions get cleared
if bulkRequest.NumberOfActions() != 0 {
  // ...
}

// Bulk response contains valuable information about the outcome, 
// e.g. which requests have failed etc.

// Indexed returns information about indexed documents
indexed := bulkResponse.Indexed()
if len(indexed) != 1 {
  // ...
}
if indexed[0].Id != "1" {
  // ...
}
if indexed[0].Status != 201 {
  // ...
}

// Created returns information about created documents
created := bulkResponse.Created()
if len(created) != 1 {
  // ...
}
if created[0].Id != "2" {
  // ...
}
if created[0].Status != 201 {
  // ...
}

// Deleted returns information about documents that were removed
deleted := bulkResponse.Deleted()
if len(deleted) != 1 {
  // ...
}
if deleted[0].Id != "1" {
  // ...
}
if deleted[0].Status != 200 {
  // ...
}
if !deleted[0].Found {
  // ...
}

// Updated returns information about documents that were updated
updated := bulkResponse.Updated()
if len(updated) != 1 {
  // ...
}
if updated[0].Id != "2" {
  // ...
}
if updated[0].Status != 200 {
  // ...
}
if updated[0].Version != 2 {
  // ...
}

// ById returns information about documents by ID
id1Results := bulkResponse.ById("1")
if len(id1Results) != 2 {
  // Document "1" should have been indexed, then deleted
  // ...
}

// ByAction returns information about a certain action.
// Use with "index", "create", "update", "delete".
deletedResults := bulkResponse.ByAction("delete")
...

// Failed() returns information about failed bulk requests
// (those with a HTTP status code outside [200,299].
failedResults := bulkResponse.Failed()
Clone this wiki locally