Skip to content

Commit

Permalink
general sync high load improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
chriku committed Sep 25, 2024
1 parent b1ebf2b commit 1304630
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.apollographql.apollo3.ApolloClient
import com.apollographql.apollo3.api.ApolloResponse
import com.apollographql.apollo3.api.Mutation
import com.apollographql.apollo3.api.Query
import com.apollographql.apollo3.network.http.HttpInfo
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import gropius.model.architecture.IMSProject
Expand Down Expand Up @@ -36,6 +37,9 @@ import org.springframework.data.neo4j.core.findById
import org.springframework.stereotype.Component
import java.time.OffsetDateTime
import java.util.*
import java.util.concurrent.ConcurrentHashMap

class GitHubResponseException(val errors: List<com.apollographql.apollo3.api.Error>) : Exception(errors.toString())

/**
* Service to handle data from GitHub
Expand Down Expand Up @@ -63,6 +67,7 @@ class GithubDataService(

companion object {
const val FALLBACK_USER_NAME = "github"
val TOKEN_WOUND_UP_IN_JAIL = ConcurrentHashMap<String, OffsetDateTime>()
}

/**
Expand Down Expand Up @@ -206,6 +211,27 @@ class GithubDataService(
return label
}

/**
* Check windup timer for token
* @param token the token to check
* @return true if the token is in timeout
*/
suspend fun tokenInTimeout(token: String): Boolean {
if (TOKEN_WOUND_UP_IN_JAIL.contains(token)) {
return TOKEN_WOUND_UP_IN_JAIL[token]!! > OffsetDateTime.now()
}
return false
}

/**
* Windup request blocking timer
* @param token the token to windup
* @param seconds the time to windup
*/
suspend fun windToken(token: String, seconds: Int) {
TOKEN_WOUND_UP_IN_JAIL[token] = OffsetDateTime.now().plusSeconds(seconds.toLong())
}

