-
Notifications
You must be signed in to change notification settings - Fork 361
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[CELEBORN-1731] Support merged kv input for Tez
### What changes were proposed in this pull request? Add MergedKVInput ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #2974 from GH-Gloway/1731. Authored-by: hongguangwei <[email protected]> Signed-off-by: mingji <[email protected]>
- Loading branch information
Showing
3 changed files
with
464 additions
and
0 deletions.
There are no files selected for viewing
109 changes: 109 additions & 0 deletions
109
...in/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValueInput.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license | ||
* agreements. See the NOTICE file distributed with this work for additional information regarding | ||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance with the License. You may obtain a | ||
* copy of the License at | ||
* | ||
* <p>http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* <p>Unless required by applicable law or agreed to in writing, software distributed under the | ||
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.tez.runtime.library.input; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
|
||
import org.apache.hadoop.classification.InterfaceAudience.Public; | ||
import org.apache.tez.dag.api.GroupInputEdge; | ||
import org.apache.tez.dag.api.TezUncheckedException; | ||
import org.apache.tez.runtime.api.*; | ||
import org.apache.tez.runtime.library.api.KeyValueReader; | ||
|
||
/** | ||
* Implements a {@link MergedLogicalInput} that merges the incoming inputs (e.g. from a {@link | ||
* GroupInputEdge} and provide a unified view of the input. It concatenates all the inputs to | ||
* provide a unified view | ||
*/ | ||
@Public | ||
public class CelebornConcatenatedMergedKeyValueInput extends MergedLogicalInput { | ||
private ConcatenatedMergedKeyValueReader concatenatedMergedKeyValueReader; | ||
|
||
public CelebornConcatenatedMergedKeyValueInput(MergedInputContext context, List<Input> inputs) { | ||
super(context, inputs); | ||
} | ||
|
||
public class ConcatenatedMergedKeyValueReader extends KeyValueReader { | ||
private int currentReaderIndex = 0; | ||
private KeyValueReader currentReader; | ||
|
||
@Override | ||
public boolean next() throws IOException { | ||
while ((currentReader == null) || !currentReader.next()) { | ||
if (currentReaderIndex == getInputs().size()) { | ||
hasCompletedProcessing(); | ||
completedProcessing = true; | ||
getContext().notifyProgress(); | ||
return false; | ||
} | ||
try { | ||
Reader reader = getInputs().get(currentReaderIndex).getReader(); | ||
if (!(reader instanceof KeyValueReader)) { | ||
throw new TezUncheckedException( | ||
"Expected KeyValueReader. " + "Got: " + reader.getClass().getName()); | ||
} | ||
currentReader = (KeyValueReader) reader; | ||
currentReaderIndex++; | ||
getContext().notifyProgress(); | ||
} catch (Exception e) { | ||
// An InterruptedException is not expected here since this works off of | ||
// underlying readers which take care of throwing IOInterruptedExceptions | ||
if (e instanceof IOException) { | ||
throw (IOException) e; | ||
} else { | ||
throw new IOException(e); | ||
} | ||
} | ||
} | ||
return true; | ||
} | ||
|
||
@Override | ||
public Object getCurrentKey() throws IOException { | ||
return currentReader.getCurrentKey(); | ||
} | ||
|
||
@Override | ||
public Object getCurrentValue() throws IOException { | ||
return currentReader.getCurrentValue(); | ||
} | ||
|
||
public float getProgress() throws IOException, InterruptedException { | ||
return (1.0f) * (currentReaderIndex + 1) / getInputs().size(); | ||
} | ||
} | ||
|
||
/** Provides a {@link KeyValueReader} that iterates over the concatenated input data */ | ||
@Override | ||
public KeyValueReader getReader() throws Exception { | ||
concatenatedMergedKeyValueReader = new ConcatenatedMergedKeyValueReader(); | ||
return concatenatedMergedKeyValueReader; | ||
} | ||
|
||
@Override | ||
public void setConstituentInputIsReady(Input input) { | ||
informInputReady(); | ||
} | ||
|
||
@Override | ||
public float getProgress() throws ProgressFailedException, InterruptedException { | ||
try { | ||
return concatenatedMergedKeyValueReader.getProgress(); | ||
} catch (IOException e) { | ||
throw new ProgressFailedException("getProgress encountered IOException ", e); | ||
} | ||
} | ||
} |
110 changes: 110 additions & 0 deletions
110
...n/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValuesInput.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license | ||
* agreements. See the NOTICE file distributed with this work for additional information regarding | ||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance with the License. You may obtain a | ||
* copy of the License at | ||
* | ||
* <p>http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* <p>Unless required by applicable law or agreed to in writing, software distributed under the | ||
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.tez.runtime.library.input; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
|
||
import org.apache.hadoop.classification.InterfaceAudience.Public; | ||
import org.apache.tez.dag.api.GroupInputEdge; | ||
import org.apache.tez.dag.api.TezUncheckedException; | ||
import org.apache.tez.runtime.api.*; | ||
import org.apache.tez.runtime.library.api.KeyValuesReader; | ||
|
||
/** | ||
* Implements a {@link MergedLogicalInput} that merges the incoming inputs (e.g. from a {@link | ||
* GroupInputEdge} and provide a unified view of the input. It concatenates all the inputs to | ||
* provide a unified view | ||
*/ | ||
@Public | ||
public class CelebornConcatenatedMergedKeyValuesInput extends MergedLogicalInput { | ||
|
||
private ConcatenatedMergedKeyValuesReader concatenatedMergedKeyValuesReader; | ||
|
||
public CelebornConcatenatedMergedKeyValuesInput(MergedInputContext context, List<Input> inputs) { | ||
super(context, inputs); | ||
} | ||
|
||
public class ConcatenatedMergedKeyValuesReader extends KeyValuesReader { | ||
private int currentReaderIndex = 0; | ||
private KeyValuesReader currentReader; | ||
|
||
@Override | ||
public boolean next() throws IOException { | ||
while ((currentReader == null) || !currentReader.next()) { | ||
if (currentReaderIndex == getInputs().size()) { | ||
hasCompletedProcessing(); | ||
completedProcessing = true; | ||
getContext().notifyProgress(); | ||
return false; | ||
} | ||
try { | ||
Reader reader = getInputs().get(currentReaderIndex).getReader(); | ||
if (!(reader instanceof KeyValuesReader)) { | ||
throw new TezUncheckedException( | ||
"Expected KeyValuesReader. " + "Got: " + reader.getClass().getName()); | ||
} | ||
currentReader = (KeyValuesReader) reader; | ||
currentReaderIndex++; | ||
getContext().notifyProgress(); | ||
} catch (Exception e) { | ||
// An InterruptedException is not expected here since this works off of | ||
// underlying readers which take care of throwing IOInterruptedExceptions | ||
if (e instanceof IOException) { | ||
throw (IOException) e; | ||
} else { | ||
throw new IOException(e); | ||
} | ||
} | ||
} | ||
return true; | ||
} | ||
|
||
@Override | ||
public Object getCurrentKey() throws IOException { | ||
return currentReader.getCurrentKey(); | ||
} | ||
|
||
@Override | ||
public Iterable<Object> getCurrentValues() throws IOException { | ||
return currentReader.getCurrentValues(); | ||
} | ||
|
||
public float getProgress() throws IOException, InterruptedException { | ||
return (1.0f) * (currentReaderIndex + 1) / getInputs().size(); | ||
} | ||
} | ||
|
||
/** Provides a {@link KeyValuesReader} that iterates over the concatenated input data */ | ||
@Override | ||
public KeyValuesReader getReader() throws Exception { | ||
concatenatedMergedKeyValuesReader = new ConcatenatedMergedKeyValuesReader(); | ||
return concatenatedMergedKeyValuesReader; | ||
} | ||
|
||
@Override | ||
public void setConstituentInputIsReady(Input input) { | ||
informInputReady(); | ||
} | ||
|
||
@Override | ||
public float getProgress() throws ProgressFailedException, InterruptedException { | ||
try { | ||
return concatenatedMergedKeyValuesReader.getProgress(); | ||
} catch (IOException e) { | ||
throw new ProgressFailedException("getProgress encountered IOException ", e); | ||
} | ||
} | ||
} |
Oops, something went wrong.