Skip to content

Commit

Permalink
Merge pull request #152 from RADAR-base/feature/configure-cache-durat…
Browse files Browse the repository at this point in the history
…ions

Impl. confiration of application loop and user cache refresh durations
  • Loading branch information
pvannierop authored Nov 20, 2024
2 parents 39ca6dc + 95fb4ef commit 701409c
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 8 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ your Fitbit App client ID and client secret. The following tables shows the poss
<th>Importance</th>
</tr>
<tr>
<td>application.loop.interval.ms</td><td>How often to perform the main application loop (only controls how often to poll for new user registrations).></td><td>long</td><td>300000</td><td></td><td></td></tr>
<tr>
<td>user.cache.refresh.interval.ms</td><td>How often to invalidate the cache and poll for new user registrations.</td><td>long</td><td>3600000</td><td></td><td></td></tr>
<tr>
<td>rest.source.poll.interval.ms</td><td>How often to poll the source URL.</td><td>long</td><td>60000</td><td></td><td>low</td></tr>
<tr>
<td>rest.source.base.url</td><td>Base URL for REST source connector.</td><td>string</td><td></td><td></td><td>high</td></tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig.FITBIT_USERS_CONFIG;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -45,9 +46,12 @@ public class FitbitSourceConnector extends AbstractRestSourceConnector {

@Override
public void start(Map<String, String> props) {
logger.info("Starting Fitbit source connector");
super.start(props);
executor = Executors.newSingleThreadScheduledExecutor();

Duration applicationLoopInterval = config.getApplicationLoopInterval();

executor.scheduleAtFixedRate(() -> {
if (repository.hasPendingUpdates()) {
try {
Expand All @@ -66,7 +70,7 @@ public void start(Map<String, String> props) {
} else {
logger.info("No pending updates found. Not attempting to refresh users.");
}
}, 0, 5, TimeUnit.MINUTES);
}, 0, applicationLoopInterval.toSeconds(), TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public abstract class FitbitPollingRoute implements PollingRequestRoute {
protected static final TemporalAmount ONE_SECOND = SECONDS.getDuration();
protected static final TemporalAmount ONE_MINUTE = MINUTES.getDuration();

private static final Logger logger = LoggerFactory.getLogger(FitbitSleepRoute.class);
private static final Logger logger = LoggerFactory.getLogger(FitbitPollingRoute.class);

/** Committed offsets. */
private Map<String, Instant> offsets;
Expand Down Expand Up @@ -164,7 +164,7 @@ public void requestEmpty(RestRequest request) {
@Override
public void requestFailed(RestRequest request, Response response) {
if (response != null && response.code() == 429) {
User user = ((FitbitRestRequest)request).getUser();
User user = ((FitbitRestRequest) request).getUser();
tooManyRequestsForUser.add(user);
String cooldownString = response.header("Retry-After");
Duration cooldown = getTooManyRequestsCooldown();
Expand All @@ -179,6 +179,8 @@ public void requestFailed(RestRequest request, Response response) {
lastPollPerUser.put(user.getId(), backOff);
logger.info("Too many requests for user {}. Backing off until {}",
user, backOff.plus(getPollIntervalPerUser()));
} else if (response != null) {
logger.warn("Failed to make request {}. Response is: {}", request, response);
} else {
logger.warn("Failed to make request {}", request);
}
Expand All @@ -197,8 +199,11 @@ public Stream<FitbitRestRequest> requests() {
lastPoll = Instant.now();
try {
return userRepository.stream()
// Collect Instant of nextPoll for each user
.map(u -> new AbstractMap.SimpleImmutableEntry<>(u, nextPoll(u)))
// Keep users where the lastPoll is later than the nextPoll for the user (i.e., user needs to be polled)
.filter(u -> lastPoll.isAfter(u.getValue()))
// Sort users by nextPoll (old to new?)
.sorted(Map.Entry.comparingByValue())
.flatMap(u -> this.createRequests(u.getKey()))
.filter(Objects::nonNull);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import java.util.stream.Stream
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toKotlinDuration

@Suppress("unused")
class ServiceUserRepository : UserRepository {
Expand Down Expand Up @@ -92,8 +92,12 @@ class ServiceUserRepository : UserRepository {
clientSecret = config.fitbitUserRepositoryClientSecret,
)

val refreshDuration = config.userCacheRefreshInterval.toKotlinDuration()
userCache = CachedSet(
CacheConfig(refreshDuration = 1.hours, retryDuration = 1.minutes),
CacheConfig(
refreshDuration = refreshDuration,
retryDuration = if (refreshDuration > 1.minutes) 1.minutes else refreshDuration,
),
) {
makeRequest<Users> { url("users?source-type=FitBit") }
.users
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,4 @@ private <T> T makeRequest(Request request, ObjectReader reader) throws IOExcepti
throw new IOException("Failed to make request to user repository", ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@
public class RestSourceConnectorConfig extends AbstractConfig {
public static final Pattern COLON_PATTERN = Pattern.compile(":");

public static final String APPLICATION_LOOP_INTERVAL_CONFIG = "application.loop.interval.ms";
private static final String APPLICATION_LOOP_INTERVAL_DOC = "How often to perform the main application loop.";
private static final String APPLICATION_LOOP_INTERVAL_DISPLAY = "Application loop interval";
private static final Long APPLICATION_LOOP_INTERVAL_DEFAULT = 300000L; // 5 minutes

private static final String SOURCE_POLL_INTERVAL_CONFIG = "rest.source.poll.interval.ms";
private static final String SOURCE_POLL_INTERVAL_DOC = "How often to poll the source URL.";
private static final String SOURCE_POLL_INTERVAL_DISPLAY = "Polling interval";
private static final Long SOURCE_POLL_INTERVAL_DEFAULT = 60000L;
private static final Long SOURCE_POLL_INTERVAL_DEFAULT = 60000L; // 1 minute

static final String SOURCE_URL_CONFIG = "rest.source.base.url";
private static final String SOURCE_URL_DOC = "Base URL for REST source connector.";
Expand Down Expand Up @@ -81,6 +86,11 @@ public class RestSourceConnectorConfig extends AbstractConfig {
"Class to be used to generate REST requests";
private static final String REQUEST_GENERATOR_DISPLAY = "Request generator class";

public static final String USER_CACHE_REFRESH_INTERVAL_CONFIG = "user.cache.refresh.interval.ms";
private static final String USER_CACHE_REFRESH_INTERVAL_DOC = "How often to poll for new user registrations.";
private static final String USER_CACHE_REFRESH_INTERVAL_DISPLAY = "Refresh interval";
private static final Long USER_CACHE_REFRESH_INTERVAL_DEFAULT = 3600000L; // 1 hour

private final TopicSelector topicSelector;
private final PayloadToSourceRecordConverter payloadToSourceRecordConverter;
private final RequestGenerator requestGenerator;
Expand Down Expand Up @@ -171,7 +181,36 @@ public static ConfigDef conf() {
++orderInGroup,
Width.SHORT,
REQUEST_GENERATOR_DISPLAY)
;

.define(APPLICATION_LOOP_INTERVAL_CONFIG,
Type.LONG,
APPLICATION_LOOP_INTERVAL_DEFAULT,
Importance.LOW,
APPLICATION_LOOP_INTERVAL_DOC,
group,
++orderInGroup,
Width.SHORT,
APPLICATION_LOOP_INTERVAL_DISPLAY)

.define(USER_CACHE_REFRESH_INTERVAL_CONFIG,
Type.LONG,
USER_CACHE_REFRESH_INTERVAL_DEFAULT,
Importance.LOW,
USER_CACHE_REFRESH_INTERVAL_DOC,
group,
++orderInGroup,
Width.SHORT,
USER_CACHE_REFRESH_INTERVAL_DISPLAY

);
}

public Duration getApplicationLoopInterval() {
return Duration.ofMillis(this.getLong(APPLICATION_LOOP_INTERVAL_CONFIG));
}

public Duration getUserCacheRefreshInterval() {
return Duration.ofMillis(this.getLong(USER_CACHE_REFRESH_INTERVAL_CONFIG));
}

public Duration getPollInterval() {
Expand Down

0 comments on commit 701409c

Please sign in to comment.