Skip to content

Commit

Permalink
Merge pull request #186 from teamclairvoyant/REST-158-RB
Browse files Browse the repository at this point in the history
[REST-158] Support for Excel
  • Loading branch information
rahulbhatia023 authored Mar 27, 2024
2 parents dde34df + 9869cc9 commit 7f7d322
Show file tree
Hide file tree
Showing 18 changed files with 343 additions and 131 deletions.
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ ThisBuild / wartremoverErrors ++= Warts.allBut(
// ----- TOOL VERSIONS ----- //

val catsVersion = "2.10.0"
val dataScalaxyReaderVersion = "2.0.0"
val dataScalaxyReaderTextVersion = "2.0.0"
val dataScalaxyReaderExcelVersion = "1.0.0"
val dataScalaxyTestUtilVersion = "1.0.0"
val dataScalaxyTransformerVersion = "1.2.0"
val dataScalaxyWriterAWSVersion = "2.0.0"
Expand All @@ -103,7 +104,8 @@ val catsDependencies = Seq(
)

val dataScalaxyReaderDependencies = Seq(
"com.clairvoyant.data.scalaxy" %% "reader-text" % dataScalaxyReaderVersion
"com.clairvoyant.data.scalaxy" %% "reader-text" % dataScalaxyReaderTextVersion,
"com.clairvoyant.data.scalaxy" %% "reader-excel" % dataScalaxyReaderExcelVersion
)

val dataScalaxyTestUtilDependencies = Seq(
Expand Down
4 changes: 4 additions & 0 deletions site/docs/response_body/excel/_category_.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"label": "Excel",
"position": 1
}
50 changes: 50 additions & 0 deletions site/docs/response_body/excel/excel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Excel

Restonomer can parse the api response of MS Excel file type. User need to configure the checkpoint in below
format:

```hocon
name = "checkpoint_excel_response_dataframe_converter"
data = {
data-request = {
url = "http://localhost:8080/excel-response-converter"
}
data-response = {
body = {
type = "Excel"
excel-format = {
data-address = "'Transactions Report'!A2:G4"
}
}
persistence = {
type = "LocalFileSystem"
file-format = {
type = "ParquetFileFormat"
}
file-path = "/tmp/response_body"
}
}
}
```

## Excel Format Configurations

User can provide below options to the `excel-format` instance:

| Parameter Name | Default Value | Description |
|:-----------------------------------|:---------------------:|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| header | true | Boolean flag to tell whether given excel sheet contains header names or not. |
| data-address | A1 | The location of the data to read from. Following address styles are supported: <br/> `B3:` Start cell of the data. Returns all rows below and all columns to the right. <br/> `B3:F35:` Cell range of data. Reading will return only rows and columns in the specified range. <br/> `'My Sheet'!B3:F35:` Same as above, but with a specific sheet. <br/> `MyTable[#All]:` Table of data. Returns all rows and columns in this table. |
| treat-empty-values-as-nulls | true | Treats empty values as null |
| set-error-cells-to-fallback-values | false | If set false errors will be converted to null. If true, any ERROR cell values (e.g. #N/A) will be converted to the zero values of the column's data type. |
| use-plain-number-format | false | If true, format the cells without rounding and scientific notations |
| infer-schema | false | Infers the input schema automatically from data. |
| add-color-columns | false | If it is set to true, adds field with coloured format |
| timestamp-format | "yyyy-mm-dd hh:mm:ss" | String timestamp format |
| excerpt-size | 10 | If set and if schema inferred, number of rows to infer schema from |
| max-rows-in-memory | None | If set, uses a streaming reader which can help with big files (will fail if used with xls format files) |
| max-byte-array-size | None | See https://poi.apache.org/apidocs/5.0/org/apache/poi/util/IOUtils.html#setByteArrayMaxOverride-int- |
| temp-file-threshold | None | Number of bytes at which a zip entry is regarded as too large for holding in memory and the data is put in a temp file instead |
2 changes: 1 addition & 1 deletion site/docs/response_body/text/_category_.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"label": "Text",
"position": 1
"position": 2
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[
{
"Created": "2021-07-29 10:35:12",
"Advertiser": "Zola",
"Transaction ID": "1210730000580100000",
"Earnings": "$0.68",
"SID": "wlus9",
"Status": "CONFIRMED",
"ClickPage": "https://www.zola.com/"
},
{
"Created": "2022-04-18 07:23:54",
"Advertiser": "TradeInn",
"Transaction ID": "1220419021230020000",
"Earnings": "$12.48",
"SID": "wles7",
"Status": "CONFIRMED",
"ClickPage": "https://www.tradeinn.com/"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"request": {
"method": "GET",
"url": "/excel-response-converter"
},
"response": {
"status": 200,
"bodyFileName": "sample_excel_response_body.xlsx"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name = "checkpoint_excel_response_dataframe_converter"

data = {
data-request = {
url = "http://localhost:8080/excel-response-converter"
}

data-response = {
body = {
type = "Excel"
excel-format = {
data-address = "'Transactions Report'!A2:G4"
}
}

persistence = {
type = "LocalFileSystem"
file-format = {
type = "ParquetFileFormat"
}
file-path = "/tmp/response_body"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.clairvoyant.restonomer.response_body

import com.clairvoyant.restonomer.common.{IntegrationTestDependencies, MockFileSystemPersistence}

class ExcelResponseToDataFrameConverterIntegrationTest
extends IntegrationTestDependencies
with MockFileSystemPersistence {

override val mappingsDirectory: String = "response_body"

it should "convert the excel file response body into a dataframe" in {
runCheckpoint(checkpointFileName = "checkpoint_excel_response_dataframe_converter.conf")
outputDF should matchExpectedDataFrame("expected_excel_response.json")
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package com.clairvoyant.restonomer.app

import cats.syntax.eq.*
import com.clairvoyant.restonomer.common.TokenResponsePlaceholders
import com.clairvoyant.restonomer.common.TokenResponsePlaceholders.*
import com.clairvoyant.restonomer.exception.RestonomerException
import com.clairvoyant.restonomer.*
import com.clairvoyant.restonomer.body.{Excel, Text}
import com.clairvoyant.restonomer.http.*
import com.clairvoyant.restonomer.model.*
import com.clairvoyant.restonomer.sttpBackend
import com.jayway.jsonpath.JsonPath
import com.clairvoyant.restonomer.token.TokenHelper.*
import org.apache.spark.sql.{DataFrame, SparkSession}
import sttp.client3.*

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
Expand All @@ -17,54 +15,32 @@ import scala.concurrent.duration.Duration
object RestonomerWorkflow {

def run(checkpointConfig: CheckpointConfig)(using sparkSession: SparkSession): Unit = {
checkpointConfig.data.dataResponse.body match {
case Text(_, None) => start[String](checkpointConfig, asString)
case Text(_, Some(_)) => start[Array[Byte]](checkpointConfig, asByteArray)
case Excel(_, _) => start[Array[Byte]](checkpointConfig, asByteArray)
}
}

val tokenFunction = checkpointConfig.token
.map { tokenConfig =>
val tokenHttpResponse = Await.result(
RestonomerRequest
.builder(tokenConfig.tokenRequest)
.build
.httpRequest
.send(sttpBackend),
Duration.Inf
)

TokenResponsePlaceholders(tokenConfig.tokenResponsePlaceholder) match {
case ResponseBody =>
(tokenJsonPath: String) =>
JsonPath.read[String](
tokenHttpResponse.body match {
case Left(errorMessage) => throw new RestonomerException(errorMessage)
case Right(responseBody) => responseBody
},
tokenJsonPath
)

case ResponseHeaders =>
(tokenName: String) =>
tokenHttpResponse.headers
.find(_.name === tokenName) match {
case Some(header) => header.value
case None =>
throw new RestonomerException(s"Could not find the value of $tokenName in the token response.")
}
}
}

val dataRestonomerRequest =
private def start[T <: String | Array[Byte]](checkpointConfig: CheckpointConfig, httpResponseType: HttpResponseAs[T])(
using sparkSession: SparkSession
): Unit = {
val restonomerRequest =
RestonomerRequest
.builder(checkpointConfig.data.dataRequest)(using tokenFunction)
.builder[T](
requestConfig = checkpointConfig.data.dataRequest,
httpResponseType = httpResponseType
)(using tokenFunction(checkpointConfig.token))
.build

val dataRestonomerResponse = RestonomerResponse.fetchFromRequest(
httpRequest = dataRestonomerRequest.httpRequest,
compression = checkpointConfig.data.dataResponse.body.compression,
val dataRestonomerResponse = RestonomerResponse.fetchFromRequest[T](
httpRequest = restonomerRequest.httpRequest,
retryConfig = checkpointConfig.data.dataRequest.retry,
restonomerPagination = checkpointConfig.data.dataResponse.pagination
)

val restonomerResponseDF = dataRestonomerResponse.body
.map(checkpointConfig.data.dataResponse.body.read)
.map(checkpointConfig.data.dataResponse.body.read[T])

val restonomerResponseTransformedDF = restonomerResponseDF.map { df =>
checkpointConfig.data.dataResponse.transformations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ sealed trait RestonomerAuthentication {

def validateCredentials(): Unit

def authenticate(httpRequest: Request[Either[String, String], Any]): Request[Either[String, String], Any]
def authenticate[T](httpRequest: Request[Either[String, T], Any]): Request[Either[String, T], Any]

def validateCredentialsAndAuthenticate(
httpRequest: Request[Either[String, String], Any]
): Request[Either[String, String], Any] = {
def validateCredentialsAndAuthenticate[T](
httpRequest: Request[Either[String, T], Any]
): Request[Either[String, T], Any] = {
validateCredentials()
authenticate(httpRequest)
}
Expand Down Expand Up @@ -67,9 +67,9 @@ case class BasicAuthentication(
)
}

override def authenticate(
httpRequest: Request[Either[String, String], Any]
): Request[Either[String, String], Any] =
override def authenticate[T](
httpRequest: Request[Either[String, T], Any]
): Request[Either[String, T], Any] =
basicToken
.map(httpRequest.auth.basicToken)
.getOrElse(
Expand All @@ -95,9 +95,9 @@ case class BearerAuthentication(
)
}

override def authenticate(
httpRequest: Request[Either[String, String], Any]
): Request[Either[String, String], Any] = httpRequest.auth.bearer(bearerToken)
override def authenticate[T](
httpRequest: Request[Either[String, T], Any]
): Request[Either[String, T], Any] = httpRequest.auth.bearer(bearerToken)

}

Expand Down Expand Up @@ -128,12 +128,12 @@ case class APIKeyAuthentication(
)
}

override def authenticate(
httpRequest: Request[Either[String, String], Any]
): Request[Either[String, String], Any] =
override def authenticate[T](
httpRequest: Request[Either[String, T], Any]
): Request[Either[String, T], Any] =
APIKeyPlaceholders(placeholder) match {
case QueryParam =>
httpRequest.copy[Identity, Either[String, String], Any](
httpRequest.copy[Identity, Either[String, T], Any](
uri = httpRequest.uri.addParam(apiKeyName, apiKeyValue)
)
case RequestHeader => httpRequest.header(apiKeyName, apiKeyValue, replaceExisting = true)
Expand Down Expand Up @@ -170,9 +170,9 @@ case class JWTAuthentication(
}
}

override def authenticate(
httpRequest: Request[Either[String, String], Any]
): Request[Either[String, String], Any] =
override def authenticate[T](
httpRequest: Request[Either[String, T], Any]
): Request[Either[String, T], Any] =
httpRequest.auth.bearer(
Jwt.encode(
claim = JwtClaim(subject = Option(subject)).issuedNow.expiresIn(tokenExpiresIn),
Expand Down Expand Up @@ -209,9 +209,9 @@ case class DigestAuthentication(
)
}

override def authenticate(
httpRequest: Request[Either[String, String], Any]
): Request[Either[String, String], Any] =
override def authenticate[T](
httpRequest: Request[Either[String, T], Any]
): Request[Either[String, T], Any] =
httpRequest.auth.digest(
user = userName,
password = password
Expand All @@ -227,7 +227,7 @@ case class OAuth2Authentication(

override def validateCredentials(): Unit = grantType.validateCredentials()

override def authenticate(httpRequest: Request[Either[String, String], Any]): Request[Either[String, String], Any] =
override def authenticate[T](httpRequest: Request[Either[String, T], Any]): Request[Either[String, T], Any] =
httpRequest.auth.bearer(grantType.getAccessToken)

}
Expand Down Expand Up @@ -307,9 +307,9 @@ case class AwsSignatureAuthentication(
)
}

override def authenticate(
httpRequest: Request[Either[String, String], Any]
): Request[Either[String, String], Any] = {
override def authenticate[T](
httpRequest: Request[Either[String, T], Any]
): Request[Either[String, T], Any] = {
val awsRequest = new DefaultRequest("AWS")
awsRequest.setHttpMethod(HttpMethodName.GET)
awsRequest.setEndpoint(new URI(s"${httpRequest.uri.scheme.get}://${httpRequest.uri.host.get}"))
Expand Down
Loading

0 comments on commit 7f7d322

Please sign in to comment.