Skip to content

Commit

Permalink
PMM-12896 Add limit for actions/jobs executed on the same DB at the s…
Browse files Browse the repository at this point in the history
…ame time (#2898)

* Add limit for actions/jobs executed on the same DB at the same time

* PMM-12896 Fix

* PMM-12896 Fixes

* PMM-12896 Fix

* PMM-12896 Fix tests

* PMM-12896 Improvements, fixes, comments, tests

* PMM-12896 Make per DB capacity configurable

* PMM-12896 Fix

* PMM-12896 Improve tests

* PMM-12896 Linter fixes

* PMM-12896 Refactoring

* PMM-12896 Fix comment

* PMM-12896 Fix DSN method for PT mysql summary action

* PMM-12896 Fix bug in local semaphores releasing

* PMM-12896 Refactoring

* PMM-12896 Refactoring

* PMM-12896 Refactoring

* PMM-12896 Refactoring

* PMM-12896 Fix test

* PMM-12896 Fix tests

* Revert "PMM-12896 Fix tests"

This reverts commit 477790b.

* PMM-12896 Fix tests

* PMM-12896 Use timeout only for job/action exectuion, not for resources awaiting

* PMM-12896 Refactoring

* Update agent/config/config.go

Co-authored-by: Alex Demidoff <[email protected]>

* Update agent/runner/actions/mongodb_explain_action.go

Co-authored-by: Alex Demidoff <[email protected]>

* Update agent/runner/actions/postgresql_query_select_action.go

Co-authored-by: Alex Demidoff <[email protected]>

* PMM-12896 Refactoring

* PMM-12896 Mute linter

---------

Co-authored-by: Alex Demidoff <[email protected]>
Co-authored-by: Vasyl Yurkovych <[email protected]>
  • Loading branch information
3 people authored May 22, 2024
1 parent 6200529 commit 09f3823
Show file tree
Hide file tree
Showing 38 changed files with 753 additions and 316 deletions.
4 changes: 3 additions & 1 deletion agent/agents/mongodb/internal/profiler/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ func testProfiler(t *testing.T, url string) {
Query: findBucket.Common.Example,
}

ex := actions.NewMongoDBExplainAction(id, 5*time.Second, params, os.TempDir())
ex, err := actions.NewMongoDBExplainAction(id, 5*time.Second, params, os.TempDir())
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), ex.Timeout())
defer cancel()
res, err := ex.Run(ctx)
Expand Down
31 changes: 18 additions & 13 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,10 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {

cfg := c.cfg.Get()
var action actions.Action
var err error
switch params := p.Params.(type) {
case *agentpb.StartActionRequest_MysqlExplainParams:
action = actions.NewMySQLExplainAction(p.ActionId, timeout, params.MysqlExplainParams)
action, err = actions.NewMySQLExplainAction(p.ActionId, timeout, params.MysqlExplainParams)

case *agentpb.StartActionRequest_MysqlShowCreateTableParams:
action = actions.NewMySQLShowCreateTableAction(p.ActionId, timeout, params.MysqlShowCreateTableParams)
Expand All @@ -468,13 +469,13 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
action = actions.NewMySQLShowIndexAction(p.ActionId, timeout, params.MysqlShowIndexParams)

case *agentpb.StartActionRequest_PostgresqlShowCreateTableParams:
action = actions.NewPostgreSQLShowCreateTableAction(p.ActionId, timeout, params.PostgresqlShowCreateTableParams, cfg.Paths.TempDir)
action, err = actions.NewPostgreSQLShowCreateTableAction(p.ActionId, timeout, params.PostgresqlShowCreateTableParams, cfg.Paths.TempDir)

case *agentpb.StartActionRequest_PostgresqlShowIndexParams:
action = actions.NewPostgreSQLShowIndexAction(p.ActionId, timeout, params.PostgresqlShowIndexParams, cfg.Paths.TempDir)
action, err = actions.NewPostgreSQLShowIndexAction(p.ActionId, timeout, params.PostgresqlShowIndexParams, cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MongodbExplainParams:
action = actions.NewMongoDBExplainAction(p.ActionId, timeout, params.MongodbExplainParams, cfg.Paths.TempDir)
action, err = actions.NewMongoDBExplainAction(p.ActionId, timeout, params.MongodbExplainParams, cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MysqlQueryShowParams:
action = actions.NewMySQLQueryShowAction(p.ActionId, timeout, params.MysqlQueryShowParams)
Expand All @@ -483,13 +484,13 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
action = actions.NewMySQLQuerySelectAction(p.ActionId, timeout, params.MysqlQuerySelectParams)

case *agentpb.StartActionRequest_PostgresqlQueryShowParams:
action = actions.NewPostgreSQLQueryShowAction(p.ActionId, timeout, params.PostgresqlQueryShowParams, cfg.Paths.TempDir)
action, err = actions.NewPostgreSQLQueryShowAction(p.ActionId, timeout, params.PostgresqlQueryShowParams, cfg.Paths.TempDir)

case *agentpb.StartActionRequest_PostgresqlQuerySelectParams:
action = actions.NewPostgreSQLQuerySelectAction(p.ActionId, timeout, params.PostgresqlQuerySelectParams, cfg.Paths.TempDir)
action, err = actions.NewPostgreSQLQuerySelectAction(p.ActionId, timeout, params.PostgresqlQuerySelectParams, cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MongodbQueryGetparameterParams:
action = actions.NewMongoDBQueryAdmincommandAction(
action, err = actions.NewMongoDBQueryAdmincommandAction(
p.ActionId,
timeout,
params.MongodbQueryGetparameterParams.Dsn,
Expand All @@ -499,7 +500,7 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MongodbQueryBuildinfoParams:
action = actions.NewMongoDBQueryAdmincommandAction(
action, err = actions.NewMongoDBQueryAdmincommandAction(
p.ActionId,
timeout,
params.MongodbQueryBuildinfoParams.Dsn,
Expand All @@ -509,7 +510,7 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MongodbQueryGetcmdlineoptsParams:
action = actions.NewMongoDBQueryAdmincommandAction(
action, err = actions.NewMongoDBQueryAdmincommandAction(
p.ActionId,
timeout,
params.MongodbQueryGetcmdlineoptsParams.Dsn,
Expand All @@ -519,7 +520,7 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MongodbQueryReplsetgetstatusParams:
action = actions.NewMongoDBQueryAdmincommandAction(
action, err = actions.NewMongoDBQueryAdmincommandAction(
p.ActionId,
timeout,
params.MongodbQueryReplsetgetstatusParams.Dsn,
Expand All @@ -529,7 +530,7 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
cfg.Paths.TempDir)

case *agentpb.StartActionRequest_MongodbQueryGetdiagnosticdataParams:
action = actions.NewMongoDBQueryAdmincommandAction(
action, err = actions.NewMongoDBQueryAdmincommandAction(
p.ActionId,
timeout,
params.MongodbQueryGetdiagnosticdataParams.Dsn,
Expand Down Expand Up @@ -565,6 +566,10 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {
return errors.Wrapf(agenterrors.ErrInvalidArgument, "invalid action type request: %T", params)
}

if err != nil {
return errors.Wrap(err, "failed to create action")
}

return c.runner.StartAction(action)
}

Expand Down Expand Up @@ -645,7 +650,7 @@ func (c *Client) handleStartJobRequest(p *agentpb.StartJobRequest) error {
return errors.WithStack(err)
}

job, err = jobs.NewMongoDBBackupJob(p.JobId, timeout, j.MongodbBackup.Name, &dsn, locationConfig,
job, err = jobs.NewMongoDBBackupJob(p.JobId, timeout, j.MongodbBackup.Name, dsn, locationConfig,
j.MongodbBackup.EnablePitr, j.MongodbBackup.DataModel, j.MongodbBackup.Folder)
if err != nil {
return err
Expand Down Expand Up @@ -678,7 +683,7 @@ func (c *Client) handleStartJobRequest(p *agentpb.StartJobRequest) error {
}

job = jobs.NewMongoDBRestoreJob(p.JobId, timeout, j.MongodbRestoreBackup.Name,
j.MongodbRestoreBackup.PitrTimestamp.AsTime(), &dsn, locationConfig,
j.MongodbRestoreBackup.PitrTimestamp.AsTime(), dsn, locationConfig,
c.supervisor, j.MongodbRestoreBackup.Folder, j.MongodbRestoreBackup.PbmMetadata.Name)
default:
return errors.Errorf("unknown job type: %T", j)
Expand Down
4 changes: 2 additions & 2 deletions agent/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestClient(t *testing.T) {
s.On("AgentsList").Return([]*agentlocalpb.AgentInfo{})
s.On("ClearChangesChannel").Return()

r := runner.New(cfgStorage.Get().RunnerCapacity)
r := runner.New(cfgStorage.Get().RunnerCapacity, cfgStorage.Get().RunnerMaxConnectionsPerService)
client := New(cfgStorage, &s, r, nil, nil, nil, connectionuptime.NewService(time.Hour), nil)
err := client.Run(context.Background())
assert.NoError(t, err)
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestUnexpectedActionType(t *testing.T) {
s.On("AgentsList").Return([]*agentlocalpb.AgentInfo{})
s.On("ClearChangesChannel").Return()

r := runner.New(cfgStorage.Get().RunnerCapacity)
r := runner.New(cfgStorage.Get().RunnerCapacity, cfgStorage.Get().RunnerMaxConnectionsPerService)
client := New(cfgStorage, s, r, nil, nil, nil, connectionuptime.NewService(time.Hour), nil)
err := client.Run(context.Background())
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion agent/commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func Run() {
supervisor := supervisor.NewSupervisor(ctx, v, configStorage)
connectionChecker := connectionchecker.New(configStorage)
serviceInfoBroker := serviceinfobroker.New(configStorage)
r := runner.New(cfg.RunnerCapacity)
r := runner.New(cfg.RunnerCapacity, cfg.RunnerMaxConnectionsPerService)
client := client.New(configStorage, supervisor, r, connectionChecker, v, serviceInfoBroker, prepareConnectionService(ctx, cfg), logStore)
localServer := agentlocal.NewServer(configStorage, supervisor, client, configFilepath, logStore)

Expand Down
11 changes: 7 additions & 4 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,11 @@ type Setup struct {
type Config struct { //nolint:musttag
// no config file there

ID string `yaml:"id"`
ListenAddress string `yaml:"listen-address"`
ListenPort uint16 `yaml:"listen-port"`
RunnerCapacity uint16 `yaml:"runner-capacity,omitempty"`
ID string `yaml:"id"`
ListenAddress string `yaml:"listen-address"`
ListenPort uint16 `yaml:"listen-port"`
RunnerCapacity uint16 `yaml:"runner-capacity,omitempty"`
RunnerMaxConnectionsPerService uint16 `yaml:"runner-max-connections-per-service,omitempty"`

Server Server `yaml:"server"`
Paths Paths `yaml:"paths"`
Expand Down Expand Up @@ -352,6 +353,8 @@ func Application(cfg *Config) (*kingpin.Application, *string) {
Envar("PMM_AGENT_LISTEN_PORT").Uint16Var(&cfg.ListenPort)
app.Flag("runner-capacity", "Agent internal actions/jobs runner capacity [PMM_AGENT_RUNNER_CAPACITY]").
Envar("PMM_AGENT_RUNNER_CAPACITY").Uint16Var(&cfg.RunnerCapacity)
app.Flag("runner-max-connections-per-service", "Agent internal action/job runner connection limit per DB instance").
Envar("PMM_AGENT_RUNNER_MAX_CONNECTIONS_PER_SERVICE").Uint16Var(&cfg.RunnerMaxConnectionsPerService)

app.Flag("server-address", "PMM Server address [PMM_AGENT_SERVER_ADDRESS]").
Envar("PMM_AGENT_SERVER_ADDRESS").PlaceHolder("<host:port>").StringVar(&cfg.Server.Address)
Expand Down
2 changes: 2 additions & 0 deletions agent/runner/actions/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Action interface {
Type() string
// Timeout returns Job timeout.
Timeout() time.Duration
// DSN returns Data Source Name required for the Action.
DSN() string
// Run runs an Action and returns output and error.
Run(ctx context.Context) ([]byte, error)

Expand Down
30 changes: 18 additions & 12 deletions agent/runner/actions/mongodb_explain_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"path/filepath"
"strings"
"time"

"github.com/percona/percona-toolkit/src/go/mongolib/proto"
Expand All @@ -31,23 +30,30 @@ import (
"github.com/percona/pmm/api/agentpb"
)

const mongoDBExplainActionType = "mongodb-explain"

type mongodbExplainAction struct {
id string
timeout time.Duration
params *agentpb.StartActionRequest_MongoDBExplainParams
tempDir string
dsn string
}

var errCannotExplain = fmt.Errorf("cannot explain this type of query")

// NewMongoDBExplainAction creates a MongoDB EXPLAIN query Action.
func NewMongoDBExplainAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_MongoDBExplainParams, tempDir string) Action {
func NewMongoDBExplainAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_MongoDBExplainParams, tempDir string) (Action, error) {
dsn, err := templates.RenderDSN(params.Dsn, params.TextFiles, filepath.Join(tempDir, mongoDBExplainActionType, id))
if err != nil {
return nil, errors.WithStack(err)
}

return &mongodbExplainAction{
id: id,
timeout: timeout,
params: params,
tempDir: tempDir,
}
dsn: dsn,
}, nil
}

// ID returns an Action ID.
Expand All @@ -62,17 +68,17 @@ func (a *mongodbExplainAction) Timeout() time.Duration {

// Type returns an Action type.
func (a *mongodbExplainAction) Type() string {
return "mongodb-explain"
return mongoDBExplainActionType
}

// DSN returns the DSN for the Action.
func (a *mongodbExplainAction) DSN() string {
return a.dsn
}

// Run runs an action and returns output and error.
func (a *mongodbExplainAction) Run(ctx context.Context) ([]byte, error) {
dsn, err := templates.RenderDSN(a.params.Dsn, a.params.TextFiles, filepath.Join(a.tempDir, strings.ToLower(a.Type()), a.id))
if err != nil {
return nil, errors.WithStack(err)
}

opts, err := mongo_fix.ClientOptionsForDSN(dsn)
opts, err := mongo_fix.ClientOptionsForDSN(a.dsn)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
8 changes: 6 additions & 2 deletions agent/runner/actions/mongodb_explain_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func TestMongoDBExplain(t *testing.T) {
Query: `{"ns":"test.coll","op":"query","query":{"k":{"$lte":{"$numberInt":"1"}}}}`,
}

ex := NewMongoDBExplainAction(id, 0, params, os.TempDir())
ex, err := NewMongoDBExplainAction(id, 0, params, os.TempDir())
require.NoError(t, err)

res, err := ex.Run(ctx)
assert.Nil(t, err)

Expand Down Expand Up @@ -130,7 +132,9 @@ func TestNewMongoDBExplain(t *testing.T) {
Query: string(query),
}

ex := NewMongoDBExplainAction(id, 0, params, os.TempDir())
ex, err := NewMongoDBExplainAction(id, 0, params, os.TempDir())
require.NoError(t, err)

res, err := ex.Run(ctx)
assert.NoError(t, err)

Expand Down
38 changes: 24 additions & 14 deletions agent/runner/actions/mongodb_query_admincommand_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package actions
import (
"context"
"path/filepath"
"strings"
"time"

"github.com/pkg/errors"
Expand All @@ -29,27 +28,38 @@ import (
"github.com/percona/pmm/api/agentpb"
)

const mongoDBQueryAdminCommandActionType = "mongodb-query-admincommand"

type mongodbQueryAdmincommandAction struct {
id string
timeout time.Duration
dsn string
files *agentpb.TextFiles
command string
arg interface{}
tempDir string
}

// NewMongoDBQueryAdmincommandAction creates a MongoDB adminCommand query action.
func NewMongoDBQueryAdmincommandAction(id string, timeout time.Duration, dsn string, files *agentpb.TextFiles, command string, arg interface{}, tempDir string) Action {
func NewMongoDBQueryAdmincommandAction(
id string,
timeout time.Duration,
dsn string,
files *agentpb.TextFiles,
command string,
arg interface{},
tempDir string,
) (Action, error) {
dsn, err := templates.RenderDSN(dsn, files, filepath.Join(tempDir, mongoDBQueryAdminCommandActionType, id))
if err != nil {
return nil, errors.WithStack(err)
}

return &mongodbQueryAdmincommandAction{
id: id,
timeout: timeout,
dsn: dsn,
files: files,
command: command,
arg: arg,
tempDir: tempDir,
}
}, nil
}

// ID returns an action ID.
Expand All @@ -64,17 +74,17 @@ func (a *mongodbQueryAdmincommandAction) Timeout() time.Duration {

// Type returns an action type.
func (a *mongodbQueryAdmincommandAction) Type() string {
return "mongodb-query-admincommand"
return mongoDBQueryAdminCommandActionType
}

// DSN returns a DSN for the Action.
func (a *mongodbQueryAdmincommandAction) DSN() string {
return a.dsn
}

// Run runs an action and returns output and error.
func (a *mongodbQueryAdmincommandAction) Run(ctx context.Context) ([]byte, error) {
dsn, err := templates.RenderDSN(a.dsn, a.files, filepath.Join(a.tempDir, strings.ToLower(a.Type()), a.id))
if err != nil {
return nil, errors.WithStack(err)
}

opts, err := mongo_fix.ClientOptionsForDSN(dsn)
opts, err := mongo_fix.ClientOptionsForDSN(a.dsn)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func TestMongoDBActionsReplWithSSL(t *testing.T) {

func runAction(t *testing.T, id string, timeout time.Duration, dsn string, files *agentpb.TextFiles, command string, arg interface{}, tempDir string) []byte { //nolint:unparam
t.Helper()
a := NewMongoDBQueryAdmincommandAction(id, timeout, dsn, files, command, arg, tempDir)
a, err := NewMongoDBQueryAdmincommandAction(id, timeout, dsn, files, command, arg, tempDir)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
b, err := a.Run(ctx)
Expand Down Expand Up @@ -227,7 +229,9 @@ func replSetGetStatusAssertionsReplicated(t *testing.T, b []byte) { //nolint:the
}

func replSetGetStatusAssertionsStandalone(t *testing.T, id string, timeout time.Duration, dsn string, files *agentpb.TextFiles, command string, arg interface{}, tempDir string) { //nolint:thelper
a := NewMongoDBQueryAdmincommandAction(id, timeout, dsn, files, command, arg, tempDir)
a, err := NewMongoDBQueryAdmincommandAction(id, timeout, dsn, files, command, arg, tempDir)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
b, err := a.Run(ctx)
Expand Down
Loading

0 comments on commit 09f3823

Please sign in to comment.