Skip to content

Commit

Permalink
Add PipelineJobItemContext.toPipelineJobItemProgress()
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 18, 2023
1 parent 5a4f36f commit 4fdc7af
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.shardingsphere.data.pipeline.common.context;

import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;

Expand Down Expand Up @@ -88,4 +88,9 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
* @return inventory records count
*/
long getInventoryRecordsCount();

@Override
default InventoryIncrementalJobItemProgress toPipelineJobItemProgress() {
return new InventoryIncrementalJobItemProgress(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;

/**
* Pipeline job item context.
Expand Down Expand Up @@ -85,4 +86,11 @@ public interface PipelineJobItemContext {
* @return stopping
*/
boolean isStopping();

/**
* Convert to pipeline job item progress.
*
* @return converted pipeline job item progress
*/
PipelineJobItemProgress toPipelineJobItemProgress();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;

import java.util.Map;

Expand All @@ -34,9 +35,6 @@
// TODO Refactor structure, List<TableProgress>
public final class ConsistencyCheckJobItemProgress implements PipelineJobItemProgress {

@Setter
private JobStatus status = JobStatus.RUNNING;

private final String tableNames;

private final String ignoredTableNames;
Expand All @@ -54,4 +52,19 @@ public final class ConsistencyCheckJobItemProgress implements PipelineJobItemPro
private final Map<String, Object> targetTableCheckPositions;

private final String sourceDatabaseType;

@Setter
private JobStatus status = JobStatus.RUNNING;

public ConsistencyCheckJobItemProgress(final ConsistencyCheckJobItemProgressContext context) {
tableNames = String.join(",", context.getTableNames());
ignoredTableNames = String.join(",", context.getIgnoredTableNames());
checkedRecordsCount = context.getCheckedRecordsCount().get();
recordsCount = context.getRecordsCount();
checkBeginTimeMillis = context.getCheckBeginTimeMillis();
checkEndTimeMillis = context.getCheckEndTimeMillis();
sourceTableCheckPositions = context.getSourceTableCheckPositions();
targetTableCheckPositions = context.getTargetTableCheckPositions();
sourceDatabaseType = context.getSourceDatabaseType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,62 @@
package org.apache.shardingsphere.data.pipeline.common.job.progress;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* Inventory incremental job item progress.
*/
@NoArgsConstructor
@Getter
@Setter
public final class InventoryIncrementalJobItemProgress implements PipelineJobItemProgress {

private JobStatus status = JobStatus.RUNNING;

private DatabaseType sourceDatabaseType;

private String dataSourceName;

private boolean active;

private JobItemInventoryTasksProgress inventory;

private JobItemIncrementalTasksProgress incremental;

private long inventoryRecordsCount;

private long processedRecordsCount;

private long inventoryRecordsCount;
private boolean active;

private JobStatus status = JobStatus.RUNNING;

public InventoryIncrementalJobItemProgress(final InventoryIncrementalJobItemContext context) {
sourceDatabaseType = context.getJobConfig().getSourceDatabaseType();
dataSourceName = context.getDataSourceName();
inventory = getInventoryTasksProgress(context.getInventoryTasks());
incremental = getIncrementalTasksProgress(context.getIncrementalTasks());
inventoryRecordsCount = context.getInventoryRecordsCount();
processedRecordsCount = context.getProcessedRecordsCount();
status = context.getStatus();
}

private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<PipelineTask> incrementalTasks) {
return new JobItemIncrementalTasksProgress(incrementalTasks.isEmpty() ? null : (IncrementalTaskProgress) incrementalTasks.iterator().next().getTaskProgress());
}

private JobItemInventoryTasksProgress getInventoryTasksProgress(final Collection<PipelineTask> inventoryTasks) {
Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new HashMap<>();
for (PipelineTask each : inventoryTasks) {
inventoryTaskProgressMap.put(each.getTaskId(), (InventoryTaskProgress) each.getTaskProgress());
}
return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemInventoryTasksProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
Expand All @@ -38,24 +35,20 @@
import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -134,35 +127,14 @@ public void persistJobItemProgress(final PipelineJobItemContext jobItemContext)
.persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}

private String convertJobItemProgress(final PipelineJobItemContext jobItemContext) {
InventoryIncrementalJobItemContext context = (InventoryIncrementalJobItemContext) jobItemContext;
InventoryIncrementalJobItemProgress jobItemProgress = new InventoryIncrementalJobItemProgress();
jobItemProgress.setStatus(context.getStatus());
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
jobItemProgress.setDataSourceName(context.getDataSourceName());
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
return YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress));
}

@Override
public void updateJobItemProgress(final PipelineJobItemContext jobItemContext) {
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
.updateJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}

private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<PipelineTask> incrementalTasks) {
return new JobItemIncrementalTasksProgress(incrementalTasks.isEmpty() ? null : (IncrementalTaskProgress) incrementalTasks.iterator().next().getTaskProgress());
}

private JobItemInventoryTasksProgress getInventoryTasksProgress(final Collection<PipelineTask> inventoryTasks) {
Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new HashMap<>();
for (PipelineTask each : inventoryTasks) {
inventoryTaskProgressMap.put(each.getTaskId(), (InventoryTaskProgress) each.getTaskProgress());
}
return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
private String convertJobItemProgress(final PipelineJobItemContext jobItemContext) {
return YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration((InventoryIncrementalJobItemProgress) jobItemContext.toPipelineJobItemProgress()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
Expand All @@ -44,7 +43,6 @@
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
Expand Down Expand Up @@ -129,15 +127,7 @@ public void persistJobItemProgress(final PipelineJobItemContext jobItemContext)
}

private String convertJobItemProgress(final PipelineJobItemContext jobItemContext) {
ConsistencyCheckJobItemContext context = (ConsistencyCheckJobItemContext) jobItemContext;
ConsistencyCheckJobItemProgressContext progressContext = context.getProgressContext();
String tableNames = String.join(",", progressContext.getTableNames());
String ignoredTableNames = String.join(",", progressContext.getIgnoredTableNames());
ConsistencyCheckJobItemProgress jobItemProgress = new ConsistencyCheckJobItemProgress(tableNames, ignoredTableNames, progressContext.getCheckedRecordsCount().get(),
progressContext.getRecordsCount(), progressContext.getCheckBeginTimeMillis(), progressContext.getCheckEndTimeMillis(),
progressContext.getSourceTableCheckPositions(), progressContext.getTargetTableCheckPositions(), progressContext.getSourceDatabaseType());
jobItemProgress.setStatus(context.getStatus());
return YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress));
return YamlEngine.marshal(swapper.swapToYamlConfiguration((ConsistencyCheckJobItemProgress) jobItemContext.toPipelineJobItemProgress()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,11 @@ public ConsistencyCheckJobItemContext(final ConsistencyCheckJobConfiguration job
public PipelineProcessContext getJobProcessContext() {
return processContext;
}

@Override
public ConsistencyCheckJobItemProgress toPipelineJobItemProgress() {
ConsistencyCheckJobItemProgress result = new ConsistencyCheckJobItemProgress(progressContext);
result.setStatus(status);
return result;
}
}

0 comments on commit 4fdc7af

Please sign in to comment.