Skip to content

Commit

Permalink
Merge pull request #913 from GoogleCloudPlatform/fix-jdbctobq-load
Browse files Browse the repository at this point in the history
fix: JDBCTOBQ removed duplicated code for read and write operations
  • Loading branch information
vanshaj-bhatia authored Mar 4, 2024
2 parents 34e992c + b3e8f98 commit 51f5f15
Showing 1 changed file with 8 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ public void runTemplate() {
jdbcProperties.put(JDBCOptions.JDBC_NUM_PARTITIONS(), jdbcSQLNumPartitions);
}

if (StringUtils.isNotBlank(jdbcFetchSize)) {
jdbcProperties.put(JDBCOptions.JDBC_BATCH_FETCH_SIZE(), jdbcFetchSize);
}

if (StringUtils.isNotBlank(jdbcSessionInitStatement)) {
jdbcProperties.put(JDBCOptions.JDBC_SESSION_INIT_STATEMENT(), jdbcSessionInitStatement);
}

/** Read Input data from JDBC table */
Dataset<Row> inputData = spark.read().format("jdbc").options(jdbcProperties).load();

Expand Down Expand Up @@ -155,78 +163,5 @@ public void validateInput() {
+ "Set all the sql partitioning parameters together"
+ "in resources/conf/template.properties file or at runtime. Refer to jdbc/README.md for more instructions.");
}
LOGGER.info(
"Starting JDBC to BigQuery spark job with following parameters:"
+ "1. {}:{}"
+ "2. {}:{}"
+ "3. {}:{}"
+ "4. {}:{}"
+ "5. {}:{}"
+ "6. {}:{}"
+ "7. {}:{}"
+ "8. {}:{}",
JDBC_TO_BQ_BIGQUERY_LOCATION,
bqLocation,
JDBC_TO_BQ_WRITE_MODE,
bqWriteMode,
JDBC_TO_BQ_SQL,
jdbcSQL,
JDBC_TO_BQ_JDBC_FETCH_SIZE,
jdbcFetchSize,
JDBC_TO_BQ_JDBC_SESSION_INIT_STATEMENT,
jdbcSessionInitStatement,
JDBC_TO_BQ_SQL_PARTITION_COLUMN,
jdbcSQLPartitionColumn,
JDBC_TO_BQ_SQL_UPPER_BOUND,
jdbcSQLUpperBound,
JDBC_TO_BQ_SQL_LOWER_BOUND,
jdbcSQLLowerBound,
JDBC_TO_BQ_SQL_NUM_PARTITIONS,
jdbcSQLNumPartitions);

SparkSession spark = null;

spark =
SparkSession.builder()
.appName("Spark JDBCToBigQuery Job")
.config("temporaryGcsBucket", temporaryGcsBucket)
.enableHiveSupport()
.getOrCreate();

HashMap<String, String> jdbcProperties = new HashMap<>();
jdbcProperties.put(JDBCOptions.JDBC_URL(), jdbcURL);
jdbcProperties.put(JDBCOptions.JDBC_DRIVER_CLASS(), jdbcDriverClassName);
jdbcProperties.put(JDBCOptions.JDBC_URL(), jdbcURL);
jdbcProperties.put(JDBCOptions.JDBC_TABLE_NAME(), jdbcSQL);

if (StringUtils.isNotBlank(concatedPartitionProps)) {
jdbcProperties.put(JDBCOptions.JDBC_PARTITION_COLUMN(), jdbcSQLPartitionColumn);
jdbcProperties.put(JDBCOptions.JDBC_UPPER_BOUND(), jdbcSQLUpperBound);
jdbcProperties.put(JDBCOptions.JDBC_LOWER_BOUND(), jdbcSQLLowerBound);
jdbcProperties.put(JDBCOptions.JDBC_NUM_PARTITIONS(), jdbcSQLNumPartitions);
}

if (StringUtils.isNotBlank(jdbcFetchSize)) {
jdbcProperties.put(JDBCOptions.JDBC_BATCH_FETCH_SIZE(), jdbcFetchSize);
}

if (StringUtils.isNotBlank(jdbcSessionInitStatement)) {
jdbcProperties.put(JDBCOptions.JDBC_SESSION_INIT_STATEMENT(), jdbcSessionInitStatement);
}

/** Read Input data from JDBC table */
Dataset<Row> inputData = spark.read().format("jdbc").options(jdbcProperties).load();

if (StringUtils.isNotBlank(tempTable) && StringUtils.isNotBlank(tempQuery)) {
inputData.createOrReplaceGlobalTempView(tempTable);
inputData = spark.sql(tempQuery);
}

inputData
.write()
.mode(bqWriteMode)
.format("com.google.cloud.spark.bigquery")
.option("table", bqLocation)
.save();
}
}

0 comments on commit 51f5f15

Please sign in to comment.