Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/linkedin/brooklin into Ev…
Browse files Browse the repository at this point in the history
…entSourceTimestampFeature
  • Loading branch information
sanjay24 committed Apr 13, 2020
2 parents 9a00c2a + 963bf43 commit 23031e6
Showing 1 changed file with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class DatastreamTaskImpl implements DatastreamTask {

// List of partitions the task covers.
private List<Integer> _partitions;
// This list is used to save topic-partition list when partition assignment is enabled.
// TODO: Investigate the requirement to populate both _partition and _partitionV2 and cleanup if required.
private List<String> _partitionsV2;

private List<String> _dependencies;
Expand Down Expand Up @@ -436,8 +438,21 @@ public int hashCode() {
@Override
public String toString() {
// toString() is mainly for logging purpose, feel free to modify the content/format
return String.format("%s(%s), partitionsV2=%s, partitions=%s, dependencies=%s", getDatastreamTaskName(), _connectorType,
String.join(",", _partitionsV2), LogUtils.logNumberArrayInRange(_partitions), _dependencies);
// When DatastreamTaskImpl is created using constructor that passes _partitionsV2, _partitions is not populated.
// When DatastreamTaskImpl is created using constructor that passes _partitions, _partitionsV2 is also populated.
// So, if _partitions is not populated, we can assume that only _partitionsV2 is populated.
String partitionsV2FormatLog = String.join(",", _partitionsV2);
if (_partitions.size() > 0) {
try {
List<Integer> partitionList = _partitionsV2.stream().map(Integer::parseInt).collect(Collectors.toList());
partitionsV2FormatLog = LogUtils.logNumberArrayInRange(partitionList);
} catch (NumberFormatException e) {
LOG.error(e.getMessage());
}
}
return String.format("%s(%s), partitionsV2=%s, partitions=%s, dependencies=%s", getDatastreamTaskName(),
_connectorType, partitionsV2FormatLog, LogUtils.logNumberArrayInRange(_partitions),
_dependencies);
}

public void setZkAdapter(ZkAdapter adapter) {
Expand Down

0 comments on commit 23031e6

Please sign in to comment.