From 14c4c5bb55c685a3c86d32a00042e199e2ab4dcd Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Thu, 11 Jan 2024 14:36:36 -0800 Subject: [PATCH] send dunamic workunit split message through HDFS channel --- .../gobblin/cluster/GobblinHelixTask.java | 36 +++- .../messaging/DynamicWorkUnitConfigKeys.java | 31 +++ .../hdfs/FileSystemMessageBuffer.java | 181 ++++++++++++++++++ .../data/FileSystemMessageBufferTest.java | 109 +++++++++++ .../gobblin/yarn/YarnAutoScalingManager.java | 19 ++ .../org/apache/gobblin/yarn/YarnService.java | 4 +- 6 files changed, 375 insertions(+), 5 deletions(-) create mode 100644 gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitConfigKeys.java create mode 100644 gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/hdfs/FileSystemMessageBuffer.java create mode 100644 gobblin-runtime/src/test/java/org/apache/gobblin/runtime/messaging/data/FileSystemMessageBufferTest.java diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java index 3e7d2707ede..6d335f1f1d1 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java @@ -17,6 +17,7 @@ package org.apache.gobblin.cluster; +import java.io.IOException; import java.util.Map; import java.util.concurrent.Callable; @@ -24,6 +25,11 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.TaskCreationException; import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.runtime.messaging.DynamicWorkUnitConfigKeys; +import org.apache.gobblin.runtime.messaging.DynamicWorkUnitProducer; +import org.apache.gobblin.runtime.messaging.MessageBuffer; +import org.apache.gobblin.runtime.messaging.data.SplitWorkUnitMessage; +import org.apache.gobblin.runtime.messaging.hdfs.FileSystemMessageBuffer; import org.apache.gobblin.runtime.util.StateStores; import org.apache.gobblin.source.workunit.MultiWorkUnit; import org.apache.gobblin.source.workunit.WorkUnit; @@ -77,6 +83,7 @@ public class GobblinHelixTask implements Task { private final TaskConfig taskConfig; + private final Config dynamicConfig; private final String applicationName; private final String instanceName; @@ -91,6 +98,8 @@ public class GobblinHelixTask implements Task { private String helixTaskId; private EventBus eventBus; private boolean isCanceled; + private DynamicWorkUnitProducer dynamicWorkUnitProducer; + private Path appWorkPath; public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder, TaskCallbackContext taskCallbackContext, @@ -104,12 +113,11 @@ public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder, this.applicationName = builder.getApplicationName(); this.instanceName = builder.getInstanceName(); this.taskMetrics = taskMetrics; + this.appWorkPath = builder.getAppWorkPath(); getInfoFromTaskConfig(); Path jobStateFilePath = GobblinClusterUtils - .getJobStateFilePath(stateStores.haveJobStateStore(), - builder.getAppWorkPath(), - this.jobId); + .getJobStateFilePath(stateStores.haveJobStateStore(), appWorkPath, this.jobId); Integer partitionNum = getPartitionForHelixTask(taskDriver); if (partitionNum == null) { @@ -119,7 +127,7 @@ public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder, // Dynamic config is considered as part of JobState in SingleTask // Important to distinguish between dynamicConfig and Config - final Config dynamicConfig = builder.getDynamicConfig() + dynamicConfig = builder.getDynamicConfig() .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName())) .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId())) .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName())) @@ -165,6 +173,8 @@ public TaskResult run() { this.isCanceled = false; long startTime = System.currentTimeMillis(); log.info("Actual task {} started. [{} {}]", this.taskId, this.applicationName, this.instanceName); + this.buildDynamicWorkUnitProducer(); + this.sendSplitWorkUnitMessage(); try (Closer closer = Closer.create()) { closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, this.jobName)); closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, this.jobKey)); @@ -239,4 +249,22 @@ public void cancel() { log.warn("Cancel called for an uninitialized Gobblin helix task for jobId {}.", jobId); } } + + + private void buildDynamicWorkUnitProducer() { + MessageBuffer fileSystemMessageBuffer = new FileSystemMessageBuffer.Factory( + dynamicConfig.withValue(DynamicWorkUnitConfigKeys.DYNAMIC_WORKUNIT_HDFS_PATH, + ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()))).getBuffer( + DynamicWorkUnitConfigKeys.DYNAMIC_WORKUNIT_HDFS_FOLDER); + dynamicWorkUnitProducer = new DynamicWorkUnitProducer(fileSystemMessageBuffer); + } + + private void sendSplitWorkUnitMessage() { + SplitWorkUnitMessage splitWorkUnitMessage = SplitWorkUnitMessage.builder().workUnitId(helixTaskId).build(); + try { + this.dynamicWorkUnitProducer.produce(splitWorkUnitMessage); + } catch (IOException ioException) { + log.error("Failed to send splitWorkUnitMessage. ", ioException); + } + } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitConfigKeys.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitConfigKeys.java new file mode 100644 index 00000000000..c1459321409 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitConfigKeys.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.messaging; + +public class DynamicWorkUnitConfigKeys { + public static final String DYNAMIC_WORKUNIT_PREFIX = "gobblin.dynamic.workunit."; + + public static final String DYNAMIC_WORKUNIT_HDFS_CHANNEL_NAME = DYNAMIC_WORKUNIT_PREFIX + "hdfs"; + public static final String DYNAMIC_WORKUNIT_HDFS_PREFIX = DYNAMIC_WORKUNIT_PREFIX + "hdfs."; + + public static final String DYNAMIC_WORKUNIT_HDFS_PATH = DYNAMIC_WORKUNIT_HDFS_PREFIX + "path"; + public static final String DYNAMIC_WORKUNIT_HDFS_POLLING_RATE_MILLIS = DYNAMIC_WORKUNIT_HDFS_PREFIX + "pollingRate"; + public static final long DYNAMIC_WORKUNIT_HDFS_DEFAULT_POLLING_RATE = 30000; + + public static final String DYNAMIC_WORKUNIT_HDFS_FOLDER = "dynamic-workunit"; +} \ No newline at end of file diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/hdfs/FileSystemMessageBuffer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/hdfs/FileSystemMessageBuffer.java new file mode 100644 index 00000000000..e75c02a91a8 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/hdfs/FileSystemMessageBuffer.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.runtime.messaging.hdfs; + +import com.google.common.io.ByteStreams; +import com.typesafe.config.Config; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.gobblin.runtime.messaging.MessageBuffer; +import org.apache.gobblin.runtime.messaging.data.DynamicWorkUnitMessage; +import org.apache.gobblin.runtime.messaging.data.DynamicWorkUnitSerde; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.PathUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import static org.apache.gobblin.runtime.messaging.DynamicWorkUnitConfigKeys.*; + + +/** + * Implements {@link FileSystem} based message buffer for sending and receiving {@link DynamicWorkUnitMessage}. + */ +@Slf4j +public class FileSystemMessageBuffer implements MessageBuffer { + + // Dynamic Workunit message specific file suffix + private static final String messageFileSuffix = ".wumessage"; + + private boolean isPollingTaskScheduled = false; + + @Getter + private final String channelName; + private final FileSystem fs; + @Getter + private final Path path; + private final long pollingRateMillis; + private final List>> subscribers = new ArrayList<>(); + + private FileSystemMessageBuffer(FileSystem fs, String channelName, long pollingRateMillis, Path parentWorkingDir) { + this.fs = fs; + this.channelName = channelName; + this.pollingRateMillis = pollingRateMillis; + this.path = parentWorkingDir; + } + + @Override + public void publish(DynamicWorkUnitMessage item) throws IOException { + byte[] serializedMsg = DynamicWorkUnitSerde.serialize(item); + try (FSDataOutputStream os = fs.create(this.getFilePathForMessage(item), true)) { + os.write(serializedMsg); + log.info("Persisted message into HDFS path {} for workunit {}.", path.toString(), item.getWorkUnitId()); + } catch (IOException ioException) { + log.error("Failed to persist message into HDFS path {} for workunit {}.", + path.toString(), item.getWorkUnitId(), ioException); + throw ioException; + } + } + + @Override + public void subscribe(Consumer> subscriber) { + subscribers.add(subscriber); + startPollingIfNotAlreadyPolling(); + } + + /** + * Only need one scheduled executor task / thread for polling + */ + private void startPollingIfNotAlreadyPolling() { + if (!isPollingTaskScheduled) { + scheduleTaskForPollingNewMessages(); + isPollingTaskScheduled = true; + } + } + + private void scheduleTaskForPollingNewMessages() { + Factory.getExecutorInstance().scheduleAtFixedRate(() -> { + List newMessages = this.getNewMessages(); + if (!CollectionUtils.isEmpty(newMessages)) { + subscribers.forEach(s -> s.accept(newMessages)); + } + }, + 0, pollingRateMillis, TimeUnit.MILLISECONDS); + } + + private List getNewMessages() { + List messageList = new LinkedList<>(); + try { + FileStatus[] fileStatus = fs.listStatus(path); + for(FileStatus status : fileStatus){ + Path singlePath = status.getPath(); + if (fs.isFile(singlePath)) + { + FSDataInputStream fis = fs.open(singlePath); + DynamicWorkUnitMessage message = DynamicWorkUnitSerde.deserialize(ByteStreams.toByteArray(fis)); + messageList.add(message); + fs.delete(singlePath, true); + log.debug("Fetched message from HDFS path {}.", singlePath.toString()); + } + } + } catch (IOException ioException) { + log.error("Failed to get message from HDFS path {}.", path.toString(), ioException); + } + + return messageList; + } + + private Path getFilePathForMessage(DynamicWorkUnitMessage message) { + String messageFileName = message.getWorkUnitId() + "_" + System.currentTimeMillis(); + Path messagePath = PathUtils.mergePaths(path, new Path(messageFileName)); + return PathUtils.addExtension(messagePath, messageFileSuffix); + } + + public static class Factory implements MessageBuffer.Factory { + private static volatile ScheduledExecutorService executorSingleton; + private final Config cfg; + private Path basePath; + private FileSystem fileSystem; + private long pollingRate; + public Factory(Config cfg) { + this.cfg = cfg; + this.setup(); + } + + @Override + public MessageBuffer getBuffer(String HDFSFolder) { + Path workunitPath = new Path(basePath, HDFSFolder); + log.info("HDFS directory for dynamic workunit is: " + workunitPath); + + return new FileSystemMessageBuffer(fileSystem, HDFSFolder, pollingRate, workunitPath); + } + + private void setup() { + String fsPathString = this.cfg.getString(DYNAMIC_WORKUNIT_HDFS_PATH); + this.basePath = new Path(fsPathString); + try { + this.fileSystem = this.basePath.getFileSystem(new Configuration()); + } catch (IOException e) { + throw new RuntimeException("Unable to detect workunit directory file system:", e); + } + pollingRate = ConfigUtils.getLong(this.cfg, DYNAMIC_WORKUNIT_HDFS_POLLING_RATE_MILLIS, + DYNAMIC_WORKUNIT_HDFS_DEFAULT_POLLING_RATE); + } + + private synchronized static ScheduledExecutorService getExecutorInstance() { + if (executorSingleton == null) { + executorSingleton = Executors.newScheduledThreadPool(1); + } + return executorSingleton; + } + } +} \ No newline at end of file diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/messaging/data/FileSystemMessageBufferTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/messaging/data/FileSystemMessageBufferTest.java new file mode 100644 index 00000000000..7cb3914794b --- /dev/null +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/messaging/data/FileSystemMessageBufferTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.messaging.data; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.io.ByteStreams; +import com.google.common.io.Files; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.runtime.messaging.DynamicWorkUnitConsumer; +import org.apache.gobblin.runtime.messaging.hdfs.FileSystemMessageBuffer; + +import static org.apache.gobblin.runtime.messaging.DynamicWorkUnitConfigKeys.DYNAMIC_WORKUNIT_HDFS_PATH; +import static org.apache.gobblin.runtime.messaging.DynamicWorkUnitConfigKeys.DYNAMIC_WORKUNIT_HDFS_POLLING_RATE_MILLIS; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.times; + + +public class FileSystemMessageBufferTest { + + FileSystemMessageBuffer messageBuffer; + FileSystem fs; + File baseDir; + Path path; + + @BeforeClass + public void testConstructBuffer() + throws IOException { + baseDir = Files.createTempDir(); + this.fs = FileSystem.get(baseDir.toURI(), new Configuration()); + Config cfg = ConfigFactory.empty() + .withValue(DYNAMIC_WORKUNIT_HDFS_PATH, ConfigValueFactory.fromAnyRef(baseDir.getAbsolutePath())) + .withValue(DYNAMIC_WORKUNIT_HDFS_POLLING_RATE_MILLIS, ConfigValueFactory.fromAnyRef(500)); + FileSystemMessageBuffer.Factory factory = new FileSystemMessageBuffer.Factory(cfg); + messageBuffer = (FileSystemMessageBuffer) factory.getBuffer("HDFS_channel"); + path = messageBuffer.getPath(); + + } + + @Test + public void testPublish() throws IOException { + String workUnitId = "workUnit0"; + List laggingPartitions = new ArrayList<>(Arrays.asList("partition-0","partition-1")); + SplitWorkUnitMessage splitMessage = new SplitWorkUnitMessage(workUnitId, laggingPartitions); + + messageBuffer.publish(splitMessage); + fs.exists(path); + FileStatus[] fileStatus = fs.listStatus(path); + Assert.assertEquals(fileStatus.length, 1); + FSDataInputStream fis = fs.open(fileStatus[0].getPath()); + DynamicWorkUnitMessage message = DynamicWorkUnitSerde.deserialize(ByteStreams.toByteArray(fis)); + + Assert.assertEquals(message.getWorkUnitId(), workUnitId); + Assert.assertEquals(((SplitWorkUnitMessage) message).getLaggingTopicPartitions().size(), 2); + + } + + @Test (dependsOnMethods={"testPublish"}) + public void testSubscribe() throws IOException, InterruptedException { + DynamicWorkUnitConsumer consumer = Mockito.mock(DynamicWorkUnitConsumer.class); + + messageBuffer.subscribe(consumer); + Thread.sleep(1000); + Mockito.verify(consumer, times(1)).accept(any(List.class)); + + // After reading all messages in the path, files should be cleaned up + Assert.assertEquals(fs.listStatus(path).length, 0); + + } + + @AfterClass + public void cleanup() throws IOException { + fs.delete(new Path(baseDir.getPath()), true); + } + +} \ No newline at end of file diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java index 21fc47c4a46..cdcad157cef 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java @@ -17,6 +17,7 @@ package org.apache.gobblin.yarn; +import com.typesafe.config.ConfigValueFactory; import java.util.ArrayDeque; import java.util.Collections; import java.util.Comparator; @@ -32,6 +33,11 @@ import java.util.stream.Collectors; import org.apache.commons.compress.utils.Sets; +import org.apache.gobblin.runtime.messaging.DynamicWorkUnitConfigKeys; +import org.apache.gobblin.runtime.messaging.DynamicWorkUnitConsumer; +import org.apache.gobblin.runtime.messaging.MessageBuffer; +import org.apache.gobblin.runtime.messaging.handler.SplitMessageHandler; +import org.apache.gobblin.runtime.messaging.hdfs.FileSystemMessageBuffer; import org.apache.gobblin.stream.WorkUnitChangeEvent; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.helix.HelixDataAccessor; @@ -92,6 +98,7 @@ public class YarnAutoScalingManager extends AbstractIdleService { private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60; private int taskAttemptsThreshold; private final boolean splitWorkUnitReachThreshold; + MessageBuffer fileSystemMessageBuffer; private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX + "windowSize"; @@ -136,6 +143,8 @@ public YarnAutoScalingManager(GobblinApplicationMaster appMaster) { DEFAULT_TASK_NUMBER_OF_ATTEMPTS_THRESHOLD); this.splitWorkUnitReachThreshold = ConfigUtils.getBoolean(this.config, SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD, DEFAULT_SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD); + + this.startDynamicWorkUnitConsumer(); } @Override @@ -322,6 +331,16 @@ boolean isInstanceUnused(String participant){ } } + private void startDynamicWorkUnitConsumer() { + this.fileSystemMessageBuffer = new FileSystemMessageBuffer.Factory( + config.withValue(DynamicWorkUnitConfigKeys.DYNAMIC_WORKUNIT_HDFS_PATH, + ConfigValueFactory.fromAnyRef(yarnService.getAppWorkDir().toString()))).getBuffer( + DynamicWorkUnitConfigKeys.DYNAMIC_WORKUNIT_HDFS_FOLDER); + DynamicWorkUnitConsumer dynamicWorkUnitConsumer = new DynamicWorkUnitConsumer(Collections.singletonList(new SplitMessageHandler())); + fileSystemMessageBuffer.subscribe(dynamicWorkUnitConsumer); + } + + /** * A FIFO queue with fixed size and returns maxValue among all elements within the queue in constant time. * This data structure prevents temporary fluctuation in the number of active helix partitions as the size of queue diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java index 370b8ba87fe..111b44cd949 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java @@ -139,6 +139,8 @@ public class YarnService extends AbstractIdleService { private final Configuration yarnConfiguration; private final FileSystem fs; + @Getter + private final Path appWorkDir; private final Optional gobblinMetrics; private final Optional eventSubmitter; @@ -296,6 +298,7 @@ public YarnService(Config config, String applicationName, String applicationId, GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL); this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE, GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE); + this.appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId); } @SuppressWarnings("unused") @@ -629,7 +632,6 @@ private void requestContainer(Optional preferredNode, Resource resource) protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo) throws IOException { - Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId); Path containerWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); Map resourceMap = Maps.newHashMap();