Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use error message type from spark commons: 2170 #2177

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ import org.apache.spark.sql.functions.{col, explode, lit, size, struct, typedLit
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.{Column, DataFrame, DataFrameWriter, Encoder, Encoders}
import za.co.absa.enceladus.plugins.api.postprocessor.PostProcessor
import za.co.absa.enceladus.plugins.builtin.common.mq.kafka.{KafkaConnectionParams, KafkaSecurityParams, SchemaRegistrySecurityParams}
import za.co.absa.enceladus.plugins.builtin.common.mq.kafka.{KafkaConnectionParams, KafkaSecurityParams}
import za.co.absa.enceladus.plugins.builtin.errorsender.DceError
import za.co.absa.enceladus.plugins.builtin.errorsender.mq.KafkaErrorSenderPluginImpl.SingleErrorStardardized
import KafkaErrorSenderPluginImpl._
import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin
import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams
import za.co.absa.enceladus.utils.error.ErrorMessage.ErrorCodes
import za.co.absa.enceladus.utils.modules._
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
import za.co.absa.abris.avro.functions.to_avro
import za.co.absa.abris.config.{AbrisConfig, ToAvroConfig}
import za.co.absa.abris.config.ToAvroConfig
import za.co.absa.enceladus.plugins.builtin.errorsender.mq.kafka.KafkaErrorSenderPlugin.{avroKeySchemaRegistryConfig, avroValueSchemaRegistryConfig, registerSchemas}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line has 166 chars

import za.co.absa.enceladus.utils.error.EnceladusErrorMessage.ErrorCodes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor:
Some unused imports.


import scala.util.{Failure, Success, Try}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@
<apache.commons.configuration2.version>2.4</apache.commons.configuration2.version>
<abris.version>6.2.0</abris.version>
<absa.commons.version>1.1.0</absa.commons.version>
<absa.spark.commons.version>0.4.0</absa.spark.commons.version>
<absa.spark.commons.version>0.5.0</absa.spark.commons.version>
<atum.version>3.9.0</atum.version>
<bower.chart.js.version>2.7.3</bower.chart.js.version>
<bson.codec.jsr310.version>3.5.4</bson.codec.jsr310.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package za.co.absa.enceladus.common

import com.typesafe.config.Config
import org.apache.spark.sql.DataFrame
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.enceladus.utils.implicits.EnceladusDataFrameImplicits.EnceladusDataframeEnhancements

object ErrorColNormalization {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import org.apache.spark.sql.functions.{col, size, sum}
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.atum.core.Atum
import za.co.absa.enceladus.utils.config.PathWithFs
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.general.ProjectMetadata
import za.co.absa.enceladus.utils.fs.HadoopFsUtils
import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements

object PerformanceMetricTools extends ProjectMetadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,18 @@ import za.co.absa.enceladus.conformance.config.ConformanceConfigParser
import za.co.absa.enceladus.conformance.datasource.PartitioningUtils
import za.co.absa.enceladus.conformance.interpreter.rules._
import za.co.absa.enceladus.conformance.interpreter.rules.custom.CustomConformanceRule
import za.co.absa.enceladus.conformance.interpreter.rules.mapping.{MappingRuleInterpreter, MappingRuleInterpreterBroadcast,
MappingRuleInterpreterGroupExplode}
import za.co.absa.enceladus.conformance.interpreter.rules.mapping._
import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.conformanceRule._
import za.co.absa.enceladus.model.{Dataset => ConfDataset}
import za.co.absa.enceladus.utils.config.PathWithFs
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.fs.HadoopFsUtils
import za.co.absa.enceladus.utils.udf.ConformanceUDFLibrary
import za.co.absa.spark.commons.utils.explode.ExplosionContext
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancementsArrays
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
import za.co.absa.commons.lang.extensions.SeqExtension._
import za.co.absa.spark.commons.errorhandling.ErrorMessage

case class DynamicInterpreter()(implicit inputFs: FileSystem) {
private val log = LoggerFactory.getLogger(this.getClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.spark.sql.{Dataset, Row, SparkSession}
import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, InterpreterContextArgs}
import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.utils.ExplodeTools
import za.co.absa.spark.commons.utils.explode.ExplosionContext

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import za.co.absa.enceladus.model.MappingTable
import za.co.absa.enceladus.model.conformanceRule.MappingConformanceRule
import za.co.absa.enceladus.model.dataFrameFilter.DataFrameFilter
import za.co.absa.enceladus.conformance.interpreter.rules.ValidationException
import za.co.absa.enceladus.utils.error.Mapping
import za.co.absa.enceladus.utils.validation.ExpressionValidator
import za.co.absa.spark.commons.errorhandling.ErrorMessage.Mapping
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements

import scala.util.Try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, Interpreter
import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule}
import za.co.absa.enceladus.model.{Dataset => ConfDataset}
import za.co.absa.enceladus.utils.error._
import za.co.absa.enceladus.utils.transformations.ArrayTransformations
import za.co.absa.enceladus.utils.udf.ConformanceUDFNames
import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancementsArrays
import za.co.absa.spark.commons.sql.functions.col_of_path

Expand All @@ -52,7 +52,7 @@ case class MappingRuleInterpreter(rule: MappingConformanceRule, conformance: Con

val res = handleArrays(rule.outputColumn, withUniqueId) { dfIn =>
val joined = joinDatasetAndMappingTable(mapTable, dfIn)
val mappings = rule.attributeMappings.map(x => Mapping(x._1, x._2)).toSeq
val mappings = rule.attributeMappings.map(x => ErrorMessage.Mapping(x._1, x._2)).toSeq
val mappingErrUdfCall = call_udf(ConformanceUDFNames.confMappingErr, lit(rule.outputColumn),
array(rule.attributeMappings.values.toSeq.map(col_of_path(_).cast(StringType)): _*),
typedLit(mappings))
Expand Down Expand Up @@ -94,7 +94,7 @@ case class MappingRuleInterpreter(rule: MappingConformanceRule, conformance: Con
.select($"conf.*", col(s"err.${ErrorMessage.errorColumnName}")).drop(idField)
}

private def inclErrorNullArr(mappings: Seq[Mapping], schema: StructType): Column = {
private def inclErrorNullArr(mappings: Seq[ErrorMessage.Mapping], schema: StructType): Column = {
val paths = mappings.flatMap { mapping =>
schema.getAllArraysInPath(mapping.mappedDatasetColumn)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule}
import za.co.absa.enceladus.model.{Dataset => ConfDataset}
import za.co.absa.enceladus.utils.broadcast.{BroadcastUtils, LocalMappingTable}
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.hats.transformations.NestedArrayTransformations
import za.co.absa.spark.hats.transformations.NestedArrayTransformations.GetFieldFunction

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import za.co.absa.enceladus.conformance.interpreter.{ExplosionState, Interpreter
import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.conformanceRule.{ConformanceRule, MappingConformanceRule}
import za.co.absa.enceladus.model.{Dataset => ConfDataset}
import za.co.absa.enceladus.utils.error._
import za.co.absa.enceladus.utils.udf.ConformanceUDFNames
import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.sql.functions.col_of_path
import za.co.absa.spark.commons.utils.explode.ExplosionContext
import za.co.absa.spark.commons.utils.{ExplodeTools, SchemaUtils}
Expand All @@ -46,7 +46,7 @@ case class MappingRuleInterpreterGroupExplode(rule: MappingConformanceRule,
val (mapTable, defaultValues) = conformPreparation(df, enableCrossJoin = true)
val (explodedDf, expCtx) = explodeIfNeeded(df, explosionState)

val mappings = rule.attributeMappings.map(x => Mapping(x._1, x._2)).toSeq
val mappings = rule.attributeMappings.map(x => ErrorMessage.Mapping(x._1, x._2)).toSeq
val mappingErrUdfCall = call_udf(ConformanceUDFNames.confMappingErr, lit(rule.allOutputColumns().keys.mkString(",")),
array(rule.attributeMappings.values.toSeq.map(col_of_path(_).cast(StringType)): _*),
typedLit(mappings))
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, Explosi
import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule
import za.co.absa.enceladus.model.{conformanceRule, Dataset => ConfDataset}
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.testUtils.{HadoopFsTestBase, TZNormalizedSparkTestBase}
import za.co.absa.spark.commons.errorhandling.ErrorMessage

case class MyCustomRule(
order: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package za.co.absa.enceladus.conformance.interpreter.rules.mapping
import org.apache.spark.sql.functions.{array, typedLit}
import za.co.absa.enceladus.conformance.interpreter.DynamicInterpreter
import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.NestedTestCaseFactory._
import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.SimpleTestCaseFactory.{simpleMappingRule, simpleMappingRuleMultipleOutputs, simpleMappingRuleMultipleOutputsWithDefaults, simpleMappingRuleWithDefaultValue}
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.SimpleTestCaseFactory._
import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.utils.JsonUtils
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import org.apache.spark.sql.functions._
import za.co.absa.enceladus.conformance.interpreter.DynamicInterpreter
import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.NestedTestCaseFactory._
import za.co.absa.enceladus.conformance.interpreter.rules.testcasefactories.SimpleTestCaseFactory._
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.spark.commons.utils.JsonUtils
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements


class MappingRuleBroadcastSuite extends MappingInterpreterSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ package za.co.absa.enceladus.conformance.samples

import za.co.absa.enceladus.model.conformanceRule._
import za.co.absa.enceladus.model.{MappingTable, Dataset => ConfDataset}
import za.co.absa.enceladus.utils.error._
import za.co.absa.enceladus.utils.error.EnceladusErrorMessage
import za.co.absa.spark.commons.errorhandling.ErrorMessage

case class Outer(order: Int, a: Seq[Inner], myFlag: Boolean)
case class OuterErr(order: Int, a: Seq[Inner], myFlag: Boolean, errCol: Seq[ErrorMessage])
Expand Down Expand Up @@ -142,7 +143,7 @@ object ArraySamples {
ConformedInner2(2, "two", "twoDrop me :)", "Hello world", new ConformedInner3("Hello world"), "myConf", "HELLO WORLD")
)),
ConformedInner(Seq()),
ConformedInner(null)), Seq(ErrorMessage.confMappingErr("a.c.conformedD", List("0", "true"), List(Mapping("ind", "a.c.d"), Mapping("otherFlag", "myFlag"))))),
ConformedInner(null)), Seq(EnceladusErrorMessage.confMappingErr("a.c.conformedD", List("0", "true"), List(ErrorMessage.Mapping("ind", "a.c.d"), ErrorMessage.Mapping("otherFlag", "myFlag"))))),
ConformedOuter(2, Seq(), Seq()),
ConformedOuter(3, null, Seq()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ package za.co.absa.enceladus.conformance.samples

import za.co.absa.enceladus.model.conformanceRule._
import za.co.absa.enceladus.model.{Dataset, DefaultValue, MappingTable}
import za.co.absa.enceladus.utils.error.{ErrorMessage, Mapping}
import za.co.absa.enceladus.utils.error.EnceladusErrorMessage
import za.co.absa.spark.commons.errorhandling.ErrorMessage

object EmployeeConformance {
val countryMT = MappingTable(name = "country", version = 0, hdfsPath = "src/test/testData/country", schemaName = "country", schemaVersion = 0)
Expand Down Expand Up @@ -65,8 +66,8 @@ object EmployeeConformance {
ConformedRole(1), ConformedEmployeeId = "2"),
ConformedEmployee(employee_id = 3, name = "John", surname = "Doe3", dept= 3, role = 2, country = "SWE", conformed_country = null, conformed_department = "Unknown dept",
conformed_role = "External dev", errCol= List(
ErrorMessage.confMappingErr("conformed_country", Seq("SWE"), Seq(Mapping("country_code", "country"))),
ErrorMessage.confMappingErr("conformed_department", Seq("3"), Seq(Mapping("dept_id", "dept")))
EnceladusErrorMessage.confMappingErr("conformed_country", Seq("SWE"), Seq(ErrorMessage.Mapping("country_code", "country"))),
EnceladusErrorMessage.confMappingErr("conformed_department", Seq("3"), Seq(ErrorMessage.Mapping("dept_id", "dept")))
), MyLiteral = "abcdef", MyUpperLiteral = "ABCDEF", Concatenated = "abcdefABCDEF", SparkConfAttr = "hello :)", ConformedRole(2), ConformedEmployeeId = "3"),
ConformedEmployee(employee_id = 4, name = "John", surname = "Doe4", dept= 1, role = 2, country = "IN", conformed_country = "India", conformed_department = "Ingestion Squad",
conformed_role = "Ingestion Developer", errCol= List(), MyLiteral = "abcdef", MyUpperLiteral = "ABCDEF", Concatenated = "abcdefABCDEF", SparkConfAttr = "hello :)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import za.co.absa.enceladus.dao.EnceladusDAO
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.standardization.config.StandardizationConfig
import za.co.absa.enceladus.standardization.fixtures.TempFileFixture
import za.co.absa.enceladus.utils.error.ErrorMessage
import za.co.absa.enceladus.utils.testUtils.TZNormalizedSparkTestBase
import za.co.absa.spark.commons.errorhandling.ErrorMessage
import za.co.absa.standardization.ValidationException
import za.co.absa.standardization.{RecordIdGeneration, Standardization}
import za.co.absa.standardization.config.{BasicMetadataColumnsConfig, BasicStandardizationConfig}
Expand All @@ -47,6 +47,7 @@ class StandardizationRerunSuite extends FixtureAnyFunSuite with TZNormalizedSpar
private val tmpFilePrefix = "test-input-"
private val tmpFileSuffix = ".csv"


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary empty line

private val csvContent: String =
"""101|102|1|2019-05-04|2019-05-04
|201|202|2|2019-05-05|2019-05-05
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{expr, udf}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Row, SparkSession}
import za.co.absa.enceladus.utils.error.{ErrorMessage, Mapping}
import za.co.absa.enceladus.utils.error.EnceladusErrorMessage
import za.co.absa.spark.commons.errorhandling.ErrorMessage

object BroadcastUtils {
// scalastyle:off null
Expand Down Expand Up @@ -108,7 +109,7 @@ object BroadcastUtils {
*/
def getErrorUdf(mappingTable: Broadcast[LocalMappingTable],
outputColumns: Seq[String],
mappings: Seq[Mapping])(implicit spark: SparkSession): UserDefinedFunction = {
mappings: Seq[ErrorMessage.Mapping])(implicit spark: SparkSession): UserDefinedFunction = {

val numberOfArguments = mappingTable.value.keyTypes.size

Expand All @@ -117,7 +118,7 @@ object BroadcastUtils {
null
} else {
val strings: Seq[String] = key.map(a => safeToString(a))
ErrorMessage.confMappingErr(outputColumns.mkString(","), strings, mappings)
EnceladusErrorMessage.confMappingErr(outputColumns.mkString(","), strings, mappings)
}
}
val errorMessageType = ScalaReflection.schemaFor[ErrorMessage].dataType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.enceladus.utils.error
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import za.co.absa.standardization.config.DefaultErrorCodesConfig
import za.co.absa.spark.commons.errorhandling.ErrorMessage

/**
* Case class to represent an error message
Expand All @@ -29,13 +30,13 @@ import za.co.absa.standardization.config.DefaultErrorCodesConfig
* @param rawValues - Sequence of raw values (which are the potential culprits of the error)
* @param mappings - Sequence of Mappings i.e Mapping Table Column -> Equivalent Mapped Dataset column
*/
case class ErrorMessage(errType: String, errCode: String, errMsg: String, errCol: String, rawValues: Seq[String], mappings: Seq[Mapping] = Seq())
case class Mapping(mappingTableColumn: String, mappedDatasetColumn: String)
//case class ErrorMessage(errType: String, errCode: String, errMsg: String, errCol: String, rawValues: Seq[String], mappings: Seq[Mapping] = Seq())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code here and below

//case class Mapping(mappingTableColumn: String, mappedDatasetColumn: String)

object ErrorMessage {
val errorColumnName = "errCol"
object EnceladusErrorMessage {
// val errorColumnName = "errCol"

def confMappingErr(errCol: String, rawValues: Seq[String], mappings: Seq[Mapping]): ErrorMessage = ErrorMessage(
def confMappingErr(errCol: String, rawValues: Seq[String], mappings: Seq[ErrorMessage.Mapping]): ErrorMessage = ErrorMessage(
errType = "confMapError",
errCode = ErrorCodes.ConfMapError,
errMsg = "Conformance Error - Null produced by mapping conformance rule",
Expand Down
Loading