Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 5-second throttling to all subscriptions #4559

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import clue.StreamingClient
import crystal.Pot
import explore.common.UserPreferencesQueries.GlobalUserPreferences
import explore.common.UserPreferencesQueries.GridLayouts
import explore.model.Constants
import explore.model.ExploreGridLayouts
import explore.model.GlobalPreferences
import explore.model.UserPreferences
import explore.model.enums.GridLayoutSection
import explore.model.layout
import explore.model.layout.LayoutsMap
import explore.utils.*
import japgolly.scalajs.react.*
import lucuma.core.model.User
import lucuma.react.common.ReactFnProps
Expand Down Expand Up @@ -64,17 +66,19 @@ object PreferencesCacheController
.subscribe[IO](props.userId.show)
.ignoreGraphQLErrors
.map:
_.map: data =>
UserPreferences.gridLayouts
.modify(GridLayouts.updateLayouts(data.lucumaGridLayoutPositions))
_.throttle(Constants.SubscriptionThrottle)
.map: data =>
UserPreferences.gridLayouts
.modify(GridLayouts.updateLayouts(data.lucumaGridLayoutPositions))

val updateGlobalPreferences: Resource[IO, fs2.Stream[IO, UserPreferences => UserPreferences]] =
UserPreferencesUpdates
.subscribe[IO](props.userId.show)
.ignoreGraphQLErrors
.map:
_.map: data =>
UserPreferences.globalPreferences
.modify(_ => data.lucumaUserPreferencesByPk.getOrElse(GlobalPreferences.Default))
_.throttle(Constants.SubscriptionThrottle)
.map: data =>
UserPreferences.globalPreferences
.modify(_ => data.lucumaUserPreferencesByPk.getOrElse(GlobalPreferences.Default))

List(updateLayouts, updateGlobalPreferences).sequence.map(_.reduceLeft(_.merge(_)))
50 changes: 30 additions & 20 deletions explore/src/main/scala/explore/cache/ProgramCacheController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import crystal.syntax.*
import explore.givens.given
import explore.model.Attachment
import explore.model.ConfigurationRequestWithObsIds
import explore.model.Constants
import explore.model.Execution
import explore.model.Group
import explore.model.Observation
Expand Down Expand Up @@ -292,17 +293,20 @@ object ProgramCacheController
.subscribe[IO](props.programId.toProgramEditInput)
.logGraphQLErrors(_ => "Error in ProgramEditDetailsSubscription subscription")
.map:
_.broadcastThrough(
_.map: data => // Replace program.
ProgramSummaries.optProgramDetails.replace(data.programEdit.value.some),
allObservationsValidationsUpdate
)
_.throttle(Constants.SubscriptionThrottle)
.broadcastThrough(
_.map: data => // Replace program.
ProgramSummaries.optProgramDetails.replace(data.programEdit.value.some),
allObservationsValidationsUpdate
)

val updateTargets: Resource[IO, Stream[IO, ProgramSummaries => ProgramSummaries]] =
TargetQueriesGQL.ProgramTargetsDelta
.subscribe[IO](props.programId.toTargetEditInput)
.logGraphQLErrors(_ => "Error in ProgramTargetsDelta subscription")
.map(_.map(data => modifyTargets(data.targetEdit)))
.map:
_.throttle(Constants.SubscriptionThrottle)
.map(data => modifyTargets(data.targetEdit))

