From d175ee160d6b33ddc92cf8417f2f9e6dcc07c97a Mon Sep 17 00:00:00 2001 From: azexcy <13588031592@qq.com> Date: Mon, 16 Oct 2023 10:29:43 +0800 Subject: [PATCH] Improve and add TODO --- .../importer/sink/PipelineDataSourceSink.java | 78 +++++++++---------- 1 file changed, 38 insertions(+), 40 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java index 03cf54dd93f2f..0995292e0dbb9 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java @@ -105,10 +105,7 @@ private PipelineJobProgressUpdatedParameter flush(final DataSource dataSource, f flushInternal(dataSource, each.getBatchDeleteDataRecords()); flushInternal(dataSource, each.getBatchInsertDataRecords()); flushInternal(dataSource, each.getBatchUpdateDataRecords()); - if (each.getNonBatchRecords().isEmpty()) { - continue; - } - tryFlush(dataSource, each.getNonBatchRecords(), false); + sequentialFlush(dataSource, each.getNonBatchRecords()); } return new PipelineJobProgressUpdatedParameter(insertRecordNumber); } @@ -117,14 +114,14 @@ private void flushInternal(final DataSource dataSource, final List b if (null == buffer || buffer.isEmpty()) { return; } - tryFlush(dataSource, buffer, true); + tryFlush(dataSource, buffer); } @SneakyThrows(InterruptedException.class) - private void tryFlush(final DataSource dataSource, final List buffer, final boolean enableTransaction) { + private void tryFlush(final DataSource dataSource, final List buffer) { for (int i = 0; !Thread.interrupted() && i <= importerConfig.getRetryTimes(); i++) { try { - doFlush(dataSource, buffer, enableTransaction); + doFlush(dataSource, buffer); return; } catch (final SQLException ex) { log.error("flush failed {}/{} times.", i, importerConfig.getRetryTimes(), ex); @@ -136,18 +133,33 @@ private void tryFlush(final DataSource dataSource, final List buffer } } - private void doFlush(final DataSource dataSource, final List buffer, final boolean enableTransaction) throws SQLException { + private void doFlush(final DataSource dataSource, final List buffer) throws SQLException { try (Connection connection = dataSource.getConnection()) { + boolean enableTransaction = buffer.size() > 1; if (enableTransaction) { connection.setAutoCommit(false); } - Set dataRecordTypes = buffer.stream().map(DataRecord::getType).collect(Collectors.toSet()); - if (dataRecordTypes.size() > 1) { - for (DataRecord each : buffer) { - execute(connection, Collections.singletonList(each)); - } - } else { - execute(connection, buffer); + switch (buffer.get(0).getType()) { + case IngestDataChangeType.INSERT: + if (null != rateLimitAlgorithm) { + rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1); + } + executeBatchInsert(connection, buffer); + break; + case IngestDataChangeType.UPDATE: + if (null != rateLimitAlgorithm) { + rateLimitAlgorithm.intercept(JobOperationType.UPDATE, 1); + } + executeUpdate(connection, buffer); + break; + case IngestDataChangeType.DELETE: + if (null != rateLimitAlgorithm) { + rateLimitAlgorithm.intercept(JobOperationType.DELETE, 1); + } + executeBatchDelete(connection, buffer); + break; + default: + break; } if (enableTransaction) { connection.commit(); @@ -155,31 +167,6 @@ private void doFlush(final DataSource dataSource, final List buffer, } } - private void execute(final Connection connection, final List buffer) throws SQLException { - switch (buffer.get(0).getType()) { - case IngestDataChangeType.INSERT: - if (null != rateLimitAlgorithm) { - rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1); - } - executeBatchInsert(connection, buffer); - break; - case IngestDataChangeType.UPDATE: - if (null != rateLimitAlgorithm) { - rateLimitAlgorithm.intercept(JobOperationType.UPDATE, 1); - } - executeUpdate(connection, buffer); - break; - case IngestDataChangeType.DELETE: - if (null != rateLimitAlgorithm) { - rateLimitAlgorithm.intercept(JobOperationType.DELETE, 1); - } - executeBatchDelete(connection, buffer); - break; - default: - break; - } - } - private void executeBatchInsert(final Connection connection, final List dataRecords) throws SQLException { DataRecord dataRecord = dataRecords.get(0); String insertSql = importSQLBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord); @@ -261,6 +248,17 @@ private void executeBatchDelete(final Connection connection, final List buffer) { + // TODO it's better use transaction, but execute delete maybe not effect when open transaction of PostgreSQL sometimes + try { + for (DataRecord each : buffer) { + doFlush(dataSource, Collections.singletonList(each)); + } + } catch (final SQLException ex) { + throw new PipelineImporterJobWriteException(ex); + } + } + @Override public void close() { PipelineJdbcUtils.cancelStatement(batchInsertStatement.get());