Skip to content

Commit

Permalink
s/paralel/parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
paramite committed Apr 30, 2024
1 parent 07303b4 commit ed39d5d
Showing 1 changed file with 32 additions and 32 deletions.
64 changes: 32 additions & 32 deletions connector/amqp10/amqp10.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,24 @@ const (
defaultListenPrefetch = 1
defaultClientName = "localhost"
defaultLinkFailureLimit = 20
defaultMaxParalelSend = 128
defaultMaxParallelSend = 128
)

// AMQP10Connector is the object to be used for communication with AMQP-1.0 entity
type AMQP10Connector struct {
Address string
ClientName string
SendTimeout int64
ListenPrefetch int64
LinkFailureLimit int64
MaxParalelSendLimit int64
appName string
inConnection *amqp.Session
outConnection *amqp.Session
receivers map[string]*amqp.Receiver
senders map[string]*amqp.Sender
logger *logging.Logger
interrupt chan bool
Address string
ClientName string
SendTimeout int64
ListenPrefetch int64
LinkFailureLimit int64
MaxParallelSendLimit int64
appName string
inConnection *amqp.Session
outConnection *amqp.Session
receivers map[string]*amqp.Receiver
senders map[string]*amqp.Sender
logger *logging.Logger
interrupt chan bool
}

// AMQP10Message holds received (or to be sent) messages from (to) AMQP-1.0 entity
Expand Down Expand Up @@ -80,22 +80,22 @@ func CreateAMQP10Connector(
appName string,
sendTimeout int64,
linkFailureLimit int64,
maxParalelLimit int64,
maxParallelLimit int64,
listenPrefetch int64,
listenChannels []string,
) (*AMQP10Connector, error) {
connector := AMQP10Connector{
Address: address,
ClientName: clientName,
SendTimeout: sendTimeout,
ListenPrefetch: listenPrefetch,
LinkFailureLimit: linkFailureLimit,
MaxParalelSendLimit: maxParalelLimit,
appName: appName,
logger: logger,
receivers: make(map[string]*amqp.Receiver),
senders: make(map[string]*amqp.Sender),
interrupt: make(chan bool),
Address: address,
ClientName: clientName,
SendTimeout: sendTimeout,
ListenPrefetch: listenPrefetch,
LinkFailureLimit: linkFailureLimit,
MaxParallelSendLimit: maxParallelLimit,
appName: appName,
logger: logger,
receivers: make(map[string]*amqp.Receiver),
senders: make(map[string]*amqp.Sender),
interrupt: make(chan bool),
}

// connect
Expand Down Expand Up @@ -154,13 +154,13 @@ func ConnectAMQP10(appName string, cfg config.Config, logger *logging.Logger) (*

switch conf := cfg.(type) {
case *config.INIConfig:
opt, err = conf.GetOption("amqp1/send_max_in_paralel")
opt, err = conf.GetOption("amqp1/send_max_in_parallel")
case *config.JSONConfig:
opt, err = conf.GetOption("Amqp1.Connection.SendMaxInParalel")
opt, err = conf.GetOption("Amqp1.Connection.SendMaxInParallel")
}
maxParalelLimit := int64(defaultMaxParalelSend)
maxParallelLimit := int64(defaultMaxParallelSend)
if opt != nil && err == nil {
maxParalelLimit = opt.GetInt()
maxParallelLimit = opt.GetInt()
}

switch conf := cfg.(type) {
Expand Down Expand Up @@ -196,7 +196,7 @@ func ConnectAMQP10(appName string, cfg config.Config, logger *logging.Logger) (*
prf = opt.GetInt()
}

return CreateAMQP10Connector(logger, addr, clientName, appName, sendTimeout, linkLimit, maxParalelLimit, prf, listen)
return CreateAMQP10Connector(logger, addr, clientName, appName, sendTimeout, linkLimit, maxParallelLimit, prf, listen)
}

//---------------------------- connect helpers and method ----------------------------
Expand Down Expand Up @@ -379,7 +379,7 @@ func (conn *AMQP10Connector) startSenders(inchan chan interface{}, wg *sync.Wait

// block if we have too much sending goroutines
sndLock.RLock()
for activeSend > conn.MaxParalelSendLimit {
for activeSend > conn.MaxParallelSendLimit {
time.Sleep(time.Second)
}
sndLock.RUnlock()
Expand Down

0 comments on commit ed39d5d

Please sign in to comment.