diff --git a/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/ErrorCategory.java b/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/ErrorCategory.java index ce6c8598385..379b3d48849 100644 --- a/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/ErrorCategory.java +++ b/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/ErrorCategory.java @@ -16,6 +16,9 @@ package io.cdap.cdap.api.exception; +import java.util.Objects; +import javax.annotation.Nullable; + /** * Class representing the category of an error. * @@ -25,6 +28,7 @@ */ public class ErrorCategory { private final ErrorCategoryEnum errorCategory; + @Nullable private final String subCategory; /** @@ -33,8 +37,7 @@ public class ErrorCategory { * @param errorCategory The category of the error. */ public ErrorCategory(ErrorCategoryEnum errorCategory) { - this.errorCategory = errorCategory; - this.subCategory = null; + this(errorCategory, null); } /** @@ -43,7 +46,7 @@ public ErrorCategory(ErrorCategoryEnum errorCategory) { * @param errorCategory The category of the error. * @param subCategory The sub-category of the error. */ - public ErrorCategory(ErrorCategoryEnum errorCategory, String subCategory) { + public ErrorCategory(ErrorCategoryEnum errorCategory, @Nullable String subCategory) { this.errorCategory = errorCategory; this.subCategory = subCategory; } @@ -52,8 +55,24 @@ public ErrorCategory(ErrorCategoryEnum errorCategory, String subCategory) { * Returns the category of the error. */ public String getErrorCategory() { - return errorCategory == null ? ErrorCategoryEnum.OTHERS.toString() : subCategory == null ? - errorCategory.toString() : String.format("%s-'%s'", errorCategory, subCategory); + return errorCategory == null ? ErrorCategoryEnum.OTHERS.toString() : subCategory == null + ? errorCategory.toString() : String.format("%s-'%s'", errorCategory, subCategory); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ErrorCategory)) { + return false; + } + + ErrorCategory that = (ErrorCategory) o; + return Objects.equals(this.errorCategory, that.errorCategory) + && Objects.equals(this.subCategory, that.subCategory); + } + + @Override + public int hashCode() { + return Objects.hash(this.errorCategory, this.subCategory); } /* @@ -64,16 +83,25 @@ public String toString() { return getErrorCategory(); } + /** + * Returns the parent category of the error. + */ + public ErrorCategoryEnum getParentCategory() { + return errorCategory == null ? ErrorCategoryEnum.OTHERS : errorCategory; + } + /** * Enum representing the different categories of errors. */ public enum ErrorCategoryEnum { - PLUGIN("Plugin"), + ACCESS("Access"), + DEPROVISIONING("Deprovisioning"), NETWORKING("Networking"), + OTHERS("Others"), + PLUGIN("Plugin"), PROVISIONING("Provisioning"), - ACCESS("Access"), SCHEDULES_AND_TRIGGERS("Schedules and Triggers"), - OTHERS("Others"); + STARTING("Starting"); private final String displayName; diff --git a/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/FailureDetailsProvider.java b/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/FailureDetailsProvider.java index c2aac29a017..e94d012334a 100644 --- a/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/FailureDetailsProvider.java +++ b/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/FailureDetailsProvider.java @@ -21,6 +21,8 @@ /** * Interface for providing failure details. + * While implementing the interface, please don't forget to add the class the in + * {io.cdap.cdap.logging.ErrorLogsClassifier#ALLOWLIST_CLASSES} list. */ public interface FailureDetailsProvider { @@ -69,4 +71,43 @@ default ErrorCategory getErrorCategory() { default ErrorType getErrorType() { return ErrorType.UNKNOWN; } + + /** + * Returns whether the error is coming from a dependent service. + * + * @return true if the error is a dependency service error, false otherwise. + */ + default boolean isDependency() { + return false; + } + + /** + * Returns the type of error code. + * + * @return the type of error code. + */ + @Nullable + default ErrorCodeType getErrorCodeType() { + return null; + } + + /** + * Returns the error code. + * + * @return the error code. + */ + @Nullable + default String getErrorCode() { + return null; + } + + /** + * Returns the URL to the documentation. + * + * @return the URL to the documentation. + */ + @Nullable + default String getSupportedDocumentationUrl() { + return null; + } } diff --git a/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/ProgramFailureException.java b/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/ProgramFailureException.java index ac22007992d..6a1818ed7af 100644 --- a/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/ProgramFailureException.java +++ b/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/ProgramFailureException.java @@ -70,46 +70,39 @@ public String getErrorReason() { @Override public ErrorCategory getErrorCategory() { + if (errorCategory == null) { + return FailureDetailsProvider.super.getErrorCategory(); + } return errorCategory; } @Override public ErrorType getErrorType() { - return errorType == null ? ErrorType.UNKNOWN : errorType; + if (errorType == null) { + return FailureDetailsProvider.super.getErrorType(); + } + return errorType; } - /** - * Returns whether the error is coming from a dependent service. - * - * @return true if the error is a dependency service error, false otherwise. - */ + @Override public boolean isDependency() { return dependency; } - /** - * Returns the type of error code. - * - * @return the type of error code. - */ + @Nullable + @Override public ErrorCodeType getErrorCodeType() { return errorCodeType; } - /** - * Returns the error code. - * - * @return the error code. - */ + @Nullable + @Override public String getErrorCode() { return errorCode; } - /** - * Returns the URL to the documentation. - * - * @return the URL to the documentation. - */ + @Nullable + @Override public String getSupportedDocumentationUrl() { return supportedDocumentationUrl; } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/DefaultProvisionerContext.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/DefaultProvisionerContext.java index 182ba3f8da1..f77b9d39f4c 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/DefaultProvisionerContext.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/DefaultProvisionerContext.java @@ -16,6 +16,8 @@ package io.cdap.cdap.internal.provision; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; import io.cdap.cdap.api.metrics.MetricsCollectionService; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.common.logging.LoggingContext; @@ -62,6 +64,7 @@ public class DefaultProvisionerContext implements ProvisionerContext { private final String profileName; private final LoggingContext loggingContext; private final Executor executor; + private final ErrorCategory errorCategory; DefaultProvisionerContext(ProgramRunId programRunId, String provisionerName, Map properties, @@ -69,7 +72,7 @@ public class DefaultProvisionerContext implements ProvisionerContext { @Nullable VersionInfo appCDAPVersion, LocationFactory locationFactory, RuntimeMonitorType runtimeMonitorType, MetricsCollectionService metricsCollectionService, @Nullable String profileName, Executor executor, - LoggingContext loggingContext) { + LoggingContext loggingContext, ErrorCategory errorCategory) { this.programRun = new ProgramRun(programRunId.getNamespace(), programRunId.getApplication(), programRunId.getProgram(), programRunId.getRun()); this.programRunInfo = new ProgramRunInfo.Builder() @@ -92,6 +95,7 @@ public class DefaultProvisionerContext implements ProvisionerContext { this.metricsCollectionService = metricsCollectionService; this.provisionerName = provisionerName; this.executor = executor; + this.errorCategory = errorCategory; } @Override @@ -146,6 +150,11 @@ public String getProfileName() { return profileName; } + @Override + public ErrorCategory getErrorCategory() { + return errorCategory == null ? new ErrorCategory(ErrorCategoryEnum.OTHERS) : errorCategory; + } + @Override public ProvisionerMetrics getMetrics(Map context) { Map tags = new HashMap<>(context); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/ProvisioningService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/ProvisioningService.java index 8fbed80b5bb..d8285714763 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/ProvisioningService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/ProvisioningService.java @@ -23,6 +23,11 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import com.google.inject.Inject; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.cdap.api.macro.InvalidMacroException; import io.cdap.cdap.api.macro.MacroEvaluator; import io.cdap.cdap.api.macro.MacroParserOptions; @@ -226,7 +231,7 @@ public ClusterStatus getClusterStatus(ProgramRunId programRunId, ProgramOptions Networks.getAddress(cConf, Constants.NETWORK_PROXY_ADDRESS), null, null); } context = createContext(cConf, programOptions, programRunId, userId, properties, - defaultSSHContext); + defaultSSHContext, new ErrorCategory(ErrorCategoryEnum.OTHERS)); } catch (InvalidMacroException e) { // This shouldn't happen runWithProgramLogging(programRunId, systemArgs, @@ -363,15 +368,18 @@ public Runnable provision(ProvisionRequest provisionRequest, StructuredTableCont Map args = programOptions.getArguments().asMap(); String name = SystemArguments.getProfileProvisioner(args); Provisioner provisioner = provisionerInfo.get().provisioners.get(name); + ErrorCategory errorCategory = new ErrorCategory(ErrorCategoryEnum.PROVISIONING); // any errors seen here will transition the state straight to deprovisioned since no cluster create was attempted if (provisioner == null) { + String errorMessage = String.format("Could not provision cluster for the run because " + + "provisioner %s does not exist.", name); + String errorReason = String.format("Provisioner %s does not exist.", name); + ProgramFailureException ex = ErrorUtils.getProgramFailureException(errorCategory, errorReason, + errorMessage, ErrorType.SYSTEM, false, null); runWithProgramLogging( programRunId, args, - () -> LOG.error( - "Could not provision cluster for the run because provisioner {} does not exist.", - name)); - programStateWriter.error(programRunId, - new IllegalStateException("Provisioner does not exist.")); + () -> LOG.error(errorMessage, ex)); + programStateWriter.error(programRunId, ex); provisionerNotifier.deprovisioned(programRunId); return () -> { }; @@ -385,15 +393,17 @@ public Runnable provision(ProvisionRequest provisionRequest, StructuredTableCont Set unfulfilledRequirements = getUnfulfilledRequirements(provisioner.getCapabilities(), requirements); if (!unfulfilledRequirements.isEmpty()) { + String errorMessage = String.format("'%s' cannot be run using profile '%s' because " + + "the profile does not met all plugin requirements. Following requirements " + + "were not meet by the listed plugins: '%s'", programRunId.getProgram(), name, + groupByRequirement(unfulfilledRequirements)); + String errorReason = String.format("Provisioner %s does not meet all the requirements for " + + "the program %s to run.", name, programRunId.getProgram()); + ProgramFailureException ex = ErrorUtils.getProgramFailureException(errorCategory, errorReason, + errorMessage, ErrorType.SYSTEM, false, null); runWithProgramLogging(programRunId, args, () -> - LOG.error(String.format( - "'%s' cannot be run using profile '%s' because the profile does not met all " - + "plugin requirements. Following requirements were not meet by the listed " - + "plugins: '%s'", programRunId.getProgram(), name, - groupByRequirement(unfulfilledRequirements)))); - programStateWriter.error(programRunId, - new IllegalArgumentException("Provisioner does not meet all the " - + "requirements for the program to run.")); + LOG.error(errorMessage, ex)); + programStateWriter.error(programRunId, ex); provisionerNotifier.deprovisioned(programRunId); return () -> { }; @@ -428,7 +438,7 @@ public Optional getRuntimeJobManager(ProgramRunId programRunI String user = programOptions.getArguments().getOption(ProgramOptionConstants.USER_ID); Map properties = SystemArguments.getProfileProperties(systemArgs); ProvisionerContext context = createContext(cConf, programOptions, programRunId, user, - properties, null); + properties, null, new ErrorCategory(ErrorCategoryEnum.OTHERS)); return provisioner.getRuntimeJobManager(context) .map(manager -> new RuntimeJobManagerCallWrapper(provisioner.getClass().getClassLoader(), manager)); @@ -665,6 +675,7 @@ private Runnable createProvisionTask(ProvisioningTaskInfo taskInfo, Provisioner ProgramRunId programRunId = taskInfo.getProgramRunId(); ProgramOptions programOptions = taskInfo.getProgramOptions(); Map systemArgs = programOptions.getArguments().asMap(); + ErrorCategory errorCategory = new ErrorCategory(ErrorCategoryEnum.PROVISIONING); ProvisionerContext context; try { SSHContext sshContext = new DefaultSSHContext( @@ -672,21 +683,28 @@ private Runnable createProvisionTask(ProvisioningTaskInfo taskInfo, Provisioner locationFactory.create(taskInfo.getSecureKeysDir()), createSSHKeyPair(taskInfo)); context = createContext(cConf, programOptions, programRunId, taskInfo.getUser(), - taskInfo.getProvisionerProperties(), sshContext); + taskInfo.getProvisionerProperties(), sshContext, errorCategory); } catch (IOException e) { + String errorReason = "Failed to load ssh key."; + String errorMessage = + String.format("Failed to load ssh key with message: %s", e.getMessage()); + Exception ex = ErrorUtils.getProgramFailureException(errorCategory, errorReason, + errorMessage, ErrorType.SYSTEM, false, e); runWithProgramLogging(taskInfo.getProgramRunId(), systemArgs, - () -> LOG.error("Failed to load ssh key. The run will be marked as failed.", e)); - programStateWriter.error(programRunId, - new IllegalStateException("Failed to load ssh key.", e)); + () -> LOG.error("The run will be marked as failed.", ex)); + programStateWriter.error(programRunId, ex); provisionerNotifier.deprovisioning(taskInfo.getProgramRunId()); return () -> { }; } catch (InvalidMacroException e) { - runWithProgramLogging(taskInfo.getProgramRunId(), systemArgs, - () -> LOG.error("Could not evaluate macros while provisoning. " - + "The run will be marked as failed.", e)); - programStateWriter.error(programRunId, - new IllegalStateException("Could not evaluate macros while provisioning", e)); + String errorReason = "Could not evaluate macros while provisioning."; + String errorMessage = String.format("Could not evaluate macros with message: %s", + e.getMessage()); + ProgramFailureException ex = ErrorUtils.getProgramFailureException(errorCategory, errorReason, + errorMessage, ErrorType.USER, false, e); + runWithProgramLogging(programRunId, systemArgs, + () -> LOG.error("The run will be marked as failed.", ex)); + programStateWriter.error(programRunId, ex); provisionerNotifier.deprovisioning(taskInfo.getProgramRunId()); return () -> { }; @@ -715,13 +733,20 @@ private Runnable createProvisionTask(ProvisioningTaskInfo taskInfo, Provisioner private Runnable createDeprovisionTask(ProvisioningTaskInfo taskInfo, Provisioner provisioner, Consumer taskCleanup) { Map properties = taskInfo.getProvisionerProperties(); + ErrorCategory errorCategory = new ErrorCategory(ErrorCategoryEnum.DEPROVISIONING); ProvisionerContext context; SSHKeyPair sshKeyPair = null; try { sshKeyPair = createSSHKeyPair(taskInfo); } catch (IOException e) { - LOG.warn("Failed to load ssh key. No SSH key will be available for the deprovision task", e); + String errorReason = "Failed to load ssh key."; + String errorMessage = + String.format("Failed to load ssh key with message: %s", e.getMessage()); + Exception ex = ErrorUtils.getProgramFailureException(errorCategory, errorReason, + errorMessage, ErrorType.SYSTEM, false, e); + LOG.warn("Failed to load ssh key. No SSH key will be available for the deprovision task", + ex); } ProgramRunId programRunId = taskInfo.getProgramRunId(); @@ -732,12 +757,15 @@ private Runnable createDeprovisionTask(ProvisioningTaskInfo taskInfo, Provisione Networks.getAddress(cConf, Constants.NETWORK_PROXY_ADDRESS), null, sshKeyPair); context = createContext(cConf, taskInfo.getProgramOptions(), programRunId, taskInfo.getUser(), - properties, - sshContext); + properties, sshContext, errorCategory); } catch (InvalidMacroException e) { + String errorReason = "Could not evaluate macros while deprovisioning."; + String errorMessage = String.format("Could not evaluate macros with message: %s", + e.getMessage()); + ProgramFailureException ex = ErrorUtils.getProgramFailureException(errorCategory, errorReason, + errorMessage, ErrorType.USER, false, e); runWithProgramLogging(programRunId, systemArgs, - () -> LOG.error("Could not evaluate macros while deprovisoning. " - + "The cluster will be marked as orphaned.", e)); + () -> LOG.error("The cluster will be marked as orphaned.", ex)); provisionerNotifier.orphaned(programRunId); return () -> { }; @@ -850,9 +878,8 @@ private Location createKeysDirectory(ProgramRunId programRunId) { } private ProvisionerContext createContext(CConfiguration cConf, ProgramOptions programOptions, - ProgramRunId programRunId, String userId, - Map properties, - @Nullable SSHContext sshContext) { + ProgramRunId programRunId, String userId, Map properties, + @Nullable SSHContext sshContext, ErrorCategory errorCategory) { RuntimeMonitorType runtimeMonitorType = SystemArguments.getRuntimeMonitorType(cConf, programOptions); Map systemArgs = programOptions.getArguments().asMap(); @@ -866,9 +893,8 @@ private ProvisionerContext createContext(CConfiguration cConf, ProgramOptions pr LoggingContext loggingContext = LoggingContextHelper.getLoggingContextWithRunId(programRunId, systemArgs); return new DefaultProvisionerContext(programRunId, provisionerName, evaluated, sparkCompat, - sshContext, - appCDAPVersion, locationFactory, runtimeMonitorType, - metricsCollectionService, profileName, contextExecutor, loggingContext); + sshContext, appCDAPVersion, locationFactory, runtimeMonitorType, metricsCollectionService, + profileName, contextExecutor, loggingContext, errorCategory); } /** diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/task/ProvisioningTask.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/task/ProvisioningTask.java index 4cd655642c1..0f813beba86 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/task/ProvisioningTask.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/task/ProvisioningTask.java @@ -17,6 +17,10 @@ package io.cdap.cdap.internal.provision.task; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.FailureDetailsProvider; +import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.cdap.common.async.RepeatedTask; import io.cdap.cdap.common.lang.Exceptions; import io.cdap.cdap.common.logging.LogSamplers; @@ -105,14 +109,16 @@ public final long executeOnce() throws Exception { } // Get the sub-task to execute + ProvisioningOp.Type type = currentTaskInfo.getProvisioningOp().getType(); ProvisioningSubtask subtask = subTasks.get(state); if (subtask == null) { // should never happen - throw new IllegalStateException( - String.format("Invalid state '%s' in provisioning task for program run '%s'. " - + "This means there is a bug in provisioning state machine. " - + "Please reach out to the development team.", - state, programRunId)); + String errorReason = String.format("Invalid state '%s' in provisioning task for " + + "program run '%s'.", state, programRunId); + String errorMessage = String.format("%s This means there is a bug in provisioning state" + + "machine. Please reach out to the development team.", errorReason); + throw ErrorUtils.getProgramFailureException(provisionerContext.getErrorCategory(), + errorReason, errorMessage, ErrorType.SYSTEM, false, null); } if (subtask == EndSubtask.INSTANCE) { LOG.debug("Completed {} task for program run {}.", @@ -158,10 +164,16 @@ public final long executeOnce() throws Exception { } catch (InterruptedException e) { throw e; } catch (Throwable e) { - LOG.error("{} task failed in {} state for program run {} due to {}.", - currentTaskInfo.getProvisioningOp().getType(), state, programRunId, - Exceptions.condenseThrowableMessage(e), e); - handleSubtaskFailure(currentTaskInfo, e); + String errorReason = String.format("'%s' task failed in '%s' state for program run '%s'", + currentTaskInfo.getProvisioningOp().getType(), state, programRunId); + ProgramFailureException ex = null; + if (!(e instanceof FailureDetailsProvider)) { + ex = ErrorUtils.getProgramFailureException(provisionerContext.getErrorCategory(), + errorReason, Exceptions.condenseThrowableMessage(e), ErrorType.UNKNOWN, false, e); + } + LOG.error("{} due to {}.", errorReason, + Exceptions.condenseThrowableMessage(ex == null ? e : ex), ex == null ? e : ex); + handleSubtaskFailure(currentTaskInfo, ex); ProvisioningOp failureOp = new ProvisioningOp(currentTaskInfo.getProvisioningOp().getType(), ProvisioningOp.Status.FAILED); ProvisioningTaskInfo failureInfo = new ProvisioningTaskInfo(currentTaskInfo, failureOp, diff --git a/cdap-common/src/main/java/io/cdap/cdap/extension/AbstractExtensionLoader.java b/cdap-common/src/main/java/io/cdap/cdap/extension/AbstractExtensionLoader.java index 77b929554ff..e37d4229b63 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/extension/AbstractExtensionLoader.java +++ b/cdap-common/src/main/java/io/cdap/cdap/extension/AbstractExtensionLoader.java @@ -340,12 +340,16 @@ private ClassLoader getExtensionParentClassLoader() { return new FilterClassLoader(getClass().getClassLoader(), new FilterClassLoader.Filter() { @Override public boolean acceptResource(String resource) { - return resource.startsWith("org/slf4j") || filter.acceptResource(resource); + return resource.startsWith("org/slf4j") + || resource.startsWith("io/cdap/cdap/api/exception") + || filter.acceptResource(resource); } @Override public boolean acceptPackage(String packageName) { - return packageName.startsWith("org.slf4j") || filter.acceptPackage(packageName); + return packageName.startsWith("org.slf4j") + || packageName.startsWith("io.cdap.cdap.api.exception") + || filter.acceptPackage(packageName); } }); } diff --git a/cdap-coverage/pom.xml b/cdap-coverage/pom.xml index d0c3b7e2763..b599f4c4a73 100644 --- a/cdap-coverage/pom.xml +++ b/cdap-coverage/pom.xml @@ -108,16 +108,6 @@ cdap-security ${project.version} - - io.cdap.cdap - cdap-hbase-spi - ${project.version} - - - io.cdap.cdap - cdap-hbase-compat-base - ${project.version} - io.cdap.cdap cdap-tms @@ -233,11 +223,6 @@ cdap-program-report ${project.version} - - io.cdap.cdap - cdap-hbase-compat-1.0 - ${project.version} - io.cdap.cdap cdap-data-fabric-tests @@ -283,26 +268,6 @@ cdap-app-fabric-tests ${project.version} - - io.cdap.cdap - cdap-hbase-compat-1.0-cdh5.5.0 - ${project.version} - - - io.cdap.cdap - cdap-hbase-compat-1.0-cdh - ${project.version} - - - io.cdap.cdap - cdap-hbase-compat-1.1 - ${project.version} - - - io.cdap.cdap - cdap-hbase-compat-1.2-cdh5.7.0 - ${project.version} - io.cdap.cdap cdap-kubernetes diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocUtils.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocUtils.java index 93a440c39ee..010c4043252 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocUtils.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/common/DataprocUtils.java @@ -29,8 +29,15 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.io.CharStreams; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.runtime.spi.provisioner.ProvisionerContext; import io.cdap.cdap.runtime.spi.provisioner.ProvisionerMetrics; +import io.cdap.cdap.runtime.spi.provisioner.RetryableProvisionException; +import io.cdap.cdap.runtime.spi.provisioner.dataproc.DataprocRetryableException; +import io.cdap.cdap.runtime.spi.provisioner.dataproc.DataprocRuntimeException; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; @@ -80,11 +87,6 @@ public final class DataprocUtils { private static final int SET_CUSTOM_TIME_MAX_RETRY = 6; private static final int SET_CUSTOM_TIME_MAX_SLEEP_MILLIS_BEFORE_RETRY = 20000; - public static final String TROUBLESHOOTING_DOCS_URL_KEY = "troubleshootingDocsURL"; - // Empty url will ensure help messages don't appear by default in Dataproc error messages. - // This property needs to be overridden in cdap-site. - public static final String TROUBLESHOOTING_DOCS_URL_DEFAULT = ""; - /** * resources required by Runtime Job (io.cdap.cdap.runtime.spi.runtimejob.RuntimeJob) that will be * running on driver pool nodes. @@ -255,14 +257,23 @@ public static List parseIpRanges(List ranges) { /** * Get network from the metadata server. */ - public static String getSystemNetwork(ConnectionProvider connectionProvider) { + public static String getSystemNetwork(ConnectionProvider connectionProvider, + ErrorCategory errorCategory) { try { String network = getMetadata(connectionProvider, "instance/network-interfaces/0/network"); // will be something like projects//networks/default return network.substring(network.lastIndexOf('/') + 1); - } catch (IOException e) { - throw new IllegalArgumentException("Unable to get the network from the environment. " - + "Please explicitly set the network.", e); + } catch (Exception e) { + String errorReason = "Unable to get the network from the environment. " + + "Please explicitly set the network."; + throw new DataprocRuntimeException.Builder() + .withCause(e) + .withErrorCategory(errorCategory) + .withErrorReason(errorReason) + .withErrorMessage(String.format("%s with message: %s", errorReason, e.getMessage())) + .withDependency(true) + .withErrorType(ErrorType.USER) + .build(); } } @@ -274,9 +285,17 @@ public static String getSystemZone(ConnectionProvider connectionProvider) { String zone = getMetadata(connectionProvider, "instance/zone"); // will be something like projects//zones/us-east1-b return zone.substring(zone.lastIndexOf('/') + 1); - } catch (IOException e) { - throw new IllegalArgumentException("Unable to get the zone from the environment. " - + "Please explicitly set the zone.", e); + } catch (Exception e) { + String errorReason = "Unable to get the zone from the environment. " + + "Please explicitly set the zone."; + throw new DataprocRuntimeException.Builder() + .withCause(e) + .withErrorCategory(DataprocRuntimeException.ERROR_CATEGORY_PROVISIONING_CONFIGURATION) + .withErrorReason(errorReason) + .withErrorMessage(String.format("%s with message: %s", errorReason, e.getMessage())) + .withDependency(true) + .withErrorType(ErrorType.USER) + .build(); } } @@ -286,8 +305,14 @@ public static String getSystemZone(ConnectionProvider connectionProvider) { public static String getRegionFromZone(String zone) { int idx = zone.lastIndexOf("-"); if (idx <= 0) { - throw new IllegalArgumentException( - "Invalid zone. Zone must be in the format of -"); + String errorReason = "Invalid zone. Zone must be in the format of -"; + throw new DataprocRuntimeException.Builder() + .withErrorCategory(DataprocRuntimeException.ERROR_CATEGORY_PROVISIONING_CONFIGURATION) + .withErrorReason(errorReason) + .withErrorMessage(errorReason) + .withDependency(true) + .withErrorType(ErrorType.USER) + .build(); } return zone.substring(0, idx); } @@ -295,12 +320,21 @@ public static String getRegionFromZone(String zone) { /** * Get project id from the metadata server. */ - public static String getSystemProjectId(ConnectionProvider connectionProvider) { + public static String getSystemProjectId(ConnectionProvider connectionProvider, + ErrorCategory errorCategory) { try { return getMetadata(connectionProvider, "project/project-id"); - } catch (IOException e) { - throw new IllegalArgumentException("Unable to get project id from the environment. " - + "Please explicitly set the project id and account key.", e); + } catch (Exception e) { + String errorReason = "Unable to get project id from the environment. " + + "Please explicitly set the project id and account key."; + throw new DataprocRuntimeException.Builder() + .withCause(e) + .withErrorCategory(errorCategory) + .withErrorReason(errorReason) + .withErrorMessage(e.getMessage()) + .withDependency(true) + .withErrorType(ErrorType.USER) + .build(); } } @@ -402,7 +436,9 @@ private static String getMetadata(ConnectionProvider connectionProvider, String if (exception instanceof IOException) { throw (IOException) exception; } - throw new RuntimeException("Error fetching metadata from server", exception); + throw new RuntimeException( + String.format("Error fetching metadata from server with response code: %s", statusCode), + exception); } /** @@ -507,17 +543,47 @@ private static void updateTemporaryHoldOnGcsObject(Storage storage, String bucke } /** - * Get a user friendly message pointing to an external troubleshooting doc. - * - * @param troubleshootingDocsUrl Url for the troubleshooting doc - * @return user friendly message pointing to an external troubleshooting doc. + * Returns error reason as {@link String} based on {@link ApiException} status code. + */ + public static String getErrorReason(String reason, Throwable e) { + if (!(e instanceof ApiException)) { + return reason; + } + ApiException ex = (ApiException) e; + int statusCode = ex.getStatusCode().getCode().getHttpStatusCode(); + StringBuilder message = new StringBuilder(); + message.append(String.format("%s %s", statusCode, reason)); + if (!Strings.isNullOrEmpty(ex.getReason())) { + message.append(String.format(": %s", ex.getReason())); + } + message.append(String.format(". %s", + ErrorUtils.getActionErrorByStatusCode(statusCode).getCorrectiveAction())); + return message.toString(); + } + + /** + * Returns {@link DataprocRuntimeException} or throws {@link RetryableProvisionException} + * based on {@link ApiException} status code. */ - public static String getTroubleshootingHelpMessage(@Nullable String troubleshootingDocsUrl) { - if (Strings.isNullOrEmpty(troubleshootingDocsUrl)) { - return ""; + public static DataprocRuntimeException handleApiException(@Nullable String operationId, + ApiException e, String errorReason, ErrorCategory errorCategory) + throws RetryableProvisionException { + if (e.getStatusCode().getCode().getHttpStatusCode() / 100 != 4) { + throw new DataprocRetryableException(operationId, e); } - return String.format("For troubleshooting Dataproc errors, refer to %s", - troubleshootingDocsUrl); + int statusCode = e.getStatusCode().getCode().getHttpStatusCode(); + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode); + return new DataprocRuntimeException.Builder() + .withCause(e) + .withErrorCategory(errorCategory) + .withOperationId(operationId) + .withErrorMessage(e.getMessage()) + .withErrorReason(getErrorReason(errorReason, e)) + .withErrorType(pair.getErrorType()) + .withErrorCodeType(ErrorCodeType.HTTP) + .withErrorCode(String.valueOf(statusCode)) + .withDependency(true) + .build(); } private DataprocUtils() { diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/AbstractDataprocProvisioner.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/AbstractDataprocProvisioner.java index 6e52bd7508d..9b1fcb9d05d 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/AbstractDataprocProvisioner.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/AbstractDataprocProvisioner.java @@ -22,6 +22,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; +import io.cdap.cdap.api.exception.ErrorCategory; import io.cdap.cdap.runtime.spi.common.DataprocImageVersion; import io.cdap.cdap.runtime.spi.common.DataprocUtils; import io.cdap.cdap.runtime.spi.provisioner.Capabilities; @@ -71,7 +72,7 @@ public abstract class AbstractDataprocProvisioner implements Provisioner { /** * In reuse scenario we can't find "our" cluster by cluster name, so let's put it into the label * - * @see {@link DataprocProvisioner#getAllocatedClusterName(ProvisionerContext)} + * @see DataprocProvisioner#findCluster(String, DataprocClient) */ public static final String LABEL_RUN_KEY = "cdap-run-key"; @@ -237,7 +238,6 @@ protected boolean isDefaultContextProperty(String property) { } return ImmutableSet.of(DataprocConf.RUNTIME_JOB_MANAGER, DataprocUtils.BUCKET, DataprocConf.TOKEN_ENDPOINT_KEY, - DataprocUtils.TROUBLESHOOTING_DOCS_URL_KEY, DataprocConf.ENCRYPTION_KEY_NAME, DataprocConf.ROOT_URL, DataprocConf.COMPUTE_HTTP_REQUEST_CONNECTION_TIMEOUT, DataprocConf.COMPUTE_HTTP_REQUEST_READ_TIMEOUT).contains(property); diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocClient.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocClient.java index 447d1ead296..7d3c5f9b6ef 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocClient.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocClient.java @@ -20,7 +20,6 @@ import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.rpc.AlreadyExistsException; import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.InvalidArgumentException; import com.google.api.gax.rpc.NotFoundException; import com.google.api.services.compute.Compute; import com.google.api.services.compute.model.Network; @@ -46,6 +45,7 @@ import com.google.cloud.dataproc.v1.SoftwareConfig; import com.google.cloud.dataproc.v1.UpdateClusterRequest; import com.google.common.base.Strings; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; import com.google.longrunning.Operation; import com.google.longrunning.OperationsClient; @@ -53,7 +53,10 @@ import com.google.protobuf.Empty; import com.google.protobuf.FieldMask; import com.google.rpc.Status; -import io.cdap.cdap.error.api.ErrorTagProvider.ErrorTag; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.runtime.spi.common.DataprocUtils; import io.cdap.cdap.runtime.spi.provisioner.Node; import io.cdap.cdap.runtime.spi.provisioner.RetryableProvisionException; @@ -98,35 +101,44 @@ abstract class DataprocClient implements AutoCloseable { private static final Set ERROR_INFO_REASONS = ImmutableSet.of( "rateLimitExceeded", "resourceQuotaExceeded"); + private static final String CONFIGURATION = "Configuration"; protected final DataprocConf conf; private final ClusterControllerClient client; private final ComputeFactory computeFactory; private Compute compute; + protected final ErrorCategory errorCategory; protected DataprocClient(DataprocConf conf, ClusterControllerClient client, - ComputeFactory computeFactory) { + ComputeFactory computeFactory, ErrorCategory errorCategory) { this.conf = conf; this.client = client; this.computeFactory = computeFactory; + this.errorCategory = errorCategory == null + ? DataprocRuntimeException.ERROR_CATEGORY_PROVISIONING_CONFIGURATION : errorCategory; } - private static String findNetwork(Compute compute, String project) - throws IOException, RetryableProvisionException { + private static String findNetwork(Compute compute, String project, ErrorCategory errorCategory) + throws RetryableProvisionException { List networks; try { NetworkList networkList = compute.networks().list(project).execute(); networks = networkList.getItems(); } catch (Exception e) { handleRetryableExceptions(e); - throw new DataprocRuntimeException(e); + String errorReason = String.format("Unable list networks in project '%s'", project); + if (e instanceof GoogleJsonResponseException) { + throw handleGoogleJsonResponseException((GoogleJsonResponseException) e, + errorReason, errorCategory); + } + throw getDataprocRuntimeException(new ErrorCategory(errorCategory.getParentCategory(), + CONFIGURATION), errorReason, e.getMessage(), true, null, ErrorType.UNKNOWN, e); } if (networks == null || networks.isEmpty()) { - throw new DataprocRuntimeException( - String.format( - "Unable to find any networks in project '%s'. Please create a network in the project.", - project), - ErrorTag.CONFIGURATION); + String errorMessage = String.format("Unable to find any networks in project '%s'. " + + "Please create a network in the project.", project); + throw getDataprocRuntimeException(new ErrorCategory(errorCategory.getParentCategory(), + CONFIGURATION), errorMessage, errorMessage, true, null, ErrorType.USER, null); } for (Network network : networks) { @@ -141,10 +153,12 @@ private static String findNetwork(Compute compute, String project) /** * Extracts and returns the zone name from the given full zone URI. */ - private static String getZone(String zoneUri) { + private static String getZone(String zoneUri, ErrorCategory errorCategory) { int idx = zoneUri.lastIndexOf("/"); if (idx <= 0) { - throw new DataprocRuntimeException("Invalid zone uri " + zoneUri, ErrorTag.CONFIGURATION); + String errorMessage = String.format("Invalid zone uri %s", zoneUri); + throw getDataprocRuntimeException(new ErrorCategory(errorCategory.getParentCategory(), + CONFIGURATION), errorMessage, errorMessage, true, null, ErrorType.USER, null); } return zoneUri.substring(idx + 1); } @@ -166,8 +180,7 @@ private static String getZone(String zoneUri) { * @throws RetryableProvisionException if there was a non 4xx error code returned */ ClusterOperationMetadata createCluster(String name, String imageVersion, - Map labels, - boolean privateInstance, @Nullable SSHPublicKey publicKey) + Map labels, boolean privateInstance, @Nullable SSHPublicKey publicKey) throws RetryableProvisionException, InterruptedException, IOException { String operationId = null; @@ -333,10 +346,15 @@ ClusterOperationMetadata createCluster(String name, String imageVersion, } catch (ExecutionException e) { cleanUpClusterAfterCreationFailure(name); Throwable cause = e.getCause(); + String errorReason = String.format("Dataproc cluster create operation %sfailed.", + operationId == null ? "" : String.format(" %s", operationId)); if (cause instanceof ApiException) { - throw handleApiException(operationId, (ApiException) cause); + throw DataprocUtils.handleApiException(operationId, (ApiException) cause, errorReason, + errorCategory); } - throw new DataprocRuntimeException(operationId, cause); + throw getDataprocRuntimeException(errorCategory, errorReason, + cause == null ? e.getMessage() : cause.getMessage(), true, operationId, ErrorType.UNKNOWN, + cause == null ? e : cause); } } @@ -344,11 +362,13 @@ protected void setNetworkConfigs(Compute compute, GceClusterConfig.Builder clust boolean privateInstance) throws RetryableProvisionException, IOException { String network = conf.getNetwork(); String systemNetwork = null; + ErrorCategory category = new ErrorCategory(errorCategory.getParentCategory(), CONFIGURATION); try { systemNetwork = DataprocUtils.getSystemNetwork( - (url) -> (HttpURLConnection) url.openConnection()); - } catch (IllegalArgumentException e) { + (url) -> (HttpURLConnection) url.openConnection(), category); + } catch (DataprocRuntimeException e) { // expected when not running on GCP, ignore + LOG.trace("Unable to automatically detect the network.", e); } String projectId = conf.getProjectId(); @@ -356,9 +376,10 @@ protected void setNetworkConfigs(Compute compute, GceClusterConfig.Builder clust String systemProjectId = null; try { systemProjectId = DataprocUtils.getSystemProjectId( - (url) -> (HttpURLConnection) url.openConnection()); - } catch (IllegalArgumentException e) { + (url) -> (HttpURLConnection) url.openConnection(), category); + } catch (DataprocRuntimeException e) { // expected when not running on GCP, ignore + LOG.trace("Unable to automatically detect the project.", e); } if (network == null && projectId.equals(systemProjectId)) { @@ -367,12 +388,13 @@ protected void setNetworkConfigs(Compute compute, GceClusterConfig.Builder clust network = systemNetwork; } else if (network == null) { // Otherwise, pick a network from the configured project using the Compute API - network = findNetwork(compute, networkHostProjectId); + network = findNetwork(compute, networkHostProjectId, errorCategory); } if (network == null) { - throw new DataprocRuntimeException( - "Unable to automatically detect a network, please explicitly set a network.", - ErrorTag.CONFIGURATION); + String errorMessage = "Unable to automatically detect a network, " + + "please explicitly set a network."; + throw getDataprocRuntimeException(new ErrorCategory(errorCategory.getParentCategory(), + CONFIGURATION), errorMessage, errorMessage, true, null, ErrorType.USER, null); } Network networkInfo = getNetworkInfo(networkHostProjectId, network, compute); @@ -380,11 +402,11 @@ protected void setNetworkConfigs(Compute compute, GceClusterConfig.Builder clust String subnet = conf.getSubnet(); List subnets = networkInfo.getSubnetworks(); if (subnet != null && !subnetExists(subnets, subnet)) { - throw new DataprocRuntimeException( - String.format( - "Subnet '%s' does not exist in network '%s' in project '%s'. Please use a different subnet.", - subnet, network, networkHostProjectId), - ErrorTag.CONFIGURATION); + String errorMessage = String.format( + "Subnet '%s' does not exist in network '%s' in project '%s'. " + + "Please use a different subnet.", subnet, network, networkHostProjectId); + throw getDataprocRuntimeException(new ErrorCategory(errorCategory.getParentCategory(), + CONFIGURATION), errorMessage, errorMessage, true, null, ErrorType.USER, null); } // if the network uses custom subnets, a subnet must be provided to the dataproc api @@ -392,16 +414,16 @@ protected void setNetworkConfigs(Compute compute, GceClusterConfig.Builder clust networkInfo.getAutoCreateSubnetworks() != null && networkInfo.getAutoCreateSubnetworks(); if (!autoCreateSubnet) { // if the network uses custom subnets but none exist, error out + String errorMessage = String.format("Network '%s' in project '%s' does not contain any subnets. " + + "Please create a subnet or use a different network.", network, + networkHostProjectId); if (subnets == null || subnets.isEmpty()) { - throw new DataprocRuntimeException( - String.format("Network '%s' in project '%s' does not contain any subnets. " - + "Please create a subnet or use a different network.", network, - networkHostProjectId), - ErrorTag.CONFIGURATION); + throw getDataprocRuntimeException(new ErrorCategory(errorCategory.getParentCategory(), + CONFIGURATION), errorMessage, errorMessage, true, null, ErrorType.USER, null); } } - subnet = chooseSubnet(network, subnets, subnet, conf.getRegion()); + subnet = chooseSubnet(network, subnets, subnet, conf.getRegion(), errorCategory); // subnets are unique within a location, not within a network, which is why these configs are mutually exclusive. clusterConfig.setSubnetworkUri(subnet); @@ -439,7 +461,7 @@ private static boolean subnetExists(List subnets, String subnet) { // will be choosen and the region will be picked on basis of the given zone. If a subnet name is not provided then // any subnetwork in the region of the given zone will be picked. private static String chooseSubnet(String network, List subnets, @Nullable String subnet, - String region) { + String region, ErrorCategory errorCategory) { for (String currentSubnet : subnets) { // if a subnet name is given then get the region of that subnet based on the zone if (subnet != null && !currentSubnet.endsWith("subnetworks/" + subnet)) { @@ -449,30 +471,35 @@ private static String chooseSubnet(String network, List subnets, @Nullab return currentSubnet; } } - throw new DataprocRuntimeException( - String.format("Could not find %s in network '%s' that are for region '%s'", - subnet == null ? "any subnet" : - String.format("a subnet named '%s", subnet), network, region), - ErrorTag.CONFIGURATION); + String errorMessage = String.format("Could not find %s in network '%s' that are for " + + "region '%s'", subnet == null ? "any subnet" : String.format("a subnet named '%s", + subnet), network, region); + throw getDataprocRuntimeException(new ErrorCategory(errorCategory.getParentCategory(), + CONFIGURATION), errorMessage, errorMessage, true, null, ErrorType.USER, null); } private Network getNetworkInfo(String project, String network, Compute compute) - throws IOException, RetryableProvisionException { + throws RetryableProvisionException { Network networkObj; try { networkObj = compute.networks().get(project, network).execute(); } catch (Exception e) { handleRetryableExceptions(e); - throw new DataprocRuntimeException(e); + String errorReason = String.format("Unable to get details of network '%s' in project '%s'", + network, project); + if (e instanceof GoogleJsonResponseException) { + throw handleGoogleJsonResponseException((GoogleJsonResponseException) e, + errorReason, errorCategory); + } + throw getDataprocRuntimeException(new ErrorCategory(errorCategory.getParentCategory(), + CONFIGURATION), errorReason, e.getMessage(), true, null, ErrorType.UNKNOWN, e); } if (networkObj == null) { - throw new DataprocRuntimeException( - String.format( - "Unable to find network '%s' in project '%s'. Please specify another network.", - network, project), - ErrorTag.CONFIGURATION - ); + String errorMessage = String.format("Unable to find network '%s' in project '%s'. " + + "Please specify another network.", network, project); + throw getDataprocRuntimeException(new ErrorCategory(errorCategory.getParentCategory(), + CONFIGURATION), errorMessage, errorMessage, true, null, ErrorType.USER, null); } return networkObj; } @@ -487,12 +514,14 @@ private boolean isInternalIpOnly(Network network, boolean privateInstance) { String systemProjectId = null; String systemNetwork = null; try { + ErrorCategory category = new ErrorCategory(errorCategory.getParentCategory(), CONFIGURATION); systemProjectId = DataprocUtils.getSystemProjectId( - (url) -> (HttpURLConnection) url.openConnection()); + (url) -> (HttpURLConnection) url.openConnection(), category); systemNetwork = DataprocUtils.getSystemNetwork( - (url) -> (HttpURLConnection) url.openConnection()); - } catch (IllegalArgumentException e) { + (url) -> (HttpURLConnection) url.openConnection(), category); + } catch (DataprocRuntimeException e) { // expected when not running on GCP, ignore + LOG.trace("Unable to automatically detect the network or project.", e); } // Use private IP only cluster if privateInstance is true or if the compute profile required @@ -526,11 +555,11 @@ private boolean isInternalIpOnly(Network network, boolean privateInstance) { } // If there is no network connectivity and yet private ip only cluster is requested, raise an exception - throw new DataprocRuntimeException( - String.format( - "Direct network connectivity is needed for private Dataproc cluster between VPC %s/%s and %s/%s", - systemProjectId, systemNetwork, conf.getNetworkHostProjectId(), network.getName()), - ErrorTag.CONFIGURATION); + String errorMessage = String.format("Direct network connectivity is needed for private " + + "Dataproc cluster between VPC %s/%s and %s/%s", systemProjectId, systemNetwork, + conf.getNetworkHostProjectId(), network.getName()); + throw getDataprocRuntimeException(new ErrorCategory(errorCategory.getParentCategory(), + CONFIGURATION), errorMessage, errorMessage, true, null, ErrorType.USER, null); } private static PeeringState getPeeringState(String systemProjectId, String systemNetwork, @@ -580,7 +609,8 @@ private void cleanUpClusterAfterCreationFailure(String name) { * @param labels Key/Value pairs to set on the Dataproc cluster. */ void updateClusterLabels(String clusterName, - Map labels) throws RetryableProvisionException, InterruptedException { + Map labels) throws RetryableProvisionException, + InterruptedException { updateClusterLabels(clusterName, labels, Collections.emptyList()); } @@ -599,12 +629,18 @@ Future updateClusterLabels(String clusterName, if (labelsToSet.isEmpty() && labelsToRemove.isEmpty()) { return CompletableFuture.completedFuture(null); } + String operationId = null; try { + String errorMessage = String.format("Dataproc cluster '%s' does not exist or not in " + + "running state", clusterName); Cluster cluster = getDataprocCluster(clusterName) .filter(c -> c.getStatus().getState() == ClusterStatus.State.RUNNING) - .orElseThrow(() -> new DataprocRuntimeException( - "Dataproc cluster " + clusterName + " does not exist or not in running state", - ErrorTag.CONFIGURATION)); + .orElseThrow(() -> new DataprocRuntimeException.Builder() + .withErrorCategory(errorCategory) + .withErrorReason(errorMessage) + .withErrorMessage(errorMessage) + .withErrorType(ErrorType.USER) + .build()); Map existingLabels = cluster.getLabelsMap(); // If the labels to set are already exist and labels to remove are not set, // no need to update the cluster labelsToSet. @@ -629,6 +665,7 @@ Future updateClusterLabels(String clusterName, .setUpdateMask(updateMask) .build()); + operationId = operationFuture.getName(); ClusterOperationMetadata metadata = operationFuture.getMetadata().get(); int numWarnings = metadata.getWarningsCount(); if (numWarnings > 0) { @@ -638,10 +675,15 @@ Future updateClusterLabels(String clusterName, return operationFuture; } catch (ExecutionException e) { Throwable cause = e.getCause(); + String errorReason = String.format("Dataproc cluster update operation %sfailed.", + operationId == null ? "" : String.format(" %s", operationId)); if (cause instanceof ApiException) { - throw handleApiException((ApiException) cause); + throw DataprocUtils.handleApiException(operationId, (ApiException) cause, + errorReason, errorCategory); } - throw new DataprocRuntimeException(cause); + throw getDataprocRuntimeException(errorCategory, errorReason, + cause == null ? e.getMessage() : cause.getMessage(), true, operationId, ErrorType.UNKNOWN, + cause == null ? e : cause); } } @@ -670,6 +712,8 @@ Optional deleteCluster(String name) return Optional.of(operationFuture.getMetadata().get()); } catch (ExecutionException e) { Throwable cause = e.getCause(); + String errorReason = String.format("Dataproc cluster delete operation %sfailed.", + operationId == null ? "" : String.format(" %s", operationId)); if (cause instanceof ApiException) { ApiException apiException = (ApiException) cause; if (apiException.getStatusCode().getCode().getHttpStatusCode() == 404) { @@ -692,9 +736,12 @@ Optional deleteCluster(String name) } catch (Exception e1) { // if there was an error getting the cluster information, ignore it and handle the original delete error } - throw handleApiException(operationId, (ApiException) cause); + throw DataprocUtils.handleApiException(operationId, (ApiException) cause, errorReason, + errorCategory); } - throw new DataprocRuntimeException(operationId, cause); + throw getDataprocRuntimeException(errorCategory, errorReason, + cause == null ? e.getMessage() : cause.getMessage(), true, operationId, ErrorType.UNKNOWN, + cause == null ? e : cause); } } @@ -719,20 +766,18 @@ String getClusterFailureMsg(String name) throws RetryableProvisionException { // same name, was deleted, then this current cluster was created). This won't happen in practice for ephemeral // clusters, but it's still not great to have this possibility in the implementation. // https://cdap.atlassian.net/browse/CDAP-19641 - String resourceName = String.format("projects/%s/regions/%s/operations", conf.getProjectId(), - conf.getRegion()); + String projectId = conf.getProjectId(); + String region = conf.getRegion(); + String resourceName = String.format("projects/%s/regions/%s/operations", projectId, region); String filter = String.format("clusterName=%s AND operationType=CREATE", name); OperationsClient.ListOperationsPagedResponse operationsResponse; try { operationsResponse = client.getOperationsClient().listOperations(resourceName, filter); } catch (ApiException e) { - if (e.getStatusCode().getCode().getHttpStatusCode() / 100 != 4) { - // if there was an API exception that was not a 4xx, we can just try again - throw new RetryableProvisionException(e); - } - // otherwise, it's not a retryable failure - throw e; + String errorReason = String.format("Dataproc clusters list operation failed in " + + "project '%s' and region '%s'.", projectId, region); + throw DataprocUtils.handleApiException(null, e, errorReason, errorCategory); } OperationsClient.ListOperationsPage page = operationsResponse.getPage(); @@ -812,7 +857,9 @@ private io.cdap.cdap.runtime.spi.provisioner.Cluster convertClusterUnchecked(Clu try { return convertCluster(cluster); } catch (IOException e) { - throw new DataprocRuntimeException(e); + String errorReason = "Unable to convert dataproc cluster object into provisioner one."; + throw getDataprocRuntimeException(errorCategory, errorReason, e.getMessage(), false, + null, ErrorType.SYSTEM, e); } } @@ -825,7 +872,7 @@ private io.cdap.cdap.runtime.spi.provisioner.Cluster convertClusterUnchecked(Clu */ private io.cdap.cdap.runtime.spi.provisioner.Cluster convertCluster(Cluster cluster) throws IOException { - String zone = getZone(cluster.getConfig().getGceClusterConfig().getZoneUri()); + String zone = getZone(cluster.getConfig().getGceClusterConfig().getZoneUri(), errorCategory); List nodes = new ArrayList<>(); for (String masterName : cluster.getConfig().getMasterConfig().getInstanceNamesList()) { @@ -854,8 +901,10 @@ private Optional getDataprocCluster(String name) throws RetryableProvis // if there was an API exception that was not a 4xx, we can just try again throw new RetryableProvisionException(e); } + String reason = String.format("Failed to get details of dataproc cluster '%s' in project '%s'" + + " and region '%s'", name, conf.getProjectId(), conf.getRegion()); // otherwise, it's not a retryable failure - throw new DataprocRuntimeException(e); + throw DataprocUtils.handleApiException(null, e, reason, errorCategory); } } @@ -882,8 +931,8 @@ public Stream getClusters( * @throws RetryableProvisionException if there was a non 4xx error code returned */ public Stream getClusters( - Map labels, - Predicate postFilter) throws RetryableProvisionException { + Map labels, Predicate postFilter) + throws RetryableProvisionException { try { ListClustersRequest.Builder builder = ListClustersRequest.newBuilder() @@ -899,7 +948,8 @@ public Stream getClusters( .filter(postFilter) .map(this::convertClusterUnchecked); } catch (ApiException e) { - throw handleApiException(e); + String errorReason = "Dataproc clusters list operation failed."; + throw DataprocUtils.handleApiException(null, e, errorReason, errorCategory); } } @@ -941,24 +991,49 @@ public void close() { client.close(); } - // if there was an API exception that was not a 4xx, we can just try again - protected RetryableProvisionException handleApiException(ApiException e) - throws RetryableProvisionException { - return handleApiException(null, e); + private static DataprocRuntimeException getDataprocRuntimeException(ErrorCategory errorCategory, + String errorReason, String errorMessage, boolean dependency, @Nullable String operationId, + @Nullable ErrorType errorType, @Nullable Throwable cause) { + if (cause != null) { + List causalChain = Throwables.getCausalChain(cause); + for (Throwable e : causalChain) { + if (e instanceof DataprocRuntimeException) { + return (DataprocRuntimeException) e; + } + } + } + return new DataprocRuntimeException.Builder() + .withOperationId(operationId) + .withErrorCategory(errorCategory) + .withErrorReason(errorReason) + .withErrorMessage(errorMessage) + .withDependency(dependency) + .withCause(cause) + .withErrorType(errorType == null ? ErrorType.UNKNOWN : errorType) + .build(); } - private RetryableProvisionException handleApiException(@Nullable String operationId, - ApiException e) - throws RetryableProvisionException { - if (e.getStatusCode().getCode().getHttpStatusCode() / 100 != 4) { - throw new DataprocRetryableException(operationId, e); - } - String helpMessage = DataprocUtils.getTroubleshootingHelpMessage( - conf.getTroubleshootingDocsUrl()); - if (e instanceof InvalidArgumentException) { - throw new DataprocRuntimeException(operationId, helpMessage, e, ErrorTag.USER); + protected static DataprocRuntimeException handleGoogleJsonResponseException( + GoogleJsonResponseException e, String errorReason, ErrorCategory errorCategory) { + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getStatusCode()); + + if (Strings.isNullOrEmpty(errorReason)) { + errorReason = String.format("'%s', %s. %s", e.getStatusCode(), e.getStatusMessage(), + pair.getCorrectiveAction()); + } else { + errorReason = String.format("%s with status '%s' %s. %s", errorReason, e.getStatusCode(), + e.getStatusMessage(), pair.getCorrectiveAction()); } - throw new DataprocRuntimeException(operationId, helpMessage, e); + return new DataprocRuntimeException.Builder() + .withCause(e) + .withErrorMessage(e.getMessage()) + .withErrorCategory(new ErrorCategory(errorCategory.getParentCategory(), CONFIGURATION)) + .withErrorReason(errorReason) + .withErrorType(pair.getErrorType()) + .withDependency(true) + .withErrorCodeType(ErrorCodeType.HTTP) + .withErrorCode(String.valueOf(e.getStatusCode())) + .build(); } //Throws retryable Exception for the cases that are transient in nature @@ -977,8 +1052,7 @@ protected static void handleRetryableExceptions(Exception e) throws RetryablePro throw new RetryableProvisionException(e); } - if (statusCode == HttpURLConnection.HTTP_FORBIDDEN - || statusCode == DataprocUtils.RESOURCE_EXHAUSTED) { + if (statusCode == DataprocUtils.RESOURCE_EXHAUSTED) { boolean isRetryAble = gError.getDetails().getErrors().stream() .anyMatch(errorInfo -> ERROR_INFO_REASONS.contains(errorInfo.getReason())); if (isRetryAble) { diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocClientFactory.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocClientFactory.java index fbec6302e93..f80170dab55 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocClientFactory.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocClientFactory.java @@ -16,6 +16,8 @@ package io.cdap.cdap.runtime.spi.provisioner.dataproc; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.runtime.spi.provisioner.RetryableProvisionException; import java.io.IOException; import java.security.GeneralSecurityException; @@ -28,11 +30,13 @@ public interface DataprocClientFactory { * Create a {@link DataprocClient} for clusters that do not need SSH access. * * @param conf configuration about the dataproc clusters to operate on + * @param errorCategory error category * @return a DataprocClient * @throws IOException if there was an exception reading the credentials */ - default DataprocClient create(DataprocConf conf) throws GeneralSecurityException, IOException { - return create(conf, false); + default DataprocClient create(DataprocConf conf, ErrorCategory errorCategory) + throws GeneralSecurityException, IOException, RetryableProvisionException { + return create(conf, false, errorCategory); } /** @@ -41,9 +45,10 @@ default DataprocClient create(DataprocConf conf) throws GeneralSecurityException * @param conf configuration about the dataproc clusters to operate on * @param requireSSH whether the cluster should be open to SSH connections. When false, the * client can avoid making various Compute calls to fetch additional information. + * @param errorCategory error category * @return a DataprocClient * @throws IOException if there was an exception reading the credentials */ - DataprocClient create(DataprocConf conf, boolean requireSSH) - throws IOException, GeneralSecurityException; + DataprocClient create(DataprocConf conf, boolean requireSSH, ErrorCategory errorCategory) + throws IOException, GeneralSecurityException, RetryableProvisionException; } diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocConf.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocConf.java index b3e5b560b73..c7420138950 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocConf.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocConf.java @@ -175,18 +175,13 @@ final class DataprocConf { private final long clusterReuseUpdateMaxMs; private final String clusterReuseKey; private final boolean enablePredefinedAutoScaling; + private final boolean disableLocalCaching; + private final boolean gcsCacheEnabled; private final int computeReadTimeout; private final int computeConnectionTimeout; - private final boolean gcsCacheEnabled; - private final String troubleshootingDocsUrl; - - public String getTroubleshootingDocsUrl() { - return troubleshootingDocsUrl; - } - private DataprocConf(@Nullable String accountKey, String region, String zone, String projectId, @Nullable String networkHostProjectId, @Nullable String network, @Nullable String subnet, int masterNumNodes, int masterCpus, int masterMemoryMb, @@ -209,8 +204,7 @@ private DataprocConf(@Nullable String accountKey, String region, String zone, St long clusterReuseThresholdMinutes, long clusterReuseRetryDelayMs, long clusterReuseRetryMaxMs, long clusterReuseUpdateMaxMs, @Nullable String clusterReuseKey, boolean enablePredefinedAutoScaling, int computeReadTimeout, int computeConnectionTimeout, - @Nullable String rootUrl, boolean gcsCacheEnabled, boolean disableLocalCaching, - String troubleshootingDocsUrl) { + @Nullable String rootUrl, boolean gcsCacheEnabled, boolean disableLocalCaching) { this.accountKey = accountKey; this.region = region; this.zone = zone; @@ -272,7 +266,6 @@ private DataprocConf(@Nullable String accountKey, String region, String zone, St this.rootUrl = rootUrl; this.gcsCacheEnabled = gcsCacheEnabled; this.disableLocalCaching = disableLocalCaching; - this.troubleshootingDocsUrl = troubleshootingDocsUrl; } String getRegion() { @@ -587,7 +580,8 @@ static DataprocConf create(Map properties) { String projectId = getString(properties, PROJECT_ID_KEY); if (projectId == null || AUTO_DETECT.equals(projectId)) { projectId = DataprocUtils.getSystemProjectId( - (url -> (HttpURLConnection) url.openConnection())); + (url -> (HttpURLConnection) url.openConnection()), + DataprocRuntimeException.ERROR_CATEGORY_PROVISIONING_CONFIGURATION); } String zone = getString(properties, "zone"); @@ -776,10 +770,8 @@ static DataprocConf create(Map properties) { properties.getOrDefault(DataprocUtils.GCS_CACHE_ENABLED, "true")); // If true, artifacts will not be cached locally. final boolean disableLocalCaching = - Boolean.parseBoolean(properties.getOrDefault(DataprocUtils.LOCAL_CACHE_DISABLED, "false")); - final String troubleshootingDocsUrl = - properties.getOrDefault(DataprocUtils.TROUBLESHOOTING_DOCS_URL_KEY, - DataprocUtils.TROUBLESHOOTING_DOCS_URL_DEFAULT); + Boolean.parseBoolean(properties.getOrDefault(DataprocUtils.LOCAL_CACHE_DISABLED, + "false")); final String scopesProperty = String.format("%s,%s", Optional.ofNullable(getString(properties, SCOPES)).orElse(""), CLOUD_PLATFORM_SCOPE); @@ -806,7 +798,7 @@ static DataprocConf create(Map properties) { clusterReuseEnabled, clusterReuseThresholdMinutes, clusterReuseRetryDelayMs, clusterReuseRetryMaxMs, clusterReuseUpdateMaxMs, clusterReuseKey, enablePredefinedAutoScaling, computeReadTimeout, computeConnectionTimeout, rootUrl, - gcsCacheEnabled, disableLocalCaching, troubleshootingDocsUrl); + gcsCacheEnabled, disableLocalCaching); } // the UI never sends nulls, it only sends empty strings. diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisioner.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisioner.java index c2f6937f37b..86c07c75e9b 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisioner.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisioner.java @@ -21,7 +21,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableSet; -import io.cdap.cdap.error.api.ErrorTagProvider.ErrorTag; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; import io.cdap.cdap.runtime.spi.ProgramRunInfo; import io.cdap.cdap.runtime.spi.RuntimeMonitorType; import io.cdap.cdap.runtime.spi.common.DataprocImageVersion; @@ -92,13 +93,18 @@ public void validateProperties(Map properties) { getSystemContext().getProperties().get(PRIVATE_INSTANCE)); if (privateInstance && conf.isPreferExternalIp()) { - // When prefer external IP is set to true it means only Dataproc external ip can be used to for communication - // the instance being private instance is incapable of using external ip for communication - throw new DataprocRuntimeException( - "The instance is incapable of using external ip for communication with Dataproc cluster. " - - + "Please correct profile configuration by deselecting preferExternalIP.", - ErrorTag.CONFIGURATION); + // When prefer external IP is set to true it means only Dataproc external ip can be used for + // communication, so if the instance is private, it is incapable of using external ip for + // communication. + String errorMessage = "The instance is incapable of using external ip for communication with " + + "Dataproc cluster. Please correct profile configuration by deselecting " + + "preferExternalIP."; + throw new DataprocRuntimeException.Builder() + .withErrorCategory(DataprocRuntimeException.ERROR_CATEGORY_PROVISIONING_CONFIGURATION) + .withErrorReason(errorMessage) + .withErrorMessage(errorMessage) + .withErrorType(ErrorType.USER) + .build(); } // Validate Network Tags as per https://cloud.google.com/vpc/docs/add-remove-network-tags @@ -107,22 +113,37 @@ public void validateProperties(Map properties) { // Lower case letters and dashes allowed only. List networkTags = conf.getNetworkTags(); if (!networkTags.stream().allMatch(e -> NETWORK_TAGS_PATTERN.matcher(e).matches())) { - throw new DataprocRuntimeException("Invalid Network Tags: Ensure tag length is max 63 chars" - + " and contains lowercase letters, numbers and dashes only. ", - ErrorTag.CONFIGURATION); + String errorMessage = "Invalid Network Tags: Ensure tag length is max 63 chars" + + " and contains lowercase letters, numbers and dashes only. "; + throw new DataprocRuntimeException.Builder() + .withErrorCategory(DataprocRuntimeException.ERROR_CATEGORY_PROVISIONING_CONFIGURATION) + .withErrorReason(errorMessage) + .withErrorMessage(errorMessage) + .withErrorType(ErrorType.USER) + .build(); } if (networkTags.size() > 64) { - throw new DataprocRuntimeException("Exceed Max number of tags. Only Max of 64 allowed. ", - ErrorTag.CONFIGURATION); + String errorMessage = "Exceed Max number of tags. Only Max of 64 allowed. "; + throw new DataprocRuntimeException.Builder() + .withErrorCategory(DataprocRuntimeException.ERROR_CATEGORY_PROVISIONING_CONFIGURATION) + .withErrorReason(errorMessage) + .withErrorMessage(errorMessage) + .withErrorType(ErrorType.USER) + .build(); } if (!isAutoscalingFieldsValid(conf, properties)) { - throw new DataprocRuntimeException( - String.format("Invalid configs : %s, %s, %s. These are not allowed when %s is enabled ", - DataprocConf.WORKER_NUM_NODES, DataprocConf.SECONDARY_WORKER_NUM_NODES, - DataprocConf.AUTOSCALING_POLICY, DataprocConf.PREDEFINED_AUTOSCALE_ENABLED), - ErrorTag.CONFIGURATION); + String errorMessage = String.format("Invalid configs : %s, %s, %s. " + + "These are not allowed when %s is enabled ", + DataprocConf.WORKER_NUM_NODES, DataprocConf.SECONDARY_WORKER_NUM_NODES, + DataprocConf.AUTOSCALING_POLICY, DataprocConf.PREDEFINED_AUTOSCALE_ENABLED); + throw new DataprocRuntimeException.Builder() + .withErrorCategory(DataprocRuntimeException.ERROR_CATEGORY_PROVISIONING_CONFIGURATION) + .withErrorReason(errorMessage) + .withErrorMessage(errorMessage) + .withErrorType(ErrorType.USER) + .build(); } } @@ -169,7 +190,8 @@ public Cluster createCluster(ProvisionerContext context) throws Exception { } } - try (DataprocClient client = clientFactory.create(conf, sshPublicKey != null)) { + try (DataprocClient client = clientFactory.create(conf, sshPublicKey != null, + context.getErrorCategory())) { Cluster reused = tryReuseCluster(client, context, conf); if (reused != null) { DataprocUtils.emitMetric(context, conf.getRegion(), getImageVersion(context, conf), @@ -198,9 +220,20 @@ public Cluster createCluster(ProvisionerContext context) throws Exception { if (comparableImageVersion == null) { LOG.warn("Unable to extract Dataproc version from string '{}'.", imageVersion); } else if (DATAPROC_1_5_VERSION.compareTo(comparableImageVersion) > 0) { - throw new DataprocRuntimeException( - "Dataproc cluster must be version 1.5 or greater for pipeline execution.", - ErrorTag.CONFIGURATION); + String errorMessage = "Dataproc cluster must be version 1.5 or greater " + + "for pipeline execution."; + ErrorCategory errorCategory = + DataprocRuntimeException.ERROR_CATEGORY_PROVISIONING_CONFIGURATION; + if (context.getErrorCategory() != null) { + errorCategory = + new ErrorCategory(context.getErrorCategory().getParentCategory(), "Configuration"); + } + throw new DataprocRuntimeException.Builder() + .withErrorCategory(errorCategory) + .withErrorReason(errorMessage) + .withErrorMessage(errorMessage) + .withErrorType(ErrorType.USER) + .build(); } } @@ -260,8 +293,7 @@ public Cluster createCluster(ProvisionerContext context) throws Exception { */ @Nullable private Cluster tryReuseCluster(DataprocClient client, ProvisionerContext context, - DataprocConf conf) - throws RetryableProvisionException, InterruptedException { + DataprocConf conf) throws RetryableProvisionException, InterruptedException { if (!isReuseSupported(conf)) { LOG.debug( "Not checking cluster reuse, enabled: {}, skip delete: {}, idle ttl: {}, reuse threshold: {}", @@ -378,6 +410,7 @@ private Cluster tryReuseCluster(DataprocClient client, ProvisionerContext contex * Waits for the cluster to be found by the run label. The operation may take some time. * Note that we don't want to wait for the whole operation to finish, we jsut need to be sure * that cluster can be found by the labels. + * * @param client dataproc client * @param conf provisioner configuration * @param updateLabelsFuture future for the cluster update operation @@ -387,9 +420,7 @@ private Cluster tryReuseCluster(DataprocClient client, ProvisionerContext contex * finished. */ private static void waitForLabelsUpdateToApply(DataprocClient client, DataprocConf conf, - Future updateLabelsFuture, String clusterName, - Map runLabels) - throws Exception { + Future updateLabelsFuture, String clusterName, Map runLabels) throws Exception { boolean wasDone = false; while (client.getClusters(runLabels).count() == 0) { if (wasDone) { @@ -446,7 +477,7 @@ public ClusterStatus getClusterStatus(ProvisionerContext context, Cluster cluste return ClusterStatus.NOT_EXISTS; } - try (DataprocClient client = clientFactory.create(conf)) { + try (DataprocClient client = clientFactory.create(conf, context.getErrorCategory())) { status = client.getClusterStatus(clusterName); DataprocUtils.emitMetric(context, conf.getRegion(), getImageVersion(context, conf), "provisioner.clusterStatus.response.count"); @@ -463,7 +494,7 @@ public String getClusterFailureMsg(ProvisionerContext context, Cluster cluster) DataprocConf conf = DataprocConf.create(createContextProperties(context)); String clusterName = cluster.getName(); - try (DataprocClient client = clientFactory.create(conf)) { + try (DataprocClient client = clientFactory.create(conf, context.getErrorCategory())) { return client.getClusterFailureMsg(clusterName); } } @@ -472,7 +503,8 @@ public String getClusterFailureMsg(ProvisionerContext context, Cluster cluster) public Cluster getClusterDetail(ProvisionerContext context, Cluster cluster) throws Exception { DataprocConf conf = DataprocConf.create(createContextProperties(context)); String clusterName = cluster.getName(); - try (DataprocClient client = clientFactory.create(conf, shouldUseSsh(context, conf))) { + try (DataprocClient client = clientFactory.create(conf, shouldUseSsh(context, conf), + context.getErrorCategory())) { Optional existing = client.getCluster(clusterName); DataprocUtils.emitMetric(context, conf.getRegion(), getImageVersion(context, conf), "provisioner.clusterDetail.response.count"); @@ -491,7 +523,7 @@ protected void doDeleteCluster(ProvisionerContext context, Cluster cluster, Data return; } String clusterName = cluster.getName(); - try (DataprocClient client = clientFactory.create(conf)) { + try (DataprocClient client = clientFactory.create(conf, context.getErrorCategory())) { if (isReuseSupported(conf)) { long reuseUntil = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis( @@ -521,7 +553,7 @@ protected String getClusterName(ProvisionerContext context) throws Exception { DataprocConf conf = DataprocConf.create(createContextProperties(context)); String clusterKey = getRunKey(context); if (isReuseSupported(conf)) { - try (DataprocClient client = clientFactory.create(conf)) { + try (DataprocClient client = clientFactory.create(conf, context.getErrorCategory())) { Optional allocatedCluster = findCluster(clusterKey, client); return allocatedCluster.map(Cluster::getName).orElse(clusterKey); } diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocRuntimeException.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocRuntimeException.java index 2554940df67..eb1055da086 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocRuntimeException.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocRuntimeException.java @@ -16,13 +16,13 @@ package io.cdap.cdap.runtime.spi.provisioner.dataproc; +import com.google.api.gax.rpc.ApiException; import com.google.common.base.Strings; -import com.google.common.base.Throwables; -import io.cdap.cdap.error.api.ErrorTagProvider; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.FailureDetailsProvider; import javax.annotation.Nullable; /** @@ -30,64 +30,250 @@ * #toString()} implementation that doesn't include this exception class name and with the root * cause error message. */ -public class DataprocRuntimeException extends RuntimeException implements ErrorTagProvider { +public class DataprocRuntimeException extends RuntimeException implements FailureDetailsProvider { - private final Set errorTags = new HashSet<>(); + private static final String TROUBLESHOOTING_MESSAGE_FORMAT = + "For troubleshooting Dataproc errors, refer to %s"; + public static final String TROUBLESHOOTING_DOC_URL = + "https://cloud.google.com/dataproc/docs/support/troubleshoot-cluster-creation"; + public static final ErrorCategory ERROR_CATEGORY_PROVISIONING_CONFIGURATION = + new ErrorCategory(ErrorCategoryEnum.PROVISIONING, "Configuration"); - public DataprocRuntimeException(String message, ErrorTag... tags) { - super(message); - errorTags.addAll(Arrays.asList(tags)); - errorTags.add(ErrorTag.DEPENDENCY); - } + private final ErrorCategory errorCategory; + private final String errorReason; + private final ErrorType errorType; + private final boolean dependency; + private final ErrorCodeType errorCodeType; + private final String errorCode; + private final String supportedDocumentationUrl; - public DataprocRuntimeException(Throwable cause, String message, ErrorTag... tags) { - super(message, cause); - errorTags.addAll(Arrays.asList(tags)); - errorTags.add(ErrorTag.DEPENDENCY); + private DataprocRuntimeException(@Nullable String operationId, Throwable cause, + ErrorCategory errorCategory, String errorReason, String errorMessage, ErrorType errorType, + boolean dependency, ErrorCodeType errorCodeType, String errorCode, + String supportedDocumentationUrl) { + super(createMessage(operationId, errorMessage, dependency), cause); + this.errorCategory = errorCategory; + this.errorReason = errorReason; + this.errorType = errorType; + this.dependency = dependency; + this.errorCodeType = errorCodeType; + this.errorCode = errorCode; + this.supportedDocumentationUrl = supportedDocumentationUrl; } - public DataprocRuntimeException(Throwable cause, ErrorTag... tags) { - this(cause, "", tags); + private static String createMessage(@Nullable String operationId, String errorMessage, + boolean dependency) { + StringBuilder message = new StringBuilder(); + if (operationId != null) { + message.append("Dataproc operation ") + .append(operationId) + .append(" failure: ") + .append(errorMessage); + } else if (dependency) { + message.append("Dataproc operation failure: ") + .append(errorMessage); + } else { + message.append(errorMessage); + } + return message.toString(); } - public DataprocRuntimeException(@Nullable String operationId, Throwable cause, ErrorTag... tags) { - this(operationId, null, cause, tags); + @Nullable + @Override + public String getErrorReason() { + if (!Strings.isNullOrEmpty(errorReason)) { + if (errorReason.endsWith(".") && !Strings.isNullOrEmpty(getSupportedDocumentationUrl())) { + return String.format("%s %s", errorReason, + String.format(TROUBLESHOOTING_MESSAGE_FORMAT, getSupportedDocumentationUrl())); + } + } + return errorReason; } - public DataprocRuntimeException(@Nullable String operationId, - @Nullable String troubleshootingMessage, - Throwable cause, ErrorTag... tags) { - super(createMessage(operationId, troubleshootingMessage, cause), cause); - errorTags.addAll(Arrays.asList(tags)); + @Nullable + @Override + public String getFailureStage() { + return null; } @Override - public String toString() { - return String.format("ErrorTags: %s, Msg: %s", errorTags, - getMessage() != null ? getMessage() : ""); + public ErrorCategory getErrorCategory() { + if (errorCategory == null) { + return FailureDetailsProvider.super.getErrorCategory(); + } + return errorCategory; } - private static String createMessage(@Nullable String operationId, - @Nullable String troubleShootingMessage, Throwable cause) { - StringBuilder message = new StringBuilder(); - if (operationId != null) { - message.append("Dataproc operation ") - .append(operationId) - .append(" failure: ") - .append(Throwables.getRootCause(cause).getMessage()); - } else { - message.append("Dataproc operation failure: ") - .append(Throwables.getRootCause(cause).getMessage()); - } - if (!Strings.isNullOrEmpty(troubleShootingMessage)) { - message.append("\n") - .append(troubleShootingMessage); + @Override + public ErrorType getErrorType() { + if (errorType == null) { + return FailureDetailsProvider.super.getErrorType(); } - return message.toString(); + return errorType; + } + + @Override + public boolean isDependency() { + return dependency; + } + + @Nullable + @Override + public ErrorCodeType getErrorCodeType() { + return errorCodeType; + } + + @Nullable + @Override + public String getErrorCode() { + return errorCode; } + @Nullable @Override - public Set getErrorTags() { - return Collections.unmodifiableSet(errorTags); + public String getSupportedDocumentationUrl() { + Throwable cause = getCause(); + if (cause != null & cause instanceof ApiException) { + if (Strings.isNullOrEmpty(supportedDocumentationUrl)) { + return TROUBLESHOOTING_DOC_URL; + } + } + return supportedDocumentationUrl; + } + + /** + * Builder class for DataprocRuntimeException. + */ + public static class Builder { + private ErrorCategory errorCategory; + private String errorReason; + private String errorMessage; + private ErrorType errorType; + private Throwable cause; + private ErrorCodeType errorCodeType; + private String errorCode; + private boolean dependency; + private String supportedDocumentationUrl; + private String operationId; + + /** + * Sets the error category for the DataprocRuntimeException. + * + * @param errorCategory The category of the error. + * @return The current Builder instance. + */ + public Builder withErrorCategory(ErrorCategory errorCategory) { + this.errorCategory = errorCategory; + return this; + } + + /** + * Sets the error reason for the DataprocRuntimeException. + * + * @param errorReason The reason for the error. + * @return The current Builder instance. + */ + public Builder withErrorReason(String errorReason) { + this.errorReason = errorReason; + return this; + } + + /** + * Sets the error message for the ProgramFailureException. + * + * @param errorMessage The detailed error message. + * @return The current Builder instance. + */ + public Builder withErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + return this; + } + + /** + * Sets the error type for the DataprocRuntimeException. + * + * @param errorType The type of error (SYSTEM, USER, UNKNOWN). + * @return The current Builder instance. + */ + public Builder withErrorType(ErrorType errorType) { + this.errorType = errorType; + return this; + } + + /** + * Sets the cause for the DataprocRuntimeException. + * + * @param cause the cause (which is saved for later retrieval by the getCause() method). + * @return The current Builder instance. + */ + public Builder withCause(Throwable cause) { + this.cause = cause; + return this; + } + + /** + * Sets the dependency flag for the DataprocRuntimeException. + * + * @param dependency True if the error is from a dependent service, false otherwise. + * @return The current Builder instance. + */ + public Builder withDependency(boolean dependency) { + this.dependency = dependency; + return this; + } + + /** + * Sets the error code type for the DataprocRuntimeException. + * + * @param errorCodeType The type of error code. + * @return The current Builder instance. + */ + public Builder withErrorCodeType(ErrorCodeType errorCodeType) { + this.errorCodeType = errorCodeType; + return this; + } + + /** + * Sets the error code for the DataprocRuntimeException. + * + * @param errorCode The error code. + * @return The current Builder instance. + */ + public Builder withErrorCode(String errorCode) { + this.errorCode = errorCode; + return this; + } + + /** + * Sets the supported documentation URL for the DataprocRuntimeException. + * + * @param supportedDocumentationUrl The URL to the documentation. + * @return The current Builder instance. + */ + public Builder withSupportedDocumentationUrl(String supportedDocumentationUrl) { + this.supportedDocumentationUrl = supportedDocumentationUrl; + return this; + } + + /** + * Sets the dataproc operationId for the DataprocRuntimeException. + * + * @param operationId the dataproc operationId. + * @return The current Builder instance. + */ + public Builder withOperationId(String operationId) { + this.operationId = operationId; + return this; + } + + /** + * Builds and returns a new instance of DataprocRuntimeException. + * + * @return A new DataprocRuntimeException instance. + */ + public DataprocRuntimeException build() { + return new DataprocRuntimeException(operationId, cause, errorCategory, errorReason, + errorMessage, errorType, dependency, errorCodeType, errorCode, supportedDocumentationUrl); + } } } diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocTool.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocTool.java index 3315f7d4e1a..52b05787842 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocTool.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocTool.java @@ -118,7 +118,7 @@ public static void main(String[] args) throws Exception { String name = commandLine.getOptionValue('n'); DataprocClientFactory clientFactory = new DefaultDataprocClientFactory( new GoogleComputeFactory()); - try (DataprocClient client = clientFactory.create(conf, commandLine.hasOption('l'))) { + try (DataprocClient client = clientFactory.create(conf, commandLine.hasOption('l'), null)) { if (PROVISION.equalsIgnoreCase(command)) { ClusterOperationMetadata createOp = client.createCluster(name, imageVersion, Collections.emptyMap(), false, diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DefaultDataprocClientFactory.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DefaultDataprocClientFactory.java index 83ed31aec7a..2ddb623be99 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DefaultDataprocClientFactory.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DefaultDataprocClientFactory.java @@ -18,8 +18,12 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.rpc.ApiException; import com.google.cloud.dataproc.v1.ClusterControllerClient; import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.runtime.spi.common.DataprocUtils; +import io.cdap.cdap.runtime.spi.provisioner.RetryableProvisionException; import java.io.IOException; import java.security.GeneralSecurityException; import java.util.Optional; @@ -40,11 +44,27 @@ public DefaultDataprocClientFactory(ComputeFactory computeFactory) { } @Override - public DataprocClient create(DataprocConf conf, boolean requireSSH) - throws IOException, GeneralSecurityException { - ClusterControllerClient clusterControllerClient = getClusterControllerClient(conf); - return requireSSH ? new SshDataprocClient(conf, clusterControllerClient, computeFactory) : - new RuntimeMonitorDataprocClient(conf, clusterControllerClient, computeFactory); + public DataprocClient create(DataprocConf conf, boolean requireSSH, ErrorCategory errorCategory) + throws IOException, GeneralSecurityException, RetryableProvisionException { + ClusterControllerClient clusterControllerClient; + try { + clusterControllerClient = getClusterControllerClient(conf); + } catch (Exception e) { + String errorReason = "Unable to create dataproc cluster controller client."; + if (e instanceof ApiException) { + throw DataprocUtils.handleApiException(null, (ApiException) e, errorReason, errorCategory); + } + throw new DataprocRuntimeException.Builder() + .withCause(e) + .withErrorCategory(errorCategory) + .withErrorReason(errorReason) + .withErrorMessage(String.format("%s %s: %s", errorReason, e.getClass().getName(), + e.getMessage())) + .build(); + } + return requireSSH ? new SshDataprocClient(conf, clusterControllerClient, computeFactory, + errorCategory) : new RuntimeMonitorDataprocClient(conf, clusterControllerClient, + computeFactory, errorCategory); } private static ClusterControllerClient getClusterControllerClient(DataprocConf conf) diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/ExistingDataprocProvisioner.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/ExistingDataprocProvisioner.java index 17cce82fa79..11588e69e7a 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/ExistingDataprocProvisioner.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/ExistingDataprocProvisioner.java @@ -17,7 +17,8 @@ package io.cdap.cdap.runtime.spi.provisioner.dataproc; import com.google.common.base.Strings; -import io.cdap.cdap.error.api.ErrorTagProvider.ErrorTag; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; import io.cdap.cdap.runtime.spi.RuntimeMonitorType; import io.cdap.cdap.runtime.spi.common.DataprocImageVersion; import io.cdap.cdap.runtime.spi.provisioner.Cluster; @@ -81,9 +82,13 @@ public Cluster createCluster(ProvisionerContext context) throws Exception { String sshUser = contextProperties.get(SSH_USER); String sshKey = contextProperties.get(SSH_KEY); if (Strings.isNullOrEmpty(sshUser) || Strings.isNullOrEmpty(sshKey)) { - throw new DataprocRuntimeException( - "SSH User and key are required for monitoring through SSH.", - ErrorTag.CONFIGURATION); + String errorMessage = "SSH User and key are required for monitoring through SSH."; + throw new DataprocRuntimeException.Builder() + .withErrorCategory(context.getErrorCategory()) + .withErrorReason(errorMessage) + .withErrorMessage(errorMessage) + .withErrorType(ErrorType.USER) + .build(); } SSHKeyPair sshKeyPair = new SSHKeyPair(new SSHPublicKey(sshUser, ""), @@ -93,7 +98,7 @@ public Cluster createCluster(ProvisionerContext context) throws Exception { } String clusterName = contextProperties.get(CLUSTER_NAME); - try (DataprocClient client = CLIENT_FACTORY.create(conf)) { + try (DataprocClient client = CLIENT_FACTORY.create(conf, context.getErrorCategory())) { try { client.updateClusterLabels(clusterName, getCommonDataprocLabels(context)); } catch (DataprocRuntimeException e) { @@ -105,11 +110,16 @@ public Cluster createCluster(ProvisionerContext context) throws Exception { LOG.debug("Cannot update cluster labels due to {}", e.getMessage()); } } + final String errorMessage = String.format("Dataproc cluster '%s' does not exist or not in " + + "running state.", clusterName); Cluster cluster = client.getCluster(clusterName) .filter(c -> c.getStatus() == ClusterStatus.RUNNING) - .orElseThrow(() -> new DataprocRuntimeException("Dataproc cluster " + clusterName - + " does not exist or not in running state.", - ErrorTag.CONFIGURATION)); + .orElseThrow(() -> new DataprocRuntimeException.Builder() + .withErrorCategory(context.getErrorCategory()) + .withErrorReason(errorMessage) + .withErrorMessage(errorMessage) + .withErrorType(ErrorType.USER) + .build()); // Determine cluster version and fail if version is smaller than 1.5 Optional optImageVer = client.getClusterImageVersion(clusterName); @@ -119,9 +129,19 @@ public Cluster createCluster(ProvisionerContext context) throws Exception { } else if (!optComparableImageVer.isPresent()) { LOG.warn("Unable to extract Dataproc version from string '{}'.", optImageVer.get()); } else if (DATAPROC_1_5_VERSION.compareTo(optComparableImageVer.get()) > 0) { - throw new DataprocRuntimeException( - "Dataproc cluster must be version 1.5 or greater for pipeline execution.", - ErrorTag.CONFIGURATION); + String errorReason = "Dataproc cluster must be version 1.5 or greater " + + "for pipeline execution."; + ErrorCategory errorCategory = DataprocRuntimeException.ERROR_CATEGORY_PROVISIONING_CONFIGURATION; + if (context.getErrorCategory() != null) { + errorCategory = + new ErrorCategory(context.getErrorCategory().getParentCategory(), "Configuration"); + } + throw new DataprocRuntimeException.Builder() + .withErrorCategory(errorCategory) + .withErrorReason(errorReason) + .withErrorMessage(errorReason) + .withErrorType(ErrorType.USER) + .build(); } return cluster; diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/RuntimeMonitorDataprocClient.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/RuntimeMonitorDataprocClient.java index 172a651eb13..36e7ef70e46 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/RuntimeMonitorDataprocClient.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/RuntimeMonitorDataprocClient.java @@ -17,6 +17,7 @@ package io.cdap.cdap.runtime.spi.provisioner.dataproc; import com.google.cloud.dataproc.v1.ClusterControllerClient; +import io.cdap.cdap.api.exception.ErrorCategory; import io.cdap.cdap.runtime.spi.provisioner.Node; import java.util.Collections; @@ -26,8 +27,8 @@ class RuntimeMonitorDataprocClient extends DataprocClient { RuntimeMonitorDataprocClient(DataprocConf conf, ClusterControllerClient client, - ComputeFactory computeFactory) { - super(conf, client, computeFactory); + ComputeFactory computeFactory, ErrorCategory errorCategory) { + super(conf, client, computeFactory, errorCategory); } @Override diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/SshDataprocClient.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/SshDataprocClient.java index b55aa46df52..a1f0bb8d7f6 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/SshDataprocClient.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/SshDataprocClient.java @@ -24,6 +24,7 @@ import com.google.api.services.compute.model.Network; import com.google.cloud.dataproc.v1.ClusterControllerClient; import com.google.cloud.dataproc.v1.GceClusterConfig; +import io.cdap.cdap.api.exception.ErrorCategory; import io.cdap.cdap.runtime.spi.common.DataprocUtils; import io.cdap.cdap.runtime.spi.common.IPRange; import io.cdap.cdap.runtime.spi.provisioner.Node; @@ -59,8 +60,8 @@ class SshDataprocClient extends DataprocClient { "192.168.0.0/16")); SshDataprocClient(DataprocConf conf, ClusterControllerClient client, - ComputeFactory computeFactory) { - super(conf, client, computeFactory); + ComputeFactory computeFactory, ErrorCategory errorCategory) { + super(conf, client, computeFactory, errorCategory); } @Override @@ -133,13 +134,23 @@ protected Node getNode(Node.Type type, String zone, String nodeName) throws IOEx * @throws IOException If failed to discover those firewall rules */ private List getFirewallTargetTags(Network network, boolean useInternalIp) - throws IOException, RetryableProvisionException { + throws RetryableProvisionException { FirewallList firewalls; + String project = conf.getNetworkHostProjectId(); try { - firewalls = getOrCreateCompute().firewalls().list(conf.getNetworkHostProjectId()).execute(); + firewalls = getOrCreateCompute().firewalls().list(project).execute(); } catch (Exception e) { handleRetryableExceptions(e); - throw new DataprocRuntimeException(e); + String errorReason = String.format("Unable to list firewalls in project '%s'", project); + if (e instanceof GoogleJsonResponseException) { + throw handleGoogleJsonResponseException((GoogleJsonResponseException) e, + errorReason, errorCategory); + } + throw new DataprocRuntimeException.Builder() + .withErrorCategory(DataprocRuntimeException.ERROR_CATEGORY_PROVISIONING_CONFIGURATION) + .withErrorReason(errorReason) + .withErrorMessage(e.getMessage()) + .build(); } List tags = new ArrayList<>(); diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java index 94cdf06a571..51c82bdc036 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java @@ -20,7 +20,6 @@ import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.rpc.AlreadyExistsException; import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.ResourceExhaustedException; import com.google.api.gax.rpc.StatusCode; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.WriteChannel; @@ -49,11 +48,12 @@ import com.google.cloud.storage.StorageRetryStrategy; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; -import com.google.common.base.Strings; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.io.ByteStreams; -import io.cdap.cdap.error.api.ErrorTagProvider; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.runtime.spi.CacheableLocalFile; import io.cdap.cdap.runtime.spi.ProgramRunInfo; import io.cdap.cdap.runtime.spi.VersionInfo; @@ -79,8 +79,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.Stream; import javax.annotation.Nullable; import org.apache.twill.api.LocalFile; import org.apache.twill.filesystem.LocalLocationFactory; @@ -361,29 +359,35 @@ public void launch(RuntimeJobInfo runtimeJobInfo) throws Exception { } DataprocUtils.emitMetric(provisionerContext, submitJobMetric.build()); } catch (Exception e) { - String errorMessage = String.format("Error while launching job %s on cluster %s.", + String errorReason = String.format("Error while launching job %s on cluster %s.", getJobId(runInfo), clusterName); // delete all uploaded gcs files in case of exception DataprocUtils.deleteGcsPath(getStorageClient(), bucket, runRootPath); DataprocUtils.emitMetric(provisionerContext, submitJobMetric.setException(e).build()); - // ResourceExhaustedException indicates Dataproc agent running on master node isn't emitting heartbeat. - // This usually indicates master VM crashing due to OOM. - if (e instanceof ResourceExhaustedException) { - String message = String.format("%s Cluster can't accept jobs presently: %s", - errorMessage, - Throwables.getRootCause(e).getMessage()); - String helpMessage = DataprocUtils.getTroubleshootingHelpMessage( - provisionerProperties.getOrDefault( - DataprocUtils.TROUBLESHOOTING_DOCS_URL_KEY, - DataprocUtils.TROUBLESHOOTING_DOCS_URL_DEFAULT)); - - String combined = Stream.of(message, helpMessage) - .filter(s -> !Strings.isNullOrEmpty(s)) - .collect(Collectors.joining("\n")); - - throw new DataprocRuntimeException(e, combined, ErrorTagProvider.ErrorTag.USER); + // ResourceExhaustedException indicates Dataproc agent running on master node + // isn't emitting heartbeat. This usually indicates master VM crashing due to OOM. + ErrorCategory errorCategory = new ErrorCategory(ErrorCategoryEnum.STARTING); + if (e instanceof ApiException) { + int statusCode = + ((ApiException) e).getStatusCode().getCode().getHttpStatusCode(); + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode); + throw new DataprocRuntimeException.Builder() + .withCause(e) + .withErrorCategory(errorCategory) + .withErrorMessage(e.getMessage()) + .withErrorReason(DataprocUtils.getErrorReason(errorReason, e)) + .withErrorType(pair.getErrorType()) + .withErrorCodeType(ErrorCodeType.HTTP) + .withErrorCode(String.valueOf(statusCode)) + .withDependency(true) + .build(); } - throw new DataprocRuntimeException(e, errorMessage); + throw new DataprocRuntimeException.Builder() + .withErrorMessage(e.getMessage()) + .withErrorReason(errorReason) + .withErrorCategory(errorCategory) + .withCause(e) + .build(); } finally { if (disableLocalCaching) { DataprocUtils.deleteDirectoryContents(tempDir); diff --git a/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocClientTest.java b/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocClientTest.java index 18c86770f2e..de7f85ec83b 100644 --- a/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocClientTest.java +++ b/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocClientTest.java @@ -45,7 +45,9 @@ import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; -import io.cdap.cdap.error.api.ErrorTagProvider; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; import io.cdap.cdap.runtime.spi.provisioner.RetryableProvisionException; import io.grpc.Status; import java.io.IOException; @@ -101,10 +103,11 @@ public void init() throws Exception { properties.put("zone", "us-test1-c"); dataprocConf = DataprocConf.create(properties); - sshDataprocClientFactory = (conf, requireSsh) -> - new SshDataprocClient(conf, clusterControllerClientMock, dconf -> computeMock); - mockDataprocClientFactory = (conf, requireSsh) -> - new MockDataprocClient(conf, clusterControllerClientMock, dconf -> computeMock); + sshDataprocClientFactory = (conf, requireSsh, errorCategory) -> + new SshDataprocClient(conf, clusterControllerClientMock, dconf -> computeMock, errorCategory); + mockDataprocClientFactory = (conf, requireSsh, errorCategory) -> + new MockDataprocClient(conf, clusterControllerClientMock, dconf -> computeMock, + errorCategory); Compute.Networks networksMock = Mockito.mock(Compute.Networks.class); listMock = Mockito.mock(Compute.Networks.List.class); @@ -115,10 +118,15 @@ public void init() throws Exception { @Test public void testReadTimeOutThrowsRetryableException() throws Exception { Mockito.when(listMock.execute()).thenThrow(SocketTimeoutException.class); - DataprocClient client = sshDataprocClientFactory.create(dataprocConf); - thrown.expect(RetryableProvisionException.class); - thrown.expectCause(IsInstanceOf.instanceOf(SocketTimeoutException.class)); - client.createCluster("name", "2.0", Collections.emptyMap(), true, null); + DataprocClient client = sshDataprocClientFactory.create(dataprocConf, + new ErrorCategory(ErrorCategoryEnum.PROVISIONING)); + try { + client.createCluster("name", "2.0", Collections.emptyMap(), true, null); + } catch (Exception e) { + assertTrue(String.format("Got exception: %s, expected: %s", e.getClass().getName(), + RetryableProvisionException.class.getName()), e instanceof RetryableProvisionException); + assertTrue(e.getCause() instanceof SocketTimeoutException); + } } @Test @@ -139,10 +147,15 @@ public void rateLimitThrowsRetryableException() throws Exception { GoogleJsonResponseException gError = new GoogleJsonResponseException(builder, googleJsonError); Mockito.when(listMock.execute()).thenThrow(gError); - DataprocClient client = sshDataprocClientFactory.create(dataprocConf); - thrown.expect(RetryableProvisionException.class); - thrown.expectCause(IsInstanceOf.instanceOf(GoogleJsonResponseException.class)); + DataprocClient client = sshDataprocClientFactory.create(dataprocConf, + new ErrorCategory(ErrorCategoryEnum.PROVISIONING)); + try { client.createCluster("name", "2.0", Collections.emptyMap(), true, null); + } catch (Exception e) { + assertTrue(String.format("Got exception: %s, expected: %s", e.getClass().getName(), + RetryableProvisionException.class.getName()), e instanceof RetryableProvisionException); + assertTrue(e.getCause() instanceof GoogleJsonResponseException); + } } @Test @@ -162,10 +175,15 @@ public void nonRateLimitDoesNotThrowsRetryableException() throws Exception { GoogleJsonResponseException gError = new GoogleJsonResponseException(builder, googleJsonError); Mockito.when(listMock.execute()).thenThrow(gError); - DataprocClient client = sshDataprocClientFactory.create(dataprocConf); - thrown.expect(DataprocRuntimeException.class); - thrown.expectCause(IsInstanceOf.instanceOf(GoogleJsonResponseException.class)); - client.createCluster("name", "2.0", Collections.emptyMap(), true, null); + DataprocClient client = sshDataprocClientFactory.create(dataprocConf, + new ErrorCategory(ErrorCategoryEnum.PROVISIONING)); + try { + client.createCluster("name", "2.0", Collections.emptyMap(), true, null); + } catch (Exception e) { + assertTrue(String.format("Got exception: %s, expected: %s", e.getClass().getName(), + DataprocRuntimeException.class.getName()), e instanceof DataprocRuntimeException); + assertTrue(e.getCause() instanceof GoogleJsonResponseException); + } } @@ -177,7 +195,8 @@ public void apiExceptionWithNon4xxThrowsRetryableException() throws Exception { PowerMockito.when(clusterControllerClientMock.listClusters(Mockito.any())).thenThrow(e); thrown.expect(RetryableProvisionException.class); thrown.expectCause(IsInstanceOf.instanceOf(ApiException.class)); - sshDataprocClientFactory.create(dataprocConf).getClusters(new HashMap<>()); + sshDataprocClientFactory.create(dataprocConf, + new ErrorCategory(ErrorCategoryEnum.PROVISIONING)).getClusters(new HashMap<>()); } @Test @@ -188,7 +207,8 @@ public void apiExceptionWith4xxNotThrowRetryableException() throws Exception { PowerMockito.when(clusterControllerClientMock.listClusters(Mockito.any())).thenThrow(e); thrown.expect(DataprocRuntimeException.class); thrown.expectCause(IsInstanceOf.instanceOf(ApiException.class)); - sshDataprocClientFactory.create(dataprocConf).getClusters(new HashMap<>()); + sshDataprocClientFactory.create(dataprocConf, + new ErrorCategory(ErrorCategoryEnum.PROVISIONING)).getClusters(new HashMap<>()); } @Test @@ -210,8 +230,9 @@ public void testCreateClusterThrowsDataprocRetryableException() throws Exception thrown.expect(DataprocRetryableException.class); thrown.expectMessage(String.format("Dataproc operation %s failure: %s", operationId, errorMessage)); thrown.expectCause(IsInstanceOf.instanceOf(ApiException.class)); - mockDataprocClientFactory.create(dataprocConf).createCluster("name", "2.0", - Collections.emptyMap(), true, null); + mockDataprocClientFactory.create(dataprocConf, + new ErrorCategory(ErrorCategoryEnum.PROVISIONING)).createCluster("name", "2.0", + Collections.emptyMap(), true, null); } @Test @@ -235,11 +256,16 @@ public void testCreateClusterThrowsDataprocRuntimeException() throws Exception { Mockito.when(clusterControllerClientMock.getCluster(Mockito.any(GetClusterRequest.class))) .thenThrow(new NotFoundException(new Exception("Cluster not found!"), HttpJsonStatusCode.of(404), false)); - thrown.expect(DataprocRuntimeException.class); - thrown.expectMessage(String.format("Dataproc operation %s failure: %s", operationId, errorMessage)); - thrown.expectCause(IsInstanceOf.instanceOf(ApiException.class)); - mockDataprocClientFactory.create(dataprocConf).createCluster("name", "2.0", - Collections.emptyMap(), true, null); + try { + mockDataprocClientFactory.create(dataprocConf, + new ErrorCategory(ErrorCategoryEnum.PROVISIONING)).createCluster("name", "2.0", + Collections.emptyMap(), true, null); + } catch (Exception e) { + assertTrue(e instanceof DataprocRuntimeException); + assertTrue(e.getCause() instanceof ApiException); + assertEquals(String.format("Dataproc operation %s failure: java.io.IOException: %s", + operationId, errorMessage), e.getMessage()); + } } @Test @@ -271,16 +297,16 @@ public void testCreateClusterThrowsUserError() throws Exception { .thenThrow(new NotFoundException(new Exception("Cluster not found!"), HttpJsonStatusCode.of(404), false)); try { - mockDataprocClientFactory.create(conf) + mockDataprocClientFactory.create(conf, new ErrorCategory(ErrorCategoryEnum.PROVISIONING)) .createCluster("name", "2.0", Collections.emptyMap(), true, null); fail("Exception not thrown by createCluster()."); } catch (DataprocRuntimeException e) { - assertTrue("Thrown exception doesn't contain user error tag.", - e.getErrorTags().contains(ErrorTagProvider.ErrorTag.USER)); + assertEquals("Thrown exception doesn't contain user error.", e.getErrorType(), + ErrorType.USER); assertEquals("Exception cause is not of type InvalidArgumentException", - e.getCause().getClass(), InvalidArgumentException.class); - assertTrue("Error message doesn't contain troubleshooting docs link.", - e.getMessage().contains("https://abc.com/troubleshooting")); + e.getCause().getClass(), InvalidArgumentException.class); + assertEquals("Error message doesn't contain troubleshooting docs link.", + e.getSupportedDocumentationUrl(), DataprocRuntimeException.TROUBLESHOOTING_DOC_URL); } // Ensure help message is absent when troubleshooting docs url is missing. @@ -288,8 +314,9 @@ public void testCreateClusterThrowsUserError() throws Exception { conf = DataprocConf.create(properties); thrown.expect(DataprocRuntimeException.class); thrown.expectMessage(not(containsString("refer to"))); - mockDataprocClientFactory.create(conf).createCluster("name", "2.0", - Collections.emptyMap(), true, null); + mockDataprocClientFactory.create(conf, + new ErrorCategory(ErrorCategoryEnum.PROVISIONING)).createCluster("name", "2.0", + Collections.emptyMap(), true, null); } @Test @@ -312,7 +339,8 @@ public void testDeleteClusterThrowsException() throws Exception { thrown.expect(DataprocRuntimeException.class); thrown.expectMessage(String.format("Dataproc operation %s failure: %s", operationId, errorMessage)); thrown.expectCause(IsInstanceOf.instanceOf(IOException.class)); - mockDataprocClientFactory.create(dataprocConf).deleteCluster(clusterName); + mockDataprocClientFactory.create(dataprocConf, + new ErrorCategory(ErrorCategoryEnum.DEPROVISIONING)).deleteCluster(clusterName); } @Test @@ -321,7 +349,8 @@ public void testGetClusterStatusCapturesErrorMessage() throws GeneralSecurityExc Cluster cluster = Cluster.newBuilder().setStatus(ClusterStatus.newBuilder() .setState(ClusterStatus.State.ERROR)).build(); // PowerMockito.when(clusterControllerClientMock.getCluster(Mockito.any())).thenReturn(cluster); - DataprocClient client = sshDataprocClientFactory.create(dataprocConf); + DataprocClient client = sshDataprocClientFactory.create(dataprocConf, + new ErrorCategory(ErrorCategoryEnum.OTHERS)); OperationsClient operationsClient = PowerMockito.mock(OperationsClient.class); PowerMockito.when(clusterControllerClientMock.getOperationsClient()).thenReturn(operationsClient); diff --git a/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisionerTest.java b/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisionerTest.java index f39485155e9..ac19e694dc2 100644 --- a/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisionerTest.java +++ b/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/DataprocProvisionerTest.java @@ -19,6 +19,8 @@ import com.google.cloud.dataproc.v1.ClusterOperationMetadata; import com.google.cloud.dataproc.v1.ClusterStatus.State; import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; import io.cdap.cdap.runtime.spi.MockVersionInfo; import io.cdap.cdap.runtime.spi.ProgramRunInfo; import io.cdap.cdap.runtime.spi.SparkCompat; @@ -70,7 +72,7 @@ public class DataprocProvisionerTest { @Before public void init() { - provisioner = new DataprocProvisioner((conf, requireSsh) -> dataprocClient); + provisioner = new DataprocProvisioner((conf, requireSsh, errorCategory) -> dataprocClient); MockProvisionerSystemContext provisionerSystemContext = new MockProvisionerSystemContext(); // default system properties defined by DataprocProvisioner @@ -306,6 +308,7 @@ public void testClusterCreateNoReuse() throws Exception { context.setProgramRunInfo(programRunInfo); context.setSparkCompat(SparkCompat.SPARK3_2_12); context.addProperty(DataprocConf.CLUSTER_REUSE_ENABLED, "false"); + context.setErrorCategory(new ErrorCategory(ErrorCategoryEnum.PROVISIONING)); Mockito.when(dataprocClient.getCluster("cdap-app-runId")) .thenReturn(Optional.empty()); @@ -342,11 +345,11 @@ public void testClusterReuseOnCreate() throws Exception { .setRun("runId") .build(); context.setProgramRunInfo(programRunInfo); + context.setErrorCategory(new ErrorCategory(ErrorCategoryEnum.PROVISIONING)); //A. Check with existing client, probably after a retry Mockito.when(dataprocClient.getClusters( - Collections.singletonMap(AbstractDataprocProvisioner.LABEL_RUN_KEY, - "cdap-app-runId"))) + Collections.singletonMap(AbstractDataprocProvisioner.LABEL_RUN_KEY, "cdap-app-runId"))) .thenAnswer(i -> Stream.of(cluster)); Mockito.when(cluster.getStatus()) .thenReturn(ClusterStatus.RUNNING); @@ -364,8 +367,7 @@ public void testClusterReuseOnCreate() throws Exception { AbstractDataprocProvisioner.LABEL_REUSE_KEY, conf.getClusterReuseKey(), AbstractDataprocProvisioner.LABEL_PROFILE, "testProfile"); - Mockito.when(dataprocClient.getClusters(Mockito.eq(reuseClusterFilter), - Mockito.any())) + Mockito.when(dataprocClient.getClusters(Mockito.eq(reuseClusterFilter), Mockito.any())) //B.1. When there is no good cluster found, a retry should happen .thenAnswer(i -> { //Ensure we call the predicate @@ -389,9 +391,8 @@ public void testClusterReuseOnCreate() throws Exception { Mockito.verify(dataprocClient) .updateClusterLabels("cluster2", - Collections.singletonMap(AbstractDataprocProvisioner.LABEL_RUN_KEY, - "cdap-app-runId"), Collections.singleton( - AbstractDataprocProvisioner.LABEL_REUSE_UNTIL)); + Collections.singletonMap(AbstractDataprocProvisioner.LABEL_RUN_KEY, "cdap-app-runId"), + Collections.singleton(AbstractDataprocProvisioner.LABEL_REUSE_UNTIL)); } @Test @@ -401,6 +402,7 @@ public void testClusterMarkedForReuseOnDelete() throws Exception { context.addProperty("region", "testRegion"); context.addProperty("idleTTL", "5"); context.addProperty(DataprocConf.SKIP_DELETE, "true"); + context.setErrorCategory(new ErrorCategory(ErrorCategoryEnum.DEPROVISIONING)); DataprocConf conf = DataprocConf.create( provisioner.createContextProperties(context)); Mockito.when(cluster.getName()) @@ -408,9 +410,8 @@ public void testClusterMarkedForReuseOnDelete() throws Exception { provisioner.doDeleteCluster(context, cluster, conf); Mockito.verify(dataprocClient) - .updateClusterLabels(Mockito.eq("testClusterName"), - addedLabelsCaptor.capture(), Mockito.eq(Collections.singleton( - AbstractDataprocProvisioner.LABEL_RUN_KEY))); + .updateClusterLabels(Mockito.eq("testClusterName"), addedLabelsCaptor.capture(), + Mockito.eq(Collections.singleton(AbstractDataprocProvisioner.LABEL_RUN_KEY))); Assert.assertEquals( Collections.singleton(AbstractDataprocProvisioner.LABEL_REUSE_UNTIL), addedLabelsCaptor.getValue() diff --git a/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/MockDataprocClient.java b/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/MockDataprocClient.java index 8d98be19803..61ff7834214 100644 --- a/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/MockDataprocClient.java +++ b/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/MockDataprocClient.java @@ -19,12 +19,14 @@ import com.google.api.services.compute.Compute; import com.google.cloud.dataproc.v1.ClusterControllerClient; import com.google.cloud.dataproc.v1.GceClusterConfig; +import io.cdap.cdap.api.exception.ErrorCategory; import io.cdap.cdap.runtime.spi.provisioner.Node; import java.util.Collections; public class MockDataprocClient extends DataprocClient { - public MockDataprocClient(DataprocConf conf, ClusterControllerClient client, ComputeFactory computeFactory) { - super(conf, client, computeFactory); + public MockDataprocClient(DataprocConf conf, ClusterControllerClient client, + ComputeFactory computeFactory,ErrorCategory errorCategory) { + super(conf, client, computeFactory, errorCategory); } @Override diff --git a/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/MockProvisionerContext.java b/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/MockProvisionerContext.java index 4ad1c3fec5d..255f7b75137 100644 --- a/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/MockProvisionerContext.java +++ b/cdap-runtime-ext-dataproc/src/test/java/io/cdap/cdap/runtime/spi/provisioner/dataproc/MockProvisionerContext.java @@ -16,6 +16,7 @@ package io.cdap.cdap.runtime.spi.provisioner.dataproc; +import io.cdap.cdap.api.exception.ErrorCategory; import io.cdap.cdap.runtime.spi.ProgramRunInfo; import io.cdap.cdap.runtime.spi.RuntimeMonitorType; import io.cdap.cdap.runtime.spi.SparkCompat; @@ -41,6 +42,7 @@ public class MockProvisionerContext implements ProvisionerContext { private VersionInfo appCDAPVersionInfo; private String cdapVersion; private String profileName; + private ErrorCategory errorCategory; public MockProvisionerContext() { this(null); @@ -93,6 +95,10 @@ public void setSparkCompat(SparkCompat sparkCompat) { this.sparkCompat = sparkCompat; } + public void setErrorCategory(ErrorCategory errorCategory) { + this.errorCategory = errorCategory; + } + @Override public String getCDAPVersion() { return cdapVersion; @@ -156,4 +162,10 @@ public CompletionStage execute(Callable callable) { } return result; } + + @Nullable + @Override + public ErrorCategory getErrorCategory() { + return errorCategory; + } } diff --git a/cdap-runtime-spi/pom.xml b/cdap-runtime-spi/pom.xml index e6c600c65c9..3b36be3dd46 100644 --- a/cdap-runtime-spi/pom.xml +++ b/cdap-runtime-spi/pom.xml @@ -39,11 +39,16 @@ io.cdap.twill twill-api - - io.cdap.cdap - cdap-error-api - ${project.version} - + + io.cdap.cdap + cdap-error-api + ${project.version} + + + io.cdap.cdap + cdap-api-common + ${project.version} + diff --git a/cdap-runtime-spi/src/main/java/io/cdap/cdap/runtime/spi/provisioner/ProvisionerContext.java b/cdap-runtime-spi/src/main/java/io/cdap/cdap/runtime/spi/provisioner/ProvisionerContext.java index eb3df70eef5..b3eac440395 100644 --- a/cdap-runtime-spi/src/main/java/io/cdap/cdap/runtime/spi/provisioner/ProvisionerContext.java +++ b/cdap-runtime-spi/src/main/java/io/cdap/cdap/runtime/spi/provisioner/ProvisionerContext.java @@ -16,6 +16,7 @@ package io.cdap.cdap.runtime.spi.provisioner; +import io.cdap.cdap.api.exception.ErrorCategory; import io.cdap.cdap.runtime.spi.ProgramRunInfo; import io.cdap.cdap.runtime.spi.RuntimeMonitorType; import io.cdap.cdap.runtime.spi.SparkCompat; @@ -118,4 +119,10 @@ default String getCDAPVersion() { */ @Nullable String getProfileName(); + + /** + * @return error category. + */ + @Nullable + ErrorCategory getErrorCategory(); } diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/ErrorLogsClassifier.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/ErrorLogsClassifier.java index 6b2d18f9bf3..0ec1df82b82 100644 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/ErrorLogsClassifier.java +++ b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/ErrorLogsClassifier.java @@ -21,6 +21,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; import com.google.gson.Gson; import io.cdap.cdap.api.dataset.lib.CloseableIterator; import io.cdap.cdap.api.exception.ErrorType; @@ -31,24 +32,33 @@ import io.cdap.cdap.proto.ErrorClassificationResponse; import io.cdap.http.HttpResponder; import io.netty.handler.codec.http.HttpResponseStatus; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; +import joptsimple.internal.Strings; /** * Classifies error logs and returns {@link ErrorClassificationResponse}. * TODO - * - Add rule based classification. - * - Handle cases when stage name is not present in the mdc. */ public class ErrorLogsClassifier { private static final Gson GSON = new Gson(); + private static final String DATAPROC_RUNTIME_EXCEPTION = + "io.cdap.cdap.runtime.spi.provisioner.dataproc.DataprocRuntimeException"; + private static final ImmutableList ALLOWLIST_CLASSES = + ImmutableList.builder().add(DATAPROC_RUNTIME_EXCEPTION).build(); private static final LoadingCache cache = CacheBuilder.newBuilder() .maximumSize(5000) .build(new CacheLoader() { @Override public Boolean load(String className) { try { - return FailureDetailsProvider.class.isAssignableFrom(Class.forName(className)); + return ALLOWLIST_CLASSES.contains(className) + || FailureDetailsProvider.class.isAssignableFrom(Class.forName(className)); } catch (Exception e) { return false; // Handle missing class } @@ -71,6 +81,7 @@ private static boolean isFailureDetailsProviderInstance(String className) { */ public void classify(CloseableIterator logIter, HttpResponder responder) { Map responseMap = new HashMap<>(); + Set responseSet = new HashSet<>(); while (logIter.hasNext()) { ILoggingEvent logEvent = logIter.next().getLoggingEvent(); Map mdc = logEvent.getMDCPropertyMap(); @@ -78,32 +89,44 @@ public void classify(CloseableIterator logIter, HttpResponder responde IThrowableProxy throwableProxy = logEvent.getThrowableProxy(); while (throwableProxy != null) { if (isFailureDetailsProviderInstance(throwableProxy.getClassName())) { - populateResponseMap(throwableProxy, responseMap, stageName, mdc); + populateResponse(throwableProxy, mdc, stageName, responseMap, responseSet); } throwableProxy = throwableProxy.getCause(); } } - responder.sendJson(HttpResponseStatus.OK, GSON.toJson(responseMap.values())); + List responses = new ArrayList<>(responseMap.values()); + responses.addAll(responseSet); + responder.sendJson(HttpResponseStatus.OK, GSON.toJson(responses)); } - private void populateResponseMap(IThrowableProxy throwableProxy, - Map responseMap, String stageName, - Map mdc) { - // populate responseMap if absent. - responseMap.putIfAbsent(stageName, getClassificationResponse(stageName, mdc, throwableProxy)); + private void populateResponse(IThrowableProxy throwableProxy, + Map mdc, String stageName, + Map responseMap, + Set responseSet) { + boolean stageNotPresent = Strings.isNullOrEmpty(stageName); + if (stageNotPresent) { + responseSet.add(getClassificationResponse(stageName, mdc, throwableProxy, + mdc.get(Logging.TAG_ERROR_CATEGORY))); + return; + } + + String errorCategory = String.format("%s-'%s'", mdc.get(Logging.TAG_ERROR_CATEGORY), stageName); + responseMap.putIfAbsent(stageName, + getClassificationResponse(stageName, mdc, throwableProxy, errorCategory)); if (WrappedStageException.class.getName().equals( responseMap.get(stageName).getThrowableClassName())) { // WrappedStageException takes lower precedence than other FailureDetailsProvider exceptions. - responseMap.put(stageName, getClassificationResponse(stageName, mdc, throwableProxy)); + responseMap.put(stageName, + getClassificationResponse(stageName, mdc, throwableProxy, errorCategory)); } } private ErrorClassificationResponse getClassificationResponse(String stageName, - Map mdc, IThrowableProxy throwableProxy) { + Map mdc, IThrowableProxy throwableProxy, String errorCategory) { return new ErrorClassificationResponse.Builder() .setStageName(stageName) - .setErrorCategory(String.format("%s-'%s'", mdc.get(Logging.TAG_ERROR_CATEGORY), stageName)) + .setErrorCategory(errorCategory) .setErrorReason(mdc.get(Logging.TAG_ERROR_REASON)) .setErrorMessage(throwableProxy.getMessage()) .setErrorType(mdc.get(Logging.TAG_ERROR_TYPE) == null ? ErrorType.UNKNOWN.name() : diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/appender/LogAppender.java b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/appender/LogAppender.java index b835d6aaf51..719145acf8c 100644 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/logging/appender/LogAppender.java +++ b/cdap-watchdog/src/main/java/io/cdap/cdap/logging/appender/LogAppender.java @@ -133,22 +133,20 @@ private void addErrorClassificationTags(ILoggingEvent event, Map if (throwable instanceof FailureDetailsProvider) { FailureDetailsProvider provider = (FailureDetailsProvider) throwable; boolean stageKeyAbsent = !modifiableMdc.containsKey(Constants.Logging.TAG_FAILED_STAGE); - if (stageKeyAbsent) { + if (stageKeyAbsent && provider.getFailureStage() != null) { modifiableMdc.put(Constants.Logging.TAG_FAILED_STAGE, provider.getFailureStage()); } modifiableMdc.put(Constants.Logging.TAG_ERROR_CATEGORY, provider.getErrorCategory().getErrorCategory()); - modifiableMdc.put(Constants.Logging.TAG_ERROR_REASON, provider.getErrorReason()); + if (provider.getErrorReason() != null) { + modifiableMdc.put(Constants.Logging.TAG_ERROR_REASON, provider.getErrorReason()); + } modifiableMdc.put(Constants.Logging.TAG_ERROR_TYPE, provider.getErrorType().name()); - } - - if (throwable instanceof ProgramFailureException) { - modifiableMdc.put(Constants.Logging.TAG_DEPENDENCY, - String.valueOf(((ProgramFailureException) throwable).isDependency())); - ErrorCodeType errorCodeType = ((ProgramFailureException) throwable).getErrorCodeType(); - String errorCode = ((ProgramFailureException) throwable).getErrorCode(); - String supportedDocUrl = - ((ProgramFailureException) throwable).getSupportedDocumentationUrl(); + modifiableMdc.put(Constants.Logging.TAG_DEPENDENCY, String.valueOf( + provider.isDependency())); + ErrorCodeType errorCodeType = provider.getErrorCodeType(); + String errorCode = provider.getErrorCode(); + String supportedDocUrl = provider.getSupportedDocumentationUrl(); if (errorCodeType != null) { modifiableMdc.put(Constants.Logging.TAG_ERROR_CODE_TYPE, errorCodeType.name()); }