From 9e87fd4ac04ea1d1a00a6efc081beae17d335afd Mon Sep 17 00:00:00 2001 From: Peyman Mohtashami Date: Thu, 7 Dec 2023 13:09:47 +0100 Subject: [PATCH 01/16] define hrv, br, skin-temp in config --- .DS_Store | Bin 0 -> 8196 bytes .../FitbitRestSourceConnectorConfig.java | 33 ++++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 .DS_Store diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..1765e0788397301b4252f190d8c349b07f56cc5e GIT binary patch literal 8196 zcmeHMyKWOf6g?A6us{hZjgVlGA`nPMq)EL*i)bTRf+9`g2Tp{|THd(dhB^g5Km*kL z0Yc&vQX=sWC_QH$wr6&;E*u4ra>trEtGRdXoINw1^<4u{JCp7f&;U?p)wpqs%{fKe zbuE<@*Ao#H$79gnYIV}#sN^XfqJStM3Wx%tfGF_4D8Og7F7=A%zO_m#3Wx$%QUQHF zM64QPhq*;_bg(fg0I|kyQ+U0hFNl%D*kNvw78YfP675jsQVeB>vp;fvvBTV=9S-G^ z59MZ7E<;gfcE*p4J5+2@T2Vk0C@T=LtA?lO;|Rw={{Ha#E8jLulcYUJ+w7}1c5lD` z{QAf9(B55WA02w{1|B`*P~4c#i$hG%4ra%LFJ%v(!}i(QltUrTFr720_6qpG@TsFs z9!D6Uix$S{1dLWszmZYN$LF-UwtI52XLLEoZ_a9wU%>08;k8OWDLQzI9>xJD-1|gM zf!*h`zPUctsNgf_Qsooyz&@wpJbkKiKsE#PLLR^OigusJ_s1R&FNVKTRXza^d%EAH z5(ju=+9q{L@s6J!ua41VKX)=$QnyWHv`+L|78tL5{6NM_M)L11OJ?`BbhPE8t_lXPT1~QQ)!_xMnA4==}dh_51(JrXle~0a4%|Dxhjf zE7>6dms^*p=v+Hu-DlNB`{fqR5H=n{TQJVc)V LW(}mddJ6mjk?CWq literal 0 HcmV?d00001 diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java index 36017b23..ee978b10 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java @@ -331,6 +331,39 @@ public String toString() { Width.SHORT, FITBIT_INTRADAY_HEART_RATE_TOPIC_DISPLAY) + .define(FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_CONFIG, + Type.STRING, + FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DEFAULT, + nonControlChar, + Importance.LOW, + FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DISPLAY) + + .define(FITBIT_BREATHING_RATE_TOPIC_CONFIG, + Type.STRING, + FITBIT_BREATHING_RATE_TOPIC_DEFAULT, + nonControlChar, + Importance.LOW, + FITBIT_BREATHING_RATE_TOPIC_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_BREATHING_RATE_TOPIC_DISPLAY) + + .define(FITBIT_SKIN_TEMPERATURE_TOPIC_CONFIG, + Type.STRING, + FITBIT_SKIN_TEMPERATURE_TOPIC_DEFAULT, + nonControlChar, + Importance.LOW, + FITBIT_SKIN_TEMPERATURE_TOPIC_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_SKIN_TEMPERATURE_TOPIC_DISPLAY) + .define(FITBIT_RESTING_HEART_RATE_TOPIC_CONFIG, Type.STRING, FITBIT_RESTING_HEART_RATE_TOPIC_DEFAULT, From fc3b97952e77905fdf0ed5bf27ef9e0d83cfd501 Mon Sep 17 00:00:00 2001 From: Pauline Date: Wed, 13 Dec 2023 04:09:09 +0800 Subject: [PATCH 02/16] Add backoff time when request successful to space out requests --- .../kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index 9a74ef4e..84bceba9 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -111,6 +111,7 @@ constructor( request.user, Instant.ofEpochSecond(offset).plus(Duration.ofMillis(500)), ) + userNextRequest[request.user.versionedId] = Instant.now().plus(SUCCESS_BACK_OFF_TIME) } else { if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) { ouraOffsetManager.updateOffsets( @@ -196,6 +197,7 @@ constructor( private val ONE_DAY = 1L private val TIME_AFTER_REQUEST = Duration.ofDays(30) private val USER_BACK_OFF_TIME = Duration.ofMinutes(2L) + private val SUCCESS_BACK_OFF_TIME = Duration.ofSeconds(3L) private val USER_MAX_REQUESTS = 20 val JSON_FACTORY = JsonFactory() val JSON_READER = ObjectMapper(JSON_FACTORY).registerModule(JavaTimeModule()).reader() From 5750bc05a1b8b965b212963a10f905ad53285168 Mon Sep 17 00:00:00 2001 From: Pauline Date: Wed, 13 Dec 2023 04:11:13 +0800 Subject: [PATCH 03/16] Fix null handling in OuraRingConfigurationConverter --- .../OuraRingConfigurationConverter.kt | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraRingConfigurationConverter.kt b/oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraRingConfigurationConverter.kt index 53d321fc..242bd60e 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraRingConfigurationConverter.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraRingConfigurationConverter.kt @@ -17,12 +17,18 @@ class OuraRingConfigurationConverter( root: JsonNode, user: User, ): Sequence> { - val array = root.get("data") - ?: return emptySequence() + val array = + root.get("data") + ?: return emptySequence() return array.asSequence() .mapCatching { - val setupTime = OffsetDateTime.parse(it["set_up_at"].textValue()) - val setupTimeInstant = setupTime.toInstant() + val setUpAt = it["set_up_at"] + val setupTimeInstant = + setUpAt?.textValue()?.let { + OffsetDateTime.parse( + it, + ) + }?.toInstant() TopicData( key = user.observationKey, topic = topic, @@ -32,9 +38,7 @@ class OuraRingConfigurationConverter( } } - private fun JsonNode.toRingConfiguration( - setupTime: Instant, - ): OuraRingConfiguration { + private fun JsonNode.toRingConfiguration(setupTime: Instant?): OuraRingConfiguration { val data = this return OuraRingConfiguration.newBuilder().apply { time = System.currentTimeMillis() / 1000.0 @@ -44,7 +48,7 @@ class OuraRingConfigurationConverter( design = data.get("design").textValue()?.classifyDesign() firmwareVersion = data.get("firmware_version").textValue() hardwareType = data.get("hardware_type").textValue()?.classifyHardware() - setUpAt = setupTime.toEpochMilli() / 1000.0 + setUpAt = if (setupTime == null) null else setupTime.toEpochMilli() / 1000.0 size = data.get("size").intValue() }.build() } @@ -77,6 +81,7 @@ class OuraRingConfigurationConverter( else -> OuraRingHardwareType.UNKNOWN } } + companion object { val logger = LoggerFactory.getLogger(OuraRingConfigurationConverter::class.java) } From 57e02978e0180b1593a98f84f4e51bd15c804322 Mon Sep 17 00:00:00 2001 From: Pauline Date: Thu, 18 Jan 2024 11:31:41 +0000 Subject: [PATCH 04/16] Add back previous version of OuraServiceRepository --- buildSrc/src/main/kotlin/Versions.kt | 12 + kafka-connect-oura-source/build.gradle.kts | 3 +- .../oura/OuraRestSourceConnectorConfig.java | 25 +- .../rest/oura/offset/KafkaOffsetManager.java | 6 +- .../oura/user/OuraServiceUserRepository.java | 257 ++++++++++++++++++ ...ry.kt => OuraServiceUserRepositoryKtor.kt} | 209 +++++++------- .../oura/request/OuraRequestGenerator.kt | 5 +- 7 files changed, 398 insertions(+), 119 deletions(-) create mode 100644 kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.java rename kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/{OuraServiceUserRepository.kt => OuraServiceUserRepositoryKtor.kt} (57%) diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index ed0e7679..1cd990fa 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -1,5 +1,15 @@ @Suppress("ConstPropertyName", "MemberVisibilityCanBePrivate") object Versions { + + object Plugins { + const val kotlin = "1.9.10" + const val kotlinSerialization = kotlin + const val kotlinAllOpen = kotlin + const val avro = "1.8.0" + const val gradle = "8.3" + } + + const val project = "0.4.2-SNAPSHOT" const val java = 11 @@ -11,6 +21,8 @@ object Versions { const val kafka = "$confluent-ce" const val avro = "1.11.0" + const val managementPortal = "2.0.0" + // From image const val jackson = "2.14.2" diff --git a/kafka-connect-oura-source/build.gradle.kts b/kafka-connect-oura-source/build.gradle.kts index b80d4801..e0acc8d7 100644 --- a/kafka-connect-oura-source/build.gradle.kts +++ b/kafka-connect-oura-source/build.gradle.kts @@ -4,7 +4,8 @@ dependencies { api(project(":oura-library")) api("io.confluent:kafka-connect-avro-converter:${Versions.confluent}") api("org.radarbase:radar-schemas-commons:${Versions.radarSchemas}") - implementation("org.radarbase:radar-commons-kotlin:${Versions.radarCommons}") + implementation("org.radarbase:radar-commons-kotlin:1.1.1") + implementation("org.radarbase:oauth-client-util:${Versions.managementPortal}") api("com.squareup.okhttp3:okhttp:${Versions.okhttp}") implementation(platform("com.fasterxml.jackson:jackson-bom:${Versions.jackson}")) diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java index 41ae3f0e..0acf9e89 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java @@ -19,7 +19,6 @@ import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; -import static io.ktor.http.URLUtilsKt.URLBuilder; import java.lang.reflect.InvocationTargetException; import java.net.MalformedURLException; import java.net.URL; @@ -45,8 +44,6 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.errors.ConnectException; import org.radarbase.connect.rest.oura.user.OuraServiceUserRepository; -import io.ktor.http.URLParserException; -import io.ktor.http.Url; public class OuraRestSourceConnectorConfig extends AbstractConfig { public static final Pattern COLON_PATTERN = Pattern.compile(":"); @@ -283,18 +280,18 @@ public OuraServiceUserRepository createUserRepository() { } } - public Url getOuraUserRepositoryUrl() { + public HttpUrl getOuraUserRepositoryUrl() { String urlString = getString(OURA_USER_REPOSITORY_URL_CONFIG).trim(); if (urlString.charAt(urlString.length() - 1) != '/') { urlString += '/'; } - try { - return URLBuilder(urlString).build(); - } catch (URLParserException ex) { + HttpUrl url = HttpUrl.parse(urlString); + if (url == null) { throw new ConfigException(OURA_USER_REPOSITORY_URL_CONFIG, getString(OURA_USER_REPOSITORY_URL_CONFIG), - "User repository URL " + urlString + " cannot be parsed as URL: " + ex); + "User repository URL " + urlString + " cannot be parsed as URL."); } + return url; } public Headers getClientCredentials() { @@ -317,18 +314,16 @@ public String getOuraUserRepositoryClientSecret() { return getPassword(OURA_USER_REPOSITORY_CLIENT_SECRET_CONFIG).value(); } - public Url getOuraUserRepositoryTokenUrl() { + public URL getOuraUserRepositoryTokenUrl() { String value = getString(OURA_USER_REPOSITORY_TOKEN_URL_CONFIG); if (value == null || value.isEmpty()) { return null; } else { try { - return URLBuilder(value).build(); - } catch (URLParserException ex) { - throw new ConfigException(OURA_USER_REPOSITORY_TOKEN_URL_CONFIG, - getString(OURA_USER_REPOSITORY_TOKEN_URL_CONFIG), - "Oura user repository token URL " + value + " cannot be parsed as URL: " + ex); + return new URL(getString(OURA_USER_REPOSITORY_TOKEN_URL_CONFIG)); + } catch (MalformedURLException e) { + throw new ConfigException("Oura user repository token URL is invalid."); } } } -} +} \ No newline at end of file diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java index 360d5f82..fbb4329b 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java @@ -31,11 +31,15 @@ public KafkaOffsetManager( } public void initialize(List> partitions) { - this.offsets = this.offsetStorageReader.offsets(partitions).entrySet().stream() + if (this.offsetStorageReader != null) { + this.offsets = this.offsetStorageReader.offsets(partitions).entrySet().stream() .filter(e -> e.getValue() != null && e.getValue().containsKey(TIMESTAMP_OFFSET_KEY)) .collect(Collectors.toMap( e -> (String) e.getKey().get("user") + "-" + e.getKey().get("route"), e -> Instant.ofEpochMilli(((Number) e.getValue().get(TIMESTAMP_OFFSET_KEY)).longValue()))); + } else { + logger.warn("Offset storage reader is null, will resume from an empty state."); + } } @Override diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.java new file mode 100644 index 00000000..aa6f17ba --- /dev/null +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.java @@ -0,0 +1,257 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + package org.radarbase.connect.rest.oura.user; + + import com.fasterxml.jackson.core.JsonFactory; + import com.fasterxml.jackson.core.JsonProcessingException; + import com.fasterxml.jackson.databind.ObjectMapper; + import com.fasterxml.jackson.databind.ObjectReader; + import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + + import kotlin.sequences.*; + + import java.io.IOException; + import java.net.ProtocolException; + import java.net.URL; + import java.time.Duration; + import java.time.Instant; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Map; + import java.util.NoSuchElementException; + import java.util.Set; + import java.util.List; + import java.util.concurrent.atomic.AtomicReference; + import java.util.stream.Collectors; + import java.util.stream.Stream; + import okhttp3.Credentials; + import okhttp3.HttpUrl; + import okhttp3.MediaType; + import okhttp3.OkHttpClient; + import okhttp3.Request; + import okhttp3.RequestBody; + import okhttp3.Response; + import okhttp3.ResponseBody; + import org.apache.kafka.common.config.ConfigException; + import org.radarbase.connect.rest.oura.OuraRestSourceConnectorConfig; + import org.radarbase.exception.TokenException; + import org.radarbase.oauth.OAuth2Client; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.radarbase.oura.user.UserRepository; + import org.radarbase.oura.user.User; + import org.radarbase.oura.user.OuraUser; + import org.radarbase.oura.user.UserNotAuthorizedException; + import static kotlin.sequences.SequencesKt.*; + + @SuppressWarnings("unused") + public class OuraServiceUserRepository implements UserRepository { + Instant MIN_INSTANT = Instant.EPOCH; + + public static final JsonFactory JSON_FACTORY = new JsonFactory(); + public static final ObjectReader JSON_READER = new ObjectMapper(JSON_FACTORY) + .registerModule(new JavaTimeModule()) + .reader(); + private static final Logger logger = LoggerFactory.getLogger(OuraServiceUserRepository.class); + + private static final ObjectReader USER_LIST_READER = JSON_READER.forType(OuraUsers.class); + private static final ObjectReader USER_READER = JSON_READER.forType(User.class); + private static final ObjectReader OAUTH_READER = JSON_READER.forType(OAuth2UserCredentials.class); + private static final RequestBody EMPTY_BODY = + RequestBody.create("", MediaType.parse("application/json; charset=utf-8")); + private static final Duration FETCH_THRESHOLD = Duration.ofMinutes(1L); + private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(60); + private static final Duration CONNECTION_READ_TIMEOUT = Duration.ofSeconds(90); + + private final OkHttpClient client; + private final Map cachedCredentials; + private final AtomicReference nextFetch = new AtomicReference<>(MIN_INSTANT); + + private HttpUrl baseUrl; + private final HashSet containedUsers; + private Set timedCachedUsers = new HashSet<>(); + private OAuth2Client repositoryClient; + private String basicCredentials; + + public OuraServiceUserRepository() { + this.client = new OkHttpClient.Builder() + .connectTimeout(CONNECTION_TIMEOUT) + .readTimeout(CONNECTION_READ_TIMEOUT) + .build(); + this.cachedCredentials = new HashMap<>(); + this.containedUsers = new HashSet<>(); + } + + @Override + public User get(String key) throws IOException { + Request request = requestFor("users/" + key).build(); + return makeRequest(request, USER_READER); + } + + public void initialize(OuraRestSourceConnectorConfig config) { + OuraRestSourceConnectorConfig ouraConfig = (OuraRestSourceConnectorConfig) config; + this.baseUrl = ouraConfig.getOuraUserRepositoryUrl(); + this.containedUsers.addAll(ouraConfig.getOuraUsers()); + + URL tokenUrl = ouraConfig.getOuraUserRepositoryTokenUrl(); + String clientId = ouraConfig.getOuraUserRepositoryClientId(); + String clientSecret = ouraConfig.getOuraUserRepositoryClientSecret(); + + if (tokenUrl != null) { + if (clientId.isEmpty()) { + throw new ConfigException("Client ID for user repository is not set."); + } + this.repositoryClient = new OAuth2Client.Builder() + .credentials(clientId, clientSecret) + .endpoint(tokenUrl) + .scopes("SUBJECT.READ MEASUREMENT.CREATE") + .httpClient(client) + .build(); + } else if (clientId != null) { + basicCredentials = Credentials.basic(clientId, clientSecret); + } + } + + @Override + public Sequence stream() { + if (nextFetch.get().equals(MIN_INSTANT)) { + try { + applyPendingUpdates(); + } catch (IOException ex) { + logger.error("Failed to initially get users from repository", ex); + } + } + return SequencesKt.asSequence(this.timedCachedUsers.stream().iterator()); + } + + @Override + public String getAccessToken(User user) throws IOException, UserNotAuthorizedException { + if (!user.isAuthorized()) { + throw new UserNotAuthorizedException("User is not authorized"); + } + OAuth2UserCredentials credentials = cachedCredentials.get(user.getId()); + if (credentials != null && !credentials.isAccessTokenExpired()) { + return credentials.getAccessToken(); + } else { + Request request = requestFor("users/" + user.getId() + "/token").build(); + return requestAccessToken(user, request); + } + } + + public String refreshAccessToken(User user) throws IOException, UserNotAuthorizedException { + if (!user.isAuthorized()) { + throw new UserNotAuthorizedException("User is not authorized"); + } + Request request = requestFor("users/" + user.getId() + "/token") + .post(EMPTY_BODY) + .build(); + return requestAccessToken(user, request); + } + + private String requestAccessToken(User user, Request request) + throws UserNotAuthorizedException, IOException { + try { + OAuth2UserCredentials credentials = makeRequest(request, OAUTH_READER); + cachedCredentials.put(user.getId(), credentials); + return credentials.getAccessToken(); + } catch (HttpResponseException ex) { + if (ex.getStatusCode() == 407) { + cachedCredentials.remove(user.getId()); + if (user instanceof User) { + // ((User) user).setIsAuthorized(false); + } + throw new UserNotAuthorizedException(ex.getMessage()); + } + throw ex; + } + } + + public boolean hasPendingUpdates() { + Instant nextFetchTime = nextFetch.get(); + Instant now = Instant.now(); + return now.isAfter(nextFetchTime); + } + + public void applyPendingUpdates() throws IOException { + logger.info("Requesting user information from webservice"); + Request request = requestFor("users?source-type=Oura").build(); + this.timedCachedUsers = + this.makeRequest(request, USER_LIST_READER).getUsers().stream() + .filter(u -> u.isComplete() && (containedUsers.isEmpty() + || containedUsers.contains(u.getVersionedId()))) + .collect(Collectors.toSet()); + nextFetch.set(Instant.now().plus(FETCH_THRESHOLD)); + } + + private Request.Builder requestFor(String relativeUrl) throws IOException { + HttpUrl url = baseUrl.resolve(relativeUrl); + if (url == null) { + throw new IllegalArgumentException("Relative URL is invalid"); + } + Request.Builder builder = new Request.Builder().url(url); + String authorization = requestAuthorization(); + if (authorization != null) { + builder.addHeader("Authorization", authorization); + } + + return builder; + } + + private String requestAuthorization() throws IOException { + if (repositoryClient != null) { + try { + return "Bearer " + repositoryClient.getValidToken().getAccessToken(); + } catch (TokenException ex) { + throw new IOException(ex); + } + } else if (basicCredentials != null) { + return basicCredentials; + } else { + return null; + } + } + + private T makeRequest(Request request, ObjectReader reader) throws IOException { + logger.info("Requesting info from {}", request.url()); + try (Response response = client.newCall(request).execute()) { + ResponseBody body = response.body(); + + if (response.code() == 404) { + throw new NoSuchElementException("URL " + request.url() + " does not exist"); + } else if (!response.isSuccessful() || body == null) { + String message = "Failed to make request"; + if (response.code() > 0) { + message += " (HTTP status code " + response.code() + ')'; + } + if (body != null) { + message += body.string(); + } + throw new HttpResponseException(message, response.code()); + } + String bodyString = body.string(); + try { + return reader.readValue(bodyString); + } catch (JsonProcessingException ex) { + logger.error("Failed to parse JSON: {}\n{}", ex, bodyString); + throw ex; + } + } catch (ProtocolException ex) { + throw new IOException("Failed to make request to user repository", ex); + } + } + } \ No newline at end of file diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryKtor.kt similarity index 57% rename from kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt rename to kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryKtor.kt index d22ad285..f2c50b41 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryKtor.kt @@ -37,6 +37,7 @@ import io.ktor.client.statement.request import io.ktor.http.ContentType import io.ktor.http.HttpMethod import io.ktor.http.HttpStatusCode +import io.ktor.http.URLBuilder import io.ktor.http.Url import io.ktor.http.contentLength import io.ktor.http.contentType @@ -67,7 +68,7 @@ import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds @Suppress("unused") -class OuraServiceUserRepository : UserRepository { +class OuraServiceUserRepositoryKtor : UserRepository { private lateinit var userCache: CachedSet private lateinit var client: HttpClient private val credentialCaches = ConcurrentHashMap>() @@ -76,33 +77,34 @@ class OuraServiceUserRepository : UserRepository { private val mapper = ObjectMapper().registerKotlinModule().registerModule(JavaTimeModule()) @Throws(IOException::class) - override fun get(key: String): User = runBlocking(Dispatchers.Default) { - makeRequest { url("users/$key") } - } + override fun get(key: String): User = + runBlocking(Dispatchers.Default) { + makeRequest { url("users/$key") } + } - fun initialize( - config: OuraRestSourceConnectorConfig, - ) { + fun initialize(config: OuraRestSourceConnectorConfig) { val containedUsers = config.ouraUsers.toHashSet() - client = createClient( - baseUrl = config.ouraUserRepositoryUrl, - tokenUrl = config.ouraUserRepositoryTokenUrl, - clientId = config.ouraUserRepositoryClientId, - clientSecret = config.ouraUserRepositoryClientSecret, - ) + client = + createClient( + baseUrl = URLBuilder(config.ouraUserRepositoryUrl.toString()).build(), + tokenUrl = URLBuilder(config.ouraUserRepositoryTokenUrl.toString()).build(), + clientId = config.ouraUserRepositoryClientId, + clientSecret = config.ouraUserRepositoryClientSecret, + ) - userCache = CachedSet( - CacheConfig(refreshDuration = 1.hours, retryDuration = 1.minutes), - ) { - makeRequest { url("users?source-type=Oura") } - .users - .toHashSet() - .filterTo(HashSet()) { u -> - u.isComplete() && - (containedUsers.isEmpty() || u.versionedId in containedUsers) - } - } + userCache = + CachedSet( + CacheConfig(refreshDuration = 1.hours, retryDuration = 1.minutes), + ) { + makeRequest { url("users?source-type=Oura") } + .users + .toHashSet() + .filterTo(HashSet()) { u -> + u.isComplete() && + (containedUsers.isEmpty() || u.versionedId in containedUsers) + } + } } private fun createClient( @@ -110,65 +112,68 @@ class OuraServiceUserRepository : UserRepository { tokenUrl: Url?, clientId: String?, clientSecret: String?, - ): HttpClient = HttpClient(CIO) { - if (tokenUrl != null) { - install(Auth) { - clientCredentials( - ClientCredentialsConfig( - tokenUrl.toString(), - clientId, - clientSecret, - ).copyWithEnv("MANAGEMENT_PORTAL"), - baseUrl.host, - ) - } - install(ContentNegotiation) { - json( - Json { - ignoreUnknownKeys = true - }, - ) - } - } else if (clientId != null && clientSecret != null) { - install(Auth) { - basic { - credentials { - BasicAuthCredentials(username = clientId, password = clientSecret) - } - realm = "Access to the '/' path" - sendWithoutRequest { - it.url.host == baseUrl.host + ): HttpClient = + HttpClient(CIO) { + if (tokenUrl != null) { + install(Auth) { + clientCredentials( + ClientCredentialsConfig( + tokenUrl.toString(), + clientId, + clientSecret, + ).copyWithEnv("MANAGEMENT_PORTAL"), + baseUrl.host, + ) + } + install(ContentNegotiation) { + json( + Json { + ignoreUnknownKeys = true + }, + ) + } + } else if (clientId != null && clientSecret != null) { + install(Auth) { + basic { + credentials { + BasicAuthCredentials(username = clientId, password = clientSecret) + } + realm = "Access to the '/' path" + sendWithoutRequest { + it.url.host == baseUrl.host + } } } } - } - defaultRequest { - url.takeFrom(baseUrl) - } + defaultRequest { + url.takeFrom(baseUrl) + } - install(ContentNegotiation) { - jackson { - registerModule(JavaTimeModule()) // support java.time.* types + install(ContentNegotiation) { + jackson { + registerModule(JavaTimeModule()) // support java.time.* types + } } - } - install(HttpTimeout) { - connectTimeoutMillis = 60.seconds.inWholeMilliseconds - requestTimeoutMillis = 90.seconds.inWholeMilliseconds + install(HttpTimeout) { + connectTimeoutMillis = 60.seconds.inWholeMilliseconds + requestTimeoutMillis = 90.seconds.inWholeMilliseconds + } } - } - override fun stream(): Sequence = runBlocking(Dispatchers.Default) { - val valueInCache = userCache.getFromCache() - .takeIf { it is CachedValue.CacheValue } - ?.getOrThrow() + override fun stream(): Sequence = + runBlocking(Dispatchers.Default) { + val valueInCache = + userCache.getFromCache() + .takeIf { it is CachedValue.CacheValue } + ?.getOrThrow() - (valueInCache ?: userCache.get()) - .stream() - .filter { it.isComplete() } - .asSequence() - } + (valueInCache ?: userCache.get()) + .stream() + .filter { it.isComplete() } + .asSequence() + } @Throws(IOException::class, UserNotAuthorizedException::class) override fun getAccessToken(user: User): String { @@ -189,12 +194,13 @@ class OuraServiceUserRepository : UserRepository { throw UserNotAuthorizedException("User is not authorized") } return runBlocking(Dispatchers.Default) { - val token = requestAccessToken(user) { - url("users/${user.id}/token") - method = HttpMethod.Post - setBody("{}") - contentType(ContentType.Application.Json) - } + val token = + requestAccessToken(user) { + url("users/${user.id}/token") + method = HttpMethod.Post + setBody("{}") + contentType(ContentType.Application.Json) + } credentialCache(user).set(token) token.accessToken } @@ -222,9 +228,10 @@ class OuraServiceUserRepository : UserRepository { throw ex } - fun hasPendingUpdates(): Boolean = runBlocking(Dispatchers.Default) { - userCache.isStale() - } + fun hasPendingUpdates(): Boolean = + runBlocking(Dispatchers.Default) { + userCache.isStale() + } @Throws(IOException::class) fun applyPendingUpdates() { @@ -237,26 +244,28 @@ class OuraServiceUserRepository : UserRepository { private suspend inline fun makeRequest( crossinline builder: HttpRequestBuilder.() -> Unit, - ): T = withContext(Dispatchers.IO) { - val response = client.request(builder) - val contentLength = response.contentLength() - val hasBody = contentLength != null && contentLength > 0 - if (response.status == HttpStatusCode.NotFound) { - throw NoSuchElementException("URL " + response.request.url + " does not exist") - } else if (!response.status.isSuccess() || !hasBody) { - val message = buildString { - append("Failed to make request (HTTP status code ") - append(response.status) - append(')') - if (hasBody) { - append(": ") - append(response.bodyAsText()) - } + ): T = + withContext(Dispatchers.IO) { + val response = client.request(builder) + val contentLength = response.contentLength() + val hasBody = contentLength != null && contentLength > 0 + if (response.status == HttpStatusCode.NotFound) { + throw NoSuchElementException("URL " + response.request.url + " does not exist") + } else if (!response.status.isSuccess() || !hasBody) { + val message = + buildString { + append("Failed to make request (HTTP status code ") + append(response.status) + append(')') + if (hasBody) { + append(": ") + append(response.bodyAsText()) + } + } + throw HttpResponseException(message, response.status.value) } - throw HttpResponseException(message, response.status.value) + mapper.readValue(response.bodyAsText()) } - mapper.readValue(response.bodyAsText()) - } companion object { private val logger = LoggerFactory.getLogger(OuraServiceUserRepository::class.java) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index 84bceba9..071865d2 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -68,10 +68,11 @@ constructor( val offset = ouraOffsetManager.getOffset(route, user) val startDate = user.startDate val startOffset: Instant = if (offset == null) { - logger.debug("No offsets found for $user, using the start date.") + logger.info("No offsets found for $user, using the start date.") startDate } else { - logger.debug("Offsets found in persistence.") + logger.info("Offsets found in persistence: " + offset.offset.toString()) + logger.info(offset.offset.coerceAtLeast(startDate).toString()) offset.offset.coerceAtLeast(startDate) } val endDate = if (user.endDate >= Instant.now()) Instant.now() else user.endDate From 20e389dd5c0ebba0f777eea1d1c91cc53948d9df Mon Sep 17 00:00:00 2001 From: Pauline Date: Thu, 18 Jan 2024 11:32:49 +0000 Subject: [PATCH 05/16] Update versions --- buildSrc/src/main/kotlin/Versions.kt | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index 1cd990fa..a8f5df3d 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -1,15 +1,5 @@ @Suppress("ConstPropertyName", "MemberVisibilityCanBePrivate") object Versions { - - object Plugins { - const val kotlin = "1.9.10" - const val kotlinSerialization = kotlin - const val kotlinAllOpen = kotlin - const val avro = "1.8.0" - const val gradle = "8.3" - } - - const val project = "0.4.2-SNAPSHOT" const val java = 11 From ab6ef4da9cf25609830d745c97d57f6b010bd878 Mon Sep 17 00:00:00 2001 From: Pauline Date: Thu, 18 Jan 2024 11:49:50 +0000 Subject: [PATCH 06/16] Rename user repositories --- ...RepositoryKtor.kt => OuraServiceUserRepository.kt} | 11 +++++------ ...tory.java => OuraServiceUserRepositoryLegacy.java} | 4 ++-- 2 files changed, 7 insertions(+), 8 deletions(-) rename kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/{OuraServiceUserRepositoryKtor.kt => OuraServiceUserRepository.kt} (97%) rename kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/{OuraServiceUserRepository.java => OuraServiceUserRepositoryLegacy.java} (98%) diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryKtor.kt b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt similarity index 97% rename from kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryKtor.kt rename to kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt index f2c50b41..a1492196 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryKtor.kt +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt @@ -57,7 +57,6 @@ import org.radarbase.ktor.auth.ClientCredentialsConfig import org.radarbase.ktor.auth.clientCredentials import org.radarbase.oura.user.OuraUser import org.radarbase.oura.user.User -import org.radarbase.oura.user.UserRepository import org.slf4j.LoggerFactory import java.io.IOException import java.util.concurrent.ConcurrentHashMap @@ -68,7 +67,7 @@ import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds @Suppress("unused") -class OuraServiceUserRepositoryKtor : UserRepository { +class OuraServiceUserRepository : OuraServiceUserRepositoryLegacy() { private lateinit var userCache: CachedSet private lateinit var client: HttpClient private val credentialCaches = ConcurrentHashMap>() @@ -82,7 +81,7 @@ class OuraServiceUserRepositoryKtor : UserRepository { makeRequest { url("users/$key") } } - fun initialize(config: OuraRestSourceConnectorConfig) { + override fun initialize(config: OuraRestSourceConnectorConfig) { val containedUsers = config.ouraUsers.toHashSet() client = @@ -189,7 +188,7 @@ class OuraServiceUserRepositoryKtor : UserRepository { } @Throws(IOException::class, UserNotAuthorizedException::class) - fun refreshAccessToken(user: User): String { + override fun refreshAccessToken(user: User): String { if (!user.isAuthorized) { throw UserNotAuthorizedException("User is not authorized") } @@ -228,13 +227,13 @@ class OuraServiceUserRepositoryKtor : UserRepository { throw ex } - fun hasPendingUpdates(): Boolean = + override fun hasPendingUpdates(): Boolean = runBlocking(Dispatchers.Default) { userCache.isStale() } @Throws(IOException::class) - fun applyPendingUpdates() { + override fun applyPendingUpdates() { logger.info("Requesting user information from webservice") runBlocking(Dispatchers.Default) { diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java similarity index 98% rename from kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.java rename to kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java index aa6f17ba..d267dbea 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java @@ -60,7 +60,7 @@ import static kotlin.sequences.SequencesKt.*; @SuppressWarnings("unused") - public class OuraServiceUserRepository implements UserRepository { + public class OuraServiceUserRepositoryLegacy implements UserRepository { Instant MIN_INSTANT = Instant.EPOCH; public static final JsonFactory JSON_FACTORY = new JsonFactory(); @@ -88,7 +88,7 @@ public class OuraServiceUserRepository implements UserRepository { private OAuth2Client repositoryClient; private String basicCredentials; - public OuraServiceUserRepository() { + public OuraServiceUserRepositoryLegacy() { this.client = new OkHttpClient.Builder() .connectTimeout(CONNECTION_TIMEOUT) .readTimeout(CONNECTION_READ_TIMEOUT) From fec341c52169ff35478d14cd36e2f1cc39387680 Mon Sep 17 00:00:00 2001 From: Pauline Date: Thu, 18 Jan 2024 11:52:21 +0000 Subject: [PATCH 07/16] Use OuraServiceUserRepositoryLegacy in the meantime --- .../connect/rest/oura/OuraRestSourceConnectorConfig.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java index 0acf9e89..873f58c9 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.errors.ConnectException; import org.radarbase.connect.rest.oura.user.OuraServiceUserRepository; +import org.radarbase.connect.rest.oura.user.OuraServiceUserRepositoryLegacy; public class OuraRestSourceConnectorConfig extends AbstractConfig { public static final Pattern COLON_PATTERN = Pattern.compile(":"); @@ -192,7 +193,7 @@ public String toString() { .define(OURA_USER_REPOSITORY_CONFIG, Type.CLASS, - OuraServiceUserRepository.class, + OuraServiceUserRepositoryLegacy.class, Importance.MEDIUM, OURA_USER_REPOSITORY_DOC, group, From 111eb9295c43474556645a029a813c6e83ca0d35 Mon Sep 17 00:00:00 2001 From: Pauline Date: Fri, 19 Jan 2024 15:22:53 +0000 Subject: [PATCH 08/16] Add OuraUserRepository abstract class and extend from this --- .../oura/OuraRestSourceConnectorConfig.java | 14 +++---- .../rest/oura/OuraSourceConnector.java | 4 +- .../connect/rest/oura/OuraSourceTask.java | 4 +- .../oura/user/OuraServiceUserRepository.kt | 2 +- .../user/OuraServiceUserRepositoryLegacy.java | 5 ++- .../rest/oura/user/OuraUserRepository.kt | 41 +++++++++++++++++++ .../oura/user/UserNotAuthorizedException.java | 2 +- 7 files changed, 57 insertions(+), 15 deletions(-) create mode 100644 kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraUserRepository.kt diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java index 873f58c9..97f6ebc4 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java @@ -43,8 +43,8 @@ import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.errors.ConnectException; +import org.radarbase.connect.rest.oura.user.OuraUserRepository; import org.radarbase.connect.rest.oura.user.OuraServiceUserRepository; -import org.radarbase.connect.rest.oura.user.OuraServiceUserRepositoryLegacy; public class OuraRestSourceConnectorConfig extends AbstractConfig { public static final Pattern COLON_PATTERN = Pattern.compile(":"); @@ -102,7 +102,7 @@ public class OuraRestSourceConnectorConfig extends AbstractConfig { private static final String OURA_USER_REPOSITORY_TOKEN_URL_DOC = "OAuth 2.0 token url for retrieving client credentials."; private static final String OURA_USER_REPOSITORY_TOKEN_URL_DISPLAY = "OAuth 2.0 token URL."; - private OuraServiceUserRepository userRepository; + private OuraUserRepository userRepository; private final Headers clientCredentials; public OuraRestSourceConnectorConfig(ConfigDef config, Map parsedConfig, boolean doLog) { @@ -193,7 +193,7 @@ public String toString() { .define(OURA_USER_REPOSITORY_CONFIG, Type.CLASS, - OuraServiceUserRepositoryLegacy.class, + OuraServiceUserRepository.class, Importance.MEDIUM, OURA_USER_REPOSITORY_DOC, group, @@ -255,7 +255,7 @@ public String getOuraClientSecret() { return getPassword(OURA_API_SECRET_CONFIG).value(); } - public OuraServiceUserRepository getUserRepository(OuraServiceUserRepository reuse) { + public OuraUserRepository getUserRepository(OuraUserRepository reuse) { if (reuse != null && reuse.getClass().equals(getClass(OURA_USER_REPOSITORY_CONFIG))) { userRepository = reuse; } else { @@ -265,15 +265,15 @@ public OuraServiceUserRepository getUserRepository(OuraServiceUserRepository reu return userRepository; } - public OuraServiceUserRepository getUserRepository() { + public OuraUserRepository getUserRepository() { userRepository.initialize(this); return userRepository; } @SuppressWarnings("unchecked") - public OuraServiceUserRepository createUserRepository() { + public OuraUserRepository createUserRepository() { try { - return ((Class) + return ((Class) getClass(OURA_USER_REPOSITORY_CONFIG)).getDeclaredConstructor().newInstance(); } catch (IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException e) { diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceConnector.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceConnector.java index f6959ee6..9bf6b865 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceConnector.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceConnector.java @@ -29,7 +29,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.radarbase.oura.user.User; -import org.radarbase.connect.rest.oura.user.OuraServiceUserRepository; +import org.radarbase.connect.rest.oura.user.OuraUserRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +45,7 @@ public class OuraSourceConnector extends AbstractRestSourceConnector { private static final Logger logger = LoggerFactory.getLogger(OuraSourceConnector.class); private ScheduledExecutorService executor; private Set configuredUsers; - private OuraServiceUserRepository repository; + private OuraUserRepository repository; @Override public void start(Map props) { diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java index 3cd47a93..293a792f 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java @@ -35,7 +35,7 @@ import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.radarbase.connect.rest.oura.offset.KafkaOffsetManager; -import org.radarbase.connect.rest.oura.user.OuraServiceUserRepository; +import org.radarbase.connect.rest.oura.user.OuraUserRepository; import org.radarbase.connect.rest.oura.util.VersionUtil; import org.radarbase.oura.converter.TopicData; import org.radarbase.oura.request.OuraRequestGenerator; @@ -57,7 +57,7 @@ public class OuraSourceTask extends SourceTask { private static final Logger logger = LoggerFactory.getLogger(OuraSourceTask.class); private OkHttpClient baseClient; - private OuraServiceUserRepository userRepository; + private OuraUserRepository userRepository; private List routes; private OuraRequestGenerator ouraRequestGenerator; private AvroData avroData = new AvroData(20); diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt index a1492196..ef20736f 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt @@ -67,7 +67,7 @@ import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds @Suppress("unused") -class OuraServiceUserRepository : OuraServiceUserRepositoryLegacy() { +class OuraServiceUserRepository : OuraUserRepository() { private lateinit var userCache: CachedSet private lateinit var client: HttpClient private val credentialCaches = ConcurrentHashMap>() diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java index d267dbea..3b8511f2 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java @@ -60,14 +60,14 @@ import static kotlin.sequences.SequencesKt.*; @SuppressWarnings("unused") - public class OuraServiceUserRepositoryLegacy implements UserRepository { + public class OuraServiceUserRepositoryLegacy extends OuraUserRepository { Instant MIN_INSTANT = Instant.EPOCH; public static final JsonFactory JSON_FACTORY = new JsonFactory(); public static final ObjectReader JSON_READER = new ObjectMapper(JSON_FACTORY) .registerModule(new JavaTimeModule()) .reader(); - private static final Logger logger = LoggerFactory.getLogger(OuraServiceUserRepository.class); + private static final Logger logger = LoggerFactory.getLogger(OuraServiceUserRepositoryLegacy.class); private static final ObjectReader USER_LIST_READER = JSON_READER.forType(OuraUsers.class); private static final ObjectReader USER_READER = JSON_READER.forType(User.class); @@ -153,6 +153,7 @@ public String getAccessToken(User user) throws IOException, UserNotAuthorizedExc } } + @Override public String refreshAccessToken(User user) throws IOException, UserNotAuthorizedException { if (!user.isAuthorized()) { throw new UserNotAuthorizedException("User is not authorized"); diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraUserRepository.kt b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraUserRepository.kt new file mode 100644 index 00000000..3a21cd5d --- /dev/null +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraUserRepository.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.connect.rest.oura.user + +import org.radarbase.connect.rest.oura.OuraRestSourceConnectorConfig +import org.radarbase.oura.user.User +import org.radarbase.oura.user.UserNotAuthorizedException +import org.radarbase.oura.user.UserRepository +import org.slf4j.LoggerFactory +import java.io.IOException + +@Suppress("unused") +abstract class OuraUserRepository : UserRepository { + abstract fun initialize(config: OuraRestSourceConnectorConfig) + + @Throws(IOException::class, UserNotAuthorizedException::class) + abstract fun refreshAccessToken(user: User): String + + @Throws(IOException::class) + abstract fun applyPendingUpdates() + + abstract fun hasPendingUpdates(): Boolean + + companion object { + private val logger = LoggerFactory.getLogger(OuraUserRepository::class.java) + } +} diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/UserNotAuthorizedException.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/UserNotAuthorizedException.java index ce705e7f..7832cf0b 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/UserNotAuthorizedException.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/UserNotAuthorizedException.java @@ -17,7 +17,7 @@ package org.radarbase.connect.rest.oura.user; -public class UserNotAuthorizedException extends Exception { +public class UserNotAuthorizedException extends RuntimeException { public UserNotAuthorizedException(String message) { super(message); } From d7a0e9c170414ac3e03b6b5b0f586164d77d1c93 Mon Sep 17 00:00:00 2001 From: Pauline Date: Fri, 19 Jan 2024 15:23:47 +0000 Subject: [PATCH 09/16] Update default userrepository in conifg --- .../connect/rest/oura/OuraRestSourceConnectorConfig.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java index 97f6ebc4..c21c038f 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java @@ -44,7 +44,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.errors.ConnectException; import org.radarbase.connect.rest.oura.user.OuraUserRepository; -import org.radarbase.connect.rest.oura.user.OuraServiceUserRepository; +import org.radarbase.connect.rest.oura.user.OuraServiceUserRepositoryLegacy; public class OuraRestSourceConnectorConfig extends AbstractConfig { public static final Pattern COLON_PATTERN = Pattern.compile(":"); @@ -193,7 +193,7 @@ public String toString() { .define(OURA_USER_REPOSITORY_CONFIG, Type.CLASS, - OuraServiceUserRepository.class, + OuraServiceUserRepositoryLegacy.class, Importance.MEDIUM, OURA_USER_REPOSITORY_DOC, group, From 985d433e467347286c0775ba54e703dfafa6d0c1 Mon Sep 17 00:00:00 2001 From: Pauline Date: Mon, 22 Jan 2024 13:10:54 +0000 Subject: [PATCH 10/16] Fix versions --- buildSrc/src/main/kotlin/Versions.kt | 2 +- kafka-connect-oura-source/build.gradle.kts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index a8f5df3d..f47e49e7 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -22,7 +22,7 @@ object Versions { const val okhttp = "4.11.0" const val firebaseAdmin = "9.1.0" - const val radarSchemas = "0.8.6-SNAPSHOT" + const val radarSchemas = "0.8.6" const val ktor = "2.3.5" const val junit = "5.9.3" diff --git a/kafka-connect-oura-source/build.gradle.kts b/kafka-connect-oura-source/build.gradle.kts index e0acc8d7..78149a71 100644 --- a/kafka-connect-oura-source/build.gradle.kts +++ b/kafka-connect-oura-source/build.gradle.kts @@ -4,7 +4,7 @@ dependencies { api(project(":oura-library")) api("io.confluent:kafka-connect-avro-converter:${Versions.confluent}") api("org.radarbase:radar-schemas-commons:${Versions.radarSchemas}") - implementation("org.radarbase:radar-commons-kotlin:1.1.1") + implementation("org.radarbase:radar-commons-kotlin:${Versions.radarCommons}") implementation("org.radarbase:oauth-client-util:${Versions.managementPortal}") api("com.squareup.okhttp3:okhttp:${Versions.okhttp}") From c76c8ad4e648c9032b797b0e63e3f7fe01b6d582 Mon Sep 17 00:00:00 2001 From: Pauline Date: Mon, 22 Jan 2024 13:11:34 +0000 Subject: [PATCH 11/16] Fix OuraServiceUserRepositoryLegacy and OuraRingConfigurationConverter --- .../rest/oura/user/OuraServiceUserRepositoryLegacy.java | 4 ++-- .../oura/converter/OuraRingConfigurationConverter.kt | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java index 3b8511f2..6cf055fe 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java @@ -63,8 +63,8 @@ public class OuraServiceUserRepositoryLegacy extends OuraUserRepository { Instant MIN_INSTANT = Instant.EPOCH; - public static final JsonFactory JSON_FACTORY = new JsonFactory(); - public static final ObjectReader JSON_READER = new ObjectMapper(JSON_FACTORY) + protected static final JsonFactory JSON_FACTORY = new JsonFactory(); + protected static final ObjectReader JSON_READER = new ObjectMapper(JSON_FACTORY) .registerModule(new JavaTimeModule()) .reader(); private static final Logger logger = LoggerFactory.getLogger(OuraServiceUserRepositoryLegacy.class); diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraRingConfigurationConverter.kt b/oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraRingConfigurationConverter.kt index 242bd60e..302ef722 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraRingConfigurationConverter.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraRingConfigurationConverter.kt @@ -48,7 +48,10 @@ class OuraRingConfigurationConverter( design = data.get("design").textValue()?.classifyDesign() firmwareVersion = data.get("firmware_version").textValue() hardwareType = data.get("hardware_type").textValue()?.classifyHardware() - setUpAt = if (setupTime == null) null else setupTime.toEpochMilli() / 1000.0 + setUpAt = + setupTime?.toEpochMilli()?.let { + it / 1000.0 + } size = data.get("size").intValue() }.build() } From ffcb2b48861355819776561559b6b43e13ff6bfa Mon Sep 17 00:00:00 2001 From: Pauline Date: Mon, 22 Jan 2024 13:42:03 +0000 Subject: [PATCH 12/16] Fix OuraRequestGenerator --- .../oura/request/OuraRequestGenerator.kt | 100 +++++++++++++----- 1 file changed, 72 insertions(+), 28 deletions(-) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index 071865d2..5e1089a5 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -16,14 +16,14 @@ import java.time.Duration import java.time.Instant import kotlin.streams.asSequence -class OuraRequestGenerator @JvmOverloads +class OuraRequestGenerator +@JvmOverloads constructor( private val userRepository: UserRepository, private val defaultQueryRange: Duration = Duration.ofDays(15), private val ouraOffsetManager: OuraOffsetManager, public val routes: List = OuraRouteFactory.getRoutes(userRepository), ) : RequestGenerator { - private val userNextRequest: MutableMap = mutableMapOf() public var nextRequestTime: Instant = Instant.MIN @@ -31,7 +31,10 @@ constructor( private val shouldBackoff: Boolean get() = Instant.now() < nextRequestTime - override fun requests(user: User, max: Int): Sequence { + override fun requests( + user: User, + max: Int, + ): Sequence { return if (user.ready()) { routes.asSequence() .flatMap { route -> @@ -43,7 +46,10 @@ constructor( } } - override fun requests(route: Route, max: Int): Sequence { + override fun requests( + route: Route, + max: Int, + ): Sequence { return userRepository .stream() .flatMap { user -> @@ -56,7 +62,11 @@ constructor( .takeWhile { !shouldBackoff } } - override fun requests(route: Route, user: User, max: Int): Sequence { + override fun requests( + route: Route, + user: User, + max: Int, + ): Sequence { return if (user.ready()) { return generateRequests(route, user).takeWhile { !shouldBackoff } } else { @@ -64,17 +74,21 @@ constructor( } } - fun generateRequests(route: Route, user: User): Sequence { + fun generateRequests( + route: Route, + user: User, + ): Sequence { val offset = ouraOffsetManager.getOffset(route, user) val startDate = user.startDate - val startOffset: Instant = if (offset == null) { - logger.info("No offsets found for $user, using the start date.") - startDate - } else { - logger.info("Offsets found in persistence: " + offset.offset.toString()) - logger.info(offset.offset.coerceAtLeast(startDate).toString()) - offset.offset.coerceAtLeast(startDate) - } + val startOffset: Instant = + if (offset == null) { + logger.info("No offsets found for $user, using the start date.") + startDate + } else { + val offsetTime = offset.offset + logger.info("Offsets found in persistence: " + offsetTime.toString()) + offsetTime.coerceAtLeast(startDate) + } val endDate = if (user.endDate >= Instant.now()) Instant.now() else user.endDate if (Duration.between(startOffset, endDate).toDays() <= ONE_DAY) { logger.info("Interval between dates is too short. Backing off..") @@ -85,25 +99,31 @@ constructor( return route.generateRequests(user, startOffset, endTime, USER_MAX_REQUESTS) } - fun handleResponse(req: RestRequest, response: Response): OuraResult> { + fun handleResponse( + req: RestRequest, + response: Response, + ): OuraResult> { if (response.isSuccessful) { return OuraResult.Success>(requestSuccessful(req, response)) } else { try { OuraResult.Error(requestFailed(req, response)) - } catch (e: TooManyRequestsException) {} finally { + } catch (e: TooManyRequestsException) { + } finally { return OuraResult.Success(listOf()) } } } - override fun requestSuccessful(request: RestRequest, response: Response): List { + override fun requestSuccessful( + request: RestRequest, + response: Response, + ): List { logger.debug("Request successful: {}..", request.request) val body: ResponseBody? = response.body val data = body?.bytes()!! - val records = request.route.converters.flatMap { - it.convert(request, response.headers, data) - } + val records = + request.route.converters.flatMap { it.convert(request, response.headers, data) } val offset = records.maxByOrNull { it -> it.offset }?.offset if (offset != null) { logger.info("Writing ${records.size} records to offsets...") @@ -112,7 +132,16 @@ constructor( request.user, Instant.ofEpochSecond(offset).plus(Duration.ofMillis(500)), ) - userNextRequest[request.user.versionedId] = Instant.now().plus(SUCCESS_BACK_OFF_TIME) + val nextRequestTime = userNextRequest[request.user.versionedId] + userNextRequest[request.user.versionedId] = + nextRequestTime?.let { + if (nextRequestTime > Instant.now()) { + nextRequestTime + } else { + Instant.now().plus(SUCCESS_BACK_OFF_TIME) + } + } + ?: Instant.now().plus(SUCCESS_BACK_OFF_TIME) } else { if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) { ouraOffsetManager.updateOffsets( @@ -125,7 +154,10 @@ constructor( return records } - override fun requestFailed(request: RestRequest, response: Response): OuraError { + override fun requestFailed( + request: RestRequest, + response: Response, + ): OuraError { return when (response.code) { 429 -> { logger.info("Too many requests, rate limit reached. Backing off...") @@ -134,10 +166,13 @@ constructor( } 403 -> { logger.warn( - "User ${request.user} has expired." + - "Please renew the subscription...", + "User ${request.user} has expired." + "Please renew the subscription...", ) - userNextRequest[request.user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME) + userNextRequest[request.user.versionedId] = + Instant.now() + .plus( + USER_BACK_OFF_TIME, + ) OuraAccessForbiddenError( "Oura subscription has expired or API data not available..", IOException("Unauthorized"), @@ -147,9 +182,14 @@ constructor( 401 -> { logger.warn( "User ${request.user} access token is" + - " expired, malformed, or revoked. " + response.body?.string(), + " expired, malformed, or revoked. " + + response.body?.string(), ) - userNextRequest[request.user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME) + userNextRequest[request.user.versionedId] = + Instant.now() + .plus( + USER_BACK_OFF_TIME, + ) OuraUnauthorizedAccessError( "Access token expired or revoked..", IOException("Unauthorized"), @@ -175,7 +215,11 @@ constructor( } 404 -> { logger.warn("Not found..") - OuraNotFoundError(response.body!!.string(), IOException("Data not found"), "404") + OuraNotFoundError( + response.body!!.string(), + IOException("Data not found"), + "404", + ) } else -> { logger.warn("Request Failed: {}, {}", request, response) From e40bdf3e8acbb3dabab0e4c5f3305e4689796f76 Mon Sep 17 00:00:00 2001 From: Pauline Date: Mon, 22 Jan 2024 14:59:17 +0000 Subject: [PATCH 13/16] Add back legacy UserRepository --- buildSrc/src/main/kotlin/Versions.kt | 2 + kafka-connect-fitbit-source/build.gradle.kts | 2 + .../FitbitRestSourceConnectorConfig.java | 24 +- .../rest/fitbit/user/ServiceUserRepository.kt | 5 +- .../user/ServiceUserRepositoryLegacy.java | 249 ++++++++++++++++++ 5 files changed, 269 insertions(+), 13 deletions(-) create mode 100644 kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepositoryLegacy.java diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index ae89e8ef..8766a71e 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -11,6 +11,8 @@ object Versions { const val kafka = "$confluent-ce" const val avro = "1.11.0" + const val managementPortal = "2.0.0" + // From image const val jackson = "2.14.2" diff --git a/kafka-connect-fitbit-source/build.gradle.kts b/kafka-connect-fitbit-source/build.gradle.kts index 932ccfef..f6994a25 100644 --- a/kafka-connect-fitbit-source/build.gradle.kts +++ b/kafka-connect-fitbit-source/build.gradle.kts @@ -6,7 +6,9 @@ dependencies { api("io.confluent:kafka-connect-avro-converter:${Versions.confluent}") api("org.radarbase:radar-schemas-commons:${Versions.radarSchemas}") implementation("org.radarbase:radar-commons-kotlin:${Versions.radarCommons}") + implementation("org.radarbase:oauth-client-util:${Versions.managementPortal}") + api("com.squareup.okhttp3:okhttp:${Versions.okhttp}") implementation(platform("com.fasterxml.jackson:jackson-bom:${Versions.jackson}")) implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310") diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java index ee978b10..edf00503 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java @@ -21,6 +21,8 @@ import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; import java.lang.reflect.InvocationTargetException; +import java.net.MalformedURLException; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; @@ -33,6 +35,8 @@ import io.ktor.http.URLParserException; import io.ktor.http.Url; import okhttp3.Headers; +import okhttp3.HttpUrl; + import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.NonEmptyString; @@ -530,18 +534,18 @@ public Path getFitbitUserCredentialsPath() { return Paths.get(getString(FITBIT_USER_CREDENTIALS_DIR_CONFIG)); } - public Url getFitbitUserRepositoryUrl() { + public HttpUrl getFitbitUserRepositoryUrl() { String urlString = getString(FITBIT_USER_REPOSITORY_URL_CONFIG).trim(); if (urlString.charAt(urlString.length() - 1) != '/') { urlString += '/'; } - try { - return URLBuilder(urlString).build(); - } catch (URLParserException ex) { + HttpUrl url = HttpUrl.parse(urlString); + if (url == null) { throw new ConfigException(FITBIT_USER_REPOSITORY_URL_CONFIG, getString(FITBIT_USER_REPOSITORY_URL_CONFIG), - "User repository URL " + urlString + " cannot be parsed as URL: " + ex); + "User repository URL " + urlString + " cannot be parsed as URL."); } + return url; } public Headers getClientCredentials() { @@ -584,17 +588,15 @@ public String getFitbitUserRepositoryClientSecret() { return getPassword(FITBIT_USER_REPOSITORY_CLIENT_SECRET_CONFIG).value(); } - public Url getFitbitUserRepositoryTokenUrl() { + public URL getFitbitUserRepositoryTokenUrl() { String value = getString(FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG); if (value == null || value.isEmpty()) { return null; } else { try { - return URLBuilder(value).build(); - } catch (URLParserException ex) { - throw new ConfigException(FITBIT_USER_REPOSITORY_URL_CONFIG, - getString(FITBIT_USER_REPOSITORY_URL_CONFIG), - "Fitbit user repository token URL " + value + " cannot be parsed as URL: " + ex); + return new URL(getString(FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG)); + } catch (MalformedURLException ex) { + throw new ConfigException("Fitbit user repository token URL is invalid."); } } } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt index d89a7dc3..c26c2af5 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt @@ -37,6 +37,7 @@ import io.ktor.client.statement.request import io.ktor.http.ContentType import io.ktor.http.HttpMethod import io.ktor.http.HttpStatusCode +import io.ktor.http.URLBuilder import io.ktor.http.Url import io.ktor.http.contentLength import io.ktor.http.contentType @@ -85,8 +86,8 @@ class ServiceUserRepository : UserRepository { val containedUsers = config.fitbitUsers.toHashSet() client = createClient( - baseUrl = config.fitbitUserRepositoryUrl, - tokenUrl = config.fitbitUserRepositoryTokenUrl, + baseUrl = URLBuilder(config.fitbitUserRepositoryUrl.toString()).build(), + tokenUrl = URLBuilder(config.fitbitUserRepositoryTokenUrl.toString()).build(), clientId = config.fitbitUserRepositoryClientId, clientSecret = config.fitbitUserRepositoryClientSecret, ) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepositoryLegacy.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepositoryLegacy.java new file mode 100644 index 00000000..2f5086d4 --- /dev/null +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepositoryLegacy.java @@ -0,0 +1,249 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + package org.radarbase.connect.rest.fitbit.user; + + import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.MIN_INSTANT; + import static org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator.JSON_READER; + + import com.fasterxml.jackson.core.JsonProcessingException; + import com.fasterxml.jackson.databind.ObjectReader; + import java.io.IOException; + import java.net.ProtocolException; + import java.net.URL; + import java.time.Duration; + import java.time.Instant; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Map; + import java.util.NoSuchElementException; + import java.util.Set; + import java.util.concurrent.atomic.AtomicReference; + import java.util.stream.Collectors; + import java.util.stream.Stream; + import okhttp3.Credentials; + import okhttp3.HttpUrl; + import okhttp3.MediaType; + import okhttp3.OkHttpClient; + import okhttp3.Request; + import okhttp3.RequestBody; + import okhttp3.Response; + import okhttp3.ResponseBody; + import org.apache.kafka.common.config.ConfigException; + import org.radarbase.connect.rest.RestSourceConnectorConfig; + import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig; + import org.radarbase.exception.TokenException; + import org.radarbase.oauth.OAuth2Client; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + @SuppressWarnings("unused") + public class ServiceUserRepositoryLegacy implements UserRepository { + private static final Logger logger = LoggerFactory.getLogger(ServiceUserRepositoryLegacy.class); + + private static final ObjectReader USER_LIST_READER = JSON_READER.forType(Users.class); + private static final ObjectReader USER_READER = JSON_READER.forType(User.class); + private static final ObjectReader OAUTH_READER = JSON_READER.forType(OAuth2UserCredentials.class); + private static final RequestBody EMPTY_BODY = + RequestBody.create("", MediaType.parse("application/json; charset=utf-8")); + private static final Duration FETCH_THRESHOLD = Duration.ofMinutes(1L); + private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(60); + private static final Duration CONNECTION_READ_TIMEOUT = Duration.ofSeconds(90); + + private final OkHttpClient client; + private final Map cachedCredentials; + private final AtomicReference nextFetch = new AtomicReference<>(MIN_INSTANT); + + private HttpUrl baseUrl; + private final HashSet containedUsers; + private Set timedCachedUsers = new HashSet<>(); + private OAuth2Client repositoryClient; + private String basicCredentials; + + public ServiceUserRepositoryLegacy() { + this.client = new OkHttpClient.Builder() + .connectTimeout(CONNECTION_TIMEOUT) + .readTimeout(CONNECTION_READ_TIMEOUT) + .build(); + this.cachedCredentials = new HashMap<>(); + this.containedUsers = new HashSet<>(); + } + + @Override + public User get(String key) throws IOException { + Request request = requestFor("users/" + key).build(); + return makeRequest(request, USER_READER); + } + + @Override + public void initialize(RestSourceConnectorConfig config) { + FitbitRestSourceConnectorConfig fitbitConfig = (FitbitRestSourceConnectorConfig) config; + this.baseUrl = fitbitConfig.getFitbitUserRepositoryUrl(); + this.containedUsers.addAll(fitbitConfig.getFitbitUsers()); + + URL tokenUrl = fitbitConfig.getFitbitUserRepositoryTokenUrl(); + String clientId = fitbitConfig.getFitbitUserRepositoryClientId(); + String clientSecret = fitbitConfig.getFitbitUserRepositoryClientSecret(); + + if (tokenUrl != null) { + if (clientId.isEmpty()) { + throw new ConfigException("Client ID for user repository is not set."); + } + this.repositoryClient = new OAuth2Client.Builder() + .credentials(clientId, clientSecret) + .endpoint(tokenUrl) + .scopes("SUBJECT.READ MEASUREMENT.CREATE") + .httpClient(client) + .build(); + } else if (clientId != null) { + basicCredentials = Credentials.basic(clientId, clientSecret); + } + } + + @Override + public Stream stream() { + if (nextFetch.get().equals(MIN_INSTANT)) { + try { + applyPendingUpdates(); + } catch (IOException ex) { + logger.error("Failed to initially get users from repository", ex); + } + } + return this.timedCachedUsers.stream() + .filter(User::isComplete); + } + + @Override + public String getAccessToken(User user) throws IOException, UserNotAuthorizedException { + if (!user.isAuthorized()) { + throw new UserNotAuthorizedException("User is not authorized"); + } + OAuth2UserCredentials credentials = cachedCredentials.get(user.getId()); + if (credentials != null && !credentials.isAccessTokenExpired()) { + return credentials.getAccessToken(); + } else { + Request request = requestFor("users/" + user.getId() + "/token").build(); + return requestAccessToken(user, request); + } + } + + @Override + public String refreshAccessToken(User user) throws IOException, UserNotAuthorizedException { + if (!user.isAuthorized()) { + throw new UserNotAuthorizedException("User is not authorized"); + } + Request request = requestFor("users/" + user.getId() + "/token") + .post(EMPTY_BODY) + .build(); + return requestAccessToken(user, request); + } + + private String requestAccessToken(User user, Request request) + throws UserNotAuthorizedException, IOException { + try { + OAuth2UserCredentials credentials = makeRequest(request, OAUTH_READER); + cachedCredentials.put(user.getId(), credentials); + return credentials.getAccessToken(); + } catch (HttpResponseException ex) { + if (ex.getStatusCode() == 407) { + cachedCredentials.remove(user.getId()); + if (user instanceof LocalUser) { + ((LocalUser) user).setIsAuthorized(false); + } + throw new UserNotAuthorizedException(ex.getMessage()); + } + throw ex; + } + } + + @Override + public boolean hasPendingUpdates() { + Instant nextFetchTime = nextFetch.get(); + Instant now = Instant.now(); + return now.isAfter(nextFetchTime); + } + + @Override + public void applyPendingUpdates() throws IOException { + logger.info("Requesting user information from webservice"); + Request request = requestFor("users?source-type=FitBit").build(); + this.timedCachedUsers = + this.makeRequest(request, USER_LIST_READER).getUsers().stream() + .filter(u -> u.isComplete() + && (containedUsers.isEmpty() + || containedUsers.contains(u.getVersionedId()))) + .collect(Collectors.toSet()); + nextFetch.set(Instant.now().plus(FETCH_THRESHOLD)); + } + + private Request.Builder requestFor(String relativeUrl) throws IOException { + HttpUrl url = baseUrl.resolve(relativeUrl); + if (url == null) { + throw new IllegalArgumentException("Relative URL is invalid"); + } + Request.Builder builder = new Request.Builder().url(url); + String authorization = requestAuthorization(); + if (authorization != null) { + builder.addHeader("Authorization", authorization); + } + + return builder; + } + + private String requestAuthorization() throws IOException { + if (repositoryClient != null) { + try { + return "Bearer " + repositoryClient.getValidToken().getAccessToken(); + } catch (TokenException ex) { + throw new IOException(ex); + } + } else if (basicCredentials != null) { + return basicCredentials; + } else { + return null; + } + } + + private T makeRequest(Request request, ObjectReader reader) throws IOException { + logger.info("Requesting info from {}", request.url()); + try (Response response = client.newCall(request).execute()) { + ResponseBody body = response.body(); + + if (response.code() == 404) { + throw new NoSuchElementException("URL " + request.url() + " does not exist"); + } else if (!response.isSuccessful() || body == null) { + String message = "Failed to make request"; + if (response.code() > 0) { + message += " (HTTP status code " + response.code() + ')'; + } + if (body != null) { + message += body.string(); + } + throw new HttpResponseException(message, response.code()); + } + String bodyString = body.string(); + try { + return reader.readValue(bodyString); + } catch (JsonProcessingException ex) { + logger.error("Failed to parse JSON: {}\n{}", ex, bodyString); + throw ex; + } + } catch (ProtocolException ex) { + throw new IOException("Failed to make request to user repository", ex); + } + } + } \ No newline at end of file From 6b6055be784a136114f6d1052e4aa3b2a5faa61c Mon Sep 17 00:00:00 2001 From: Pauline Date: Mon, 22 Jan 2024 15:23:37 +0000 Subject: [PATCH 14/16] Remove unused imports --- .../connect/rest/fitbit/FitbitRestSourceConnectorConfig.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java index edf00503..ceafad5a 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java @@ -17,7 +17,6 @@ package org.radarbase.connect.rest.fitbit; -import static io.ktor.http.URLUtilsKt.URLBuilder; import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; import java.lang.reflect.InvocationTargetException; @@ -32,8 +31,6 @@ import java.util.List; import java.util.Map; -import io.ktor.http.URLParserException; -import io.ktor.http.Url; import okhttp3.Headers; import okhttp3.HttpUrl; From e60ddd04dd1a4129f0b44b4f3686b7fdf352360a Mon Sep 17 00:00:00 2001 From: Pauline Date: Mon, 22 Jan 2024 15:29:57 +0000 Subject: [PATCH 15/16] Fix OuraRequestGenerator --- .../radarbase/oura/request/OuraRequestGenerator.kt | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index 5e1089a5..d7729270 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -132,16 +132,17 @@ constructor( request.user, Instant.ofEpochSecond(offset).plus(Duration.ofMillis(500)), ) - val nextRequestTime = userNextRequest[request.user.versionedId] + val currentNextRequestTime = userNextRequest[request.user.versionedId] + val nextRequestTime = Instant.now().plus(SUCCESS_BACK_OFF_TIME) userNextRequest[request.user.versionedId] = - nextRequestTime?.let { - if (nextRequestTime > Instant.now()) { - nextRequestTime + currentNextRequestTime?.let { + if (currentNextRequestTime > nextRequestTime) { + currentNextRequestTime } else { - Instant.now().plus(SUCCESS_BACK_OFF_TIME) + nextRequestTime } } - ?: Instant.now().plus(SUCCESS_BACK_OFF_TIME) + ?: nextRequestTime } else { if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) { ouraOffsetManager.updateOffsets( From 7d9711b72cb4f2a38b09d8ac474bdde6ea09ee7e Mon Sep 17 00:00:00 2001 From: Pauline Date: Tue, 23 Jan 2024 12:30:49 +0000 Subject: [PATCH 16/16] Delete unnecessary files --- .DS_Store | Bin 8196 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 .DS_Store diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 1765e0788397301b4252f190d8c349b07f56cc5e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 8196 zcmeHMyKWOf6g?A6us{hZjgVlGA`nPMq)EL*i)bTRf+9`g2Tp{|THd(dhB^g5Km*kL z0Yc&vQX=sWC_QH$wr6&;E*u4ra>trEtGRdXoINw1^<4u{JCp7f&;U?p)wpqs%{fKe zbuE<@*Ao#H$79gnYIV}#sN^XfqJStM3Wx%tfGF_4D8Og7F7=A%zO_m#3Wx$%QUQHF zM64QPhq*;_bg(fg0I|kyQ+U0hFNl%D*kNvw78YfP675jsQVeB>vp;fvvBTV=9S-G^ z59MZ7E<;gfcE*p4J5+2@T2Vk0C@T=LtA?lO;|Rw={{Ha#E8jLulcYUJ+w7}1c5lD` z{QAf9(B55WA02w{1|B`*P~4c#i$hG%4ra%LFJ%v(!}i(QltUrTFr720_6qpG@TsFs z9!D6Uix$S{1dLWszmZYN$LF-UwtI52XLLEoZ_a9wU%>08;k8OWDLQzI9>xJD-1|gM zf!*h`zPUctsNgf_Qsooyz&@wpJbkKiKsE#PLLR^OigusJ_s1R&FNVKTRXza^d%EAH z5(ju=+9q{L@s6J!ua41VKX)=$QnyWHv`+L|78tL5{6NM_M)L11OJ?`BbhPE8t_lXPT1~QQ)!_xMnA4==}dh_51(JrXle~0a4%|Dxhjf zE7>6dms^*p=v+Hu-DlNB`{fqR5H=n{TQJVc)V LW(}mddJ6mjk?CWq