Skip to content

Commit

Permalink
Emit events on action success and failure
Browse files Browse the repository at this point in the history
  • Loading branch information
snowteamer committed Oct 30, 2023
1 parent d357522 commit 6192b3a
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 55 deletions.
3 changes: 3 additions & 0 deletions shared/domains/chelonia/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ export const CONTRACTS_MODIFIED = 'contracts-modified'
export const EVENT_HANDLED = 'event-handled'
export const CONTRACT_REGISTERED = 'contract-registered'
export const CONTRACT_UNREGISTERED = 'contract-unregistered'
export const PERSISTENT_ACTION_FAILURE = 'persistent-action-failure'
export const PERSISTENT_ACTION_SUCCESS = 'persistent-action-success'
export const PERSISTENT_ACTION_TOTAL_FAILURE = 'persistent-action-total_failure'
126 changes: 71 additions & 55 deletions shared/domains/chelonia/persistent-actions.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
'use strict'

import sbp from '@sbp/sbp'
import '@sbp/okturtles.events'
import { PERSISTENT_ACTION_FAILURE, PERSISTENT_ACTION_SUCCESS, PERSISTENT_ACTION_TOTAL_FAILURE } from './events.js'

type SbpInvocation = any[]
type UUIDV4 = string

type PersistentActionOptions = {
errorInvocation?: SbpInvocation,
Expand All @@ -11,91 +14,98 @@ type PersistentActionOptions = {
// How many seconds to wait between retries.
retrySeconds: number,
skipCondition?: SbpInvocation,
// SBP selector to call on success with the received value.
successInvocationSelector?: string,
totalFailureInvocation?: SbpInvocation
}

type PersistentActionStatus = {
cancelled: boolean,
type PersistentActionStatus = {|
attempting: boolean,
failedAttemptsSoFar: number,
lastError: string,
nextRetry: string,
pending: boolean
}
resolved: boolean
|}

const defaultOptions: PersistentActionOptions = {
const defaultOptions = {
maxAttempts: Number.POSITIVE_INFINITY,
retrySeconds: 30
}
const tag = '[chelonia.persistentActions]'

class PersistentAction {
id: UUIDV4
invocation: SbpInvocation
options: PersistentActionOptions
status: PersistentActionStatus
timer: TimeoutID | void

constructor (invocation: SbpInvocation, options: PersistentActionOptions = {}) {
// $FlowFixMe: Cannot resolve name `crypto`.
this.id = crypto.randomUUID()
this.invocation = invocation
this.options = { ...defaultOptions, ...options }
this.status = {
cancelled: false,
attempting: false,
failedAttemptsSoFar: 0,
lastError: '',
nextRetry: '',
pending: false
resolved: false
}
}

// Do not call if the action is pending or cancelled!
async attempt (): Promise<void> {
// Bail out if the action is already attempting or resolved.
if (this.status.attempting || this.status.resolved) return
if (await this.trySBP(this.options.skipCondition)) {
this.cancel()
return
}
try {
this.status.pending = true
this.status.attempting = true
const result = await sbp(...this.invocation)
this.status.attempting = false
this.handleSuccess(result)
} catch (error) {
this.status.attempting = false
this.handleError(error)
}
}

cancel (): void {
this.timer && clearTimeout(this.timer)
this.status.cancelled = true
this.status.nextRetry = ''
this.status.resolved = true
}

async handleError (error: Error): Promise<void> {
const { options, status } = this
// Update relevant status fields before calling any optional selector.
const { id, options, status } = this
// Update relevant status fields before calling any optional code.
status.failedAttemptsSoFar++
status.lastError = error.message
const anyAttemptLeft = options.maxAttempts > status.failedAttemptsSoFar
status.nextRetry = anyAttemptLeft && !status.cancelled
if (!anyAttemptLeft) status.resolved = true
status.nextRetry = anyAttemptLeft && !status.resolved
? new Date(Date.now() + options.retrySeconds * 1e3).toISOString()
: ''
// Perform any optional SBP invocation.
sbp('okTurtles.events/emit', PERSISTENT_ACTION_FAILURE, { error, id })
await this.trySBP(options.errorInvocation)
!anyAttemptLeft && await this.trySBP(options.totalFailureInvocation)
if (!anyAttemptLeft) {
sbp('okTurtles.events/emit', PERSISTENT_ACTION_TOTAL_FAILURE, { error, id })
await this.trySBP(options.totalFailureInvocation)
}
// Schedule a retry if appropriate.
if (status.nextRetry) {
// Note: there should be no older active timeout to clear.
this.timer = setTimeout(() => this.attempt(), this.options.retrySeconds * 1e3)
}
status.pending = false
}

async handleSuccess (result: any): Promise<void> {
const { status } = this
handleSuccess (result: any): void {
const { id, status } = this
status.lastError = ''
status.nextRetry = ''
status.pending = false
this.options.successInvocationSelector &&
await this.trySBP([this.options.successInvocationSelector, result])
status.resolved = true
sbp('okTurtles.events/emit', PERSISTENT_ACTION_SUCCESS, { id, result })
}

async trySBP (invocation: SbpInvocation | void): Promise<any> {
Expand All @@ -112,14 +122,19 @@ class PersistentAction {
sbp('sbp/selectors/register', {
'chelonia.persistentActions/_init' (): void {
this.actionsByID = Object.create(null)
this.nextID = 0
// Necessary for now as _init cannot be async. Becomes true when 'configure' has been called.
this.ready = false
sbp('okTurtles.events/on', PERSISTENT_ACTION_SUCCESS, ({ id }) => {
sbp('chelonia.persistentActions/cancel', id)
})
sbp('okTurtles.events/on', PERSISTENT_ACTION_TOTAL_FAILURE, ({ id }) => {
sbp('chelonia.persistentActions/cancel', id)
})
},

// Cancels a specific action by its ID.
// The action won't be retried again, but an async action cannot be aborted if its promise is stil pending.
'chelonia.persistentActions/cancel' (id: number): void {
// The action won't be retried again, but an async action cannot be aborted if its promise is stil attempting.
'chelonia.persistentActions/cancel' (id: UUIDV4): void {
if (id in this.actionsByID) {
this.actionsByID[id].cancel()
delete this.actionsByID[id]
Expand All @@ -128,19 +143,28 @@ sbp('sbp/selectors/register', {
}
},

'chelonia.persistentActions/configure' (options: { databaseKey: string }): void {
this.databaseKey = options.databaseKey
// TODO: validation
'chelonia.persistentActions/configure' ({ databaseKey, options = {} }: { databaseKey: string; options: Object }): void {
if (!databaseKey) throw new TypeError(`${tag} 'databaseKey' is required`)
this.databaseKey = databaseKey
for (const key in options) {
if (key in defaultOptions) {
defaultOptions[key] = options[key]
} else {
throw new TypeError(`${tag} Unknown option: ${key}`)
}
}
},

'chelonia.persistentActions/enqueue' (...args): number[] {
'chelonia.persistentActions/enqueue' (...args): UUIDV4[] {
if (!this.ready) throw new Error(`${tag} Not ready yet.`)
const ids: number[] = []
const ids: UUIDV4[] = []
for (const arg of args) {
const id = this.nextID++
this.actionsByID[id] = Array.isArray(arg)
const action = Array.isArray(arg)
? new PersistentAction(arg)
: new PersistentAction(arg.invocation, arg)
ids.push(id)
this.actionsByID[action.id] = action
ids.push(action.id)
}
// Likely no need to await this call.
sbp('chelonia.persistentActions/save')
Expand All @@ -150,29 +174,21 @@ sbp('sbp/selectors/register', {

// Forces retrying an existing persisted action given its ID.
// Note: 'failedAttemptsSoFar' will still be increased upon failure.
async 'chelonia.persistentActions/forceRetry' (id: number): Promise<void> {
'chelonia.persistentActions/forceRetry' (id: UUIDV4): void {
if (id in this.actionsByID) {
const action = this.actionsByID[id]
// Bail out if the action is already pending or cancelled.
if (action.status.pending || action.status.cancelled) return
try {
await action.attempt()
// If the action succeded, delete it and update the DB.
delete this.actionsByID[id]
sbp('chelonia.persistentActions/save')
} catch {
// Do nothing.
}
this.actionsByID[id].attempt()
}
},

// Called on login to load the correct set of actions for the current user.
// Loads and tries every stored persistent action under the configured database key.
async 'chelonia.persistentActions/load' (): Promise<void> {
const { actionsByID = {}, nextID = 0 } = (await sbp('chelonia/db/get', this.databaseKey)) ?? {}
for (const id in actionsByID) {
this.actionsByID[id] = new PersistentAction(actionsByID[id].invocation, actionsByID[id].options)
const storedActions = JSON.parse((await sbp('chelonia/db/get', this.databaseKey)) ?? '[]')
for (const { id, invocation, options } of storedActions) {
this.actionsByID[id] = new PersistentAction(invocation, options)
// Use the stored ID instead of the autogenerated one.
// TODO: find a cleaner alternative.
this.actionsByID[id].id = id
}
this.nextID = nextID
this.ready = true
sbp('chelonia.persistentActions/retryAll')
},
Expand All @@ -186,18 +202,18 @@ sbp('sbp/selectors/register', {
}
},

// Updates the database version of the pending action list.
// Updates the database version of the attempting action list.
'chelonia.persistentActions/save' (): Promise<Error | void> {
return sbp(
'chelonia/db/set',
this.databaseKey,
{ actionsByID: JSON.stringify(this.actionsByID), nextID: this.nextID }
JSON.stringify(Object.values(this.actionsByID))
)
},

'chelonia.persistentActions/status' (id: number) {
if (id in this.actionsByID) {
return JSON.stringify(this.actionsByID[id])
}
'chelonia.persistentActions/status' () {
return Object.values(this.actionsByID)
// $FlowFixMe: `PersistentAction` is incompatible with mixed
.map((action: PersistentAction) => ({ id: action.id, invocation: action.invocation, ...action.status }))
}
})

0 comments on commit 6192b3a

Please sign in to comment.