Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle notifyClose #152

Open
wants to merge 2 commits into
base: toronto-main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ build:
-X 'github.com/0xPolygon/polygon-edge/versioning.BuildTime=$(TIME)'" \
main.go

.PHONY: build-amd
build-amd:
$(eval LATEST_VERSION = $(shell git describe --tags --abbrev=0))
$(eval COMMIT_HASH = $(shell git rev-parse HEAD))
$(eval BRANCH = $(shell git rev-parse --abbrev-ref HEAD | tr -d '\040\011\012\015\n'))
$(eval TIME = $(shell date))
GOOS=linux GOARCH=amd64 go build -o main -ldflags="\
-X 'github.com/0xPolygon/polygon-edge/versioning.Version=$(LATEST_VERSION)' \
-X 'github.com/0xPolygon/polygon-edge/versioning.Commit=$(COMMIT_HASH)'\
-X 'github.com/0xPolygon/polygon-edge/versioning.Branch=$(BRANCH)'\
-X 'github.com/0xPolygon/polygon-edge/versioning.BuildTime=$(TIME)'" \
main.go

.PHONY: lint
lint:
golangci-lint run --config .golangci.yml
Expand Down
12 changes: 12 additions & 0 deletions command/ibft/propose/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ const (
voteRemoveSCFunction = "function voteDrop(address oldValidator)"
)

const (
txGasPriceWei = 1000000000
txGasLimitWei = 1000000
)

type proposeParams struct {
addressRaw string
rawBLSPublicKey string
Expand Down Expand Up @@ -207,6 +212,13 @@ func (p *proposeParams) ibftSetVotingStationValidators(grpcAddress string, jsonr
functionArgs...,
)

txn.WithOpts(
&contract.TxnOpts{
GasPrice: txGasPriceWei + (4 * txGasPriceWei),
GasLimit: txGasLimitWei,
},
)

if txnErr != nil {
fmt.Println(fmt.Errorf("failed to initiate voting-station txn %w", txnErr))
return txnErr
Expand Down
37 changes: 29 additions & 8 deletions datafeed/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ const (
mqConsumerConcurrency = 1
)

const (
consumerRetries = 10
)

// MQService
type MQService struct {
logger hclog.Logger
Expand Down Expand Up @@ -61,6 +65,19 @@ func newMQService(logger hclog.Logger, config *MQConfig, datafeedService *DataFe
return mq, nil
}

func (mq *MQService) restartWithRetries(ctx context.Context) (<-chan *proto.DataFeedReport, <-chan error, error) {
for i := 0; i < consumerRetries; i++ {
mq.logger.Debug("Restarting consumer with try", i)
time.Sleep(5 * time.Second)
reports, errors, err := mq.startConsumer(ctx, mqConsumerConcurrency)
if err == nil {
return reports, errors, err
}
}

return nil, nil, fmt.Errorf("failed to restart consumer after %d retries", consumerRetries)
}

// startConsumeLoop
func (mq *MQService) startConsumeLoop() {
mq.logger.Debug("listening for MQ messages...")
Expand All @@ -80,19 +97,15 @@ func (mq *MQService) startConsumeLoop() {
mq.datafeedService.queueReportingTx(ProposeOutcome, report.MarketHash, report.Outcome)
case err = <-errors:
mq.logger.Error("error while consuming from message queue", "err", err)
mq.logger.Debug("Restarting consumer...")
time.Sleep(2 * time.Second)
reports, errors, err = mq.startConsumer(ctx, mqConsumerConcurrency)
reports, errors, err = mq.restartWithRetries(ctx)
if err != nil {
mq.logger.Error("Got Error during consumer restart", err)
mq.logger.Error("failed to start consumer - errors chan", err)
}
case <-common.GetTerminationSignalCh():
mq.logger.Debug("got sigterm, shuttown down mq consumer")
mq.logger.Debug("Restarting consumer...")
time.Sleep(2 * time.Second)
reports, errors, err = mq.startConsumer(ctx, mqConsumerConcurrency)
reports, errors, err = mq.restartWithRetries(ctx)
if err != nil {
mq.logger.Error("Got Error during consumer restart", err)
mq.logger.Error("failed to start consumer - sigterm", err)
}

}
Expand Down Expand Up @@ -175,6 +188,14 @@ func (mq *MQService) startConsumer(
}()
}

go func() {
notifyCloseError := <-mq.connection.Channel.NotifyClose(make(chan *amqp.Error))
if notifyCloseError != nil {
mq.logger.Debug("Got notifyCloseError error")
errors <- fmt.Errorf("Connection closed: %v", notifyCloseError)
}
}()

// stop the consumer upon sigterm
go func() {
<-ctx.Done()
Expand Down