Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send dunamic workunit split message through HDFS channel #3856

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@

package org.apache.gobblin.cluster;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.TaskCreationException;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.messaging.DynamicWorkUnitConfigKeys;
import org.apache.gobblin.runtime.messaging.DynamicWorkUnitProducer;
import org.apache.gobblin.runtime.messaging.MessageBuffer;
import org.apache.gobblin.runtime.messaging.data.SplitWorkUnitMessage;
import org.apache.gobblin.runtime.messaging.hdfs.FileSystemMessageBuffer;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
Expand Down Expand Up @@ -77,6 +83,7 @@
public class GobblinHelixTask implements Task {

private final TaskConfig taskConfig;
private final Config dynamicConfig;
private final String applicationName;
private final String instanceName;

Expand All @@ -91,6 +98,8 @@ public class GobblinHelixTask implements Task {
private String helixTaskId;
private EventBus eventBus;
private boolean isCanceled;
private DynamicWorkUnitProducer dynamicWorkUnitProducer;
private Path appWorkPath;

public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
TaskCallbackContext taskCallbackContext,
Expand All @@ -104,12 +113,11 @@ public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
this.applicationName = builder.getApplicationName();
this.instanceName = builder.getInstanceName();
this.taskMetrics = taskMetrics;
this.appWorkPath = builder.getAppWorkPath();
getInfoFromTaskConfig();

Path jobStateFilePath = GobblinClusterUtils
.getJobStateFilePath(stateStores.haveJobStateStore(),
builder.getAppWorkPath(),
this.jobId);
.getJobStateFilePath(stateStores.haveJobStateStore(), appWorkPath, this.jobId);

Integer partitionNum = getPartitionForHelixTask(taskDriver);
if (partitionNum == null) {
Expand All @@ -119,7 +127,7 @@ public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,

// Dynamic config is considered as part of JobState in SingleTask
// Important to distinguish between dynamicConfig and Config
final Config dynamicConfig = builder.getDynamicConfig()
dynamicConfig = builder.getDynamicConfig()
.withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName()))
.withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId()))
.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
Expand Down Expand Up @@ -165,6 +173,8 @@ public TaskResult run() {
this.isCanceled = false;
long startTime = System.currentTimeMillis();
log.info("Actual task {} started. [{} {}]", this.taskId, this.applicationName, this.instanceName);
this.buildDynamicWorkUnitProducer();
this.sendSplitWorkUnitMessage();
try (Closer closer = Closer.create()) {
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, this.jobName));
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, this.jobKey));
Expand Down Expand Up @@ -239,4 +249,22 @@ public void cancel() {
log.warn("Cancel called for an uninitialized Gobblin helix task for jobId {}.", jobId);
}
}


private void buildDynamicWorkUnitProducer() {
MessageBuffer fileSystemMessageBuffer = new FileSystemMessageBuffer.Factory(
dynamicConfig.withValue(DynamicWorkUnitConfigKeys.DYNAMIC_WORKUNIT_HDFS_PATH,
ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()))).getBuffer(
DynamicWorkUnitConfigKeys.DYNAMIC_WORKUNIT_HDFS_FOLDER);
dynamicWorkUnitProducer = new DynamicWorkUnitProducer(fileSystemMessageBuffer);
}