val onlyExistingObs: Pipe[
IO,
Expand Down Expand Up @@ -367,31 +371,34 @@ object ProgramCacheController
.subscribe[IO](props.programId.toObservationEditInput)
.handleGraphQLErrors(IO.println(_))
.map:
_.broadcastThrough(
_.map(data => modifyObservations(data.observationEdit)),
onlyExistingObs.andThen(obsTimesUpdates)
)
_.throttle(Constants.SubscriptionThrottle)
.broadcastThrough(
_.map(data => modifyObservations(data.observationEdit)),
onlyExistingObs.andThen(obsTimesUpdates)
)

val updateGroups: Resource[IO, Stream[IO, ProgramSummaries => ProgramSummaries]] =
ProgramQueriesGQL.GroupEditSubscription
.subscribe[IO](props.programId.toProgramEditInput)
.logGraphQLErrors(_ => "Error in GroupEditSubscription subscription")
.map:
_.broadcastThrough(
_.map(data => modifyGroups(data.groupEdit)),
onlyExistingGroups.andThen(groupTimeRangeUpdate)
)
_.throttle(Constants.SubscriptionThrottle)
.broadcastThrough(
_.map(data => modifyGroups(data.groupEdit)),
onlyExistingGroups.andThen(groupTimeRangeUpdate)
)

val updateConfigurationRequests
: Resource[IO, Stream[IO, ProgramSummaries => ProgramSummaries]] =
ProgramQueriesGQL.ConfigurationRequestSubscription
.subscribe[IO](ConfigurationRequestEditInput(props.programId.assign))
.logGraphQLErrors(_ => "Error in ConfigurationRequestSubscription subscription")
.map:
_.broadcastThrough(
_.map(data => modifyConfigurationRequests(data.configurationRequestEdit)),
obsWorkflowUpdates
)
_.throttle(Constants.SubscriptionThrottle)
.broadcastThrough(
_.map(data => modifyConfigurationRequests(data.configurationRequestEdit)),
obsWorkflowUpdates
)

// Right now the programEdit subsription isn't fine grained enough to
// differentiate what got updated, so we alway update all the attachments.
Expand All @@ -400,14 +407,17 @@ object ProgramCacheController
ProgramQueriesGQL.ProgramEditAttachmentSubscription
.subscribe[IO](props.programId.toProgramEditInput)
.logGraphQLErrors(_ => "Error in ProgramEditAttachmentSubscription subscription")
.map(_.map(data => modifyAttachments(data.programEdit)))
.map:
_.throttle(Constants.SubscriptionThrottle)
.map(data => modifyAttachments(data.programEdit))

val updatePrograms: Resource[IO, Stream[IO, ProgramSummaries => ProgramSummaries]] =
ProgramQueriesGQL.ProgramInfoDelta
.subscribe[IO]()
.logGraphQLErrors(_ => "Error in ProgramInfoDelta subscription")
.map:
_.map(data => modifyPrograms(data.programEdit))
_.throttle(Constants.SubscriptionThrottle)
.map(data => modifyPrograms(data.programEdit))

// TODO Handle errors, disable transparent resubscription upon connection loss.
List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ trait SequenceTileHelper:
refreshVisits <- useCallback(visitThrottler.submit(visits.refresh.to[IO]))
sequenceThrottler = Throttler.unsafe[IO](7.seconds)
refreshSequence <- useCallback(sequenceThrottler.submit(sequenceData.refresh.to[IO]))
// Here we don't throttle the subscriptions since we are throttling the refreshes.
_ <-
useEffectStreamResourceOnMount: // Subscribe to observation edits
ObsQueriesGQL.ObservationEditSubscription
Expand Down
12 changes: 8 additions & 4 deletions explore/src/main/scala/explore/tabs/ObsTabTiles.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import explore.schedulingWindows.SchedulingWindowsTile
import explore.syntax.ui.*
import explore.targeteditor.AsterismEditorTile
import explore.undo.UndoSetter
import explore.utils.*
import japgolly.scalajs.react.*
import japgolly.scalajs.react.extra.router.SetRouteVia
import japgolly.scalajs.react.vdom.html_<^.*
Expand Down Expand Up @@ -149,9 +150,10 @@ case class ObsTabTiles(

def obsIQLikelihood(obsTime: Instant): Option[IntCentiPercent] =
(centralWavelength, targetCoords(obsTime).map(_.value.dec), site).mapN((cw, dec, site) =>
percentileImageQuality(constraintSet.get.imageQuality.toArcSeconds.toValue[BigDecimal],
cw.value,
minimumAirmass(dec, site)
percentileImageQuality(
constraintSet.get.imageQuality.toArcSeconds.toValue[BigDecimal],
cw.value,
minimumAirmass(dec, site)
)
)

Expand Down Expand Up @@ -243,7 +245,9 @@ object ObsTabTiles:
)
// TODO Could we get the edit signal from ProgramCache instead of doing another subscritpion??
.reRunOnResourceSignals:
ObservationEditSubscription.subscribe[IO](props.obsId.toObservationEditInput)
ObservationEditSubscription
.subscribe[IO](props.obsId.toObservationEditInput)
.map(_.throttle(Constants.SubscriptionThrottle))
// Ags state
.useStateView[AgsState](AgsState.Idle)
// the configuration the user has selected from the spectroscopy modes table, if any
Expand Down
3 changes: 3 additions & 0 deletions model/shared/src/main/scala/explore/model/Constants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import lucuma.core.math.Angle
import java.time.Duration
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
import scala.concurrent.duration.*

trait Constants:
val UTC = ZoneOffset.UTC
Expand Down Expand Up @@ -69,4 +70,6 @@ trait Constants:
val MissingInfoMsg = "Not enough information to call ITC"
val P1TemplatesUrl = "https://www.gemini.edu/observing/phase-i/pit/pit-description#PDF"

val SubscriptionThrottle: FiniteDuration = 5.seconds

object Constants extends Constants
Loading