diff --git a/config/instance.go b/config/instance.go index c907b0c..41640ad 100644 --- a/config/instance.go +++ b/config/instance.go @@ -55,6 +55,16 @@ func initInstanceConfig(file *os.File, cfgInstance *Instance) error { return fmt.Errorf("unknown config format type: %s. Use .toml, .yaml or .json suffix in filename", file.Name()) } +const ( + DefaultStorageConcurrency = 100 +) + +func EmbeddDafults(cfgInstance *Instance) { + if cfgInstance.StorageCnf.StorageConcurrency == 0 { + cfgInstance.StorageCnf.StorageConcurrency = DefaultStorageConcurrency + } +} + func LoadInstanceConfig(cfgPath string) error { var cfg Instance file, err := os.Open(cfgPath) diff --git a/config/proxy.go b/config/proxy.go index 39a384c..e99d811 100644 --- a/config/proxy.go +++ b/config/proxy.go @@ -1,4 +1,5 @@ package config type Proxy struct { + ConsolePort string `json:"console_port" toml:"console+port" yaml:"console_port"` } diff --git a/config/storage.go b/config/storage.go index 76092ba..c46a5ff 100644 --- a/config/storage.go +++ b/config/storage.go @@ -12,5 +12,8 @@ type Storage struct { StoragePrefix string `json:"storage_prefix" toml:"storage_prefix" yaml:"storage_prefix"` StorageBucket string `json:"storage_bucket" toml:"storage_bucket" yaml:"storage_bucket"` + // how many concurrrent connection acquire allowed + StorageConcurrency int64 `json:"storage_concurrency" toml:"storage_concurrency" yaml:"storage_concurrency"` + StorageRegion string `json:"storage_region" toml:"storage_region" yaml:"storage_region"` } diff --git a/go.mod b/go.mod index 5f3f707..5265168 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,11 @@ go 1.21.1 require ( github.com/BurntSushi/toml v1.3.2 github.com/ProtonMail/go-crypto v0.0.0-20230923063757-afb1ddc0824c - github.com/aws/aws-sdk-go v1.47.9 + github.com/aws/aws-sdk-go v1.47.10 github.com/pkg/errors v0.9.1 github.com/rs/zerolog v1.31.0 github.com/spf13/cobra v1.8.0 + golang.org/x/sync v0.5.0 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index bc49c42..f9e9d45 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8 github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/ProtonMail/go-crypto v0.0.0-20230923063757-afb1ddc0824c h1:kMFnB0vCcX7IL/m9Y5LO+KQYv+t1CQOiFe6+SV2J7bE= github.com/ProtonMail/go-crypto v0.0.0-20230923063757-afb1ddc0824c/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0= -github.com/aws/aws-sdk-go v1.47.9 h1:rarTsos0mA16q+huicGx0e560aYRtOucV5z2Mw23JRY= -github.com/aws/aws-sdk-go v1.47.9/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go v1.47.10 h1:cvufN7WkD1nlOgpRopsmxKQlFp5X1MfyAw4r7BBORQc= +github.com/aws/aws-sdk-go v1.47.10/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEMs= github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA= @@ -55,6 +55,8 @@ golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/pkg/storage/sessionpool.go b/pkg/storage/sessionpool.go index f9dd1af..3a36f9c 100644 --- a/pkg/storage/sessionpool.go +++ b/pkg/storage/sessionpool.go @@ -1,6 +1,8 @@ package storage import ( + "context" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/defaults" @@ -9,19 +11,25 @@ import ( "github.com/pkg/errors" "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/ylogger" + + "golang.org/x/sync/semaphore" ) type SessionPool interface { - GetSession() (*s3.S3, error) + GetSession(ctx context.Context) (*s3.S3, error) } type S3SessionPool struct { cnf *config.Storage + + sem *semaphore.Weighted } func NewSessionPool(cnf *config.Storage) SessionPool { + return &S3SessionPool{ cnf: cnf, + sem: semaphore.NewWeighted(cnf.StorageConcurrency), } } @@ -55,7 +63,10 @@ func (sp *S3SessionPool) createSession() (*session.Session, error) { return s, err } -func (s *S3SessionPool) GetSession() (*s3.S3, error) { +func (s *S3SessionPool) GetSession(ctx context.Context) (*s3.S3, error) { + s.sem.Acquire(ctx, 1) + defer s.sem.Release(1) + sess, err := s.createSession() if err != nil { return nil, errors.Wrap(err, "failed to create new session") diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index b00ecc8..a496b27 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,6 +1,7 @@ package storage import ( + "context" "io" "path" @@ -27,7 +28,8 @@ func NewStorage(cnf *config.Storage) StorageReader { } func (s *S3StorageReader) CatFileFromStorage(name string) (io.Reader, error) { - sess, err := s.pool.GetSession() + // XXX: fix this + sess, err := s.pool.GetSession(context.TODO()) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") return nil, err