Skip to content

Commit

Permalink
Add configuration to exclude topic on file path (#1172)
Browse files Browse the repository at this point in the history
* Add configuration to exclude topic on file path

* add docs
  • Loading branch information
shibd authored Jan 10, 2025
1 parent 7ddce6a commit d0450e2
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 13 deletions.
1 change: 1 addition & 0 deletions docs/aws-s3-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ Before using the AWS S3 sink connector, you need to configure it. This table out
| `timePartitionPattern` | String | False | false | "yyyy-MM-dd" | The format pattern of the time-based partitioning. For details, refer to the Java date and time format. |
| `timePartitionDuration` | String | False | false | "86400000" | The time interval for time-based partitioning. Support formatted interval string, such as `30d`, `24h`, `30m`, `10s`, and also support number in milliseconds precision, such as `86400000` refers to `24h` or `1d`. |
| `pathPrefix` | String | False | false | false | If it is set, the output files are stored in a folder under the given bucket path. The `pathPrefix` must be in the format of `xx/xxx/`. |
| `partitionerWithTopicName` | Boolean | False | false | true | Indicates whether to include the topic name in the file path. Default is true. If not included, the path like: `pathPrefix/24.45.0.json` |
| `partitionerUseIndexAsOffset` | Boolean | False | false | false | Whether to use the Pulsar's message index as offset or the record sequence. It's recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it's not present on the record, the sequence will be used. See [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata) for more details. |
| `withTopicPartitionNumber` | Boolean | False | false | true | When it is set to `true`, include the topic partition number to the object path. |
| `sliceTopicPartitionPath` | Boolean | False | false | false | When it is set to `true`, split the partitioned topic name into separate folders in the bucket path. |
Expand Down
1 change: 1 addition & 0 deletions docs/azure-blob-storage-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ Before using the Azure Blob Storage sink connector, you need to configure it. Th
| `timePartitionPattern` | String | False | false | "yyyy-MM-dd" | The format pattern of the time-based partitioning. For details, refer to the Java date and time format. |
| `timePartitionDuration` | String | False | false | "86400000" | The time interval for time-based partitioning. Support formatted interval string, such as `30d`, `24h`, `30m`, `10s`, and also support number in milliseconds precision, such as `86400000` refers to `24h` or `1d`. |
| `pathPrefix` | String | False | false | false | If it is set, the output files are stored in a folder under the given bucket path. The `pathPrefix` must be in the format of `xx/xxx/`. |
| `partitionerWithTopicName` | Boolean | False | false | true | Indicates whether to include the topic name in the file path. Default is true. If not included, the path like: `pathPrefix/24.45.0.json` |
| `partitionerUseIndexAsOffset` | Boolean | False | false | false | Whether to use the Pulsar's message index as offset or the record sequence. It's recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it's not present on the record, the sequence will be used. See [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata) for more details. |
| `withTopicPartitionNumber` | Boolean | False | false | true | When it is set to `true`, include the topic partition number to the object path. |
| `sliceTopicPartitionPath` | Boolean | False | false | false | When it is set to `true`, split the partitioned topic name into separate folders in the bucket path. |
Expand Down
1 change: 1 addition & 0 deletions docs/google-cloud-storage-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Before using the Google Cloud Storage sink connector, you need to configure it.
| `timePartitionPattern` | String | False | false | "yyyy-MM-dd" | The format pattern of the time-based partitioning. For details, refer to the Java date and time format. |
| `timePartitionDuration` | String | False | false | "86400000" | The time interval for time-based partitioning. Support formatted interval string, such as `30d`, `24h`, `30m`, `10s`, and also support number in milliseconds precision, such as `86400000` refers to `24h` or `1d`. |
| `pathPrefix` | String | False | false | false | If it is set, the output files are stored in a folder under the given bucket path. The `pathPrefix` must be in the format of `xx/xxx/`. |
| `partitionerWithTopicName` | Boolean | False | false | true | Indicates whether to include the topic name in the file path. Default is true. If not included, the path like: `pathPrefix/24.45.0.json` |
| `partitionerUseIndexAsOffset` | Boolean | False | false | false | Whether to use the Pulsar's message index as offset or the record sequence. It's recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it's not present on the record, the sequence will be used. See [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata) for more details. |
| `withTopicPartitionNumber` | Boolean | False | false | true | When it is set to `true`, include the topic partition number to the object path. |
| `sliceTopicPartitionPath` | Boolean | False | false | false | When it is set to `true`, split the partitioned topic name into separate folders in the bucket path. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class BlobStoreAbstractConfig implements Serializable {
private String partitionerType;
private String pathPrefix;
private boolean withTopicPartitionNumber = true;
private boolean partitionerWithTopicName = true;
private boolean partitionerUseIndexAsOffset;
private String timePartitionPattern;
private String timePartitionDuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,34 +41,36 @@ public abstract class AbstractPartitioner<T> implements Partitioner<T> {

private boolean sliceTopicPartitionPath;
private boolean withTopicPartitionNumber;
private boolean partitionerWithTopicName;
private boolean useIndexAsOffset;

@Override
public void configure(BlobStoreAbstractConfig config) {
this.sliceTopicPartitionPath = config.isSliceTopicPartitionPath();
this.withTopicPartitionNumber = config.isWithTopicPartitionNumber();
this.useIndexAsOffset = config.isPartitionerUseIndexAsOffset();
this.partitionerWithTopicName = config.isPartitionerWithTopicName();
}

@Override
public String generatePartitionedPath(String topic, String encodedPartition) {

List<String> joinList = new ArrayList<>();
TopicName topicName = TopicName.get(topic);
joinList.add(topicName.getTenant());
joinList.add(topicName.getNamespacePortion());

if (topicName.isPartitioned() && withTopicPartitionNumber) {
if (sliceTopicPartitionPath) {
if (partitionerWithTopicName) {
TopicName topicName = TopicName.get(topic);
joinList.add(topicName.getTenant());
joinList.add(topicName.getNamespacePortion());
if (topicName.isPartitioned() && withTopicPartitionNumber) {
if (sliceTopicPartitionPath) {
TopicName newTopicName = TopicName.get(topicName.getPartitionedTopicName());
joinList.add(newTopicName.getLocalName());
joinList.add(Integer.toString(topicName.getPartitionIndex()));
} else {
joinList.add(topicName.getLocalName());
}
} else {
TopicName newTopicName = TopicName.get(topicName.getPartitionedTopicName());
joinList.add(newTopicName.getLocalName());
joinList.add(Integer.toString(topicName.getPartitionIndex()));
} else {
joinList.add(topicName.getLocalName());
}
} else {
TopicName newTopicName = TopicName.get(topicName.getPartitionedTopicName());
joinList.add(newTopicName.getLocalName());
}
joinList.add(encodedPartition);
return StringUtils.join(joinList, PATH_SEPARATOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void loadBasicConfigTest() throws IOException {
Assert.assertEquals(config.get("timePartitionPattern"), cloudStorageSinkConfig.getTimePartitionPattern());
Assert.assertEquals(config.get("timePartitionDuration"), cloudStorageSinkConfig.getTimePartitionDuration());
Assert.assertEquals(config.get("batchSize"), cloudStorageSinkConfig.getBatchSize());
Assert.assertTrue(cloudStorageSinkConfig.isPartitionerWithTopicName());
Assert.assertEquals(10000000L, cloudStorageSinkConfig.getMaxBatchBytes());
}

Expand Down Expand Up @@ -243,6 +244,7 @@ public void byteConfigTest() throws IOException {
config.put("timePartitionDuration", "2d");
config.put("batchSize", 10);
config.put("bytesFormatTypeSeparator", "0x10");
config.put("partitionerWithTopicName", "false");
CloudStorageSinkConfig cloudStorageSinkConfig = CloudStorageSinkConfig.load(config);
cloudStorageSinkConfig.validate();

Expand All @@ -258,6 +260,7 @@ public void byteConfigTest() throws IOException {
Assert.assertEquals(config.get("batchSize"), cloudStorageSinkConfig.getBatchSize());
Assert.assertEquals(config.get("bytesFormatTypeSeparator"),
cloudStorageSinkConfig.getBytesFormatTypeSeparator());
Assert.assertFalse(cloudStorageSinkConfig.isPartitionerWithTopicName());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ public static Object[][] data() {
numberConfig.setTimePartitionPattern("yyyy-MM-dd-HH");
TimePartitioner<Object> numberPartitioner = new TimePartitioner<>();
numberPartitioner.configure(numberConfig);

BlobStoreAbstractConfig withoutTopicNameConfig = new BlobStoreAbstractConfig();
withoutTopicNameConfig.setPartitionerWithTopicName(false);
SimplePartitioner<Object> simpleWithoutTopicNamePartitioner = new SimplePartitioner<>();
simpleWithoutTopicNamePartitioner.configure(withoutTopicNameConfig);

BlobStoreAbstractConfig withoutTopicNameConfig2 = new BlobStoreAbstractConfig();
withoutTopicNameConfig2.setPartitionerWithTopicName(false);
withoutTopicNameConfig2.setTimePartitionDuration("7200000");
withoutTopicNameConfig2.setTimePartitionPattern("yyyy-MM-dd-HH");
TimePartitioner<Object> timeWithoutTopicNamePartitioner = new TimePartitioner<>();
timeWithoutTopicNamePartitioner.configure(withoutTopicNameConfig2);
return new Object[][]{
new Object[]{
simplePartitioner,
Expand Down Expand Up @@ -146,6 +158,18 @@ public static Object[][] data() {
+ Partitioner.PATH_SEPARATOR + testMsgIdFileName,
getPartitionedTopic()
},
new Object[]{
simpleWithoutTopicNamePartitioner,
testMsgIdFileName,
testMsgIdFileName,
getTopic()
},
new Object[]{
timeWithoutTopicNamePartitioner,
"2020-09-08-14" + Partitioner.PATH_SEPARATOR + testMsgIdFileName,
"2020-09-08-14" + Partitioner.PATH_SEPARATOR + testMsgIdFileName,
getPartitionedTopic()
},
};
}

Expand Down
Loading

0 comments on commit d0450e2

Please sign in to comment.