Skip to content

Commit

Permalink
Merge pull request #13 from infrawatch/connector-spli
Browse files Browse the repository at this point in the history
Connector split
  • Loading branch information
vyzigold authored Feb 23, 2021
2 parents 31d051e + a728e01 commit 28412c8
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 149 deletions.
147 changes: 86 additions & 61 deletions connector/amqp10.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ import (
"github.com/apache/qpid-proton/go/pkg/electron"
)

const (
defaultSendTimeout = 2
defaultListenPrefetch = -1
defaultClientName = "localhost"
)

//AMQP10Receiver is tagged electron receiver
type AMQP10Receiver struct {
Receiver electron.Receiver
Tags []string
}

//LokiConnector is the object to be used for communication with AMQP-1.0 entity
//AMQP10Connector is the object to be used for communication with AMQP-1.0 entity
type AMQP10Connector struct {
Address string
ClientName string
Expand All @@ -36,96 +42,115 @@ type AMQP10Message struct {
Tags []string
}

//CreateAMQP10Connector creates the connector and connects to given AMQP1.0 service
func CreateAMQP10Connector(logger *logging.Logger, address string, clientName string, sendTimeout int64, listenPrefetch int64, listenChannels []string) (*AMQP10Connector, error) {
connector := AMQP10Connector{
Address: address,
ClientName: clientName,
SendTimeout: sendTimeout,
logger: logger,
receivers: make([]AMQP10Receiver, 0),
}

// connect
if err := connector.Connect(); err != nil {
return &connector, fmt.Errorf("Error while connecting to AMQP")
}
// bind to channels
for _, channel := range listenChannels {
if len(channel) < 1 {
continue
}
logger.Metadata(map[string]interface{}{
"channel": channel,
"prefetch": listenPrefetch,
})
logger.Debug("Creating AMQP receiver for channel")
if err := connector.CreateReceiver(channel, int(listenPrefetch)); err != nil {
return &connector, fmt.Errorf("Failed to create receiver: %s", err)
}
}
return &connector, nil
}

//ConnectAMQP10 creates new AMQP1.0 connector from the given configuration file
func ConnectAMQP10(cfg config.Config, logger *logging.Logger) (*AMQP10Connector, error) {
connector := AMQP10Connector{}
connector.receivers = make([]AMQP10Receiver, 0)
connector.logger = logger

var err error
// pre-connect initialization
var addr *config.Option
var opt *config.Option

switch conf := cfg.(type) {
case *config.INIConfig:
addr, err = conf.GetOption("amqp1/connection")
opt, err = conf.GetOption("amqp1/connection")
case *config.JSONConfig:
addr, err = conf.GetOption("Amqp1.Connection.Address")
opt, err = conf.GetOption("Amqp1.Connection.Address")
default:
return &connector, fmt.Errorf("Unknown Config type")
return nil, fmt.Errorf("Unknown Config type")
}
if err == nil && addr != nil {
connector.Address = addr.GetString()
} else {
return &connector, fmt.Errorf("Failed to get connection URL from configuration file: %s", err)
if err != nil {
return nil, err
}
if opt == nil {
return nil, fmt.Errorf("Failed to get connection URL from configuration file")
}
addr := opt.GetString()

var sendTimeout *config.Option
switch conf := cfg.(type) {
case *config.INIConfig:
sendTimeout, err = conf.GetOption("amqp1/send_timeout")
opt, err = conf.GetOption("amqp1/send_timeout")
case *config.JSONConfig:
sendTimeout, err = conf.GetOption("Amqp1.Connection.SendTimeout")
opt, err = conf.GetOption("Amqp1.Connection.SendTimeout")
}
if err == nil && sendTimeout != nil {
connector.SendTimeout = sendTimeout.GetInt()
} else {
return &connector, fmt.Errorf("Failed to get send timeout from configuration file: %s", err)
if err != nil {
return nil, err
}
sendTimeout := int64(defaultSendTimeout)
if opt != nil {
sendTimeout = opt.GetInt()
}

var clientName *config.Option
switch conf := cfg.(type) {
case *config.INIConfig:
clientName, err = conf.GetOption("amqp1/client_name")
opt, err = conf.GetOption("amqp1/client_name")
case *config.JSONConfig:
clientName, err = conf.GetOption("Amqp1.Client.Name")
opt, err = conf.GetOption("Amqp1.Client.Name")
}
if err == nil && clientName != nil {
connector.ClientName = clientName.GetString()
} else {
return &connector, fmt.Errorf("Failed to get client name from configuration file: %s", err)
if err != nil {
return nil, err
}
clientName := defaultClientName
if opt != nil {
clientName = opt.GetString()
}

// connect
if err := connector.Connect(); err != nil {
return &connector, fmt.Errorf("Error while connecting to AMQP")
switch conf := cfg.(type) {
case *config.INIConfig:
opt, err = conf.GetOption("amqp1/listen_channels")
case *config.JSONConfig:
opt, err = conf.GetOption("Amqp1.Connection.ListenChannels")
}
if err != nil {
return nil, err
}
listen := []string{}
if opt != nil {
listen = opt.GetStrings(",")
}

// post-connect initialization
var listen *config.Option
switch conf := cfg.(type) {
case *config.INIConfig:
listen, err = conf.GetOption("amqp1/listen_channels")
opt, err = conf.GetOption("amqp1/listen_prefetch")
case *config.JSONConfig:
listen, err = conf.GetOption("Amqp1.Connection.ListenChannels")
opt, err = conf.GetOption("Amqp1.Connection.ListenPrefetch")
}
if err == nil && listen != nil {
var prf *config.Option
switch conf := cfg.(type) {
case *config.INIConfig:
prf, err = conf.GetOption("amqp1/listen_prefetch")
case *config.JSONConfig:
prf, err = conf.GetOption("Amqp1.Connection.ListenPrefetch")
}
prefetch := int64(-1)
if err == nil && prf != nil {
prefetch = prf.GetInt()
}
for _, channel := range listen.GetStrings(",") {
if len(channel) < 1 {
continue
}
logger.Metadata(map[string]interface{}{
"channel": channel,
"prefetch": prefetch,
})
logger.Debug("Creating AMQP receiver for channel")
if err := connector.CreateReceiver(channel, int(prefetch)); err != nil {
return &connector, fmt.Errorf("Failed to create receiver: %s", err)
}
}
if err != nil {
return nil, err
}
prf := int64(defaultListenPrefetch)
if opt != nil {
prf = opt.GetInt()
}

return &connector, nil
return CreateAMQP10Connector(logger, addr, clientName, sendTimeout, prf, listen)
}

//Connect creates input and output connection to configured AMQP1.0 node
Expand Down
111 changes: 67 additions & 44 deletions connector/sensu.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ const (
//QueueNameKeepAlives is the name of queue used by Sensu server for receiving keepalive messages
QueueNameKeepAlives = "keepalives"
//QueueNameResults is the name of queue used by Sensu server for receiving check result messages
QueueNameResults = "results"
QueueNameResults = "results"
defaultClientAddress = "127.0.0.1"
defaultInterval = 30
)

//Result contains data about check execution
Expand Down Expand Up @@ -69,83 +71,104 @@ type SensuConnector struct {
consumer <-chan amqp.Delivery
}

//CreateSensuConnector creates the connector and connects on given RabbitMQ service with Sensu server on appropriate channels
func CreateSensuConnector(logger *logging.Logger, address string, clientName string, clientAddress string, keepaliveInterval int64, subscriptions []string) (*SensuConnector, error) {
connector := SensuConnector{
Address: address,
Subscription: subscriptions,
ClientName: clientName,
exchangeName: fmt.Sprintf("client:%s", clientName),
queueName: fmt.Sprintf("%s-infrawatch-%d", clientName, time.Now().Unix()),
ClientAddress: clientAddress,
KeepaliveInterval: keepaliveInterval,
logger: logger,
}

if err := connector.Connect(); err != nil {
return &connector, fmt.Errorf("Error while connecting to RabbitMQ")
}

return &connector, nil
}

//ConnectSensu creates new Sensu connector from the given configuration file
func ConnectSensu(cfg config.Config, logger *logging.Logger) (*SensuConnector, error) {
connector := SensuConnector{}
connector.logger = logger

var err error
var addr *config.Option
var opt *config.Option

switch conf := cfg.(type) {
case *config.INIConfig:
addr, err = conf.GetOption("sensu/connection")
opt, err = conf.GetOption("sensu/connection")
case *config.JSONConfig:
addr, err = conf.GetOption("Sensu.Connection.Address")
opt, err = conf.GetOption("Sensu.Connection.Address")
default:
return &connector, fmt.Errorf("Unknown Config type")
return nil, fmt.Errorf("Unknown Config type")
}
if err != nil {
return nil, err
}
if err == nil && addr != nil {
connector.Address = addr.GetString()
} else {
return &connector, fmt.Errorf("Failed to get connection URL from configuration file")
if opt == nil {
return nil, fmt.Errorf("Failed to get connection URL from configuration file")
}
addr := opt.GetString()

var subs *config.Option
switch conf := cfg.(type) {
case *config.INIConfig:
subs, err = conf.GetOption("sensu/subscriptions")
opt, err = conf.GetOption("sensu/subscriptions")
case *config.JSONConfig:
subs, err = conf.GetOption("Sensu.Connection.Subscriptions")
opt, err = conf.GetOption("Sensu.Connection.Subscriptions")
}
if err != nil {
return nil, err
}
if err == nil && subs != nil {
connector.Subscription = subs.GetStrings(",")
} else {
return &connector, fmt.Errorf("Failed to get subscription channels from configuration file")
subs := []string{"all"}
if opt != nil {
subs = opt.GetStrings(",")
}

var clientName *config.Option
switch conf := cfg.(type) {
case *config.INIConfig:
clientName, err = conf.GetOption("sensu/client_name")
opt, err = conf.GetOption("sensu/client_name")
case *config.JSONConfig:
clientName, err = conf.GetOption("Sensu.Client.Name")
opt, err = conf.GetOption("Sensu.Client.Name")
}
if err != nil {
return nil, err
}
if err == nil && clientName != nil {
connector.ClientName = clientName.GetString()
connector.exchangeName = fmt.Sprintf("client:%s", clientName)
connector.queueName = fmt.Sprintf("%s-infrawatch-%d", clientName, time.Now().Unix())
} else {
return &connector, fmt.Errorf("Failed to get client name from configuration file")
clientName := defaultClientName
if opt != nil {
clientName = opt.GetString()
}

var clientAddr *config.Option
switch conf := cfg.(type) {
case *config.INIConfig:
clientAddr, err = conf.GetOption("sensu/client_address")
opt, err = conf.GetOption("sensu/client_address")
case *config.JSONConfig:
clientAddr, err = conf.GetOption("Sensu.Client.Address")
opt, err = conf.GetOption("Sensu.Client.Address")
}
if err == nil && clientAddr != nil {
connector.ClientAddress = clientAddr.GetString()
} else {
return &connector, fmt.Errorf("Failed to get client address from configuration file")
if err != nil {
return nil, err
}
clientAddr := defaultClientAddress
if opt != nil {
clientAddr = opt.GetString()
}

var interval *config.Option
switch conf := cfg.(type) {
case *config.INIConfig:
interval, err = conf.GetOption("sensu/keepalive_interval")
opt, err = conf.GetOption("sensu/keepalive_interval")
case *config.JSONConfig:
interval, err = conf.GetOption("Sensu.Connection.KeepaliveInterval")
opt, err = conf.GetOption("Sensu.Connection.KeepaliveInterval")
}
if err != nil {
return nil, err
}
if err == nil && interval != nil {
connector.KeepaliveInterval = interval.GetInt()
} else {
return &connector, fmt.Errorf("Failed to get keepalive interval from configuration file")
interval := int64(defaultInterval)
if opt != nil {
interval = opt.GetInt()
}

err = connector.Connect()
return &connector, err
return CreateSensuConnector(logger, addr, clientName, clientAddr, interval, subs)
}

//Connect connects to RabbitMQ server and
Expand Down
Loading

0 comments on commit 28412c8

Please sign in to comment.