diff --git a/README.md b/README.md
index c5bb33b1..32d8b4be 100644
--- a/README.md
+++ b/README.md
@@ -37,6 +37,10 @@ your Fitbit App client ID and client secret. The following tables shows the poss
Importance |
+application.loop.interval.ms | How often to perform the main application loop (only controls how often to poll for new user registrations).> | long | 300000 | | |
+
+user.cache.refresh.interval.ms | How often to invalidate the cache and poll for new user registrations. | long | 3600000 | | |
+
rest.source.poll.interval.ms | How often to poll the source URL. | long | 60000 | | low |
rest.source.base.url | Base URL for REST source connector. | string | | | high |
diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java
index 3b7a6edb..30e14c28 100644
--- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java
+++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java
@@ -20,6 +20,7 @@
import static org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig.FITBIT_USERS_CONFIG;
import java.io.IOException;
+import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,9 +46,12 @@ public class FitbitSourceConnector extends AbstractRestSourceConnector {
@Override
public void start(Map props) {
+ logger.info("Starting Fitbit source connector");
super.start(props);
executor = Executors.newSingleThreadScheduledExecutor();
+ Duration applicationLoopInterval = config.getApplicationLoopInterval();
+
executor.scheduleAtFixedRate(() -> {
if (repository.hasPendingUpdates()) {
try {
@@ -66,7 +70,7 @@ public void start(Map props) {
} else {
logger.info("No pending updates found. Not attempting to refresh users.");
}
- }, 0, 5, TimeUnit.MINUTES);
+ }, 0, applicationLoopInterval.toSeconds(), TimeUnit.SECONDS);
}
@Override
diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java
index 53e17b97..e2573143 100644
--- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java
+++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java
@@ -100,7 +100,7 @@ public abstract class FitbitPollingRoute implements PollingRequestRoute {
protected static final TemporalAmount ONE_SECOND = SECONDS.getDuration();
protected static final TemporalAmount ONE_MINUTE = MINUTES.getDuration();
- private static final Logger logger = LoggerFactory.getLogger(FitbitSleepRoute.class);
+ private static final Logger logger = LoggerFactory.getLogger(FitbitPollingRoute.class);
/** Committed offsets. */
private Map offsets;
@@ -164,7 +164,7 @@ public void requestEmpty(RestRequest request) {
@Override
public void requestFailed(RestRequest request, Response response) {
if (response != null && response.code() == 429) {
- User user = ((FitbitRestRequest)request).getUser();
+ User user = ((FitbitRestRequest) request).getUser();
tooManyRequestsForUser.add(user);
String cooldownString = response.header("Retry-After");
Duration cooldown = getTooManyRequestsCooldown();
@@ -179,6 +179,8 @@ public void requestFailed(RestRequest request, Response response) {
lastPollPerUser.put(user.getId(), backOff);
logger.info("Too many requests for user {}. Backing off until {}",
user, backOff.plus(getPollIntervalPerUser()));
+ } else if (response != null) {
+ logger.warn("Failed to make request {}. Response is: {}", request, response);
} else {
logger.warn("Failed to make request {}", request);
}
@@ -197,8 +199,11 @@ public Stream requests() {
lastPoll = Instant.now();
try {
return userRepository.stream()
+ // Collect Instant of nextPoll for each user
.map(u -> new AbstractMap.SimpleImmutableEntry<>(u, nextPoll(u)))
+ // Keep users where the lastPoll is later than the nextPoll for the user (i.e., user needs to be polled)
.filter(u -> lastPoll.isAfter(u.getValue()))
+ // Sort users by nextPoll (old to new?)
.sorted(Map.Entry.comparingByValue())
.flatMap(u -> this.createRequests(u.getKey()))
.filter(Objects::nonNull);
diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt
index 9c93a576..a29a810f 100644
--- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt
+++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt
@@ -61,9 +61,9 @@ import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import java.util.stream.Stream
import kotlin.time.Duration.Companion.days
-import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
+import kotlin.time.toKotlinDuration
@Suppress("unused")
class ServiceUserRepository : UserRepository {
@@ -92,8 +92,12 @@ class ServiceUserRepository : UserRepository {
clientSecret = config.fitbitUserRepositoryClientSecret,
)
+ val refreshDuration = config.userCacheRefreshInterval.toKotlinDuration()
userCache = CachedSet(
- CacheConfig(refreshDuration = 1.hours, retryDuration = 1.minutes),
+ CacheConfig(
+ refreshDuration = refreshDuration,
+ retryDuration = if (refreshDuration > 1.minutes) 1.minutes else refreshDuration,
+ ),
) {
makeRequest { url("users?source-type=FitBit") }
.users
diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepositoryLegacy.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepositoryLegacy.java
index 2f5086d4..b06dcce8 100644
--- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepositoryLegacy.java
+++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepositoryLegacy.java
@@ -246,4 +246,4 @@ private T makeRequest(Request request, ObjectReader reader) throws IOExcepti
throw new IOException("Failed to make request to user repository", ex);
}
}
- }
\ No newline at end of file
+ }
diff --git a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceConnectorConfig.java b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceConnectorConfig.java
index 6b9d7c70..67eadb31 100644
--- a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceConnectorConfig.java
+++ b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceConnectorConfig.java
@@ -45,10 +45,15 @@
public class RestSourceConnectorConfig extends AbstractConfig {
public static final Pattern COLON_PATTERN = Pattern.compile(":");
+ public static final String APPLICATION_LOOP_INTERVAL_CONFIG = "application.loop.interval.ms";
+ private static final String APPLICATION_LOOP_INTERVAL_DOC = "How often to perform the main application loop.";
+ private static final String APPLICATION_LOOP_INTERVAL_DISPLAY = "Application loop interval";
+ private static final Long APPLICATION_LOOP_INTERVAL_DEFAULT = 300000L; // 5 minutes
+
private static final String SOURCE_POLL_INTERVAL_CONFIG = "rest.source.poll.interval.ms";
private static final String SOURCE_POLL_INTERVAL_DOC = "How often to poll the source URL.";
private static final String SOURCE_POLL_INTERVAL_DISPLAY = "Polling interval";
- private static final Long SOURCE_POLL_INTERVAL_DEFAULT = 60000L;
+ private static final Long SOURCE_POLL_INTERVAL_DEFAULT = 60000L; // 1 minute
static final String SOURCE_URL_CONFIG = "rest.source.base.url";
private static final String SOURCE_URL_DOC = "Base URL for REST source connector.";
@@ -81,6 +86,11 @@ public class RestSourceConnectorConfig extends AbstractConfig {
"Class to be used to generate REST requests";
private static final String REQUEST_GENERATOR_DISPLAY = "Request generator class";
+ public static final String USER_CACHE_REFRESH_INTERVAL_CONFIG = "user.cache.refresh.interval.ms";
+ private static final String USER_CACHE_REFRESH_INTERVAL_DOC = "How often to poll for new user registrations.";
+ private static final String USER_CACHE_REFRESH_INTERVAL_DISPLAY = "Refresh interval";
+ private static final Long USER_CACHE_REFRESH_INTERVAL_DEFAULT = 3600000L; // 1 hour
+
private final TopicSelector topicSelector;
private final PayloadToSourceRecordConverter payloadToSourceRecordConverter;
private final RequestGenerator requestGenerator;
@@ -171,7 +181,36 @@ public static ConfigDef conf() {
++orderInGroup,
Width.SHORT,
REQUEST_GENERATOR_DISPLAY)
- ;
+
+ .define(APPLICATION_LOOP_INTERVAL_CONFIG,
+ Type.LONG,
+ APPLICATION_LOOP_INTERVAL_DEFAULT,
+ Importance.LOW,
+ APPLICATION_LOOP_INTERVAL_DOC,
+ group,
+ ++orderInGroup,
+ Width.SHORT,
+ APPLICATION_LOOP_INTERVAL_DISPLAY)
+
+ .define(USER_CACHE_REFRESH_INTERVAL_CONFIG,
+ Type.LONG,
+ USER_CACHE_REFRESH_INTERVAL_DEFAULT,
+ Importance.LOW,
+ USER_CACHE_REFRESH_INTERVAL_DOC,
+ group,
+ ++orderInGroup,
+ Width.SHORT,
+ USER_CACHE_REFRESH_INTERVAL_DISPLAY
+
+ );
+ }
+
+ public Duration getApplicationLoopInterval() {
+ return Duration.ofMillis(this.getLong(APPLICATION_LOOP_INTERVAL_CONFIG));
+ }
+
+ public Duration getUserCacheRefreshInterval() {
+ return Duration.ofMillis(this.getLong(USER_CACHE_REFRESH_INTERVAL_CONFIG));
}
public Duration getPollInterval() {