Skip to content

Commit

Permalink
Add in data validation for HTTP requests and responses
Browse files Browse the repository at this point in the history
  • Loading branch information
pflooky committed Dec 5, 2024
1 parent e301fc7 commit c15adfc
Show file tree
Hide file tree
Showing 18 changed files with 209 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ object Constants {
lazy val STATIC = "static"
lazy val CLUSTERING_POSITION = "clusteringPos"
lazy val METADATA_IDENTIFIER = "metadataIdentifier"
lazy val VALIDATION_IDENTIFIER = "validationIdentifier"
lazy val FIELD_LABEL = "label"
lazy val IS_PII = "isPII"
lazy val HTTP_PARAMETER_TYPE = "httpParamType"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Constants.{DEFAULT_COUNT_RECORDS, DEFAULT_DATA_SOURCE_NAME, DEFAULT_FIELD
import scala.language.implicitConversions

case class Plan(
name: String = "Default plan",
name: String = "default_plan",
description: String = "Data generation plan",
tasks: List[TaskSummary] = List(),
sinkOptions: Option[SinkOptions] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String
metadataConfig: MetadataConfig, flagsConfig: FlagsConfig, generationConfig: GenerationConfig)(implicit sparkSession: SparkSession) {

private val LOGGER = Logger.getLogger(getClass.getName)
private lazy val sinkFactory = new SinkFactory(flagsConfig, metadataConfig)
private lazy val sinkFactory = new SinkFactory(flagsConfig, metadataConfig, foldersConfig)
private lazy val recordTrackingProcessor = new RecordTrackingProcessor(foldersConfig.recordTrackingFolderPath)
private lazy val validationRecordTrackingProcessor = new RecordTrackingProcessor(foldersConfig.recordTrackingForValidationFolderPath)
private lazy val maxRetries = 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,5 @@ case class SinkResult(

def durationInSeconds: Long = Duration.between(startTime, endTime).toSeconds
}

case class RealTimeSinkResult(result: String = "")
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package io.github.datacatering.datacaterer.core.sink

import com.google.common.util.concurrent.RateLimiter
import io.github.datacatering.datacaterer.api.model.Constants.{DELTA, DELTA_LAKE_SPARK_CONF, DRIVER, FORMAT, ICEBERG, ICEBERG_SPARK_CONF, JDBC, OMIT, PARTITIONS, PARTITION_BY, POSTGRES_DRIVER, RATE, ROWS_PER_SECOND, SAVE_MODE, TABLE}
import io.github.datacatering.datacaterer.api.model.{FlagsConfig, MetadataConfig, Step}
import io.github.datacatering.datacaterer.api.model.{FlagsConfig, FoldersConfig, MetadataConfig, Step}
import io.github.datacatering.datacaterer.core.exception.{FailedSaveDataDataFrameV2Exception, FailedSaveDataException}
import io.github.datacatering.datacaterer.core.model.Constants.{BATCH, DEFAULT_ROWS_PER_SECOND, FAILED, FINISHED, PER_COLUMN_INDEX_COL, STARTED}
import io.github.datacatering.datacaterer.core.model.SinkResult
import io.github.datacatering.datacaterer.core.model.{RealTimeSinkResult, SinkResult}
import io.github.datacatering.datacaterer.core.sink.http.model.HttpResult
import io.github.datacatering.datacaterer.core.util.ConfigUtil
import io.github.datacatering.datacaterer.core.util.GeneratorUtil.determineSaveTiming
import io.github.datacatering.datacaterer.core.util.MetadataUtil.getFieldMetadata
import io.github.datacatering.datacaterer.core.util.ValidationUtil.cleanValidationIdentifier
import org.apache.log4j.Logger
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.functions.col
Expand All @@ -19,7 +21,8 @@ import scala.util.{Failure, Success, Try}

class SinkFactory(
val flagsConfig: FlagsConfig,
val metadataConfig: MetadataConfig
val metadataConfig: MetadataConfig,
val foldersConfig: FoldersConfig
)(implicit val sparkSession: SparkSession) {

private val LOGGER = Logger.getLogger(getClass.getName)
Expand Down Expand Up @@ -120,7 +123,7 @@ class SinkFactory(

private def saveRealTimeGuava(dataSourceName: String, df: DataFrame, format: String, connectionConfig: Map[String, String],
step: Step, rowsPerSecond: String, count: String, startTime: LocalDateTime): SinkResult = {
implicit val tryEncoder: Encoder[Try[Unit]] = Encoders.kryo[Try[Unit]]
implicit val tryEncoder: Encoder[Try[RealTimeSinkResult]] = Encoders.kryo[Try[RealTimeSinkResult]]
val permitsPerSecond = Math.max(rowsPerSecond.toInt, 1)

//TODO should return back the response so it could be saved for potential validations (i.e. validate HTTP request and response)
Expand All @@ -147,7 +150,13 @@ class SinkFactory(

case class CheckExceptionAndSuccess(optException: Dataset[Option[Throwable]], isSuccess: Boolean)

private def checkExceptionAndSuccess(dataSourceName: String, format: String, step: Step, count: String, saveResult: Dataset[Try[Unit]]): CheckExceptionAndSuccess = {
private def checkExceptionAndSuccess(
dataSourceName: String,
format: String,
step: Step,
count: String,
saveResult: Dataset[Try[RealTimeSinkResult]]
): CheckExceptionAndSuccess = {
implicit val optionThrowableEncoder: Encoder[Option[Throwable]] = Encoders.kryo[Option[Throwable]]
val optException = saveResult.map {
case Failure(exception) => Some(exception)
Expand All @@ -164,6 +173,8 @@ class SinkFactory(
} else {
true
}

saveRealTimeResponses(step, saveResult)
CheckExceptionAndSuccess(optException, isSuccess)
}

Expand Down Expand Up @@ -270,4 +281,17 @@ class SinkFactory(
if (!dfWithoutOmitFields.storageLevel.useMemory) dfWithoutOmitFields.cache()
dfWithoutOmitFields
}

private def saveRealTimeResponses(step: Step, saveResult: Dataset[Try[RealTimeSinkResult]]): Unit = {
import sparkSession.implicits._
val resultJson = saveResult.map(tryRes => tryRes.getOrElse(RealTimeSinkResult()).result)
val jsonSchema = sparkSession.read.json(resultJson).schema
val topLevelFieldNames = jsonSchema.fields.map(f => s"result.${f.name}")
val parsedResult = resultJson.selectExpr(s"FROM_JSON(value, '${jsonSchema.toDDL}') AS result")
.selectExpr(topLevelFieldNames: _*)
val cleanStepName = cleanValidationIdentifier(step.name)
parsedResult.write
.mode(SaveMode.Overwrite)
.json(s"${foldersConfig.recordTrackingForValidationFolderPath}/$cleanStepName")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package io.github.datacatering.datacaterer.core.sink
import io.github.datacatering.datacaterer.api.model.Constants.{HTTP, JMS}
import io.github.datacatering.datacaterer.api.model.Step
import io.github.datacatering.datacaterer.core.exception.UnsupportedRealTimeDataSourceFormat
import io.github.datacatering.datacaterer.core.model.RealTimeSinkResult
import io.github.datacatering.datacaterer.core.sink.http.HttpSinkProcessor
import io.github.datacatering.datacaterer.core.sink.http.model.HttpResult
import io.github.datacatering.datacaterer.core.sink.jms.JmsSinkProcessor
import org.apache.spark.sql.Row
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType

import java.util.concurrent.LinkedBlockingQueue
Expand All @@ -17,7 +19,7 @@ trait SinkProcessor[T] {

def createConnection(connectionConfig: Map[String, String], step: Step): T

def pushRowToSink(row: Row): Unit
def pushRowToSink(row: Row): RealTimeSinkResult

def close: Unit
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,29 @@ package io.github.datacatering.datacaterer.core.sink.http

import io.github.datacatering.datacaterer.api.model.Constants.DEFAULT_REAL_TIME_HEADERS_DATA_TYPE
import io.github.datacatering.datacaterer.api.model.Step
import io.github.datacatering.datacaterer.core.exception.{AddHttpHeaderException, FailedHttpRequestException}
import io.github.datacatering.datacaterer.core.exception.AddHttpHeaderException
import io.github.datacatering.datacaterer.core.model.Constants.{DEFAULT_HTTP_METHOD, HTTP_HEADER_COL_PREFIX, REAL_TIME_BODY_COL, REAL_TIME_HEADERS_COL, REAL_TIME_METHOD_COL, REAL_TIME_URL_COL}
import io.github.datacatering.datacaterer.core.model.RealTimeSinkResult
import io.github.datacatering.datacaterer.core.sink.http.model.HttpResult
import io.github.datacatering.datacaterer.core.sink.{RealTimeSinkProcessor, SinkProcessor}
import io.github.datacatering.datacaterer.core.util.HttpUtil.getAuthHeader
import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil
import io.github.datacatering.datacaterer.core.util.RowUtil.getRowValue
import io.netty.handler.ssl.SslContextBuilder
import org.apache.log4j.Logger
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.asynchttpclient.Dsl.asyncHttpClient
import org.asynchttpclient.{AsyncHttpClient, DefaultAsyncHttpClientConfig, ListenableFuture, Request, Response}

import java.security.cert.X509Certificate
import java.util.concurrent.TimeUnit
import javax.net.ssl.X509TrustManager
import scala.annotation.tailrec
import scala.compat.java8.FutureConverters._
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

object HttpSinkProcessor extends RealTimeSinkProcessor[Unit] with Serializable {
Expand All @@ -26,6 +34,7 @@ object HttpSinkProcessor extends RealTimeSinkProcessor[Unit] with Serializable {
var connectionConfig: Map[String, String] = _
var step: Step = _
var http: AsyncHttpClient = buildClient
implicit val httpResultEncoder: Encoder[HttpResult] = Encoders.kryo[HttpResult]

override val expectedSchema: Map[String, String] = Map(
REAL_TIME_URL_COL -> StringType.typeName,
Expand Down Expand Up @@ -68,11 +77,11 @@ object HttpSinkProcessor extends RealTimeSinkProcessor[Unit] with Serializable {
}
}

override def pushRowToSink(row: Row): Unit = {
pushRowToSinkFuture(row)
override def pushRowToSink(row: Row): RealTimeSinkResult = {
RealTimeSinkResult(pushRowToSinkFuture(row))
}

private def pushRowToSinkFuture(row: Row): Unit = {
private def pushRowToSinkFuture(row: Row): String = {
if (http.isClosed) {
http = buildClient
}
Expand All @@ -85,24 +94,27 @@ object HttpSinkProcessor extends RealTimeSinkProcessor[Unit] with Serializable {
}
}

private def handleResponse(value: ListenableFuture[Response], request: Request) = {
value.toCompletableFuture
.exceptionally(error => {
LOGGER.error(s"Failed to send HTTP request, url=${request.getUri}, method=${request.getMethod}", error)
throw error
})
.whenComplete((resp, error) => {
if (error == null && resp.getStatusCode >= 200 && resp.getStatusCode < 300) {
LOGGER.debug(s"Successful HTTP request, url=${resp.getUri}, method=${request.getMethod}, status-code=${resp.getStatusCode}, " +
s"status-text=${resp.getStatusText}, response-body=${resp.getResponseBody}")
//TODO can save response body along with request in file for validations, save as parquet or json?
// save request url, request headers, request body, response status code, response status text, response body, response headers, response cookies
private def handleResponse(value: ListenableFuture[Response], request: Request): String = {
val res = value.toCompletableFuture
.toScala
.map(HttpResult.fromRequestAndResponse(request, _))

res.onComplete {
case Success(value) =>
val resp = value.response
if (resp.statusCode >= 200 && resp.statusCode < 300) {
LOGGER.debug(s"Successful HTTP request, url=${request.getUrl}, method=${request.getMethod}, status-code=${resp.statusCode}, " +
s"status-text=${resp.statusText}, response-body=${resp.body}")
} else {
LOGGER.error(s"Failed HTTP request, url=${resp.getUri}, method=${request.getMethod}, status-code=${resp.getStatusCode}, " +
s"status-text=${resp.getStatusText}")
if (error == null) throw FailedHttpRequestException() else throw error
LOGGER.error(s"Failed HTTP request, url=${request.getUrl}, method=${request.getMethod}, status-code=${resp.statusCode}, " +
s"status-text=${resp.statusText}")
}
}).join()
case Failure(exception) =>
LOGGER.error(s"Failed to send HTTP request, url=${request.getUri}, method=${request.getMethod}", exception)
throw exception
}
val resAsJson = res.map(ObjectMapperUtil.jsonObjectMapper.writeValueAsString)
Await.result(resAsJson, Duration.create(5, TimeUnit.SECONDS))
}

def createHttpRequest(row: Row, connectionConfig: Option[Map[String, String]] = None): Request = {
Expand Down Expand Up @@ -155,3 +167,4 @@ object HttpSinkProcessor extends RealTimeSinkProcessor[Unit] with Serializable {
asyncHttpClient(config)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.github.datacatering.datacaterer.core.sink.http.model

import io.github.datacatering.datacaterer.core.util.HttpUtil.getHeadersAsMap
import org.asynchttpclient.{Request, Response}

case class HttpRequest(
method: String = "",
url: String = "",
headers: Map[String, String] = Map(),
body: String = ""
)

object HttpRequest {

def fromRequest(request: Request): HttpRequest = {
HttpRequest(
request.getMethod,
request.getUri.toString,
getHeadersAsMap(request.getHeaders),
request.getStringData
)
}
}

case class HttpResponse(
contentType: String = "",
headers: Map[String, String] = Map(),
body: String = "",
statusCode: Int = 200,
statusText: String = ""
)

object HttpResponse {

def fromResponse(response: Response): HttpResponse = {
//response body contains new line characters
HttpResponse(
response.getContentType,
getHeadersAsMap(response.getHeaders),
response.getResponseBody,
response.getStatusCode,
response.getStatusText
)
}
}

case class HttpResult(request: HttpRequest = HttpRequest(), response: HttpResponse = HttpResponse())

object HttpResult {

def fromRequest(request: Request): HttpResult = {
HttpResult(HttpRequest.fromRequest(request))
}

def fromRequestAndResponse(request: Request, response: Response): HttpResult = {
HttpResult(HttpRequest.fromRequest(request), HttpResponse.fromResponse(response))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_REAL_TIME
import io.github.datacatering.datacaterer.api.model.Step
import io.github.datacatering.datacaterer.core.exception.{FailedJmsMessageCreateException, FailedJmsMessageGetBodyException, FailedJmsMessageSendException}
import io.github.datacatering.datacaterer.core.model.Constants.{REAL_TIME_BODY_COL, REAL_TIME_HEADERS_COL, REAL_TIME_PARTITION_COL}
import io.github.datacatering.datacaterer.core.model.RealTimeSinkResult
import io.github.datacatering.datacaterer.core.sink.http.model.{HttpRequest, HttpResult}
import io.github.datacatering.datacaterer.core.sink.{RealTimeSinkProcessor, SinkProcessor}
import io.github.datacatering.datacaterer.core.util.RowUtil.getRowValue
import org.apache.log4j.Logger
import org.apache.spark.sql.Row
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType}

import java.nio.charset.StandardCharsets
Expand All @@ -32,11 +34,12 @@ object JmsSinkProcessor extends RealTimeSinkProcessor[(MessageProducer, Session,
REAL_TIME_HEADERS_COL -> DEFAULT_REAL_TIME_HEADERS_DATA_TYPE
)

override def pushRowToSink(row: Row): Unit = {
override def pushRowToSink(row: Row): RealTimeSinkResult = {
val body = tryGetBody(row)
val (messageProducer, session, connection) = getConnectionFromPool
val message = tryCreateMessage(body, messageProducer, session, connection)
trySendMessage(row, messageProducer, session, connection, message)
RealTimeSinkResult("")
}

private def trySendMessage(row: Row, messageProducer: MessageProducer, session: Session, connection: Connection, message: TextMessage): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.github.datacatering.datacaterer.core.util

import io.github.datacatering.datacaterer.api.converter.Converters.toScalaList
import io.github.datacatering.datacaterer.api.model.Constants.{PASSWORD, USERNAME}
import io.netty.handler.codec.http.HttpHeaders

import java.util.Base64

Expand All @@ -16,4 +18,10 @@ object HttpUtil {
Map()
}
}

def getHeadersAsMap(httpHeaders: HttpHeaders): Map[String, String] = {
toScalaList(httpHeaders.entries())
.map(m => m.getKey -> m.getValue)
.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@ class UniqueFieldsUtil(plan: Plan, executableTasks: List[(TaskSummary, Task)])(i
val columns = previouslyGenerated._1.columns
LOGGER.debug(s"Only keeping unique values for generated data for columns, " +
s"data-source-step=$dataSourceStep, columns=${columns.mkString(",")}")
val dfWithUnique = finalDf.dropDuplicates(columns)
//check if it is a nested column, need to bring column to top level before dropping duplicates
val dfWithUnique = if (columns.exists(c => c.contains("."))) {
LOGGER.debug("Nested columns exist, required to bring to top level data frame before dropping duplicates")
val mappedCols = columns.map(c => if (c.contains(".")) c -> s"_dedup_$c" else c -> c).toMap
val dedupCols = mappedCols.values.filter(c => c.startsWith("_dedup_")).toList
finalDf.withColumns(mappedCols.filter(_._2.startsWith("_dedup_")).map(c => c._2 -> col(c._1)))
.dropDuplicates(mappedCols.values.toList)
.drop(dedupCols: _*)
} else {
finalDf.dropDuplicates(columns)
}
//if there is a perColumn count, then need to create unique set of values, then run a left semi join with original dataset
finalDf = if (step.count.perColumn.isDefined) {
getUniqueWithPerColumnCount(step, finalDf, previouslyGenerated, columns, dfWithUnique)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.github.datacatering.datacaterer.core.util

object ValidationUtil {

def cleanValidationIdentifier(identifier: String): String = identifier.replaceAll("[{}]", "")

}
Loading

0 comments on commit c15adfc

Please sign in to comment.