Skip to content

Commit

Permalink
TEZ-4540: Reading proto data more than 2GB from multiple splits fails (
Browse files Browse the repository at this point in the history
…#334) (Raghav Aggarwal reviewed by Laszlo Bodor)
  • Loading branch information
Aggarwal-Raghav authored Jun 20, 2024
1 parent e08d027 commit 0ac505b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,6 @@ public void readFields(DataInput in) throws IOException {
}
din.in = in;
message = cin.readMessage(parser, ExtensionRegistry.getEmptyRegistry());
cin.resetSizeCounter();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@

import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import com.google.protobuf.CodedInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
Expand Down Expand Up @@ -135,6 +137,51 @@ public void testService() throws Exception {
scanner.close();
}

@Test
public void testProtoMessageSizeReset() throws Exception {
// This test case is to confirm that cin.resetSizeCounter() was indeed called
ProtoHistoryLoggingService service = createService(false);
service.start();
TezDAGID dagId = TezDAGID.getInstance(appId, 0);
List<HistoryEventProto> protos = new ArrayList<>();
for (DAGHistoryEvent event : makeHistoryEvents(dagId, service)) {
protos.add(new HistoryEventProtoConverter().convert(event.getHistoryEvent()));
service.handle(event);
}
service.stop();

TezProtoLoggers loggers = new TezProtoLoggers();
Assert.assertTrue(loggers.setup(service.getConfig(), clock));

// Verify dag events are logged.
DatePartitionedLogger<HistoryEventProto> dagLogger = loggers.getDagEventsLogger();
Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId + "_1");
try (ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(dagFilePath)) {
assertEventsRead(reader, protos, 1, protos.size());

int totalBytesRead = getTotalBytesRead(reader);
// cin.resetSizeCounter() in ProtoMessageWritable.java ensures that
// totalBytesRead will always be 0. For reference read javadoc of CodedInputStream.
Assert.assertEquals(totalBytesRead, 0);
}
}

private static int getTotalBytesRead(ProtoMessageReader<HistoryEventProto> reader) throws NoSuchFieldException,
IllegalAccessException {
// writable is a private field in ProtoMessageReader.java
Field f = reader.getClass().getDeclaredField("writable");
f.setAccessible(true);
ProtoMessageWritable<HistoryEventProto> writable = (ProtoMessageWritable<HistoryEventProto>) f.get(reader);

// cin is a private filed in ProtoMessageWritable.java
Field c = writable.getClass().getDeclaredField("cin");
c.setAccessible(true);
CodedInputStream cin = (CodedInputStream) c.get(writable);

// Goal is to get value of: reader.writable.cin.getTotalBytesRead()
return cin.getTotalBytesRead();
}

@Test
public void testServiceSplitEvents() throws Exception {
ProtoHistoryLoggingService service = createService(true);
Expand Down

0 comments on commit 0ac505b

Please sign in to comment.