Skip to content

Commit

Permalink
Refactor to address more review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vnarayanan committed Jun 26, 2024
1 parent 83cd25f commit c0c29b4
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public OutputCommitterContextImpl(ApplicationId applicationId,
int dagAttemptNumber,
String dagName,
String vertexName,
int dagIdentifier, int vertexIdx, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output) {
int dagIdentifier,
int vertexIdx,
RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output) {
Objects.requireNonNull(applicationId, "applicationId is null");
Objects.requireNonNull(dagName, "dagName is null");
Objects.requireNonNull(vertexName, "vertexName is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ public void initialize() throws IOException {
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(
getContext().getApplicationId(),
getContext().getDagIdentifier()));
jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext()));
jobConf.setInt(MRJobConfig.VERTEX_ID, getContext().getVertexIndex());
committer = getOutputCommitter(getContext());
jobContext = getJobContextFromVertexContext(getContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.OutputContext;

@Private
public final class Utils {
Expand Down Expand Up @@ -66,7 +68,11 @@ public static Counter getMRCounter(TezCounter tezCounter) {
return new MRCounters.MRCounter(tezCounter);
}

public static String getDAGID(ApplicationId id, int dagIdentifier) {
return TezDAGID.getInstance(id, dagIdentifier).toString();
public static String getDAGID(OutputCommitterContext context) {
return TezDAGID.getInstance(context.getApplicationId(), context.getDagIdentifier()).toString();
}

public static String getDAGID(OutputContext context) {
return TezDAGID.getInstance(context.getApplicationId(), context.getDagIdentifier()).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,7 @@ protected List<Event> initializeBase() throws IOException, InterruptedException
}
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext().getApplicationId(),
getContext().getDagIdentifier()));
jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext()));
TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
.createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(),
getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void testJobUUIDSet() throws Exception {
String dagID = output.jobConf.get(MRJobConfig.JOB_COMMITTER_UUID, invalidDAGID);
assertNotEquals(dagID, invalidDAGID);
assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID), dagID);
assertEquals(dagID, Utils.getDAGID(outputContext.getApplicationId(), outputContext.getDagIdentifier()));
assertEquals(dagID, Utils.getDAGID(outputContext));
}

@Test(timeout = 5000)
Expand Down

0 comments on commit c0c29b4

Please sign in to comment.