Skip to content

MINOR: Cleanup Connect Module (3/n) #20156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -32,7 +31,7 @@ public class NoneConnectorClientConfigOverridePolicyTest extends BaseConnectorCl

@Test
public void testNoOverrides() {
testValidOverride(Collections.emptyMap());
testValidOverride(Map.of());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -32,7 +31,7 @@ public class PrincipalConnectorClientConfigOverridePolicyTest extends BaseConnec

@Test
public void testPrincipalOnly() {
Map<String, Object> clientConfig = Collections.singletonMap(SaslConfigs.SASL_JAAS_CONFIG, "test");
Map<String, Object> clientConfig = Map.of(SaslConfigs.SASL_JAAS_CONFIG, "test");
testValidOverride(clientConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.junit.jupiter.api.Test;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -42,7 +42,7 @@ public class BooleanConverterTest {

@BeforeEach
public void setUp() {
converter.configure(Collections.emptyMap(), false);
converter.configure(Map.of(), false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -42,7 +42,7 @@ public class ByteArrayConverterTest {

@BeforeEach
public void setUp() {
converter.configure(Collections.emptyMap(), false);
converter.configure(Map.of(), false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -577,7 +576,7 @@ public Class<? extends Task> taskClass() {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
block.maybeBlockOn(CONNECTOR_TASK_CONFIGS);
return Collections.singletonList(Collections.emptyMap());
return List.of(Map.of());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import java.io.FileOutputStream;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -229,7 +228,7 @@ public void testRestartFailedTask() throws Exception {
// Restart the failed task
String taskRestartEndpoint = connect.endpointForResource(
String.format("connectors/%s/tasks/0/restart", CONNECTOR_NAME));
connect.requestPost(taskRestartEndpoint, "", Collections.emptyMap());
connect.requestPost(taskRestartEndpoint, "", Map.of());

// Ensure the task started successfully this time
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
Expand Down Expand Up @@ -374,7 +373,7 @@ public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce
NUM_TASKS, "Connector tasks did not start in time");
connector.awaitRecords(TimeUnit.MINUTES.toMillis(1));

// Then if we delete the connector, it and each of its tasks should be stopped by the framework
// Then, if we delete the connector, it and each of its tasks should be stopped by the framework
// even though the producer is blocked because there is no topic
StartAndStopLatch stopCounter = connector.expectedStops(1);
connect.deleteConnector(CONNECTOR_NAME);
Expand Down Expand Up @@ -434,8 +433,8 @@ public void testPauseStopResume() throws Exception {
"Connector did not stop in time"
);
// If the connector is truly stopped, we should also see an empty set of tasks and task configs
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
assertEquals(List.of(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(List.of(), connect.taskConfigs(CONNECTOR_NAME));

// Transition to RUNNING
connect.resumeConnector(CONNECTOR_NAME);
Expand Down Expand Up @@ -463,8 +462,8 @@ public void testPauseStopResume() throws Exception {
CONNECTOR_NAME,
"Connector did not stop in time"
);
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
assertEquals(List.of(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(List.of(), connect.taskConfigs(CONNECTOR_NAME));

// Transition to PAUSED
connect.pauseConnector(CONNECTOR_NAME);
Expand Down Expand Up @@ -520,8 +519,8 @@ public void testStoppedState() throws Exception {
"Connector did not stop in time"
);
// If the connector is truly stopped, we should also see an empty set of tasks and task configs
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
assertEquals(List.of(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(List.of(), connect.taskConfigs(CONNECTOR_NAME));

// Can resume a connector after its Connector has failed before shutdown after receiving a stop request
props.remove("connector.start.inject.error");
Expand All @@ -542,8 +541,8 @@ public void testStoppedState() throws Exception {
CONNECTOR_NAME,
"Connector did not stop in time"
);
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
assertEquals(List.of(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(List.of(), connect.taskConfigs(CONNECTOR_NAME));

// Can resume a connector after its Connector has failed during shutdown after receiving a stop request
connect.resumeConnector(CONNECTOR_NAME);
Expand Down Expand Up @@ -580,8 +579,8 @@ public void testCreateConnectorWithPausedInitialState() throws Exception {
0,
"Connector was not created in a paused state"
);
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
assertEquals(List.of(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(List.of(), connect.taskConfigs(CONNECTOR_NAME));

// Verify that a connector created in the PAUSED state can be resumed successfully
connect.resumeConnector(CONNECTOR_NAME);
Expand Down Expand Up @@ -615,16 +614,16 @@ public void testCreateSourceConnectorWithStoppedInitialStateAndModifyOffsets() t
CONNECTOR_NAME,
"Connector was not created in a stopped state"
);
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
assertEquals(List.of(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(List.of(), connect.taskConfigs(CONNECTOR_NAME));

// Verify that the offsets can be modified for a source connector created in the STOPPED state

// Alter the offsets so that only 5 messages are produced
connect.alterSourceConnectorOffset(
CONNECTOR_NAME,
Collections.singletonMap("task.id", CONNECTOR_NAME + "-0"),
Collections.singletonMap("saved", 5L)
Map.of("task.id", CONNECTOR_NAME + "-0"),
Map.of("saved", 5L)
);

// Verify that a connector created in the STOPPED state can be resumed successfully
Expand Down Expand Up @@ -669,8 +668,8 @@ public void testCreateSinkConnectorWithStoppedInitialStateAndModifyOffsets() thr
CONNECTOR_NAME,
"Connector was not created in a stopped state"
);
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
assertEquals(List.of(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(List.of(), connect.taskConfigs(CONNECTOR_NAME));

// Verify that the offsets can be modified for a sink connector created in the STOPPED state

Expand Down Expand Up @@ -726,8 +725,8 @@ public void testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E
0,
"Connector was not created in a paused state"
);
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
assertEquals(List.of(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(List.of(), connect.taskConfigs(CONNECTOR_NAME));

// Verify that a connector created in the PAUSED state can be deleted successfully
connect.deleteConnector(CONNECTOR_NAME);
Expand All @@ -747,8 +746,8 @@ public void testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E
CONNECTOR_NAME,
"Connector was not created in a stopped state"
);
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
assertEquals(List.of(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(List.of(), connect.taskConfigs(CONNECTOR_NAME));

// Verify that a connector created in the STOPPED state can be deleted successfully
connect.deleteConnector(CONNECTOR_NAME);
Expand Down Expand Up @@ -1014,7 +1013,7 @@ public void testTasksMaxEnforcement() throws Exception {
// an existing set of task configs that was written before the cluster was upgraded
try (JsonConverter converter = new JsonConverter()) {
converter.configure(
Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"),
Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"),
false
);

Expand Down Expand Up @@ -1326,7 +1325,7 @@ public void testRuntimePropertyReconfiguration() throws Exception {
"Connector did not start or task did not fail in time"
);
assertEquals(
new ConnectorOffsets(Collections.emptyList()),
new ConnectorOffsets(List.of()),
connect.connectorOffsets(CONNECTOR_NAME),
"Connector should not have any committed offsets when only task fails on first record"
);
Expand All @@ -1346,9 +1345,9 @@ public void testRuntimePropertyReconfiguration() throws Exception {
Map<String, Object> expectedOffsetKey = new HashMap<>();
expectedOffsetKey.put(SinkUtils.KAFKA_TOPIC_KEY, topic);
expectedOffsetKey.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
Map<String, Object> expectedOffsetValue = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 1);
Map<String, Object> expectedOffsetValue = Map.of(SinkUtils.KAFKA_OFFSET_KEY, 1);
ConnectorOffset expectedOffset = new ConnectorOffset(expectedOffsetKey, expectedOffsetValue);
ConnectorOffsets expectedOffsets = new ConnectorOffsets(Collections.singletonList(expectedOffset));
ConnectorOffsets expectedOffsets = new ConnectorOffsets(List.of(expectedOffset));

// Wait for it to commit offsets, signaling that it has successfully processed the record we produced earlier
waitForCondition(
Expand Down Expand Up @@ -1443,7 +1442,7 @@ public Class<? extends Task> taskClass() {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return IntStream.range(0, maxTasks)
.mapToObj(i -> Collections.<String, String>emptyMap())
.mapToObj(i -> Map.<String, String>of())
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
Expand Down Expand Up @@ -283,17 +281,17 @@ public StartAndStopLatch expectedStarts(int expectedStarts, boolean includeTasks
List<StartAndStopLatch> taskLatches = includeTasks
? taskHandles.values().stream()
.map(task -> task.expectedStarts(expectedStarts))
.collect(Collectors.toList())
: Collections.emptyList();
.toList()
: List.of();
return startAndStopCounter.expectedStarts(expectedStarts, taskLatches);
}

public StartAndStopLatch expectedStarts(int expectedStarts, Map<String, Integer> expectedTasksStarts, boolean includeTasks) {
List<StartAndStopLatch> taskLatches = includeTasks
? taskHandles.values().stream()
.map(task -> task.expectedStarts(expectedTasksStarts.get(task.taskId())))
.collect(Collectors.toList())
: Collections.emptyList();
.toList()
: List.of();
return startAndStopCounter.expectedStarts(expectedStarts, taskLatches);
}

Expand Down Expand Up @@ -345,17 +343,17 @@ public StartAndStopLatch expectedStops(int expectedStops, boolean includeTasks)
List<StartAndStopLatch> taskLatches = includeTasks
? taskHandles.values().stream()
.map(task -> task.expectedStops(expectedStops))
.collect(Collectors.toList())
: Collections.emptyList();
.toList()
: List.of();
return startAndStopCounter.expectedStops(expectedStops, taskLatches);
}

public StartAndStopLatch expectedStops(int expectedStops, Map<String, Integer> expectedTasksStops, boolean includeTasks) {
List<StartAndStopLatch> taskLatches = includeTasks
? taskHandles.values().stream()
.map(task -> task.expectedStops(expectedTasksStops.get(task.taskId())))
.collect(Collectors.toList())
: Collections.emptyList();
.toList()
: List.of();
return startAndStopCounter.expectedStops(expectedStops, taskLatches);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -131,7 +130,7 @@ public void testRestartUnknownConnectorNoParams() {
// Call the Restart API
String restartEndpoint = connect.endpointForResource(
String.format("connectors/%s/restart", connectorName));
Response response = connect.requestPost(restartEndpoint, "", Collections.emptyMap());
Response response = connect.requestPost(restartEndpoint, "", Map.of());
assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus());

}
Expand All @@ -152,7 +151,7 @@ private void restartUnknownConnector(boolean onlyFailed, boolean includeTasks) {
// Call the Restart API
String restartEndpoint = connect.endpointForResource(
String.format("connectors/%s/restart?onlyFailed=" + onlyFailed + "&includeTasks=" + includeTasks, connectorName));
Response response = connect.requestPost(restartEndpoint, "", Collections.emptyMap());
Response response = connect.requestPost(restartEndpoint, "", Map.of());
assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus());
}

Expand Down Expand Up @@ -213,7 +212,7 @@ public void testFailedTasksRestartBothConnectorAndTasks() throws Exception {

@Test
public void testOneFailedTasksRestartOnlyOneTasks() throws Exception {
Set<String> tasksToFail = Collections.singleton(taskId(1));
Set<String> tasksToFail = Set.of(taskId(1));
failedTasksRestart(true, true, 0, buildExpectedTasksRestarts(tasksToFail), tasksToFail, false);
}

Expand Down
Loading