The DataSource
framework is a utility framework that helps configuring and reading DataFrame
s.
This framework provides for reading from a given path with the specified format like
avro
, parquet
, orc
, json
, csv
, jdbc
, delta
...
The framework is composed of two main traits:
DataSource
, which is created based on aDataSourceConfiguration
class and provides one main function:override def reader(implicit spark: SparkSession): Reader override def read(implicit spark: SparkSession): Try[DataFrame]
DataSourceConfiguration
: a marker trait to defineDataSource
configuration classes
The framework provides the following predefined DataSource
implementations:
- FileDataSource
- JdbcDataSource
- GenericDataSource
- FileStreamDataSource
- KafkaStreamDataSource
- GenericStreamDataSource
For convenience the DataSourceFactory
trait and the default implementation are provided.
To create a DataSource
out of a given TypeSafe Config
instance, one can call
DataSource( someDataSourceConfigurationInstance )
Also, in order to easily extract the configuration from a given TypeSafe Config
instance,
the FormatAwareDataSourceConfiguration
factory is provided.
FormatAwareDataSourceConfiguration( someTypesafeConfigurationInstance )
There is a convenience implicit decorator for the Spark session, available through the
import org.tupol.spark.io._
import org.tupol.spark.io.implicits._
import statements.
The org.tupol.spark.io
package contains the implicit factories for data sources and the org.tupol.spark.implicits
contains the actual SparkSession
decorator.
This allows us to create the DataSource
by calling the source()
function on the given Spark session,
passing a DataSourceConfiguration
configuration instance.
import org.tupol.spark.io._
import org.tupol.spark.io.implicits._
def spark: SparkSession = ???
def dataSourceConfiguration: DataSourceConfiguration = ???
val dataSource = spark.source(dataSourceConfiguration)
For streaming sources:
import org.tupol.spark.io._
import org.tupol.spark.io.implicits._
def spark: SparkSession = ???
def dataSourceConfiguration: DataSourceConfiguration = ???
val dataSource = spark.streamingSource(dataSourceConfiguration)