private void sendSplitWorkUnitMessage() {
SplitWorkUnitMessage splitWorkUnitMessage = SplitWorkUnitMessage.builder().workUnitId(helixTaskId).build();
try {
this.dynamicWorkUnitProducer.produce(splitWorkUnitMessage);
} catch (IOException ioException) {
log.error("Failed to send splitWorkUnitMessage. ", ioException);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.runtime.messaging;

public class DynamicWorkUnitConfigKeys {
public static final String DYNAMIC_WORKUNIT_PREFIX = "gobblin.dynamic.workunit.";

public static final String DYNAMIC_WORKUNIT_HDFS_CHANNEL_NAME = DYNAMIC_WORKUNIT_PREFIX + "hdfs";
public static final String DYNAMIC_WORKUNIT_HDFS_PREFIX = DYNAMIC_WORKUNIT_PREFIX + "hdfs.";

public static final String DYNAMIC_WORKUNIT_HDFS_PATH = DYNAMIC_WORKUNIT_HDFS_PREFIX + "path";
public static final String DYNAMIC_WORKUNIT_HDFS_POLLING_RATE_MILLIS = DYNAMIC_WORKUNIT_HDFS_PREFIX + "pollingRate";
public static final long DYNAMIC_WORKUNIT_HDFS_DEFAULT_POLLING_RATE = 30000;

public static final String DYNAMIC_WORKUNIT_HDFS_FOLDER = "dynamic-workunit";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.runtime.messaging.hdfs;

import com.google.common.io.ByteStreams;
import com.typesafe.config.Config;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.gobblin.runtime.messaging.MessageBuffer;
import org.apache.gobblin.runtime.messaging.data.DynamicWorkUnitMessage;
import org.apache.gobblin.runtime.messaging.data.DynamicWorkUnitSerde;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import static org.apache.gobblin.runtime.messaging.DynamicWorkUnitConfigKeys.*;


/**
* Implements {@link FileSystem} based message buffer for sending and receiving {@link DynamicWorkUnitMessage}.
*/
@Slf4j
public class FileSystemMessageBuffer implements MessageBuffer<DynamicWorkUnitMessage> {

// Dynamic Workunit message specific file suffix
private static final String messageFileSuffix = ".wumessage";

private boolean isPollingTaskScheduled = false;

@Getter
private final String channelName;
private final FileSystem fs;
@Getter
private final Path path;
private final long pollingRateMillis;
private final List<Consumer<List<DynamicWorkUnitMessage>>> subscribers = new ArrayList<>();

private FileSystemMessageBuffer(FileSystem fs, String channelName, long pollingRateMillis, Path parentWorkingDir) {
this.fs = fs;
this.channelName = channelName;
this.pollingRateMillis = pollingRateMillis;
this.path = parentWorkingDir;
}

@Override
public void publish(DynamicWorkUnitMessage item) throws IOException {
byte[] serializedMsg = DynamicWorkUnitSerde.serialize(item);
try (FSDataOutputStream os = fs.create(this.getFilePathForMessage(item), true)) {
os.write(serializedMsg);
log.info("Persisted message into HDFS path {} for workunit {}.", path.toString(), item.getWorkUnitId());
} catch (IOException ioException) {
log.error("Failed to persist message into HDFS path {} for workunit {}.",
path.toString(), item.getWorkUnitId(), ioException);
throw ioException;
}
}

@Override
public void subscribe(Consumer<List<DynamicWorkUnitMessage>> subscriber) {
subscribers.add(subscriber);
startPollingIfNotAlreadyPolling();
}

/**
* Only need one scheduled executor task / thread for polling
*/
private void startPollingIfNotAlreadyPolling() {
if (!isPollingTaskScheduled) {
scheduleTaskForPollingNewMessages();
isPollingTaskScheduled = true;
}
}

private void scheduleTaskForPollingNewMessages() {
Factory.getExecutorInstance().scheduleAtFixedRate(() -> {
List<DynamicWorkUnitMessage> newMessages = this.getNewMessages();
if (!CollectionUtils.isEmpty(newMessages)) {
subscribers.forEach(s -> s.accept(newMessages));
}
},
0, pollingRateMillis, TimeUnit.MILLISECONDS);
}

private List<DynamicWorkUnitMessage> getNewMessages() {
List<DynamicWorkUnitMessage> messageList = new LinkedList<>();
try {
FileStatus[] fileStatus = fs.listStatus(path);
for(FileStatus status : fileStatus){
Path singlePath = status.getPath();
if (fs.isFile(singlePath))
{
FSDataInputStream fis = fs.open(singlePath);
DynamicWorkUnitMessage message = DynamicWorkUnitSerde.deserialize(ByteStreams.toByteArray(fis));
messageList.add(message);
fs.delete(singlePath, true);
log.debug("Fetched message from HDFS path {}.", singlePath.toString());
}
}
} catch (IOException ioException) {
log.error("Failed to get message from HDFS path {}.", path.toString(), ioException);
}

return messageList;
}

private Path getFilePathForMessage(DynamicWorkUnitMessage message) {
String messageFileName = message.getWorkUnitId() + "_" + System.currentTimeMillis();
Path messagePath = PathUtils.mergePaths(path, new Path(messageFileName));
return PathUtils.addExtension(messagePath, messageFileSuffix);
}

public static class Factory implements MessageBuffer.Factory<DynamicWorkUnitMessage> {
private static volatile ScheduledExecutorService executorSingleton;
private final Config cfg;
private Path basePath;
private FileSystem fileSystem;
private long pollingRate;
public Factory(Config cfg) {
this.cfg = cfg;
this.setup();
}

@Override
public MessageBuffer<DynamicWorkUnitMessage> getBuffer(String HDFSFolder) {
Path workunitPath = new Path(basePath, HDFSFolder);
log.info("HDFS directory for dynamic workunit is: " + workunitPath);

return new FileSystemMessageBuffer(fileSystem, HDFSFolder, pollingRate, workunitPath);
}

private void setup() {
String fsPathString = this.cfg.getString(DYNAMIC_WORKUNIT_HDFS_PATH);
this.basePath = new Path(fsPathString);
try {
this.fileSystem = this.basePath.getFileSystem(new Configuration());
} catch (IOException e) {
throw new RuntimeException("Unable to detect workunit directory file system:", e);
}
pollingRate = ConfigUtils.getLong(this.cfg, DYNAMIC_WORKUNIT_HDFS_POLLING_RATE_MILLIS,
DYNAMIC_WORKUNIT_HDFS_DEFAULT_POLLING_RATE);
}

private synchronized static ScheduledExecutorService getExecutorInstance() {
if (executorSingleton == null) {
executorSingleton = Executors.newScheduledThreadPool(1);
}
return executorSingleton;
}
}
}
Loading