Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Sep 25, 2024
1 parent eba9ae9 commit bf316e0
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 40 deletions.
5 changes: 0 additions & 5 deletions src/main/kotlin/LocalEvaluationClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import com.amplitude.experiment.evaluation.EvaluationEngineImpl
import com.amplitude.experiment.evaluation.EvaluationFlag
import com.amplitude.experiment.evaluation.topologicalSort
import com.amplitude.experiment.flag.DynamicFlagConfigApi
import com.amplitude.experiment.flag.FlagConfigPoller
import com.amplitude.experiment.flag.FlagConfigStreamApi
import com.amplitude.experiment.flag.InMemoryFlagConfigStorage
import com.amplitude.experiment.util.LocalEvaluationMetricsWrapper
Expand All @@ -34,10 +33,6 @@ import com.amplitude.experiment.util.wrapMetrics
import okhttp3.HttpUrl
import okhttp3.HttpUrl.Companion.toHttpUrl
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
import okhttp3.sse.EventSources

class LocalEvaluationClient internal constructor(
apiKey: String,
Expand Down
15 changes: 7 additions & 8 deletions src/main/kotlin/flag/FlagConfigStreamApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

internal open class FlagConfigStreamApiError(message: String?, cause: Throwable?): Exception(message, cause) {
internal open class FlagConfigStreamApiError(message: String?, cause: Throwable?) : Exception(message, cause) {
constructor(message: String?) : this(message, null)
constructor(cause: Throwable?) : this(cause?.toString(), cause)
}
internal class FlagConfigStreamApiConnTimeoutError: FlagConfigStreamApiError("Initial connection timed out")
internal class FlagConfigStreamApiDataCorruptError: FlagConfigStreamApiError("Stream data corrupted")
internal class FlagConfigStreamApiStreamError(cause: Throwable?): FlagConfigStreamApiError("Stream error", cause)
internal class FlagConfigStreamApiConnTimeoutError : FlagConfigStreamApiError("Initial connection timed out")
internal class FlagConfigStreamApiDataCorruptError : FlagConfigStreamApiError("Stream data corrupted")
internal class FlagConfigStreamApiStreamError(cause: Throwable?) : FlagConfigStreamApiError("Stream error", cause)

private const val CONNECTION_TIMEOUT_MILLIS_DEFAULT = 1500L
private const val KEEP_ALIVE_TIMEOUT_MILLIS_DEFAULT = 17000L // keep alive sends at 15s interval. 2s grace period
private const val RECONN_INTERVAL_MILLIS_DEFAULT = 15 * 60 * 1000L
internal class FlagConfigStreamApi (
internal class FlagConfigStreamApi(
deploymentKey: String,
serverUrl: HttpUrl,
httpClient: OkHttpClient = OkHttpClient(),
Expand All @@ -42,7 +42,8 @@ internal class FlagConfigStreamApi (
httpClient,
connectionTimeoutMillis,
keepaliveTimeoutMillis,
reconnIntervalMillis)
reconnIntervalMillis
)

/**
* Connects to flag configs stream.
Expand Down Expand Up @@ -82,7 +83,6 @@ internal class FlagConfigStreamApi (
} catch (_: Throwable) {
updateTimeoutFuture.completeExceptionally(FlagConfigStreamApiDataCorruptError())
}

} else {
// Stream has already established.
// Make sure valid data.
Expand All @@ -98,7 +98,6 @@ internal class FlagConfigStreamApi (
// Stream corrupted. Reconnect.
handleError(onError, FlagConfigStreamApiDataCorruptError())
}

}
}
val onSseError: ((Throwable?) -> Unit) = { t ->
Expand Down
9 changes: 5 additions & 4 deletions src/main/kotlin/flag/FlagConfigUpdater.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ internal class FlagConfigPoller(
cohortStorage: CohortStorage?,
private val config: LocalEvaluationConfig,
private val metrics: LocalEvaluationMetrics = LocalEvaluationMetricsWrapper(),
): FlagConfigUpdater, FlagConfigUpdaterBase(
) : FlagConfigUpdater, FlagConfigUpdaterBase(
storage, cohortLoader, cohortStorage
) {
private val lock: ReentrantLock = ReentrantLock()
Expand Down Expand Up @@ -191,7 +191,7 @@ internal class FlagConfigStreamer(
cohortLoader: CohortLoader?,
cohortStorage: CohortStorage?,
private val metrics: LocalEvaluationMetrics = LocalEvaluationMetricsWrapper()
): FlagConfigUpdater, FlagConfigUpdaterBase(
) : FlagConfigUpdater, FlagConfigUpdaterBase(
storage, cohortLoader, cohortStorage
) {
private val lock: ReentrantLock = ReentrantLock()
Expand Down Expand Up @@ -240,7 +240,7 @@ internal class FlagConfigFallbackRetryWrapper(
private val fallbackUpdater: FlagConfigUpdater?,
retryDelayMillis: Long = RETRY_DELAY_MILLIS_DEFAULT,
maxJitterMillis: Long = MAX_JITTER_MILLIS_DEFAULT,
): FlagConfigUpdater {
) : FlagConfigUpdater {
private val lock: ReentrantLock = ReentrantLock()
private val reconnIntervalRange = max(0, retryDelayMillis - maxJitterMillis)..(min(retryDelayMillis, Long.MAX_VALUE - maxJitterMillis) + maxJitterMillis)
private val executor = Executors.newScheduledThreadPool(1, daemonFactory)
Expand Down Expand Up @@ -320,5 +320,6 @@ internal class FlagConfigFallbackRetryWrapper(
scheduleRetry()
}
}, reconnIntervalRange.random(), TimeUnit.MILLISECONDS)
}
}
}
8 changes: 4 additions & 4 deletions src/main/kotlin/util/SseStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import kotlin.concurrent.withLock
import kotlin.math.max
import kotlin.math.min

internal class StreamException(error: String): Throwable(error)
internal class StreamException(error: String) : Throwable(error)

private const val KEEP_ALIVE_TIMEOUT_MILLIS_DEFAULT = 0L // no timeout
private const val RECONN_INTERVAL_MILLIS_DEFAULT = 30 * 60 * 1000L
Expand All @@ -26,7 +26,7 @@ private const val KEEP_ALIVE_DATA = " "
/**
* For establishing an SSE stream.
*/
internal class SseStream (
internal class SseStream(
authToken: String, // Will be used in header as Authorization: <authToken>
url: HttpUrl, // The full url to connect to.
httpClient: OkHttpClient = OkHttpClient(),
Expand Down Expand Up @@ -128,7 +128,7 @@ internal class SseStream (
this.onUpdate = onUpdate
this.onError = onError
es = client.newEventSource(request, eventSourceListener)
reconnectTimerTask = Timer().schedule(reconnIntervalRange.random()) {// Timer for a new event source.
reconnectTimerTask = Timer().schedule(reconnIntervalRange.random()) { // Timer for a new event source.
// This forces client side reconnection after interval.
this@SseStream.cancel()
connect(onUpdate, onError)
Expand All @@ -155,4 +155,4 @@ internal class SseStream (
this.onError = null
}
}
}
}
1 change: 0 additions & 1 deletion src/test/kotlin/LocalEvaluationClientTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
import kotlin.system.measureNanoTime
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test

private const val API_KEY = "server-qz35UwzJ5akieoAdIgzM4m9MIiOLXLoz"
Expand Down
4 changes: 2 additions & 2 deletions src/test/kotlin/flag/FlagConfigStreamApiTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ class FlagConfigStreamApiTest {
var err: Array<Throwable?> = arrayOf()

val run = async {
api.connect({d ->
api.connect({ d ->
assertEquals(listOf(), d)
}, {d ->
}, { d ->
assertEquals(listOf(), d)
}, { t ->
err += t
Expand Down
24 changes: 16 additions & 8 deletions src/test/kotlin/flag/FlagConfigUpdaterTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@ package com.amplitude.experiment.flag

import com.amplitude.experiment.LocalEvaluationConfig
import com.amplitude.experiment.evaluation.EvaluationFlag
import com.amplitude.experiment.util.SseStream
import io.mockk.*
import io.mockk.clearAllMocks
import io.mockk.every
import io.mockk.justRun
import io.mockk.mockk
import io.mockk.slot
import io.mockk.verify
import org.junit.Assert.assertEquals
import java.lang.Exception
import kotlin.test.*
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.fail

private val FLAG1 = EvaluationFlag("key1", emptyMap(), emptyList())
private val FLAG2 = EvaluationFlag("key2", emptyMap(), emptyList())
Expand Down Expand Up @@ -65,7 +73,7 @@ class FlagConfigPollerTest {
}

@Test
fun `Test Poller start fails`(){
fun `Test Poller start fails`() {
every { fetchApi.getFlagConfigs() } answers { throw Error("Haha error") }
val poller = FlagConfigPoller(fetchApi, storage, null, null, LocalEvaluationConfig(flagConfigPollerIntervalMillis = 1000))
var errorCount = 0
Expand All @@ -85,7 +93,7 @@ class FlagConfigPollerTest {
}

@Test
fun `Test Poller poll fails`(){
fun `Test Poller poll fails`() {
every { fetchApi.getFlagConfigs() } returns emptyList()
val poller = FlagConfigPoller(fetchApi, storage, null, null, LocalEvaluationConfig(flagConfigPollerIntervalMillis = 1000))
var errorCount = 0
Expand Down Expand Up @@ -154,7 +162,7 @@ class FlagConfigStreamerTest {
}

@Test
fun `Test Streamer start fails`(){
fun `Test Streamer start fails`() {
every { streamApi.connect(capture(onUpdateCapture), capture(onUpdateCapture), capture(onErrorCapture)) } answers { throw Error("Haha error") }
val streamer = FlagConfigStreamer(streamApi, storage, null, null)
var errorCount = 0
Expand All @@ -168,7 +176,7 @@ class FlagConfigStreamerTest {
}

@Test
fun `Test Streamer stream fails`(){
fun `Test Streamer stream fails`() {
justRun { streamApi.connect(capture(onUpdateCapture), capture(onUpdateCapture), capture(onErrorCapture)) }
val streamer = FlagConfigStreamer(streamApi, storage, null, null)
var errorCount = 0
Expand Down Expand Up @@ -398,4 +406,4 @@ class FlagConfigFallbackRetryWrapperTest {
}
verify(exactly = 0) { mainUpdater.start() }
}
}
}
16 changes: 8 additions & 8 deletions src/test/kotlin/util/SseStreamTest.kt
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package com.amplitude.experiment.util

import com.amplitude.experiment.ExperimentUser
import com.amplitude.experiment.RemoteEvaluationClient
import io.mockk.*
import okhttp3.HttpUrl
import io.mockk.clearAllMocks
import io.mockk.every
import io.mockk.justRun
import io.mockk.mockk
import io.mockk.mockkConstructor
import io.mockk.mockkStatic
import io.mockk.slot
import io.mockk.verify
import okhttp3.HttpUrl.Companion.toHttpUrl
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
import org.mockito.Mockito
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals


class SseStreamTest {
private val listenerCapture = slot<EventSourceListener>()
private val clientMock = mockk<OkHttpClient>()
Expand Down

0 comments on commit bf316e0

Please sign in to comment.