Skip to content

Commit

Permalink
Add bulkDocSection caching
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Oct 30, 2024
1 parent 48d6efe commit 4941c99
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public class RfsMigrateDocuments {
public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5;
public static final String LOGGING_MDC_WORKER_ID = "workerId";

private static final String DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG = "[" +
" {" +
" \"JsonTransformerForDocumentTypeRemovalProvider\":\"\"" +
" }" +
"]";

public static class DurationConverter implements IStringConverter<Duration> {
@Override
public Duration convert(String value) {
Expand Down Expand Up @@ -254,12 +260,15 @@ public static void main(String[] args) throws Exception {


String docTransformerConfig = TransformerConfigUtils.getTransformerConfig(arguments.docTransformationParams);
IJsonTransformer docTransformer = null;
if (docTransformerConfig != null) {
log.atInfo().setMessage("Doc Transformations config string: {}")
.addArgument(docTransformerConfig).log();
docTransformer = new TransformationLoader().getTransformerFactoryLoader(docTransformerConfig);
} else {
log.atInfo().setMessage("Using default transformation config: {}")
.addArgument(DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG).log();
docTransformerConfig = DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG;
}
IJsonTransformer docTransformer = new TransformationLoader().getTransformerFactoryLoader(docTransformerConfig);

try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class BulkDocSection {
private final ObjectNode indexCommand;
private final ObjectNode source;

private StringBuilder bulkDocSectionStringCache = null;

public BulkDocSection(String id, String indexName, String type, String docBody) {
this.docId = id;
this.indexCommand = createIndexCommand(id, indexName, type);
Expand Down Expand Up @@ -68,12 +70,17 @@ public static String convertToBulkRequestBody(Collection<BulkDocSection> bulkSec
return builder.toString();
}

public StringBuilder asStringBuilder() {
private StringBuilder asStringBuilder() {
if (this.bulkDocSectionStringCache != null) {
return this.bulkDocSectionStringCache;
}

StringBuilder builder = new StringBuilder();
try {
String indexCommand = asBulkIndex();
String sourceJson = asBulkSource();
String indexCommand = asBulkIndexString();
String sourceJson = asBulkSourceString();
builder.append(indexCommand).append(NEWLINE).append(sourceJson);
bulkDocSectionStringCache = builder;
return builder;
} catch (JsonProcessingException e) {
throw new RuntimeException(SERIALIZATION_ERROR_MESSAGE, e);
Expand All @@ -84,18 +91,18 @@ public String asString() {
return asStringBuilder().toString();
}

private String asString(ObjectNode node) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsString(node);
}

private String asBulkIndex() throws JsonProcessingException {
private String asBulkIndexString() throws JsonProcessingException {
return asString(this.indexCommand);
}

private String asBulkSource() throws JsonProcessingException {
private String asBulkSourceString() throws JsonProcessingException {
return asString(this.source);
}

private String asString(ObjectNode node) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsString(node);
}

@SuppressWarnings("unchecked")
public Map<String, Object> toMap() {
var indexMap = OBJECT_MAPPER.convertValue(this.indexCommand, HashMap.class);
Expand Down

0 comments on commit 4941c99

Please sign in to comment.