Raw source data often needs to undergo some transformations before it is pushed to Pinot. Transformations include extracting records from nested objects, applying simple transform functions on certain columns, filtering out unwanted columns, etc and also more advanced operations like joining between datasets. Typically, a preprocessing job is needed to perform these operations. In streaming data sources, such transformations require users to write a samza job, and create another intermediate topic. Writing preprocessing jobs, especially for simple transformations, creates an additional step for user onboarding, can result in inconsistencies in the batch/stream data source, and increases the maintenance and operator overhead.
Here's some of the ingestion transformations that are supported by Pinot
Records can be filtered as they are being ingested. A filter function can be specified in the filterConfigs in the ingestionConfigs of the table config.
tableConfig: {
tableName: ...,
tableType: ...,
"ingestionConfig": {
"filterConfig": {
"filterFunction": “<expression>”
}
}
}
If the expression evaluates to true, the record will be filtered out. The expressions allowed here will be within the scope of the transform functions support that we have in Pinot today i.e. Groovy expressions, or any inbuilt functions.
Consider table with a column timestamp
.
Filter out records which are older than timestamp 1589007600000
"ingestionConfig": {
"filterConfig": {
"filterFunction": “Groovy({timestamp < 1589007600000}, timestamp)”
}
}
Consider a table with a string column campaign
and a multi-value column double column prices
.
Filter out records where campaign = X or Y and sum of all elements in prices is less than 100
"ingestionConfig": {
"filterConfig": {
"filterFunction": "Groovy({(campaign == \"X\" || campaign == \"Y\") && prices.sum() < 100}, prices, campaign)"
}
}
Transform functions can be defined on columns in the ingestion config of the table config. For example:
{% code title="pinot-table-offline.json" %}
{
"tableName": "myTable",
...
"ingestionConfig": {
"transformConfigs": [{
"columnName": "maxPrice",
"transformFunction": "Groovy({prices.max()}, prices)" // groovy function
},
{
"columnName": "hoursSinceEpoch",
"transformFunction": "toEpochHours(timestamp)" // inbuilt function
}]
}
}
{% endcode %}
In this example, we're assuming that prices
field is available in the source data, and maxPrice
is expected in the Pinot schema. Similarly, timestamp
is available in the source data, and hoursSinceEpoch
is expected in the Pinot schema.
{% hint style="warning" %} Note
Currently, the arguments must be from the source data. Other columns from Pinot schema can be used, as long as those columns have NOT been created through transformations themselves. In other words, chaining of transformations is not supported (x = f(y) and z = f(x) not supported) {% endhint %}
Currently, we have support for 2 kinds of functions
- Groovy functions
- Inbuilt functions
Groovy functions can be defined using the syntax:
Groovy({groovy script}, argument1, argument2...argumentN)
Here's some examples of commonly needed functions. Any valid Groovy expression can be used.
Concat firstName
and lasName
to get fullName
"ingestionConfig": {
"transformConfigs": [{
"columnName": "fullName",
"transformFunction": "Groovy({firstName+' '+lastName}, firstName, lastName)"
}
}
Find max value in array bids
"ingestionConfig": {
"transformConfigs": [{
"columnName": "maxBid",
"transformFunction": "Groovy({bids.max{ it.toBigDecimal() }}, bids)"
}
}
Convert timestamp
from MILLISECONDS
to HOURS
"ingestionConfig": {
"transformConfigs": [{
"columnName": "hoursSinceEpoch",
"transformFunction": "Groovy({timestamp/(1000*60*60)}, timestamp)"
}
}
Simply change name of the column from user_id
to userId
"ingestionConfig": {
"transformConfigs": [{
"columnName": "userId",
"transformFunction": "Groovy({user_id}, user_id)"
}
}
If eventType
is IMPRESSION
set impression
to 1
. Similar for CLICK
.
"ingestionConfig": {
"transformConfigs": [{
"columnName": "impressions",
"transformFunction": "Groovy({eventType == 'IMPRESSION' ? 1: 0}, eventType)"
},
{
"columnName": "clicks",
"transformFunction": "Groovy({eventType == 'CLICK' ? 1: 0}, eventType)"
}
}
Store an AVRO Map in Pinot as two multi-value columns. Sort the keys, to maintain the mapping.
1) The keys of the map as map_keys
2) The values of the map as map_values
"ingestionConfig": {
"transformConfigs": [{
"columnName": "map2_keys",
"transformFunction": "Groovy({map2.sort()*.key}, map2)"
},
{
"columnName": "map2_values",
"transformFunction": "Groovy({map2.sort()*.value}, map2)"
}
}
We have several inbuilt functions that can be used directly in as ingestion transform functions
These are functions which enable commonly needed time transformations.
toEpochXXX
Converts from epoch milliseconds to a higher granularity.
Function name | Description |
---|---|
toEpochSeconds | Converts epoch millis to epoch seconds. Usage: |
toEpochMinutes | Converts epoch millis to epoch minutes Usage: |
toEpochHours | Converts epoch millis to epoch hours Usage: |
toEpochDays | Converts epoch millis to epoch days Usage: |
toEpochXXXRounded
Converts from epoch milliseconds to another granularity, rounding to the nearest rounding bucket. For example, 1588469352000
(2020-05-01 42:29:12) is 26474489
minutesSinceEpoch. `toEpochMinutesRounded(1588469352000) = 26474480
(2020-05-01 42:20:00)
Function Name | Description |
---|---|
toEpochSecondsRounded | Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochSecondsRounded(millis, 30)" |
toEpochMinutesRounded | Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochMinutesRounded(millis, 10)" |
toEpochHoursRounded | Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochHoursRounded(millis, 6)" |
toEpochDaysRounded | Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochDaysRounded(millis, 7)" |
fromEpochXXX
Converts from an epoch granularity to milliseconds.
Function Name | Description |
---|---|
fromEpochSeconds | Converts from epoch seconds to milliseconds
|
fromEpochMinutes | Converts from epoch minutes to milliseconds
|
fromEpochHours | Converts from epoch hours to milliseconds
|
fromEpochDays | Converts from epoch days to milliseconds
|
Simple date format
Converts simple date format strings to milliseconds and vice-a-versa, as per the provided pattern string.
Function name | Description |
---|---|
toDateTime | Converts from milliseconds to a formatted date time string, as per the provided pattern
|
fromDateTime | Converts a formatted date time string to milliseconds, as per the provided pattern
|
{% hint style="info" %} Note
Letters that are not part of Simple Date Time legend (https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html) need to be escaped. For example:
"transformFunction": "fromDateTime(dateTimeStr, 'yyyy-MM-dd''T''HH:mm:ss')"
{% endhint %}
Function name | Description |
---|---|
json_format | Converts a JSON/AVRO complex object to a string. This json map can then be queried using jsonExtractScalar function.
|
There are 2 kinds of flattening
This is not natively supported as of yet. You can write a custom Decoder/RecordReader if you want to use this. Once the Decoder generates the multiple GenericRows from the provided input record, a List<GenericRow> should be set into the destination GenericRow, with the key $MULTIPLE_RECORDS_KEY$
. The segment generation drivers will treat this as a special case and handle the multiple records case.
Feature TBD