Skip to content

Commit

Permalink
process patches in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshi0301 committed Jan 30, 2024
1 parent d6bd3cf commit d9ddecd
Showing 1 changed file with 52 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.*;
import java.util.concurrent.CompletableFuture;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand Down Expand Up @@ -146,7 +147,7 @@ private void loadBootstrapTypeDefs() {
AtlasPatchRegistry patchRegistry = new AtlasPatchRegistry(graph);

if (modelsDirContents != null && modelsDirContents.length > 0) {
// Arrays.sort(modelsDirContents);
Arrays.sort(modelsDirContents);

for (File folder : modelsDirContents) {
if (folder.isFile()) {
Expand Down Expand Up @@ -180,7 +181,7 @@ private void loadModelsInFolder(File typesDir, AtlasPatchRegistry patchRegistry)
LOG.info("Types directory {} does not exist or not readable or has no typedef files", typesDirName );
} else {
// sort the files by filename
// Arrays.sort(typeDefFiles);
Arrays.sort(typeDefFiles);

for (File typeDefFile : typeDefFiles) {
if (typeDefFile.isFile()) {
Expand Down Expand Up @@ -438,8 +439,8 @@ private static boolean isTypeUpdateApplicable(AtlasBaseTypeDef oldTypeDef, Atlas

private void applyTypePatches(String typesDirName, AtlasPatchRegistry patchRegistry) {
String typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME;
File typePatchesDir = new File(typePatchesDirName);
File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null;
File typePatchesDir = new File(typePatchesDirName);
File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null;

if (typePatchFiles == null || typePatchFiles.length == 0) {
LOG.info("Type patches directory {} does not exist or not readable or has no patches", typePatchesDirName);
Expand All @@ -449,7 +450,7 @@ private void applyTypePatches(String typesDirName, AtlasPatchRegistry patchRegis
// sort the files by filename
Arrays.sort(typePatchFiles);

PatchHandler[] patchHandlers = new PatchHandler[] {
PatchHandler[] patchHandlers = new PatchHandler[]{
new UpdateEnumDefPatchHandler(typeDefStore, typeRegistry),
new AddAttributePatchHandler(typeDefStore, typeRegistry),
new UpdateAttributePatchHandler(typeDefStore, typeRegistry),
Expand All @@ -469,54 +470,68 @@ private void applyTypePatches(String typesDirName, AtlasPatchRegistry patchRegis
}
}

List<CompletableFuture<Void>> futures = new ArrayList<>();

for (File typePatchFile : typePatchFiles) {
if (typePatchFile.isFile()) {
String patchFile = typePatchFile.getAbsolutePath();
if (!typePatchFile.isFile()) {
continue;
}

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
String patchFile = typePatchFile.getAbsolutePath();
LOG.info("Applying patches in file {}", patchFile);

try {
String jsonStr = new String(Files.readAllBytes(typePatchFile.toPath()), StandardCharsets.UTF_8);
String jsonStr = new String(Files.readAllBytes(typePatchFile.toPath()), StandardCharsets.UTF_8);
TypeDefPatches patches = AtlasType.fromJson(jsonStr, TypeDefPatches.class);

if (patches == null || CollectionUtils.isEmpty(patches.getPatches())) {
if (patches == null || patches.getPatches().isEmpty()) {
LOG.info("No patches in file {}", patchFile);

continue;
return;
}

int patchIndex = 0;
for (TypeDefPatch patch : patches.getPatches()) {
PatchHandler patchHandler = patchHandlerRegistry.get(patch.getAction());
applyPatches(patchFile, patches, patchRegistry, patchHandlerRegistry);
} catch (Throwable t) {
LOG.error("Failed to apply patches in file {}. Ignored", patchFile, t);
}
});

if (patchHandler == null) {
LOG.error("Unknown patch action {} in file {}. Ignored", patch.getAction(), patchFile);
continue;
}
futures.add(future);
}

if (patchRegistry.isApplicable(patch.getId(), patchFile, patchIndex++)) {
PatchStatus status;
// Wait for all futures to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
}

try {
status = patchHandler.applyPatch(patch);
} catch (AtlasBaseException ex) {
status = FAILED;
private void applyPatches(String patchFile, TypeDefPatches patches, AtlasPatchRegistry patchRegistry, Map<String, PatchHandler> patchHandlerRegistry) {
int patchIndex = 0;
for (TypeDefPatch patch : patches.getPatches()) {
PatchHandler patchHandler = patchHandlerRegistry.get(patch.getAction());

LOG.error("Failed to apply {} (status: {}; action: {}) in file: {}. Ignored.",
patch.getId(), status.toString(), patch.getAction(), patchFile);
}
if (patchHandler == null) {
LOG.error("Unknown patch action {} in file {}. Ignored", patch.getAction(), patchFile);
continue;
}

patchRegistry.register(patch.id, patch.description, TYPEDEF_PATCH_TYPE, patch.action, status);
LOG.info("{} (status: {}; action: {}) in file: {}", patch.getId(), status.toString(), patch.getAction(), patchFile);
} else {
LOG.info("{} in file: {} already {}. Ignoring.", patch.getId(), patchFile, patchRegistry.getStatus(patch.getId()).toString());
}
}
} catch (Throwable t) {
LOG.error("Failed to apply patches in file {}. Ignored", patchFile, t);
}
}
if (!patchRegistry.isApplicable(patch.getId(), patchFile, patchIndex++)) {
LOG.info("{} in file: {} already {}. Ignoring.", patch.getId(), patchFile, patchRegistry.getStatus(patch.getId()).toString());
continue;
}

PatchStatus status = applyPatch(patchHandler, patch, patchFile);
patchRegistry.register(patch.id, patch.description, TYPEDEF_PATCH_TYPE, patch.action, status);
LOG.info("{} (status: {}; action: {}) in file: {}", patch.getId(), status.toString(), patch.getAction(), patchFile);
}
}

private PatchStatus applyPatch(PatchHandler patchHandler, TypeDefPatch patch, String patchFile) {
try {
return patchHandler.applyPatch(patch);
} catch (AtlasBaseException ex) {
LOG.error("Failed to apply {} (status: FAILED; action: {}) in file: {}. Ignored.",
patch.getId(), patch.getAction(), patchFile);
return FAILED;
}
}

Expand Down

0 comments on commit d9ddecd

Please sign in to comment.