-
Notifications
You must be signed in to change notification settings - Fork 9
Spark Vector F.A.Q.
Q: What is the API spark-vector implements?
A: The Spark-Vector connector implements the SparkSQL DataSource API. The Vector Relation is defined on a per Vector table basis and it extends Spark's BaseRelation with insert capability (trait InsertableRelation) and is able to push down predicates (implementing the PrunedFilteredScan trait). Main methods implemented:
-
schema
returns the datatypes of theVectorRelation
and it does so by contacting Vector via a JDBC connection and issuing a dummy query on the table to obtain the column datatypes (something very similar toJDBCRDD
's resolveTable method). -
insert
: Constructs an InsertRDD that has as a parent the given DataFrame's RDD (to be loaded) and then submits the job that starts loading the RDD data from Spark into Vector. More details here -
buildScan
Constructs a ScanRDD that will contact Vector via a JDBC connection and submit a select statement (possibly with additional predicates) to Vector of which results are consumed when the job is executed, either by using the same connection (in0.1.x
branch) or multipleDataStreams
in parallel.
Q: When does spark-vector actually connect to Vector? Is it when we register the temporary table? What happens at that point?
A: Spark-Vector connects to Vector:
- on the driver, when the register temporary table statement is submitted, so that it can obtain the schema (column data types) for the
VectorRelation
. It does so via JDBC inside a single-use transaction. - on the driver, when inserting data into Vector, Spark-Vector first contacts Vector via JDBC to inform it that it needs to be prepared to load data into the specified table. Using the same transaction, it obtains the needed information (authentication roles and hostnames and port # of Vector worker nodes) to start loading.
- the Spark executors that process some piece of the
InsertRDD
will open a socket connection toVector
worker nodes based on the information obtained by the driver - sequential unload in
0.1.x
: when unloading data out of Vector, the Spark executor processing theScanRDD
will connect via JDBC to the Vector leader node, issue a SQL select statement, and consume its results through the same connection. - parallel unload in
0.2.x
andmaster
: when unloading data out of Vector, the Spark executor processing theScanRDD
s will connect via JDBC to the Vector leader node, issue a SQL select statement, and then spawn other executors to consume its results through multipleDataStreams
in parallel.
Q: How does the transactional model look like? Do we start a new transaction and commit every time we execute a SQL or Dataframe operation on a Vector data source? Do we keep a connection open in between operations? Is there a transaction open in between operations?
A: We start a new transaction and commit every time we execute an operation.
Q: What determines the number of data streams and the level of parallelism? We have a number of Spark executors that can be fixed on the command line, or automatically determined (I think, when using YARN, right?). We have a number of nodes and we have a number of partitions. How do these variables interact and what determines the level of parallelism in the end?
A: Number of executors E
is determined by Spark (indeed either statically assigned or after negotiation with Yarn). When trying to load to a partitioned table with P
partitions on N
Vector workers/nodes with a MPL
(Vector maximum parallelism level) we will allocate a number of D
Vector datastreams where D
is >= P
and if P
<= MPL
then D
is also <= MPL
. The value is determined depending on the load on Vector (empty system = MAX(MPL
, P
), loaded system = P
). D
is also the number of InsertRDD
partitions and, consequently, the number of tasks that Spark will execute (at the last stage) to load data into Vector
. These D
tasks are allocated to the E
executors with locality preference. Note that it is possible that not all D
tasks are run at the same time if D
> E
.
Q: How does the predicate pushdown currently work? Is it Catalyst who figures it out and makes the lists of columns/predicates? Who generates the WHERE (or the complete SQL query, for that matter) that we send over JDBC to Vector?
A: The list of columns/predicates is indeed figured out by the catalyst and in VectorRelation
's buildScan method these columns/filters are used to generate the appropriate Vector select statement