From 9f6473d9098012a68b3316e2ae43c1abd866ae21 Mon Sep 17 00:00:00 2001 From: hongguangwei Date: Tue, 3 Dec 2024 17:05:38 +0800 Subject: [PATCH] [CELEBORN-1731]Support merged kv input for Tez --- ...lebornConcatenatedMergedKeyValueInput.java | 109 ++++++++ ...ebornConcatenatedMergedKeyValuesInput.java | 110 ++++++++ .../CelebornOrderedGroupedMergedKVInput.java | 245 ++++++++++++++++++ 3 files changed, 464 insertions(+) create mode 100644 client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValueInput.java create mode 100644 client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValuesInput.java create mode 100644 client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedMergedKVInput.java diff --git a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValueInput.java b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValueInput.java new file mode 100644 index 0000000000..8e534ac5df --- /dev/null +++ b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValueInput.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.input; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.tez.dag.api.GroupInputEdge; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.runtime.api.*; +import org.apache.tez.runtime.library.api.KeyValueReader; + +/** + * Implements a {@link MergedLogicalInput} that merges the incoming inputs (e.g. from a {@link + * GroupInputEdge} and provide a unified view of the input. It concatenates all the inputs to + * provide a unified view + */ +@Public +public class CelebornConcatenatedMergedKeyValueInput extends MergedLogicalInput { + private ConcatenatedMergedKeyValueReader concatenatedMergedKeyValueReader; + + public CelebornConcatenatedMergedKeyValueInput(MergedInputContext context, List inputs) { + super(context, inputs); + } + + public class ConcatenatedMergedKeyValueReader extends KeyValueReader { + private int currentReaderIndex = 0; + private KeyValueReader currentReader; + + @Override + public boolean next() throws IOException { + while ((currentReader == null) || !currentReader.next()) { + if (currentReaderIndex == getInputs().size()) { + hasCompletedProcessing(); + completedProcessing = true; + getContext().notifyProgress(); + return false; + } + try { + Reader reader = getInputs().get(currentReaderIndex).getReader(); + if (!(reader instanceof KeyValueReader)) { + throw new TezUncheckedException( + "Expected KeyValueReader. " + "Got: " + reader.getClass().getName()); + } + currentReader = (KeyValueReader) reader; + currentReaderIndex++; + getContext().notifyProgress(); + } catch (Exception e) { + // An InterruptedException is not expected here since this works off of + // underlying readers which take care of throwing IOInterruptedExceptions + if (e instanceof IOException) { + throw (IOException) e; + } else { + throw new IOException(e); + } + } + } + return true; + } + + @Override + public Object getCurrentKey() throws IOException { + return currentReader.getCurrentKey(); + } + + @Override + public Object getCurrentValue() throws IOException { + return currentReader.getCurrentValue(); + } + + public float getProgress() throws IOException, InterruptedException { + return (1.0f) * (currentReaderIndex + 1) / getInputs().size(); + } + } + + /** Provides a {@link KeyValueReader} that iterates over the concatenated input data */ + @Override + public KeyValueReader getReader() throws Exception { + concatenatedMergedKeyValueReader = new ConcatenatedMergedKeyValueReader(); + return concatenatedMergedKeyValueReader; + } + + @Override + public void setConstituentInputIsReady(Input input) { + informInputReady(); + } + + @Override + public float getProgress() throws ProgressFailedException, InterruptedException { + try { + return concatenatedMergedKeyValueReader.getProgress(); + } catch (IOException e) { + throw new ProgressFailedException("getProgress encountered IOException ", e); + } + } +} diff --git a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValuesInput.java b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValuesInput.java new file mode 100644 index 0000000000..ca2dd97a98 --- /dev/null +++ b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValuesInput.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.input; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.tez.dag.api.GroupInputEdge; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.runtime.api.*; +import org.apache.tez.runtime.library.api.KeyValuesReader; + +/** + * Implements a {@link MergedLogicalInput} that merges the incoming inputs (e.g. from a {@link + * GroupInputEdge} and provide a unified view of the input. It concatenates all the inputs to + * provide a unified view + */ +@Public +public class CelebornConcatenatedMergedKeyValuesInput extends MergedLogicalInput { + + private ConcatenatedMergedKeyValuesReader concatenatedMergedKeyValuesReader; + + public CelebornConcatenatedMergedKeyValuesInput(MergedInputContext context, List inputs) { + super(context, inputs); + } + + public class ConcatenatedMergedKeyValuesReader extends KeyValuesReader { + private int currentReaderIndex = 0; + private KeyValuesReader currentReader; + + @Override + public boolean next() throws IOException { + while ((currentReader == null) || !currentReader.next()) { + if (currentReaderIndex == getInputs().size()) { + hasCompletedProcessing(); + completedProcessing = true; + getContext().notifyProgress(); + return false; + } + try { + Reader reader = getInputs().get(currentReaderIndex).getReader(); + if (!(reader instanceof KeyValuesReader)) { + throw new TezUncheckedException( + "Expected KeyValuesReader. " + "Got: " + reader.getClass().getName()); + } + currentReader = (KeyValuesReader) reader; + currentReaderIndex++; + getContext().notifyProgress(); + } catch (Exception e) { + // An InterruptedException is not expected here since this works off of + // underlying readers which take care of throwing IOInterruptedExceptions + if (e instanceof IOException) { + throw (IOException) e; + } else { + throw new IOException(e); + } + } + } + return true; + } + + @Override + public Object getCurrentKey() throws IOException { + return currentReader.getCurrentKey(); + } + + @Override + public Iterable getCurrentValues() throws IOException { + return currentReader.getCurrentValues(); + } + + public float getProgress() throws IOException, InterruptedException { + return (1.0f) * (currentReaderIndex + 1) / getInputs().size(); + } + } + + /** Provides a {@link KeyValuesReader} that iterates over the concatenated input data */ + @Override + public KeyValuesReader getReader() throws Exception { + concatenatedMergedKeyValuesReader = new ConcatenatedMergedKeyValuesReader(); + return concatenatedMergedKeyValuesReader; + } + + @Override + public void setConstituentInputIsReady(Input input) { + informInputReady(); + } + + @Override + public float getProgress() throws ProgressFailedException, InterruptedException { + try { + return concatenatedMergedKeyValuesReader.getProgress(); + } catch (IOException e) { + throw new ProgressFailedException("getProgress encountered IOException ", e); + } + } +} diff --git a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedMergedKVInput.java b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedMergedKVInput.java new file mode 100644 index 0000000000..3d5d5a68ac --- /dev/null +++ b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedMergedKVInput.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.input; + +import java.io.IOException; +import java.util.*; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.io.RawComparator; +import org.apache.tez.runtime.api.Input; +import org.apache.tez.runtime.api.MergedInputContext; +import org.apache.tez.runtime.api.MergedLogicalInput; +import org.apache.tez.runtime.api.ProgressFailedException; +import org.apache.tez.runtime.library.api.KeyValuesReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link MergedLogicalInput} which merges multiple {@link OrderedGroupedKVInput}s and returns a + * single view of these by merging values which belong to the same key. + * + *

Combiners and Secondary Sort are not implemented, so there is no guarantee on the order of + * values. + */ +@Public +public class CelebornOrderedGroupedMergedKVInput extends MergedLogicalInput { + + private static final Logger LOG = + LoggerFactory.getLogger(CelebornOrderedGroupedMergedKVInput.class); + private final Set completedInputs = + Collections.newSetFromMap(new IdentityHashMap()); + + public CelebornOrderedGroupedMergedKVInput(MergedInputContext context, List inputs) { + super(context, inputs); + } + + /** Provides an ordered {@link KeyValuesReader} */ + @Override + public KeyValuesReader getReader() throws Exception { + return new OrderedGroupedMergedKeyValuesReader(getInputs(), getContext()); + } + + @Override + public void setConstituentInputIsReady(Input input) { + synchronized (completedInputs) { + completedInputs.add(input); + if (completedInputs.size() == getInputs().size()) { + informInputReady(); + } + } + } + + private static class OrderedGroupedMergedKeyValuesReader extends KeyValuesReader { + private final PriorityQueue pQueue; + + @SuppressWarnings("rawtypes") + private final RawComparator keyComparator; + + private final List finishedReaders; + private final ValuesIterable currentValues; + private KeyValuesReader nextKVReader; + private Object currentKey; + private final MergedInputContext context; + + public OrderedGroupedMergedKeyValuesReader(List inputs, MergedInputContext context) + throws Exception { + keyComparator = ((OrderedGroupedKVInput) inputs.get(0)).getInputKeyComparator(); + pQueue = + new PriorityQueue(inputs.size(), new KVReaderComparator(keyComparator)); + finishedReaders = new ArrayList(inputs.size()); + for (Input input : inputs) { + KeyValuesReader reader = (KeyValuesReader) input.getReader(); + if (reader.next()) { + pQueue.add(reader); + } + } + currentValues = new ValuesIterable(); + this.context = context; + } + + private void advanceAndAddToQueue(KeyValuesReader kvsReadr) throws IOException { + if (kvsReadr.next()) { + pQueue.add(kvsReadr); + } + } + + private void addToQueue(KeyValuesReader kvsReadr) throws IOException { + if (kvsReadr != null) { + pQueue.add(kvsReadr); + } + } + + @Override + public boolean next() throws IOException { + // Skip values of current key if not consumed by the user + currentValues.discardCurrent(); + + for (KeyValuesReader reader : finishedReaders) { + // add them back to queue + advanceAndAddToQueue(reader); + } + finishedReaders.clear(); + + nextKVReader = pQueue.poll(); + context.notifyProgress(); + if (nextKVReader != null) { + currentKey = nextKVReader.getCurrentKey(); + currentValues.moveToNext(); + return true; + } else { + hasCompletedProcessing(); + completedProcessing = true; + } + return false; + } + + @Override + public Object getCurrentKey() throws IOException { + return currentKey; + } + + @Override + public Iterable getCurrentValues() throws IOException { + return currentValues; + } + + private class ValuesIterable implements Iterable { + private ValuesIterator iterator = new ValuesIterator(); + + @Override + public Iterator iterator() { + return iterator; + } + + public void discardCurrent() throws IOException { + iterator.discardCurrent(); + } + + public void moveToNext() throws IOException { + iterator.moveToNext(); + } + } + + @SuppressWarnings("unchecked") + private class ValuesIterator implements Iterator { + + private Iterator currentValuesIter; + + public void moveToNext() throws IOException { + currentValuesIter = nextKVReader.getCurrentValues().iterator(); + } + + @Override + public boolean hasNext() { + if (currentValuesIter != null) { // No current key. next needs to be called. + if (currentValuesIter.hasNext()) { + return true; + } else { + finishedReaders.add(nextKVReader); + nextKVReader = pQueue.poll(); + try { + if (nextKVReader != null + && keyComparator.compare(currentKey, nextKVReader.getCurrentKey()) == 0) { + currentValuesIter = nextKVReader.getCurrentValues().iterator(); + return true; + } else { // key changed or no more data. + // Add the reader back to queue + addToQueue(nextKVReader); + currentValuesIter = null; + return false; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } else { + return false; + } + } + + public void discardCurrent() throws IOException { + if (currentValuesIter != null) { + do { + finishedReaders.add(nextKVReader); + nextKVReader = pQueue.poll(); + } while (nextKVReader != null + && keyComparator.compare(currentKey, nextKVReader.getCurrentKey()) == 0); + addToQueue(nextKVReader); + currentValuesIter = null; + } + } + + @Override + public Object next() { + return currentValuesIter.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + /** Comparator that compares KeyValuesReader on their current key */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private static class KVReaderComparator implements Comparator { + + private RawComparator keyComparator; + + public KVReaderComparator(RawComparator keyComparator) { + this.keyComparator = keyComparator; + } + + @Override + public int compare(KeyValuesReader o1, KeyValuesReader o2) { + try { + return keyComparator.compare(o1.getCurrentKey(), o2.getCurrentKey()); + } catch (IOException e) { + LOG.error("Caught exception while comparing keys in shuffle input", e); + throw new RuntimeException(e); + } + } + } + } + + public float getProgress() throws ProgressFailedException, InterruptedException { + float totalProgress = 0.0f; + for (Input input : getInputs()) { + totalProgress += ((OrderedGroupedKVInput) input).getProgress(); + } + return (1.0f) * totalProgress / getInputs().size(); + } +}