/**
* Send a mutation to the IMS
*
Expand All @@ -230,14 +256,24 @@ class GithubDataService(
}
logger.info("Requesting with users: $userList")
return tokenManager.executeUntilWorking(imsProject, userList, owner) { token ->
if (tokenInTimeout(token.token!!)) {
return@executeUntilWorking Optional.empty()
}
val apolloClient = ApolloClient.Builder().serverUrl(imsConfig.graphQLUrl.toString())
.addHttpHeader("Authorization", "Bearer ${token.token}").build()
val res = apolloClient.mutation(body).execute()
logger.info("Response Code for request with token $token is ${res.data} ${res.errors}")
val headers = res.executionContext[HttpInfo]?.headers
if ((headers?.firstOrNull { it.name == "x-ratelimit-remaining" }?.value?.toInt() ?: 0) < 100) {
windToken(token.token!!, 3600)
}
if (res.errors?.isNotEmpty() != true) {
Optional.of(res)
} else {
} else if (res.errors?.all { it.nonStandardFields?.get("type") == "RATE_LIMITED" } == true) {
windToken(token.token!!, 10800)
Optional.empty()
} else {
throw GitHubResponseException(res.errors!!)
}
}
}
Expand Down Expand Up @@ -270,14 +306,24 @@ class GithubDataService(
}
logger.info("Requesting with users: $userList ")
return tokenManager.executeUntilWorking(imsProject, userList, listOf()) { token ->
if (tokenInTimeout(token.token!!)) {
return@executeUntilWorking Optional.empty()
}
val apolloClient = ApolloClient.Builder().serverUrl(imsConfig.graphQLUrl.toString())
.addHttpHeader("Authorization", "Bearer ${token.token}").build()
val res = apolloClient.query(body).execute()
logger.info("Response Code for request with token $token is ${res.data} ${res.errors}")
val headers = res.executionContext[HttpInfo]?.headers
if ((headers?.firstOrNull { it.name == "x-ratelimit-remaining" }?.value?.toInt() ?: 0) < 100) {
windToken(token.token!!, 3600)
}
if (res.errors?.isNotEmpty() != true) {
Optional.of(res)
} else {
} else if (res.errors?.all { it.nonStandardFields?.get("type") == "RATE_LIMITED" } == true) {
windToken(token.token!!, 10800)
Optional.empty()
} else {
throw GitHubResponseException(res.errors!!)
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions sync-github/src/main/kotlin/gropius/sync/github/GithubSync.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import gropius.model.issue.Label
import gropius.model.issue.timeline.IssueComment
import gropius.model.template.IMSTemplate
import gropius.model.template.IssueState
import gropius.model.user.GropiusUser
import gropius.model.user.IMSUser
import gropius.model.user.User
import gropius.sync.*
import gropius.sync.github.config.IMSConfigManager
Expand Down Expand Up @@ -127,7 +125,7 @@ final class GithubSync(
imsProject, it.id!!, GitHubResourceWalkerConfig(
CursorResourceWalkerConfig<GithubGithubResourceWalkerBudgetUsageType, GithubGithubResourceWalkerEstimatedBudgetUsageType>(
1.0,
0.1,
0.001,
GithubGithubResourceWalkerEstimatedBudgetUsageType(),
GithubGithubResourceWalkerBudgetUsageType()
), imsProjectConfig.repo.owner, imsProjectConfig.repo.repo, 100
Expand All @@ -142,7 +140,7 @@ final class GithubSync(
imsProject, dirtyIssue.id!!, comment.githubId, GitHubResourceWalkerConfig(
CursorResourceWalkerConfig<GithubGithubResourceWalkerBudgetUsageType, GithubGithubResourceWalkerEstimatedBudgetUsageType>(
1.0,
0.1,
0.001,
GithubGithubResourceWalkerEstimatedBudgetUsageType(),
GithubGithubResourceWalkerBudgetUsageType()
), imsProjectConfig.repo.owner, imsProjectConfig.repo.repo, 100
Expand All @@ -156,7 +154,7 @@ final class GithubSync(
override suspend fun findUnsyncedIssues(imsProject: IMSProject): List<IncomingIssue> {
return issuePileService.findByImsProjectAndHasUnsyncedData(imsProject.rawId!!, true)
}

override suspend fun syncComment(
imsProject: IMSProject, issueId: String, issueComment: IssueComment, users: List<User>
): TimelineItemConversionInformation? {
Expand Down
19 changes: 10 additions & 9 deletions sync/src/main/kotlin/gropius/sync/AbstractSync.kt
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ abstract class AbstractSync(
* Sync Incoming Part
* @param imsProject IMS project to sync
*/
suspend fun doIncoming(imsProject: IMSProject) {
private suspend fun doIncoming(imsProject: IMSProject) {
val dereplicatorRequest = SimpleIssueDereplicatorRequest(
collectedSyncInfo.neoOperations.findAll<GropiusUser>().filter { it.username == "gropius" }.firstOrNull()
?: collectedSyncInfo.neoOperations.save(
Expand All @@ -239,14 +239,15 @@ abstract class AbstractSync(
)
try {
findUnsyncedIssues(imsProject).forEach {
syncIncomingIssue(imsProject, it, dereplicatorRequest)
//} catch (e: SyncNotificator.NotificatedError) {
// syncNotificator.sendNotification(
// imsIssue, SyncNotificator.NotificationDummy(e)
// )
//} catch (e: Exception) {
// logger.warn("Error in issue sync", e)
//}
try {
syncIncomingIssue(imsProject, it, dereplicatorRequest)/*
} catch (e: SyncNotificator.NotificatedError) {
syncNotificator.sendNotification(
imsIssue, SyncNotificator.NotificationDummy(e)
)*/
} catch (e: Exception) {
logger.warn("Exception in issue sync", e)
}
}
} catch (e: SyncNotificator.NotificatedError) {
logger.warn("Error in IMSProject sync", e)
Expand Down
5 changes: 4 additions & 1 deletion sync/src/main/kotlin/gropius/sync/CursorResourceWalker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ abstract class CursorResourceWalker<BudgetUsageType, EstimatedBudgetUsageType, B
}
} else {
cursorResourceWalkerDataService.changePriority(
imsProject, resource, { it + resourceWalkerConfig.priorityIncrease }, resourceWalkerConfig.basePriority
imsProject,
resource,
{ it + resourceWalkerConfig.priorityIncrease + resourceWalkerConfig.priorityIncrease * Math.random() },
resourceWalkerConfig.basePriority
);
}
}
Expand Down
8 changes: 6 additions & 2 deletions sync/src/main/kotlin/gropius/sync/LoadBalancedDataFetcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ class LoadBalancedDataFetcher() : DataFetcher {
val walkers = walkerPairs.sortedBy { it.first }.map { it.second }
for ((walker, imsProject) in walkers) {
logger.info("Executing walker for ${imsProject.rawId!!}")
walker.process()
logger.info("Executed walker for ${imsProject.rawId!!}")
try {
walker.process()
logger.info("Executed walker for ${imsProject.rawId!!}")
} catch (e: Exception) {
logger.warn("Exception in walker for ${imsProject.rawId!!}", e)
}
}
}
}
7 changes: 6 additions & 1 deletion sync/src/main/kotlin/gropius/sync/TokenManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ interface BaseResponseType {
@Serializable
data class LinkImsUserQuery(val imsUserIds: List<String>)

/**
* Exception thrown when no valid token is available
*/
class NoTokenValidException : Exception()

/**
* Manager for token from login service
* @param neoOperations Reference for the spring instance of ReactiveNeo4jOperations
Expand Down Expand Up @@ -209,7 +214,7 @@ abstract class TokenManager<ResponseType : BaseResponseType>(
logger.trace("User $user does not allow sync from $owner")
}
}
TODO("Error Message for no working users")
throw NoTokenValidException()
}

/**
Expand Down

0 comments on commit 1304630

Please sign in to comment.