Skip to content

Commit

Permalink
[CELEBORN-751] Rename remain rss related class name and filenames etc
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Rename remain rss related class name and filenames etc...

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #1664 from AngersZhuuuu/CELEBORN-751.

Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Angerszhuuuu <[email protected]>
  • Loading branch information
AngersZhuuuu committed Jul 4, 2023
1 parent 26aaba1 commit 693172d
Show file tree
Hide file tree
Showing 110 changed files with 1,164 additions and 1,132 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ Copy $CELEBORN_HOME/spark/*.jar to $SPARK_HOME/jars/
To use Celeborn,the following spark configurations should be added.
```properties
# Shuffle manager class name changed in 0.3.0:
# before 0.3.0: org.apache.spark.shuffle.celeborn.RssShuffleManager
# before 0.3.0: org.apache.spark.shuffle.celeborn.RssShuffleManager
# since 0.3.0: org.apache.spark.shuffle.celeborn.SparkShuffleManager
spark.shuffle.manager org.apache.spark.shuffle.celeborn.SparkShuffleManager
# must use kryo serializer because java serializer do not support relocation
Expand Down
4 changes: 2 additions & 2 deletions charts/celeborn/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ securityContext:
# diskType only works in Celeborn Worker with hostPath type to manifest local disk type
volumes:
master:
- mountPath: /mnt/rss_ratis
hostPath: /mnt/rss_ratis
- mountPath: /mnt/celeborn_ratis
hostPath: /mnt/celeborn_ratis
type: hostPath
size: 1Gi
worker:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.apache.celeborn.plugin.flink.buffer.CreditListener;
import org.apache.celeborn.plugin.flink.buffer.TransferBufferPool;
import org.apache.celeborn.plugin.flink.protocol.ReadData;
import org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream;
import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.readclient.RssBufferStream;

public class RemoteBufferStreamReader extends CreditListener {
private static Logger logger = LoggerFactory.getLogger(RemoteBufferStreamReader.class);
Expand All @@ -45,7 +45,7 @@ public class RemoteBufferStreamReader extends CreditListener {
private boolean isOpened;
private Consumer<ByteBuf> dataListener;
private Consumer<Throwable> failureListener;
private RssBufferStream bufferStream;
private CelebornBufferStream bufferStream;
private volatile boolean closed = false;
private Consumer<RequestMessage> messageConsumer;

Expand Down Expand Up @@ -134,7 +134,7 @@ public void errorReceived(String errorMsg) {

public void dataReceived(ReadData readData) {
logger.debug(
"Rss buffer stream reader get stream id {} received readable bytes {}.",
"Remote buffer stream reader get stream id {} received readable bytes {}.",
readData.getStreamId(),
readData.getFlinkBuffer().readableBytes());
dataListener.accept(readData.getFlinkBuffer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ public RemoteShuffleInputGateDelegation(
this.shuffleClient =
FlinkShuffleClientImpl.get(
appUniqueId,
shuffleResource.getRssMetaServiceHost(),
shuffleResource.getRssMetaServicePort(),
shuffleResource.getRssMetaServiceTimestamp(),
shuffleResource.getLifecycleManagerHost(),
shuffleResource.getLifecycleManagerPort(),
shuffleResource.getLifecycleManagerTimestamp(),
celebornConf,
new UserIdentifier("default", "default"));
} catch (DriverChangedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ public class RemoteShuffleMaster implements ShuffleMaster<RemoteShuffleDescripto
ThreadUtils.createFactoryWithDefaultExceptionHandler(
"remote-shuffle-master-executor", LOG));
private final ResultPartitionAdapter resultPartitionDelegation;
private final long rssMetaServiceTimestamp;
private final long lifecycleManagerTimestamp;

public RemoteShuffleMaster(
ShuffleMasterContext shuffleMasterContext, ResultPartitionAdapter resultPartitionDelegation) {
this.shuffleMasterContext = shuffleMasterContext;
this.resultPartitionDelegation = resultPartitionDelegation;
this.rssMetaServiceTimestamp = System.currentTimeMillis();
this.lifecycleManagerTimestamp = System.currentTimeMillis();
}

@Override
Expand All @@ -66,7 +66,7 @@ public void registerJob(JobShuffleContext context) {
if (lifecycleManager == null) {
synchronized (RemoteShuffleMaster.class) {
if (lifecycleManager == null) {
celebornAppId = FlinkUtils.toCelebornAppId(rssMetaServiceTimestamp, jobID);
celebornAppId = FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
LOG.info("CelebornAppId: {}", celebornAppId);
CelebornConf celebornConf =
FlinkUtils.toCelebornConf(shuffleMasterContext.getConfiguration());
Expand Down Expand Up @@ -136,9 +136,9 @@ public CompletableFuture<RemoteShuffleDescriptor> registerPartitionWithProducer(

RemoteShuffleResource remoteShuffleResource =
new RemoteShuffleResource(
lifecycleManager.getRssMetaServiceHost(),
lifecycleManager.getRssMetaServicePort(),
rssMetaServiceTimestamp,
lifecycleManager.getHost(),
lifecycleManager.getPort(),
lifecycleManagerTimestamp,
shuffleResourceDescriptor);

shuffleResourceTracker.addPartitionResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ public class RemoteShuffleOutputGate {
private int mapId;
private int attemptId;
private int partitionId;
private String rssMetaServiceHost;
private int rssMetaServicePort;
private long rssMetaServiceTimestamp;
private String lifecycleManagerHost;
private int lifecycleManagerPort;
private long lifecycleManagerTimestamp;
private UserIdentifier userIdentifier;
private boolean isFirstHandShake = true;

Expand Down Expand Up @@ -109,9 +109,10 @@ public RemoteShuffleOutputGate(
shuffleDesc.getShuffleResource().getMapPartitionShuffleDescriptor().getAttemptId();
this.partitionId =
shuffleDesc.getShuffleResource().getMapPartitionShuffleDescriptor().getPartitionId();
this.rssMetaServiceHost = shuffleDesc.getShuffleResource().getRssMetaServiceHost();
this.rssMetaServicePort = shuffleDesc.getShuffleResource().getRssMetaServicePort();
this.rssMetaServiceTimestamp = shuffleDesc.getShuffleResource().getRssMetaServiceTimestamp();
this.lifecycleManagerHost = shuffleDesc.getShuffleResource().getLifecycleManagerHost();
this.lifecycleManagerPort = shuffleDesc.getShuffleResource().getLifecycleManagerPort();
this.lifecycleManagerTimestamp =
shuffleDesc.getShuffleResource().getLifecycleManagerTimestamp();
this.flinkShuffleClient = getShuffleClient();
}

Expand Down Expand Up @@ -212,9 +213,9 @@ FlinkShuffleClientImpl getShuffleClient() {
try {
return FlinkShuffleClientImpl.get(
applicationId,
rssMetaServiceHost,
rssMetaServicePort,
rssMetaServiceTimestamp,
lifecycleManagerHost,
lifecycleManagerPort,
lifecycleManagerTimestamp,
celebornConf,
userIdentifier);
} catch (DriverChangedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ public class RemoteShuffleResource implements ShuffleResource {

private static final long serialVersionUID = 6497939083185255973L;

private final String rssMetaServiceHost;
private final int rssMetaServicePort;
private final long rssMetaServiceTimestamp;
private final String lifecycleManagerHost;
private final int lifecycleManagerPort;
private final long lifecycleManagerTimestamp;
private ShuffleResourceDescriptor shuffleResourceDescriptor;

public RemoteShuffleResource(
String rssMetaServiceHost,
int rssMetaServicePort,
long rssMetaServiceTimestamp,
String lifecycleManagerHost,
int lifecycleManagerPort,
long lifecycleManagerTimestamp,
ShuffleResourceDescriptor remoteShuffleDescriptor) {
this.rssMetaServiceHost = rssMetaServiceHost;
this.rssMetaServicePort = rssMetaServicePort;
this.rssMetaServiceTimestamp = rssMetaServiceTimestamp;
this.lifecycleManagerHost = lifecycleManagerHost;
this.lifecycleManagerPort = lifecycleManagerPort;
this.lifecycleManagerTimestamp = lifecycleManagerTimestamp;
this.shuffleResourceDescriptor = remoteShuffleDescriptor;
}

Expand All @@ -42,24 +42,24 @@ public ShuffleResourceDescriptor getMapPartitionShuffleDescriptor() {
return shuffleResourceDescriptor;
}

public String getRssMetaServiceHost() {
return rssMetaServiceHost;
public String getLifecycleManagerHost() {
return lifecycleManagerHost;
}

public int getRssMetaServicePort() {
return rssMetaServicePort;
public int getLifecycleManagerPort() {
return lifecycleManagerPort;
}

public long getRssMetaServiceTimestamp() {
return rssMetaServiceTimestamp;
public long getLifecycleManagerTimestamp() {
return lifecycleManagerTimestamp;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("RemoteShuffleResource{");
sb.append("rssMetaServiceHost='").append(rssMetaServiceHost).append('\'');
sb.append(", rssMetaServicePort=").append(rssMetaServicePort);
sb.append(", rssMetaServiceTimestamp=").append(rssMetaServiceTimestamp);
sb.append("lifecycleManagerHost='").append(lifecycleManagerHost).append('\'');
sb.append(", lifecycleManagerPort=").append(lifecycleManagerPort);
sb.append(", lifecycleManagerTimestamp=").append(lifecycleManagerTimestamp);
sb.append(", shuffleResourceDescriptor=").append(shuffleResourceDescriptor);
sb.append('}');
return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory;

public class RssBufferStream {
public class CelebornBufferStream {

private static Logger logger = LoggerFactory.getLogger(RssBufferStream.class);
private static Logger logger = LoggerFactory.getLogger(CelebornBufferStream.class);
private CelebornConf conf;
private FlinkTransportClientFactory clientFactory;
private String shuffleKey;
Expand All @@ -50,9 +50,9 @@ public class RssBufferStream {
private boolean isOpenSuccess;
private Object lock = new Object();

public RssBufferStream() {}
public CelebornBufferStream() {}

public RssBufferStream(
public CelebornBufferStream(
FlinkShuffleClientImpl mapShuffleClient,
CelebornConf conf,
FlinkTransportClientFactory dataClientFactory,
Expand Down Expand Up @@ -86,10 +86,10 @@ public void open(
@Override
public void onSuccess(ByteBuffer response) {
StreamHandle streamHandle = (StreamHandle) Message.decode(response);
RssBufferStream.this.streamId = streamHandle.streamId;
CelebornBufferStream.this.streamId = streamHandle.streamId;
synchronized (lock) {
if (!isClosed) {
clientFactory.registerSupplier(RssBufferStream.this.streamId, supplier);
clientFactory.registerSupplier(CelebornBufferStream.this.streamId, supplier);
mapShuffleClient
.getReadClientHandler()
.registerHandler(streamId, messageConsumer, client);
Expand Down Expand Up @@ -134,15 +134,15 @@ public void addCredit(ReadAddCredit addCredit) {
});
}

public static RssBufferStream empty() {
return emptyRssBufferStream;
public static CelebornBufferStream empty() {
return EMPTY_CELEBORN_BUFFER_STREAM;
}

public long getStreamId() {
return streamId;
}

public static RssBufferStream create(
public static CelebornBufferStream create(
FlinkShuffleClientImpl client,
CelebornConf conf,
FlinkTransportClientFactory dataClientFactory,
Expand All @@ -153,12 +153,13 @@ public static RssBufferStream create(
if (locations == null || locations.length == 0) {
return empty();
} else {
return new RssBufferStream(
return new CelebornBufferStream(
client, conf, dataClientFactory, shuffleKey, locations, subIndexStart, subIndexEnd);
}
}

private static final RssBufferStream emptyRssBufferStream = new RssBufferStream();
private static final CelebornBufferStream EMPTY_CELEBORN_BUFFER_STREAM =
new CelebornBufferStream();

private void closeStream() {
if (client != null && client.isActive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ public FlinkShuffleClientImpl(
dataTransportConf, readClientHandler, conf.clientCloseIdleConnections());
this.flinkTransportClientFactory =
new FlinkTransportClientFactory(context, conf.clientFetchMaxRetriesForEachReplica());
this.setupMetaServiceRef(driverHost, port);
this.setupLifecycleManagerRef(driverHost, port);
this.driverTimestamp = driverTimestamp;
}

public RssBufferStream readBufferedPartition(
public CelebornBufferStream readBufferedPartition(
int shuffleId, int partitionId, int subPartitionIndexStart, int subPartitionIndexEnd)
throws IOException {
String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
Expand All @@ -148,7 +148,7 @@ public RssBufferStream readBufferedPartition(
logger.error("Shuffle data is empty for shuffle {} partitionId {}.", shuffleId, partitionId);
throw new PartitionUnRetryAbleException(partitionId + " may be lost.");
} else {
return RssBufferStream.create(
return CelebornBufferStream.create(
this,
conf,
flinkTransportClientFactory,
Expand Down Expand Up @@ -392,7 +392,7 @@ public Optional<PartitionLocation> regionStart(
StatusCode.HARD_SPLIT);
requests.add(req);
PbChangeLocationResponse response =
driverRssMetaService.askSync(
lifecycleManagerRef.askSync(
ControlMessages.Revive$.MODULE$.apply(shuffleId, mapIds, requests),
conf.clientRpcRequestPartitionLocationRpcAskTimeout(),
ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ public static CelebornConf toCelebornConf(Configuration configuration) {
return tmpCelebornConf;
}

public static String toCelebornAppId(long rssMetaServiceTimestamp, JobID jobID) {
public static String toCelebornAppId(long lifecycleManagerTimestamp, JobID jobID) {
// Workaround for FLINK-19358, use first none ZERO_JOB_ID as celeborn shared appId for all
// other flink jobs
if (!ZERO_JOB_ID.equals(jobID)) {
return rssMetaServiceTimestamp + "-" + jobID.toString();
return lifecycleManagerTimestamp + "-" + jobID.toString();
}

return rssMetaServiceTimestamp + "-" + JobID.generate();
return lifecycleManagerTimestamp + "-" + JobID.generate();
}

public static String toShuffleId(JobID jobID, IntermediateDataSetID dataSetID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void setup() throws IOException, InterruptedException {
new FlinkShuffleClientImpl(
"APP", "localhost", 1232, System.currentTimeMillis(), conf, null) {
@Override
public void setupMetaServiceRef(String host, int port) {}
public void setupLifecycleManagerRef(String host, int port) {}
};
when(clientFactory.createClient(primaryLocation.getHost(), primaryLocation.getPushPort(), 1))
.thenAnswer(t -> client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class SortBasedPusher extends MemoryConsumer {
private MemoryBlock currentPage = null;
private long pageCursor = -1;

private final ShuffleClient rssShuffleClient;
private final ShuffleClient shuffleClient;
private DataPusher dataPusher;
private final int pushBufferMaxSize;
private final long pushSortMemoryThreshold;
Expand All @@ -78,7 +78,7 @@ public class SortBasedPusher extends MemoryConsumer {

public SortBasedPusher(
TaskMemoryManager memoryManager,
ShuffleClient rssShuffleClient,
ShuffleClient shuffleClient,
String appId,
int shuffleId,
int mapId,
Expand All @@ -97,7 +97,7 @@ public SortBasedPusher(
(int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes()),
memoryManager.getTungstenMemoryMode());

this.rssShuffleClient = rssShuffleClient;
this.shuffleClient = shuffleClient;

this.appId = appId;
this.shuffleId = shuffleId;
Expand Down Expand Up @@ -127,7 +127,7 @@ public SortBasedPusher(
numMappers,
numPartitions,
conf,
rssShuffleClient,
shuffleClient,
afterPush,
mapStatusLengths);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -164,7 +164,7 @@ public long pushData() throws IOException {
currentPartition = partition;
} else {
int bytesWritten =
rssShuffleClient.mergeData(
shuffleClient.mergeData(
shuffleId,
mapId,
attemptNumber,
Expand Down
Loading

0 comments on commit 693172d

Please sign in to comment.