diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java index df5743660c..63a1ebda08 100644 --- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java @@ -98,5 +98,6 @@ public void readFields(DataInput in) throws IOException { } din.in = in; message = cin.readMessage(parser, ExtensionRegistry.getEmptyRegistry()); + cin.resetSizeCounter(); } } diff --git a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java index fd3154d904..4f24d30a88 100644 --- a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java +++ b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java @@ -23,12 +23,14 @@ import java.io.EOFException; import java.io.IOException; +import java.lang.reflect.Field; import java.time.LocalDate; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import com.google.protobuf.CodedInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -135,6 +137,51 @@ public void testService() throws Exception { scanner.close(); } + @Test + public void testProtoMessageSizeReset() throws Exception { + // This test case is to confirm that cin.resetSizeCounter() was indeed called + ProtoHistoryLoggingService service = createService(false); + service.start(); + TezDAGID dagId = TezDAGID.getInstance(appId, 0); + List protos = new ArrayList<>(); + for (DAGHistoryEvent event : makeHistoryEvents(dagId, service)) { + protos.add(new HistoryEventProtoConverter().convert(event.getHistoryEvent())); + service.handle(event); + } + service.stop(); + + TezProtoLoggers loggers = new TezProtoLoggers(); + Assert.assertTrue(loggers.setup(service.getConfig(), clock)); + + // Verify dag events are logged. + DatePartitionedLogger dagLogger = loggers.getDagEventsLogger(); + Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId + "_1"); + try (ProtoMessageReader reader = dagLogger.getReader(dagFilePath)) { + assertEventsRead(reader, protos, 1, protos.size()); + + int totalBytesRead = getTotalBytesRead(reader); + // cin.resetSizeCounter() in ProtoMessageWritable.java ensures that + // totalBytesRead will always be 0. For reference read javadoc of CodedInputStream. + Assert.assertEquals(totalBytesRead, 0); + } + } + + private static int getTotalBytesRead(ProtoMessageReader reader) throws NoSuchFieldException, + IllegalAccessException { + // writable is a private field in ProtoMessageReader.java + Field f = reader.getClass().getDeclaredField("writable"); + f.setAccessible(true); + ProtoMessageWritable writable = (ProtoMessageWritable) f.get(reader); + + // cin is a private filed in ProtoMessageWritable.java + Field c = writable.getClass().getDeclaredField("cin"); + c.setAccessible(true); + CodedInputStream cin = (CodedInputStream) c.get(writable); + + // Goal is to get value of: reader.writable.cin.getTotalBytesRead() + return cin.getTotalBytesRead(); + } + @Test public void testServiceSplitEvents() throws Exception { ProtoHistoryLoggingService service = createService(true);