Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PipelineJobItemContext.toProgress() #29071

Merged
merged 2 commits into from
Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.shardingsphere.data.pipeline.common.context;

import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;

Expand Down Expand Up @@ -88,4 +88,9 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
* @return inventory records count
*/
long getInventoryRecordsCount();

@Override
default InventoryIncrementalJobItemProgress toProgress() {
return new InventoryIncrementalJobItemProgress(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;

/**
* Pipeline job item context.
Expand Down Expand Up @@ -85,4 +86,11 @@ public interface PipelineJobItemContext {
* @return stopping
*/
boolean isStopping();

/**
* Convert to pipeline job item progress.
*
* @return converted pipeline job item progress
*/
PipelineJobItemProgress toProgress();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;

import java.util.Map;

Expand All @@ -34,9 +35,6 @@
// TODO Refactor structure, List<TableProgress>
public final class ConsistencyCheckJobItemProgress implements PipelineJobItemProgress {

@Setter
private JobStatus status = JobStatus.RUNNING;

private final String tableNames;

private final String ignoredTableNames;
Expand All @@ -54,4 +52,19 @@ public final class ConsistencyCheckJobItemProgress implements PipelineJobItemPro
private final Map<String, Object> targetTableCheckPositions;

private final String sourceDatabaseType;

@Setter
private JobStatus status = JobStatus.RUNNING;

public ConsistencyCheckJobItemProgress(final ConsistencyCheckJobItemProgressContext context) {
tableNames = String.join(",", context.getTableNames());
ignoredTableNames = String.join(",", context.getIgnoredTableNames());
checkedRecordsCount = context.getCheckedRecordsCount().get();
recordsCount = context.getRecordsCount();
checkBeginTimeMillis = context.getCheckBeginTimeMillis();
checkEndTimeMillis = context.getCheckEndTimeMillis();
sourceTableCheckPositions = context.getSourceTableCheckPositions();
targetTableCheckPositions = context.getTargetTableCheckPositions();
sourceDatabaseType = context.getSourceDatabaseType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,62 @@
package org.apache.shardingsphere.data.pipeline.common.job.progress;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* Inventory incremental job item progress.
*/
@NoArgsConstructor
@Getter
@Setter
public final class InventoryIncrementalJobItemProgress implements PipelineJobItemProgress {

private JobStatus status = JobStatus.RUNNING;

private DatabaseType sourceDatabaseType;

private String dataSourceName;

private boolean active;

private JobItemInventoryTasksProgress inventory;

private JobItemIncrementalTasksProgress incremental;

private long inventoryRecordsCount;

private long processedRecordsCount;

private long inventoryRecordsCount;
private boolean active;

private JobStatus status = JobStatus.RUNNING;

public InventoryIncrementalJobItemProgress(final InventoryIncrementalJobItemContext context) {
sourceDatabaseType = context.getJobConfig().getSourceDatabaseType();
dataSourceName = context.getDataSourceName();
inventory = getInventoryTasksProgress(context.getInventoryTasks());
incremental = getIncrementalTasksProgress(context.getIncrementalTasks());
inventoryRecordsCount = context.getInventoryRecordsCount();
processedRecordsCount = context.getProcessedRecordsCount();
status = context.getStatus();
}

private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<PipelineTask> incrementalTasks) {
return new JobItemIncrementalTasksProgress(incrementalTasks.isEmpty() ? null : (IncrementalTaskProgress) incrementalTasks.iterator().next().getTaskProgress());
}

private JobItemInventoryTasksProgress getInventoryTasksProgress(final Collection<PipelineTask> inventoryTasks) {
Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new HashMap<>();
for (PipelineTask each : inventoryTasks) {
inventoryTaskProgressMap.put(each.getTaskId(), (InventoryTaskProgress) each.getTaskProgress());
}
return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemInventoryTasksProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
Expand All @@ -38,24 +35,20 @@
import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -134,35 +127,14 @@ public void persistJobItemProgress(final PipelineJobItemContext jobItemContext)
.persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}

private String convertJobItemProgress(final PipelineJobItemContext jobItemContext) {
InventoryIncrementalJobItemContext context = (InventoryIncrementalJobItemContext) jobItemContext;
InventoryIncrementalJobItemProgress jobItemProgress = new InventoryIncrementalJobItemProgress();
jobItemProgress.setStatus(context.getStatus());
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
jobItemProgress.setDataSourceName(context.getDataSourceName());
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
return YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress));
}

@Override
public void updateJobItemProgress(final PipelineJobItemContext jobItemContext) {
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
.updateJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}

private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<PipelineTask> incrementalTasks) {
return new JobItemIncrementalTasksProgress(incrementalTasks.isEmpty() ? null : (IncrementalTaskProgress) incrementalTasks.iterator().next().getTaskProgress());
}

private JobItemInventoryTasksProgress getInventoryTasksProgress(final Collection<PipelineTask> inventoryTasks) {
Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new HashMap<>();
for (PipelineTask each : inventoryTasks) {
inventoryTaskProgressMap.put(each.getTaskId(), (InventoryTaskProgress) each.getTaskProgress());
}
return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
private String convertJobItemProgress(final PipelineJobItemContext jobItemContext) {
return YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration((InventoryIncrementalJobItemProgress) jobItemContext.toProgress()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
Expand All @@ -44,7 +43,6 @@
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
Expand Down Expand Up @@ -129,15 +127,7 @@ public void persistJobItemProgress(final PipelineJobItemContext jobItemContext)
}

private String convertJobItemProgress(final PipelineJobItemContext jobItemContext) {
ConsistencyCheckJobItemContext context = (ConsistencyCheckJobItemContext) jobItemContext;
ConsistencyCheckJobItemProgressContext progressContext = context.getProgressContext();
String tableNames = String.join(",", progressContext.getTableNames());
String ignoredTableNames = String.join(",", progressContext.getIgnoredTableNames());
ConsistencyCheckJobItemProgress jobItemProgress = new ConsistencyCheckJobItemProgress(tableNames, ignoredTableNames, progressContext.getCheckedRecordsCount().get(),
progressContext.getRecordsCount(), progressContext.getCheckBeginTimeMillis(), progressContext.getCheckEndTimeMillis(),
progressContext.getSourceTableCheckPositions(), progressContext.getTargetTableCheckPositions(), progressContext.getSourceDatabaseType());
jobItemProgress.setStatus(context.getStatus());
return YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress));
return YamlEngine.marshal(swapper.swapToYamlConfiguration((ConsistencyCheckJobItemProgress) jobItemContext.toProgress()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,11 @@ public ConsistencyCheckJobItemContext(final ConsistencyCheckJobConfiguration job
public PipelineProcessContext getJobProcessContext() {
return processContext;
}

@Override
public ConsistencyCheckJobItemProgress toProgress() {
ConsistencyCheckJobItemProgress result = new ConsistencyCheckJobItemProgress(progressContext);
result.setStatus(status);
return result;
}
}