Skip to content

Commit

Permalink
[no-release-notes] eventscheduler: Improve lifecycle of session and t…
Browse files Browse the repository at this point in the history
…ransaction in eventscheduler.

This adds sql.SessionCommand{Begin,End} callbacks to the eventscheduler
database accesses. It also moves the responsibility for transaction management
explicitly to the event scheduler code, which is where it belongs.
  • Loading branch information
reltuk committed Feb 26, 2025
1 parent e410216 commit 4fe29a3
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 51 deletions.
2 changes: 1 addition & 1 deletion engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ func (e *Engine) EngineEventScheduler() sql.EventScheduler {
// getter function, |ctxGetterFunc, the EventScheduler |status|, and the |period| for the event scheduler
// to check for events to execute. If |period| is less than 1, then it is ignored and the default period
// (30s currently) is used. This function also initializes the EventScheduler of the analyzer of this engine.
func (e *Engine) InitializeEventScheduler(ctxGetterFunc func() (*sql.Context, func() error, error), status eventscheduler.SchedulerStatus, period int) error {
func (e *Engine) InitializeEventScheduler(ctxGetterFunc func() (*sql.Context, error), status eventscheduler.SchedulerStatus, period int) error {
var err error
e.EventScheduler, err = eventscheduler.InitEventScheduler(e.Analyzer, e.BackgroundThreads, ctxGetterFunc, status, e.executeEvent, period)
if err != nil {
Expand Down
154 changes: 111 additions & 43 deletions eventscheduler/event_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type eventExecutor struct {
bThreads *sql.BackgroundThreads
list *enabledEventsList
runningEventsStatus *runningEventsStatus
ctxGetterFunc func() (*sql.Context, func() error, error)
ctxGetterFunc func() (*sql.Context, error)
queryRunFunc func(ctx *sql.Context, dbName, query, username, address string) error
stop atomic.Bool
catalog sql.Catalog
Expand All @@ -43,7 +43,7 @@ type eventExecutor struct {

// newEventExecutor returns a new eventExecutor instance with an empty enabled events list.
// The enabled events list is loaded only when the EventScheduler status is ENABLED.
func newEventExecutor(bgt *sql.BackgroundThreads, ctxFunc func() (*sql.Context, func() error, error), runQueryFunc func(ctx *sql.Context, dbName, query, username, address string) error, period int) *eventExecutor {
func newEventExecutor(bgt *sql.BackgroundThreads, ctxFunc func() (*sql.Context, error), runQueryFunc func(ctx *sql.Context, dbName, query, username, address string) error, period int) *eventExecutor {
return &eventExecutor{
bThreads: bgt,
list: newEnabledEventsList([]*enabledEvent{}),
Expand Down Expand Up @@ -75,35 +75,73 @@ func (ee *eventExecutor) start() {
for {
time.Sleep(pollingDuration)

ctx, _, err := ee.ctxGetterFunc()
if err != nil {
logrus.Errorf("unable to create context for event executor: %s", err)
continue
}
type res int
const (
res_fallthrough res = iota
res_continue
res_return
)

needsToReloadEvents, err := ee.needsToReloadEvents(ctx)
if err != nil {
ctx.GetLogger().Errorf("unable to determine if events need to be reloaded: %s", err)
}
if needsToReloadEvents {
err := ee.loadAllEvents(ctx)
var timeNow, nextAt time.Time
var lgr *logrus.Entry

result := func() res {
ctx, err := ee.ctxGetterFunc()
if err != nil {
ctx.GetLogger().Errorf("unable to reload events: %s", err)
logrus.Errorf("unable to create context for event executor: %s", err)
return res_continue
}
}
lgr = ctx.GetLogger()

timeNow := time.Now()
if ee.stop.Load() {
logrus.Trace("Stopping eventExecutor")
return
} else if ee.list.len() == 0 {
continue
}
defer sql.SessionEnd(ctx.Session)
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)

// safeguard list entry getting removed while in check
nextAt, ok := ee.list.getNextExecutionTime()
if !ok {
err = beginTx(ctx)
if err != nil {
lgr.Errorf("unable to begin transaction for event executor: %s", err)
return res_continue
}

needsToReloadEvents, err := ee.needsToReloadEvents(ctx)
if err != nil {
lgr.Errorf("unable to determine if events need to be reloaded: %s", err)
}
if needsToReloadEvents {
err := ee.loadAllEvents(ctx)
if err != nil {
lgr.Errorf("unable to reload events: %s", err)
}
}

if ee.stop.Load() {
logrus.Trace("Stopping eventExecutor")
return res_return
} else if ee.list.len() == 0 {
rollbackTx(ctx)
return res_continue
}

// safeguard list entry getting removed while in check
timeNow = time.Now()
var ok bool
nextAt, ok = ee.list.getNextExecutionTime()
if !ok {
rollbackTx(ctx)
return res_continue
}

err = commitTx(ctx)
if err != nil {
lgr.Errorf("unable to commit transaction for reloading events: %s", err)
}
return res_fallthrough
}()

if result == res_continue {
continue
} else if result == res_return {
return
}

secondsUntilExecution := nextAt.Sub(timeNow).Seconds()
Expand All @@ -117,22 +155,33 @@ func (ee *eventExecutor) start() {
} else if secondsUntilExecution <= 0.0000001 {
curEvent := ee.list.pop()
if curEvent != nil {
ctx.GetLogger().Debugf("Executing event %s, seconds until execution: %f", curEvent.name(), secondsUntilExecution)
ctx, commit, err := ee.ctxGetterFunc()
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' getting ctx in event scheduler", err)
}
err = ee.executeEventAndUpdateList(ctx, curEvent, timeNow)
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
}
err = commit()
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
}
func() {
lgr.Debugf("Executing event %s, seconds until execution: %f", curEvent.name(), secondsUntilExecution)
ctx, err := ee.ctxGetterFunc()
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' getting ctx in event scheduler", err)
return
}
defer sql.SessionEnd(ctx.Session)
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
err = beginTx(ctx)
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' beginning transaction in event scheduler", err)
return
}
err = ee.executeEventAndUpdateList(ctx, curEvent, timeNow)
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
}
err = commitTx(ctx)
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
}
}()
}
} else {
ctx.GetLogger().Tracef("Not executing event %s yet, seconds until execution: %f", ee.list.peek().name(), secondsUntilExecution)
lgr.Tracef("Not executing event %s yet, seconds until execution: %f", ee.list.peek().name(), secondsUntilExecution)
}
}
}
Expand Down Expand Up @@ -253,11 +302,19 @@ func (ee *eventExecutor) executeEvent(event *enabledEvent) (bool, error) {
return
default:
// get a new session sql.Context for each event definition execution
sqlCtx, commit, err := ee.ctxGetterFunc()
sqlCtx, err := ee.ctxGetterFunc()
if err != nil {
logrus.WithField("query", event.event.EventBody).Errorf("unable to get context for executed query: %v", err)
return
}
defer sql.SessionEnd(sqlCtx.Session)
sql.SessionCommandBegin(sqlCtx.Session)
defer sql.SessionCommandEnd(sqlCtx.Session)
err = beginTx(sqlCtx)
if err != nil {
logrus.WithField("query", event.event.EventBody).Errorf("unable to begin transaction on context for executed query: %v", err)
return
}

// Note that we pass in the full CREATE EVENT statement so that the engine can parse it
// and pull out the plan nodes for the event body, since the event body doesn't always
Expand All @@ -266,11 +323,12 @@ func (ee *eventExecutor) executeEvent(event *enabledEvent) (bool, error) {
err = ee.queryRunFunc(sqlCtx, event.edb.Name(), event.event.CreateEventStatement(), event.username, event.address)
if err != nil {
logrus.WithField("query", event.event.EventBody).Errorf("unable to execute query: %v", err)
rollbackTx(sqlCtx)
return
}

// must commit after done using the sql.Context
err = commit()
err = commitTx(sqlCtx)
if err != nil {
logrus.WithField("query", event.event.EventBody).Errorf("unable to commit transaction: %v", err)
return
Expand All @@ -290,9 +348,19 @@ func (ee *eventExecutor) reevaluateEvent(edb sql.EventDatabase, event sql.EventD
return
}

ctx, commit, err := ee.ctxGetterFunc()
ctx, err := ee.ctxGetterFunc()
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' getting ctx in event scheduler", err)
return
}
defer sql.SessionEnd(ctx.Session)
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)

err = beginTx(ctx)
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' beginning transaction on ctx in event scheduler", err)
return
}

newEvent, created, err := newEnabledEvent(ctx, edb, event, time.Now())
Expand All @@ -302,7 +370,7 @@ func (ee *eventExecutor) reevaluateEvent(edb sql.EventDatabase, event sql.EventD
ee.list.add(newEvent)
}

err = commit()
err = commitTx(ctx)
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' re-evaluating event to scheduler: %s", err, event.Name)
}
Expand Down
56 changes: 49 additions & 7 deletions eventscheduler/event_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var _ sql.EventScheduler = (*EventScheduler)(nil)
type EventScheduler struct {
status SchedulerStatus
executor *eventExecutor
ctxGetterFunc func() (*sql.Context, func() error, error)
ctxGetterFunc func() (*sql.Context, error)
}

// InitEventScheduler is called at the start of the server. This function returns EventScheduler object
Expand All @@ -63,7 +63,7 @@ type EventScheduler struct {
func InitEventScheduler(
a *analyzer.Analyzer,
bgt *sql.BackgroundThreads,
getSqlCtxFunc func() (*sql.Context, func() error, error),
getSqlCtxFunc func() (*sql.Context, error),
status SchedulerStatus,
runQueryFunc func(ctx *sql.Context, dbName, query, username, address string) error,
period int,
Expand All @@ -81,21 +81,28 @@ func InitEventScheduler(
// If the EventSchedulerStatus is ON, then load enabled
// events and start executing events on schedule.
if es.status == SchedulerOn {
ctx, commit, err := getSqlCtxFunc()
ctx, err := getSqlCtxFunc()
if err != nil {
return nil, err
}
ctx.Session.SetClient(sql.Client{
User: eventSchedulerSuperUserName,
Address: "localhost",
Capabilities: 0,
})

defer sql.SessionEnd(ctx.Session)
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
err = beginTx(ctx)
if err != nil {
return nil, err
}
err = es.loadEventsAndStartEventExecutor(ctx, a)
if err != nil {
rollbackTx(ctx)
return nil, err
}
err = commit()
err = commitTx(ctx)
if err != nil {
return nil, err
}
Expand All @@ -104,6 +111,33 @@ func InitEventScheduler(
return es, nil
}

func beginTx(ctx *sql.Context) error {
if ts, ok := ctx.Session.(sql.TransactionSession); ok {
tr, err := ts.StartTransaction(ctx, sql.ReadWrite)
if err != nil {
return err
}
ts.SetTransaction(tr)
}
return nil
}

func commitTx(ctx *sql.Context) error {
if ts, ok := ctx.Session.(sql.TransactionSession); ok {
defer ts.SetTransaction(nil)
return ts.CommitTransaction(ctx, ts.GetTransaction())
}
return nil
}

func rollbackTx(ctx *sql.Context) error {
if ts, ok := ctx.Session.(sql.TransactionSession); ok {
defer ts.SetTransaction(nil)
return ts.Rollback(ctx, ts.GetTransaction())
}
return nil
}

// initializeEventSchedulerSuperUser ensures the event_scheduler superuser exists (as a locked
// account that cannot be directly used to log in) so that the event scheduler can read events
// from all databases.
Expand Down Expand Up @@ -147,15 +181,23 @@ func (es *EventScheduler) TurnOnEventScheduler(a *analyzer.Analyzer) error {

es.status = SchedulerOn

ctx, commit, err := es.ctxGetterFunc()
ctx, err := es.ctxGetterFunc()
if err != nil {
return err
}
defer sql.SessionEnd(ctx.Session)
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
err = beginTx(ctx)
if err != nil {
return err
}
err = es.loadEventsAndStartEventExecutor(ctx, a)
if err != nil {
rollbackTx(ctx)
return err
}
return commit()
return commitTx(ctx)
}

// TurnOffEventScheduler is called when user sets --event-scheduler system variable to OFF or 0.
Expand Down

0 comments on commit 4fe29a3

Please sign in to comment.