Skip to content

Commit

Permalink
Use Postgres Cursors in selects for better memory management (#3249)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei authored Feb 12, 2025
1 parent b623378 commit 93f2f37
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 74 deletions.
4 changes: 2 additions & 2 deletions backend/pkg/sqlmanager/postgres/postgres-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,12 +998,12 @@ func (p *PostgresManager) GetTableRowCount(
if whereClause != nil && *whereClause != "" {
query = query.Where(goqu.L(*whereClause))
}
sql, _, err := query.ToSQL()
compiledSql, _, err := query.ToSQL()
if err != nil {
return 0, err
}
var count int64
err = p.db.QueryRowContext(ctx, sql).Scan(&count)
err = p.db.QueryRowContext(ctx, compiledSql).Scan(&count)
if err != nil {
return 0, err
}
Expand Down
112 changes: 56 additions & 56 deletions backend/services/mgmt/v1alpha1/job-service/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func (s *Service) GetJobs(

jobIds := []pgtype.UUID{}
for idx := range jobs {
job := jobs[idx]
jobIds = append(jobIds, job.ID)
dbJob := jobs[idx]
jobIds = append(jobIds, dbJob.ID)
}

var destinationAssociations []db_queries.NeosyncApiJobDestinationConnectionAssociation
Expand All @@ -79,8 +79,8 @@ func (s *Service) GetJobs(

jobMap := map[pgtype.UUID]*db_queries.NeosyncApiJob{}
for idx := range jobs {
job := jobs[idx]
jobMap[job.ID] = &job
dbJob := jobs[idx]
jobMap[dbJob.ID] = &dbJob
}

associationMap := map[pgtype.UUID][]db_queries.NeosyncApiJobDestinationConnectionAssociation{}
Expand All @@ -96,8 +96,8 @@ func (s *Service) GetJobs(
dtos := []*mgmtv1alpha1.Job{}
// Use jobIds to retain original query order
for _, jobId := range jobIds {
job := jobMap[jobId]
dtos = append(dtos, dtomaps.ToJobDto(job, associationMap[job.ID]))
dbJob := jobMap[jobId]
dtos = append(dtos, dtomaps.ToJobDto(dbJob, associationMap[dbJob.ID]))
}

return connect.NewResponse(&mgmtv1alpha1.GetJobsResponse{
Expand All @@ -120,15 +120,15 @@ func (s *Service) GetJob(

errgrp, errctx := errgroup.WithContext(ctx)

var job db_queries.NeosyncApiJob
var dbJob db_queries.NeosyncApiJob
errgrp.Go(func() error {
j, err := s.db.Q.GetJobById(errctx, s.db.Db, jobUuid)
if err != nil && !neosyncdb.IsNoRows(err) {
return fmt.Errorf("unable to get job by id: %w", err)
} else if err != nil && neosyncdb.IsNoRows(err) {
return nucleuserrors.NewNotFound("job with that id does not exist")
}
job = j
dbJob = j
return nil
})
var destConnections []db_queries.NeosyncApiJobDestinationConnectionAssociation
Expand All @@ -146,12 +146,12 @@ func (s *Service) GetJob(
return nil, err
}

if err := user.EnforceJob(ctx, userdata.NewDbDomainEntity(job.AccountID, job.ID), rbac.JobAction_View); err != nil {
if err := user.EnforceJob(ctx, userdata.NewDbDomainEntity(dbJob.AccountID, dbJob.ID), rbac.JobAction_View); err != nil {
return nil, err
}

return connect.NewResponse(&mgmtv1alpha1.GetJobResponse{
Job: dtomaps.ToJobDto(&job, destConnections),
Job: dtomaps.ToJobDto(&dbJob, destConnections),
}), nil
}

Expand Down Expand Up @@ -541,7 +541,7 @@ func (s *Service) DeleteJob(
return nil, err
}

job, err := s.db.Q.GetJobById(ctx, s.db.Db, idUuid)
dbJob, err := s.db.Q.GetJobById(ctx, s.db.Db, idUuid)
if err != nil && !neosyncdb.IsNoRows(err) {
return nil, err
} else if err != nil && neosyncdb.IsNoRows(err) {
Expand All @@ -552,24 +552,24 @@ func (s *Service) DeleteJob(
if err != nil {
return nil, err
}
err = user.EnforceJob(ctx, userdata.NewDbDomainEntity(job.AccountID, job.ID), rbac.JobAction_Delete)
err = user.EnforceJob(ctx, userdata.NewDbDomainEntity(dbJob.AccountID, dbJob.ID), rbac.JobAction_Delete)
if err != nil {
return nil, err
}

logger.Debug("deleting temporal schedule")
err = s.temporalmgr.DeleteSchedule(
ctx,
neosyncdb.UUIDString(job.AccountID),
neosyncdb.UUIDString(job.ID),
neosyncdb.UUIDString(dbJob.AccountID),
neosyncdb.UUIDString(dbJob.ID),
logger,
)
if err != nil {
return nil, fmt.Errorf("unable to remove schedule when deleting job")
}

logger.Debug("deleting job")
err = s.db.Q.RemoveJobById(ctx, s.db.Db, job.ID)
err = s.db.Q.RemoveJobById(ctx, s.db.Db, dbJob.ID)
if err != nil {
return nil, err
}
Expand All @@ -587,7 +587,7 @@ func (s *Service) CreateJobDestinationConnections(
if err != nil {
return nil, err
}
job, err := s.GetJob(ctx, connect.NewRequest(&mgmtv1alpha1.GetJobRequest{
jobResp, err := s.GetJob(ctx, connect.NewRequest(&mgmtv1alpha1.GetJobRequest{
Id: req.Msg.JobId,
}))
if err != nil {
Expand All @@ -597,11 +597,11 @@ func (s *Service) CreateJobDestinationConnections(
if err != nil {
return nil, err
}
err = user.EnforceJob(ctx, job.Msg.GetJob(), rbac.JobAction_Create)
err = user.EnforceJob(ctx, jobResp.Msg.GetJob(), rbac.JobAction_Create)
if err != nil {
return nil, err
}
accountUuid, err := neosyncdb.ToUuid(job.Msg.GetJob().GetAccountId())
accountUuid, err := neosyncdb.ToUuid(jobResp.Msg.GetJob().GetAccountId())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -678,13 +678,13 @@ func (s *Service) UpdateJobSchedule(
if err != nil {
return nil, err
}
job := jobResp.Msg.GetJob()
jobDto := jobResp.Msg.GetJob()

user, err := s.userdataclient.GetUser(ctx)
if err != nil {
return nil, err
}
err = user.EnforceJob(ctx, job, rbac.JobAction_Edit)
err = user.EnforceJob(ctx, jobDto, rbac.JobAction_Edit)
if err != nil {
return nil, err
}
Expand All @@ -699,7 +699,7 @@ func (s *Service) UpdateJobSchedule(
return nil, err
}

jobUuid, err := neosyncdb.ToUuid(job.GetId())
jobUuid, err := neosyncdb.ToUuid(jobDto.GetId())
if err != nil {
return nil, err
}
Expand All @@ -720,8 +720,8 @@ func (s *Service) UpdateJobSchedule(
// update temporal scheduled job
err = s.temporalmgr.UpdateSchedule(
ctx,
job.GetAccountId(),
job.GetId(),
jobDto.GetAccountId(),
jobDto.GetId(),
&temporalclient.ScheduleUpdateOptions{
DoUpdate: func(schedule temporalclient.ScheduleUpdateInput) (*temporalclient.ScheduleUpdate, error) {
schedule.Description.Schedule.Spec = spec
Expand Down Expand Up @@ -764,13 +764,13 @@ func (s *Service) PauseJob(
if err != nil {
return nil, err
}
job := jobResp.Msg.GetJob()
jobDto := jobResp.Msg.GetJob()

user, err := s.userdataclient.GetUser(ctx)
if err != nil {
return nil, err
}
err = user.EnforceJob(ctx, job, rbac.JobAction_Edit)
err = user.EnforceJob(ctx, jobDto, rbac.JobAction_Edit)
if err != nil {
return nil, err
}
Expand All @@ -779,8 +779,8 @@ func (s *Service) PauseJob(
logger.Debug("pausing job")
err = s.temporalmgr.PauseSchedule(
ctx,
job.GetAccountId(),
job.GetId(),
jobDto.GetAccountId(),
jobDto.GetId(),
&temporalclient.SchedulePauseOptions{Note: req.Msg.GetNote()},
logger,
)
Expand All @@ -791,8 +791,8 @@ func (s *Service) PauseJob(
logger.Debug("unpausing job")
err = s.temporalmgr.UnpauseSchedule(
ctx,
job.GetAccountId(),
job.GetId(),
jobDto.GetAccountId(),
jobDto.GetId(),
&temporalclient.ScheduleUnpauseOptions{Note: req.Msg.GetNote()},
logger,
)
Expand Down Expand Up @@ -827,13 +827,13 @@ func (s *Service) UpdateJobSourceConnection(
if err != nil {
return nil, err
}
job := jobResp.Msg.GetJob()
jobDto := jobResp.Msg.GetJob()

user, err := s.userdataclient.GetUser(ctx)
if err != nil {
return nil, err
}
err = user.EnforceJob(ctx, job, rbac.JobAction_Edit)
err = user.EnforceJob(ctx, jobDto, rbac.JobAction_Edit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -865,7 +865,7 @@ func (s *Service) UpdateJobSourceConnection(
}

// verifies that the account has access to that connection id
if err := s.verifyConnectionInAccount(ctx, connectionIdToVerify, job.GetAccountId()); err != nil {
if err := s.verifyConnectionInAccount(ctx, connectionIdToVerify, jobDto.GetAccountId()); err != nil {
return nil, err
}

Expand Down Expand Up @@ -955,7 +955,7 @@ func (s *Service) UpdateJobSourceConnection(
vfkKeys[key] = struct{}{}
}

jobUuid, err := neosyncdb.ToUuid(job.GetId())
jobUuid, err := neosyncdb.ToUuid(jobDto.GetId())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1021,30 +1021,30 @@ func (s *Service) SetJobSourceSqlConnectionSubsets(
if err != nil {
return nil, err
}
job := jobResp.Msg.GetJob()
jobUuid, err := neosyncdb.ToUuid(job.GetId())
jobDto := jobResp.Msg.GetJob()
jobUuid, err := neosyncdb.ToUuid(jobDto.GetId())
if err != nil {
return nil, err
}
user, err := s.userdataclient.GetUser(ctx)
if err != nil {
return nil, err
}
err = user.EnforceJob(ctx, job, rbac.JobAction_Edit)
err = user.EnforceJob(ctx, jobDto, rbac.JobAction_Edit)
if err != nil {
return nil, err
}

var connectionId *string
if job.GetSource().GetOptions() != nil {
if job.GetSource().GetOptions().GetMysql() != nil {
connectionId = &job.GetSource().GetOptions().GetMysql().ConnectionId
} else if job.GetSource().GetOptions().GetPostgres() != nil {
connectionId = &job.GetSource().GetOptions().GetPostgres().ConnectionId
} else if job.GetSource().GetOptions().GetDynamodb() != nil {
connectionId = &job.GetSource().GetOptions().GetDynamodb().ConnectionId
} else if job.GetSource().GetOptions().GetMssql() != nil {
connectionId = &job.GetSource().GetOptions().GetMssql().ConnectionId
if jobDto.GetSource().GetOptions() != nil {
if jobDto.GetSource().GetOptions().GetMysql() != nil {
connectionId = &jobDto.GetSource().GetOptions().GetMysql().ConnectionId
} else if jobDto.GetSource().GetOptions().GetPostgres() != nil {
connectionId = &jobDto.GetSource().GetOptions().GetPostgres().ConnectionId
} else if jobDto.GetSource().GetOptions().GetDynamodb() != nil {
connectionId = &jobDto.GetSource().GetOptions().GetDynamodb().ConnectionId
} else if jobDto.GetSource().GetOptions().GetMssql() != nil {
connectionId = &jobDto.GetSource().GetOptions().GetMssql().ConnectionId
} else {
return nil, nucleuserrors.NewBadRequest("only jobs with a valid source connection id may be subset")
}
Expand Down Expand Up @@ -1109,12 +1109,12 @@ func (s *Service) UpdateJobDestinationConnection(
if err != nil {
return nil, err
}
job := jobResp.Msg.GetJob()
jobDto := jobResp.Msg.GetJob()
user, err := s.userdataclient.GetUser(ctx)
if err != nil {
return nil, err
}
err = user.EnforceJob(ctx, job, rbac.JobAction_Edit)
err = user.EnforceJob(ctx, jobDto, rbac.JobAction_Edit)
if err != nil {
return nil, err
}
Expand All @@ -1123,7 +1123,7 @@ func (s *Service) UpdateJobDestinationConnection(
if err != nil {
return nil, err
}
if err := s.verifyConnectionInAccount(ctx, req.Msg.GetConnectionId(), job.GetAccountId()); err != nil {
if err := s.verifyConnectionInAccount(ctx, req.Msg.GetConnectionId(), jobDto.GetAccountId()); err != nil {
return nil, err
}
options := &pg_models.JobDestinationOptions{}
Expand Down Expand Up @@ -1155,7 +1155,7 @@ func (s *Service) UpdateJobDestinationConnection(
}

updatedJob, err := s.GetJob(ctx, connect.NewRequest(&mgmtv1alpha1.GetJobRequest{
Id: job.GetId(),
Id: jobDto.GetId(),
}))
if err != nil {
return nil, err
Expand Down Expand Up @@ -1193,15 +1193,15 @@ func (s *Service) DeleteJobDestinationConnection(
if err != nil {
return nil, err
}
job := jobResp.Msg.GetJob()
jobDto := jobResp.Msg.GetJob()

logger = logger.With("jobId", job.GetId())
logger = logger.With("jobId", jobDto.GetId())

user, err := s.userdataclient.GetUser(ctx)
if err != nil {
return nil, err
}
err = user.EnforceJob(ctx, job, rbac.JobAction_Edit)
err = user.EnforceJob(ctx, jobDto, rbac.JobAction_Edit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1369,7 +1369,7 @@ func (s *Service) SetJobWorkflowOptions(
logger := logger_interceptor.GetLoggerFromContextOrDefault(ctx)
logger = logger.With("jobId", req.Msg.Id)

job, err := s.GetJob(ctx, connect.NewRequest(&mgmtv1alpha1.GetJobRequest{
jobResp, err := s.GetJob(ctx, connect.NewRequest(&mgmtv1alpha1.GetJobRequest{
Id: req.Msg.Id,
}))
if err != nil {
Expand All @@ -1380,7 +1380,7 @@ func (s *Service) SetJobWorkflowOptions(
if err != nil {
return nil, err
}
err = user.EnforceJob(ctx, job.Msg.GetJob(), rbac.JobAction_Edit)
err = user.EnforceJob(ctx, jobResp.Msg.GetJob(), rbac.JobAction_Edit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1408,8 +1408,8 @@ func (s *Service) SetJobWorkflowOptions(

err = s.temporalmgr.UpdateSchedule(
ctx,
job.Msg.GetJob().GetAccountId(),
job.Msg.GetJob().GetId(),
jobResp.Msg.GetJob().GetAccountId(),
jobResp.Msg.GetJob().GetId(),
&temporalclient.ScheduleUpdateOptions{
DoUpdate: func(schedule temporalclient.ScheduleUpdateInput) (*temporalclient.ScheduleUpdate, error) {
action, ok := schedule.Description.Schedule.Action.(*temporalclient.ScheduleWorkflowAction)
Expand Down
Loading

0 comments on commit 93f2f37

Please sign in to comment.