Skip to content

Commit fafc2c5

Browse files
authored
Merge pull request #43 from SourceLabOrg/sp/newEndPoints
add support for new kafka connect reset end points
2 parents e778172 + 3f51c5c commit fafc2c5

File tree

11 files changed

+381
-14
lines changed

11 files changed

+381
-14
lines changed

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,18 @@
22
The format is based on [Keep a Changelog](http://keepachangelog.com/)
33
and this project adheres to [Semantic Versioning](http://semver.org/).
44

5+
## 3.1.0 (05/10/2020)
6+
7+
### New Features
8+
- Add support for `/connectors/<connector-name>/topics` and `/connectors/<connector-name>/topics/reset` endpoints
9+
added in Kafka 2.5.0 via `getConnectorTopics()` and `resetConnectorTopics()` methods on KafkaConnectClient.
10+
11+
#### Internal Dependency Updates
12+
- com.google.guava:guava from 28.2-jre -> 29.0-jre
13+
- org.apache.httpcomponents from 4.5.11 -> 4.5.12
14+
- com.fasterxml.jackson.core from 2.10.2 -> 2.10.4
15+
- Checkstyle plugin from 8.24 -> 8.32
16+
517
## 3.0.0 (03/20/2020)
618

719
#### Possible Breaking Dependency Change

build/checkstyle-v1.5.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,11 @@
156156
<module name="JavadocMethod">
157157
<property name="scope" value="public"/>
158158
<property name="allowMissingParamTags" value="true"/>
159-
<property name="allowMissingThrowsTags" value="true"/>
160159
<property name="allowMissingReturnTag" value="true"/>
161-
<property name="minLineCount" value="2"/>
162160
<property name="allowedAnnotations" value="Override, Test"/>
163-
<property name="allowThrowsTagsForSubclasses" value="true"/>
161+
</module>
162+
<module name="MissingJavadocMethod">
163+
<property name="minLineCount" value="2"/>
164164
</module>
165165
<module name="JavadocStyle">
166166
<property name="scope" value="public"/>

pom.xml

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>org.sourcelab</groupId>
88
<artifactId>kafka-connect-client</artifactId>
9-
<version>3.0.0</version>
9+
<version>3.1.0</version>
1010
<packaging>jar</packaging>
1111

1212
<!-- Require Maven 3.3.9 -->
@@ -47,24 +47,24 @@
4747
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
4848

4949
<!-- guava version -->
50-
<guava.version>28.2-jre</guava.version>
50+
<guava.version>29.0-jre</guava.version>
5151

5252
<!-- Http Components version -->
53-
<http-components.version>4.5.11</http-components.version>
53+
<http-components.version>4.5.12</http-components.version>
5454

5555
<!-- Jackson version -->
56-
<jackson.version>2.10.2</jackson.version>
56+
<jackson.version>2.10.4</jackson.version>
5757

5858
<!-- Define which version of junit you'll be running -->
5959
<junit.version>4.12</junit.version>
6060

6161
<!-- Specify which Checkstyle ruleset to use -->
6262
<checkstyle.ruleset>build/checkstyle-v1.5.xml</checkstyle.ruleset>
63-
<checkstyle.plugin.version>3.0.0</checkstyle.plugin.version>
64-
<checkstyle.version>8.24</checkstyle.version>
63+
<checkstyle.plugin.version>3.1.1</checkstyle.plugin.version>
64+
<checkstyle.version>8.32</checkstyle.version>
6565

6666
<!-- Log4J Version -->
67-
<log4j2.version>2.12.1</log4j2.version>
67+
<log4j2.version>2.13.2</log4j2.version>
6868
<slf4j.version>1.7.30</slf4j.version>
6969

7070
<!-- test toggling -->
@@ -156,7 +156,7 @@
156156
<dependency>
157157
<groupId>org.eclipse.jetty</groupId>
158158
<artifactId>jetty-server</artifactId>
159-
<version>9.4.27.v20200227</version>
159+
<version>9.4.28.v20200408</version>
160160
<scope>test</scope>
161161
</dependency>
162162

@@ -187,7 +187,7 @@
187187
<plugin>
188188
<groupId>org.apache.maven.plugins</groupId>
189189
<artifactId>maven-compiler-plugin</artifactId>
190-
<version>3.6.1</version>
190+
<version>3.8.1</version>
191191
<configuration>
192192
<source>1.8</source>
193193
<target>1.8</target>
@@ -343,7 +343,7 @@
343343
<plugin>
344344
<groupId>org.apache.maven.plugins</groupId>
345345
<artifactId>maven-source-plugin</artifactId>
346-
<version>3.0.1</version>
346+
<version>3.2.1</version>
347347
<executions>
348348
<execution>
349349
<id>attach-sources</id>
@@ -358,7 +358,7 @@
358358
<plugin>
359359
<groupId>org.apache.maven.plugins</groupId>
360360
<artifactId>maven-javadoc-plugin</artifactId>
361-
<version>3.0.1</version>
361+
<version>3.2.0</version>
362362
<executions>
363363
<execution>
364364
<id>attach-javadocs</id>

src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPluginConfigDefinition;
3333
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPluginConfigValidationResults;
3434
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorStatus;
35+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorTopics;
3536
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorsWithExpandedInfo;
3637
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorsWithExpandedMetadata;
3738
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorsWithExpandedStatus;
@@ -45,6 +46,7 @@
4546
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorStatus;
4647
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorTaskStatus;
4748
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorTasks;
49+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorTopics;
4850
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectors;
4951
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorsExpandAllDetails;
5052
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorsExpandInfo;
@@ -56,6 +58,7 @@
5658
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorPause;
5759
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorPluginConfigValidate;
5860
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorResume;
61+
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorTopicsReset;
5962
import org.sourcelab.kafka.connect.apiclient.rest.HttpClientRestClient;
6063
import org.sourcelab.kafka.connect.apiclient.rest.RestClient;
6164
import org.sourcelab.kafka.connect.apiclient.rest.RestResponse;
@@ -112,6 +115,7 @@ public KafkaConnectClient(final Configuration configuration, final RestClient re
112115

113116
/**
114117
* Retrieve details about the Kafka-Connect service itself.
118+
* https://docs.confluent.io/current/connect/references/restapi.html#get--
115119
* @return ConnectServerVersion
116120
*/
117121
public ConnectServerVersion getConnectServerVersion() {
@@ -197,6 +201,30 @@ public ConnectorStatus getConnectorStatus(final String connectorName) {
197201
return submitRequest(new GetConnectorStatus(connectorName));
198202
}
199203

204+
/**
205+
* Get the set of topics that a specific connector is using since the connector was created or since a request
206+
* to reset its set of active topics was issued.
207+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors-(string-name)-topics
208+
*
209+
* Requires Kafka-Connect 2.5.0+
210+
*
211+
* @param connectorName Name of connector.
212+
*/
213+
public ConnectorTopics getConnectorTopics(final String connectorName) {
214+
return submitRequest(new GetConnectorTopics(connectorName));
215+
}
216+
217+
/**
218+
* Send a request to empty the set of active topics of a connector.
219+
* https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-topics-reset
220+
* Requires Kafka-Connect 2.5.0+
221+
*
222+
* @param connectorName Name of connector.
223+
*/
224+
public boolean resetConnectorTopics(final String connectorName) {
225+
return submitRequest(new PutConnectorTopicsReset(connectorName));
226+
}
227+
200228
/**
201229
* Create a new connector, returning the current connector info if successful.
202230
* https://docs.confluent.io/current/connect/references/restapi.html#post--connectors
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/**
2+
* Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
7+
* persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package org.sourcelab.kafka.connect.apiclient.request.dto;
19+
20+
import com.fasterxml.jackson.core.JsonParseException;
21+
import com.fasterxml.jackson.core.JsonParser;
22+
import com.fasterxml.jackson.core.JsonProcessingException;
23+
import com.fasterxml.jackson.databind.DeserializationContext;
24+
import com.fasterxml.jackson.databind.JsonNode;
25+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
26+
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
27+
import com.fasterxml.jackson.databind.node.JsonNodeType;
28+
29+
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.Collections;
32+
import java.util.Iterator;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Objects;
36+
import java.util.stream.Collectors;
37+
38+
/**
39+
* Represents result from /connectors/[topic-name]/topics REST end point.
40+
*/
41+
@JsonDeserialize(using = ConnectorTopics.Deserializer.class)
42+
public class ConnectorTopics {
43+
private final String name;
44+
private final List<String> topics;
45+
46+
/**
47+
* Constructor.
48+
* @param name Name of the connector.
49+
* @param topics List of topics.
50+
*/
51+
public ConnectorTopics(final String name, final List<String> topics) {
52+
this.name = Objects.requireNonNull(name);
53+
Objects.requireNonNull(topics);
54+
this.topics = Collections.unmodifiableList(topics.stream().sorted().collect(Collectors.toList()));
55+
}
56+
57+
public String getName() {
58+
return name;
59+
}
60+
61+
public List<String> getTopics() {
62+
return topics;
63+
}
64+
65+
@Override
66+
public String toString() {
67+
return "ConnectorTopics{"
68+
+ "name='" + name + '\''
69+
+ ", topics=" + topics
70+
+ '}';
71+
}
72+
73+
/**
74+
* Deserializer for ConnectorTopics.
75+
*/
76+
public static class Deserializer extends StdDeserializer<ConnectorTopics> {
77+
78+
/**
79+
* Constructor.
80+
*/
81+
public Deserializer() {
82+
this(null);
83+
}
84+
85+
/**
86+
* Constructor.
87+
*/
88+
public Deserializer(final Class<?> vc) {
89+
super(vc);
90+
}
91+
92+
@Override
93+
public ConnectorTopics deserialize(final JsonParser jsonParser, final DeserializationContext ctxt) throws IOException, JsonProcessingException {
94+
final JsonNode node = jsonParser.getCodec().readTree(jsonParser);
95+
final Iterator<Map.Entry<String, JsonNode>> children = node.fields();
96+
97+
while (children.hasNext()) {
98+
final Map.Entry<String, JsonNode> child = children.next();
99+
final String name = child.getKey();
100+
final JsonNode childNode = child.getValue();
101+
102+
// Skip unknown entries
103+
if (JsonNodeType.OBJECT != childNode.getNodeType()) {
104+
continue;
105+
}
106+
107+
// If has no topics key
108+
if (!childNode.has("topics") || JsonNodeType.ARRAY != childNode.get("topics").getNodeType()) {
109+
// Skip?
110+
continue;
111+
}
112+
final List<String> topicNames = new ArrayList<>();
113+
if (!childNode.get("topics").isEmpty()) {
114+
// Parse topic name values out
115+
childNode.get("topics").forEach((entry) -> topicNames.add(entry.textValue()));
116+
}
117+
return new ConnectorTopics(name, topicNames);
118+
}
119+
throw new JsonParseException(jsonParser, "Unable to parse response JSON");
120+
}
121+
}
122+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
7+
* persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package org.sourcelab.kafka.connect.apiclient.request.get;
19+
20+
import org.sourcelab.kafka.connect.apiclient.request.JacksonFactory;
21+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorTopics;
22+
23+
import java.io.IOException;
24+
import java.util.Objects;
25+
26+
import static com.google.common.net.UrlEscapers.urlPathSegmentEscaper;
27+
28+
/**
29+
* Returns a list of connector topic names.
30+
* There is no defined order in which the topics are returned and consecutive calls may return the same topic names
31+
* but in different order. This request is independent of whether a connector is running, and will return an empty set
32+
* of topics, both for connectors that don’t have active topics as well as non-existent connectors.
33+
*
34+
* https://docs.confluent.io/current/connect/references/restapi.html#get--connectors-(string-name)-topics
35+
*/
36+
public class GetConnectorTopics implements GetRequest<ConnectorTopics> {
37+
private final String connectorName;
38+
39+
/**
40+
* Constructor.
41+
* @param connectorName Name of connector.
42+
*/
43+
public GetConnectorTopics(final String connectorName) {
44+
Objects.requireNonNull(connectorName);
45+
this.connectorName = connectorName;
46+
}
47+
48+
@Override
49+
public String getApiEndpoint() {
50+
return "/connectors/" + urlPathSegmentEscaper().escape(connectorName) + "/topics";
51+
}
52+
53+
@Override
54+
public ConnectorTopics parseResponse(final String responseStr) throws IOException {
55+
return JacksonFactory.newInstance().readValue(responseStr, ConnectorTopics.class);
56+
}
57+
}

0 commit comments

Comments
 (0)