diff --git a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/writers/CelebornUnorderedPartitionedKVWriter.java b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/writers/CelebornUnorderedPartitionedKVWriter.java new file mode 100644 index 00000000000..9e473475d54 --- /dev/null +++ b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/writers/CelebornUnorderedPartitionedKVWriter.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.common.writers; + +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.library.api.KeyValuesWriter; +import org.apache.tez.runtime.library.api.Partitioner; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.ConfigUtils; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; +import org.apache.tez.runtime.library.sort.CelebornSortBasedPusher; +import org.apache.tez.runtime.library.utils.CodecUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.celeborn.client.CelebornTezWriter; +import org.apache.celeborn.common.CelebornConf; + +public class CelebornUnorderedPartitionedKVWriter extends KeyValuesWriter { + private static final Logger Logger = + LoggerFactory.getLogger(CelebornUnorderedPartitionedKVWriter.class); + + protected final OutputContext outputContext; + protected final Configuration conf; + protected final RawLocalFileSystem localFs; + protected final Partitioner partitioner; + protected final Class keyClass; + protected final Class valClass; + protected final Serializer keySerializer; + protected final Serializer valSerializer; + protected final SerializationFactory serializationFactory; + protected final Serialization keySerialization; + protected final Serialization valSerialization; + protected final int numOutputs; + protected final CompressionCodec codec; + + protected final TezCounter outputRecordBytesCounter; + protected final TezCounter outputRecordsCounter; + protected final TezCounter outputBytesWithOverheadCounter; + + private long availableMemory; + private int[] numRecordsPerPartition; + private long[] sizePerPartition; + private AtomicBoolean isShutdown = new AtomicBoolean(false); + + final TezRuntimeConfiguration.ReportPartitionStats reportPartitionStats; + + private CelebornSortBasedPusher pusher; + + public CelebornUnorderedPartitionedKVWriter( + OutputContext outputContext, + Configuration conf, + int numOutputs, + long availableMemoryBytes, + CelebornTezWriter celebornTezWriter, + CelebornConf celebornConf) { + this.outputContext = outputContext; + this.conf = conf; + try { + this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw(); + } catch (IOException e) { + throw new RuntimeException(e); + } + this.numOutputs = numOutputs; + + // k/v serialization + keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf); + valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf); + serializationFactory = new SerializationFactory(this.conf); + keySerialization = serializationFactory.getSerialization(keyClass); + valSerialization = serializationFactory.getSerialization(valClass); + keySerializer = keySerialization.getSerializer(keyClass); + valSerializer = valSerialization.getSerializer(valClass); + + outputRecordBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES); + outputRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS); + outputBytesWithOverheadCounter = + outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD); + + // stats + reportPartitionStats = + TezRuntimeConfiguration.ReportPartitionStats.fromString( + conf.get( + TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, + TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT)); + sizePerPartition = (reportPartitionStats.isEnabled()) ? new long[numOutputs] : null; + numRecordsPerPartition = new int[numOutputs]; + + // compression + try { + this.codec = CodecUtils.getCodec(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + + Logger.info( + "Instantiating Partitioner: [{}]", + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS)); + + try { + this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + + availableMemory = availableMemoryBytes; + // assume that there is 64MB memory to writer shuffle data + if (availableMemory == 0) { + availableMemory = 64 * 1024 * 1024; + } + pusher = + new CelebornSortBasedPusher( + keySerializer, + valSerializer, + (int) availableMemory, + (int) (availableMemory * 0.8), + null, + outputRecordBytesCounter, + outputRecordsCounter, + celebornTezWriter, + celebornConf, + false); + } + + @Override + public void write(Object key, Iterable iterable) throws IOException { + Iterator it = iterable.iterator(); + while (it.hasNext()) { + write(key, it.next()); + } + } + + @Override + public void write(Object key, Object value) throws IOException { + if (isShutdown.get()) { + throw new RuntimeException("Writer already closed"); + } + pusher.insert(key, value, partitioner.getPartition(key, value, numOutputs)); + } + + public void close() throws IOException { + pusher.close(); + isShutdown.set(true); + updateTezCountersAndNotify(); + } + + private void updateTezCountersAndNotify() { + numRecordsPerPartition = pusher.getRecordsPerPartition(); + if (sizePerPartition != null) { + sizePerPartition = pusher.getBytesPerPartition(); + } + outputContext.notifyProgress(); + } + + public int[] getNumRecordsPerPartition() { + return numRecordsPerPartition; + } + + public boolean reportDetailedPartitionStats() { + return reportPartitionStats.isPrecise(); + } + + public long[] getPartitionStats() { + return sizePerPartition; + } +} diff --git a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedKVOutput.java b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedKVOutput.java new file mode 100644 index 00000000000..83ce22b6468 --- /dev/null +++ b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedKVOutput.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.output; + +import static org.apache.celeborn.tez.plugin.util.CelebornTezUtils.*; + +import java.io.IOException; +import java.util.BitSet; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.Deflater; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.Preconditions; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.AbstractLogicalOutput; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.library.api.KeyValuesWriter; +import org.apache.tez.runtime.library.api.Partitioner; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; +import org.apache.tez.runtime.library.common.writers.CelebornUnorderedPartitionedKVWriter; +import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter; +import org.apache.tez.runtime.library.sort.CelebornTezPerPartitionRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.celeborn.client.CelebornTezWriter; +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.celeborn.tez.plugin.util.CelebornTezUtils; + +/** + * {@link UnorderedPartitionedKVOutput} is a {@link LogicalOutput} which can be used to write + * Key-Value pairs. The key-value pairs are written to the correct partition based on the configured + * Partitioner. + */ +@Public +public class CelebornUnorderedKVOutput extends AbstractLogicalOutput { + + private static final Logger LOG = + LoggerFactory.getLogger(CelebornUnorderedPartitionedKVOutput.class); + + @VisibleForTesting Configuration conf; + private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler; + private CelebornUnorderedPartitionedKVWriter kvWriter; + private final Deflater deflater; + private boolean sendEmptyPartitionDetails; + private final AtomicBoolean isStarted = new AtomicBoolean(false); + + private static int mapId; + private int numMapppers; + private int numOutputs; + private int attemptId; + private String host; + private int port; + private int shuffleId; + private String appId; + private static boolean broadcastOrOntToOne; + + public CelebornUnorderedKVOutput(OutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); + this.numOutputs = getNumPhysicalOutputs(); + this.numMapppers = outputContext.getVertexParallelism(); + TezTaskAttemptID taskAttemptId = + TezTaskAttemptID.fromString( + CelebornTezUtils.uniqueIdentifierToAttemptId(outputContext.getUniqueIdentifier())); + attemptId = taskAttemptId.getId(); + mapId = taskAttemptId.getTaskID().getId(); + deflater = TezCommonUtils.newBestCompressionDeflater(); + } + + @Override + public synchronized List initialize() throws Exception { + this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext()); + this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, getContext().getWorkDirs()); + this.conf.setInt( + TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, getNumPhysicalOutputs()); + this.conf.set( + TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, CustomPartitioner.class.getName()); + sendEmptyPartitionDetails = + conf.getBoolean( + TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, + TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT); + this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler(); + getContext() + .requestInitialMemory( + UnorderedPartitionedKVWriter.getInitialMemoryRequirement( + conf, getContext().getTotalMemoryAvailableToTask()), + memoryUpdateCallbackHandler); + this.host = this.conf.get(TEZ_CELEBORN_LM_HOST); + this.port = this.conf.getInt(TEZ_CELEBORN_LM_PORT, -1); + this.shuffleId = this.conf.getInt(TEZ_SHUFFLE_ID, -1); + this.appId = this.conf.get(TEZ_CELEBORN_APPLICATION_ID); + this.broadcastOrOntToOne = conf.getBoolean(TEZ_BROADCAST_OR_ONETOONE, false); + return Collections.emptyList(); + } + + @Override + public synchronized void start() throws Exception { + if (!isStarted.get()) { + memoryUpdateCallbackHandler.validateUpdateReceived(); + CelebornConf celebornConf = CelebornTezUtils.fromTezConfiguration(conf); + CelebornTezWriter celebornTezWriter = + new CelebornTezWriter( + shuffleId, + mapId, + mapId, + attemptId, + numMapppers, + numOutputs, + celebornConf, + appId, + host, + port, + new UserIdentifier( + celebornConf.quotaUserSpecificTenant(), + celebornConf.quotaUserSpecificUserName())); + this.kvWriter = + new CelebornUnorderedPartitionedKVWriter( + getContext(), + conf, + numOutputs, + memoryUpdateCallbackHandler.getMemoryAssigned(), + celebornTezWriter, + celebornConf); + isStarted.set(true); + } + } + + @Override + public synchronized KeyValuesWriter getWriter() throws Exception { + Preconditions.checkState(isStarted.get(), "Cannot get writer before starting the Output"); + return kvWriter; + } + + @Override + public void handleEvents(List outputEvents) {} + + @Override + public synchronized List close() throws Exception { + List returnEvents; + if (isStarted.get()) { + kvWriter.close(); + returnEvents = generateEvents(); + kvWriter = null; + } else { + LOG.warn( + getContext().getInputOutputVertexNames() + + ": Attempting to close output {} of type {} before it was started. Generating empty events", + getContext().getDestinationVertexName(), + this.getClass().getSimpleName()); + returnEvents = new LinkedList(); + ShuffleUtils.generateEventsForNonStartedOutput( + returnEvents, + getNumPhysicalOutputs(), + getContext(), + false, + true, + TezCommonUtils.newBestCompressionDeflater()); + } + + // This works for non-started outputs since new counters will be created with an initial value + // of 0 + long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); + getContext().getStatisticsReporter().reportDataSize(outputSize); + long outputRecords = + getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS).getValue(); + getContext().getStatisticsReporter().reportItemsProcessed(outputRecords); + + return returnEvents; + } + + private List generateEvents() throws IOException { + List eventList = Lists.newLinkedList(); + boolean isLastEvent = true; + + String auxiliaryService = + conf.get( + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + + int[] numRecordsPerPartition = kvWriter.getNumRecordsPerPartition(); + + CelebornTezPerPartitionRecord celebornTezPerPartitionRecord = + new CelebornTezPerPartitionRecord(numOutputs, numRecordsPerPartition); + + BitSet emptyPartitionDetails = new BitSet(); + for (int i = 0; i < celebornTezPerPartitionRecord.size(); i++) { + TezIndexRecord indexRecord = celebornTezPerPartitionRecord.getIndex(i); + if (!indexRecord.hasData()) { + emptyPartitionDetails.set(i); + } + } + if (emptyPartitionDetails.cardinality() > 0) { + LOG.info("empty partition details"); + } + + ShuffleUtils.generateEventOnSpill( + eventList, + true, + isLastEvent, + getContext(), + 0, + celebornTezPerPartitionRecord, + getNumPhysicalOutputs(), + sendEmptyPartitionDetails, + getContext().getUniqueIdentifier(), + kvWriter.getPartitionStats(), + kvWriter.reportDetailedPartitionStats(), + auxiliaryService, + deflater); + LOG.info("Generate events."); + return eventList; + } + + @InterfaceAudience.Private + public static class CustomPartitioner implements Partitioner { + + @Override + public int getPartition(Object key, Object value, int numPartitions) { + if (broadcastOrOntToOne) { + return mapId; + } else { + return 0; + } + } + } +} diff --git a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedPartitionedKVOutput.java b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedPartitionedKVOutput.java new file mode 100644 index 00000000000..2802895b666 --- /dev/null +++ b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedPartitionedKVOutput.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.output; + +import static org.apache.celeborn.tez.plugin.util.CelebornTezUtils.*; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.Deflater; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.Preconditions; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.*; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.common.writers.CelebornUnorderedPartitionedKVWriter; +import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter; +import org.apache.tez.runtime.library.sort.CelebornTezPerPartitionRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.celeborn.client.CelebornTezWriter; +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.celeborn.tez.plugin.util.CelebornTezUtils; + +/** + * {@link UnorderedPartitionedKVOutput} is a {@link LogicalOutput} which can be used to write + * Key-Value pairs. The key-value pairs are written to the correct partition based on the configured + * Partitioner. + */ +@Public +public class CelebornUnorderedPartitionedKVOutput extends AbstractLogicalOutput { + + private static final Logger LOG = + LoggerFactory.getLogger(CelebornUnorderedPartitionedKVOutput.class); + + @VisibleForTesting Configuration conf; + private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler; + private CelebornUnorderedPartitionedKVWriter kvWriter; + private final Deflater deflater; + + boolean sendEmptyPartitionDetails; + private final AtomicBoolean isStarted = new AtomicBoolean(false); + + private static int mapId; + private int numMapppers; + private int numOutputs; + private int attemptId; + private String host; + private int port; + private int shuffleId; + private String appId; + + public CelebornUnorderedPartitionedKVOutput(OutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); + this.numOutputs = getNumPhysicalOutputs(); + this.numMapppers = outputContext.getVertexParallelism(); + TezTaskAttemptID taskAttemptId = + TezTaskAttemptID.fromString( + CelebornTezUtils.uniqueIdentifierToAttemptId(outputContext.getUniqueIdentifier())); + attemptId = taskAttemptId.getId(); + mapId = taskAttemptId.getTaskID().getId(); + deflater = TezCommonUtils.newBestCompressionDeflater(); + } + + @Override + public synchronized List initialize() throws Exception { + this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext()); + this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, getContext().getWorkDirs()); + this.conf.setInt( + TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, getNumPhysicalOutputs()); + sendEmptyPartitionDetails = + conf.getBoolean( + TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, + TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT); + this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler(); + getContext() + .requestInitialMemory( + UnorderedPartitionedKVWriter.getInitialMemoryRequirement( + conf, getContext().getTotalMemoryAvailableToTask()), + memoryUpdateCallbackHandler); + this.host = this.conf.get(TEZ_CELEBORN_LM_HOST); + this.port = this.conf.getInt(TEZ_CELEBORN_LM_PORT, -1); + this.shuffleId = this.conf.getInt(TEZ_SHUFFLE_ID, -1); + this.appId = this.conf.get(TEZ_CELEBORN_APPLICATION_ID); + + return Collections.emptyList(); + } + + @Override + public synchronized void start() throws Exception { + if (!isStarted.get()) { + memoryUpdateCallbackHandler.validateUpdateReceived(); + CelebornConf celebornConf = CelebornTezUtils.fromTezConfiguration(conf); + CelebornTezWriter celebornTezWriter = + new CelebornTezWriter( + shuffleId, + mapId, + mapId, + attemptId, + numMapppers, + numOutputs, + celebornConf, + appId, + host, + port, + new UserIdentifier( + celebornConf.quotaUserSpecificTenant(), + celebornConf.quotaUserSpecificUserName())); + this.kvWriter = + new CelebornUnorderedPartitionedKVWriter( + getContext(), + conf, + numOutputs, + memoryUpdateCallbackHandler.getMemoryAssigned(), + celebornTezWriter, + celebornConf); + isStarted.set(true); + } + } + + @Override + public synchronized Writer getWriter() throws Exception { + Preconditions.checkState(isStarted.get(), "Cannot get writer before starting the Output"); + return kvWriter; + } + + @Override + public void handleEvents(List outputEvents) {} + + @Override + public synchronized List close() throws Exception { + List returnEvents; + if (isStarted.get()) { + kvWriter.close(); + returnEvents = generateEvents(); + kvWriter = null; + } else { + LOG.warn( + getContext().getInputOutputVertexNames() + + ": Attempting to close output {} of type {} before it was started. Generating empty events", + getContext().getDestinationVertexName(), + this.getClass().getSimpleName()); + returnEvents = new LinkedList(); + ShuffleUtils.generateEventsForNonStartedOutput( + returnEvents, + getNumPhysicalOutputs(), + getContext(), + false, + true, + TezCommonUtils.newBestCompressionDeflater()); + } + + // This works for non-started outputs since new counters will be created with an initial value + // of 0 + long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); + getContext().getStatisticsReporter().reportDataSize(outputSize); + long outputRecords = + getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS).getValue(); + getContext().getStatisticsReporter().reportItemsProcessed(outputRecords); + + return returnEvents; + } + + private List generateEvents() throws IOException { + List eventList = Lists.newLinkedList(); + boolean isLastEvent = true; + + String auxiliaryService = + conf.get( + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + + int[] numRecordsPerPartition = kvWriter.getNumRecordsPerPartition(); + + CelebornTezPerPartitionRecord celebornTezPerPartitionRecord = + new CelebornTezPerPartitionRecord(numOutputs, numRecordsPerPartition); + + ShuffleUtils.generateEventOnSpill( + eventList, + true, + isLastEvent, + getContext(), + 0, + celebornTezPerPartitionRecord, + getNumPhysicalOutputs(), + sendEmptyPartitionDetails, + getContext().getUniqueIdentifier(), + kvWriter.getPartitionStats(), + kvWriter.reportDetailedPartitionStats(), + auxiliaryService, + deflater); + LOG.info("Generate events."); + return eventList; + } +} diff --git a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornTezPerPartitionRecord.java b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornTezPerPartitionRecord.java new file mode 100644 index 00000000000..6a8e4e5769f --- /dev/null +++ b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornTezPerPartitionRecord.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.sort; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; +import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; + +public class CelebornTezPerPartitionRecord extends TezSpillRecord { + private int numPartitions; + private int[] numRecordsPerPartition; + + public CelebornTezPerPartitionRecord(int numPartitions) { + super(numPartitions); + this.numPartitions = numPartitions; + } + + public CelebornTezPerPartitionRecord(int numPartitions, int[] numRecordsPerPartition) { + super(numPartitions); + this.numPartitions = numPartitions; + this.numRecordsPerPartition = numRecordsPerPartition; + } + + public CelebornTezPerPartitionRecord(Path indexFileName, Configuration job) throws IOException { + super(indexFileName, job); + } + + @Override + public int size() { + return numPartitions; + } + + @Override + public CelebornTezIndexRecord getIndex(int i) { + int records = numRecordsPerPartition[i]; + CelebornTezIndexRecord celebornTezIndexRecord = new CelebornTezIndexRecord(); + celebornTezIndexRecord.setData(!(records == 0)); + return celebornTezIndexRecord; + } + + static class CelebornTezIndexRecord extends TezIndexRecord { + private boolean hasData; + + private void setData(boolean hasData) { + this.hasData = hasData; + } + + @Override + public boolean hasData() { + return hasData; + } + } +}