diff --git a/README.md b/README.md index f307424..c39c74d 100644 --- a/README.md +++ b/README.md @@ -3,11 +3,12 @@ ![Latest release](https://img.shields.io/github/v/release/QuokkaStake/cosmos-transactions-bot) [![Actions Status](https://github.com/QuokkaStake/cosmos-transactions-bot/workflows/test/badge.svg)](https://github.com/QuokkaStake/cosmos-transactions-bot/actions) -cosmos-transactions-bot is a tool that listens to transactions with a specific filter on multiple chains and reports them to a Telegram channel. +cosmos-transactions-bot is a tool that listens to transactions with a specific filter on multiple chains +and reports them to a Telegram channel. Here's how it may look like: -![Telegram](https://raw.githubusercontent.com/QuokkaStake/cosmos-transactions-bot/master/images/telegram.png) +![Telegram](https://raw.githubusercontent.com/QuokkaStake/cosmos-transactions-bot/main/images/telegram.png) ## How can I set it up? @@ -31,7 +32,8 @@ Then we need to create a systemd service for our app: sudo nano /etc/systemd/system/cosmos-transactions-bot.service ``` -You can use this template (change the user to whatever user you want this to be executed from. It's advised to create a separate user for that instead of running it from root): +You can use this template (change the user to whatever user you want this to be executed from. It's advised +to create a separate user for that instead of running it from root): ``` [Unit] @@ -69,29 +71,100 @@ sudo journalctl -u cosmos-transactions-bot -f --output cat ## How does it work? -There are multiple nodes this app is connecting to via Websockets (see [this](https://docs.tendermint.com/master/rpc/#/Websocket/subscribe) for more details) and subscribing to the queries that are set in config. When a new transaction matching the filters is found, it's put through a deduplication filter first, to make sure we don't send the same transaction twice. Then each message in transaction is enriched (for example, if someone claims rewards, the app fetches Coingecko price and validator rewards are claimed from). Lastly, each of these transactions are sent to a reporter (currently Telegram only) to notify those who need it. +There are multiple nodes this app is connecting to via Websockets (see [this](https://docs.tendermint.com/master/rpc/#/Websocket/subscribe) for more details) and subscribing +to the queries that are set in config. When a new transaction matching the filters is found, it's put through +a deduplication filter first, to make sure we don't send the same transaction twice. Then each message +in transaction is enriched (for example, if someone claims rewards, the app fetches Coingecko price +and validator rewards are claimed from). Lastly, each of these transactions are sent to a reporter +(currently Telegram only) to notify those who need it. ## How can I configure it? -All configuration is done with a `.toml` file, which is passed to an app through a `--config` flag. See `config.example.toml` for reference. +All configuration is done with a `.toml` file, which is passed to an app through a `--config` flag. +See `config.example.toml` for reference. + +### Chains, subscriptions, chain subscriptions and reporters + +This app's design is quite complex to allow it to be as flexible as possible. +There are the main objects that this app has: + +- reporter - something that acts as a destination point (e.g. Telegram bot) and maybe allows you as a user +to interact with it in a special way (like, setting aliases etc.) +- chain - info about chain itself, its denoms, queries (see below), nodes used to receive data from, etc. +- subscription - info about which set of chains and their events to send to which reporter, +has many chain subscriptions +- chain subscription - info about which chain to receive data from, filters on which events to match +(see below) and how to process errors/unparsed/unsupported messages, if any. + +Each chain has many chain subscriptions, each subscription has one reporter, each chain subscription +has one chain and many filters. + +Generally speaking, the workflow of the app looks something like this: + +![Schema](https://raw.githubusercontent.com/QuokkaStake/cosmos-transactions-bot/main/images/schema.png) + +This allows to build very flexible setups. Here's the example of the easy and the more difficult setup. + +1) "I want to receive all transactions sent from my wallet on chain A, B and C to my Telegram channel" + +You can do it the following way: +- have 1 reporter, a Telegram channel +- have 3 chains, A, B and C, and their configs +- have 1 subscription, with Telegram reporter and 3 chain subscriptions inside (one for chain A, B and C +with 1 filter each matching transfers from wallets on these chains) + +2) "I want to receive all transactions sent from my wallet on chains A, B and C to one Telegram chat, +all transactions that are votes on chains A and B to another Telegram chat, and all transactions that are delegations +with amount more than 10M $TOKEN on chain C to another Telegram chat" + +That's also manageable. You can do the following: +- reporter 1, "first", a bot that sends messages to Telegram channel 1 +- reporter 2, "second", a bot that sends messages to Telegram channel 2 +- reporter 3, "third", a bot that sends messages to Telegram channel 3 +- chain A and its config +- chain B and its config +- chain C and its config +- subscription 1, let's call it "my-wallet-sends", with reporter "first" and the following chain subscriptions +- - chain subscription 1, chain A, 1 filter matching transfers from my wallet on chain A +- - chain subscription 2, chain B, 1 filter matching transfers from my wallet on chain B +- - chain subscription 3, chain C, 1 filter matching transfers from my wallet on chain C +- subscription 2, let's call it "all-wallet-votes", with reporter "second" and the following chain subscriptions +- - chain subscription 1, chain A, 1 filter matching any vote on chain A +- - chain subscription 2, chain B, 1 filter matching any vote on chain B +- subscription 3, let's call it "whale-votes", with reporter "third" and the following chain subscription +- - chain subscription 1, chain C, 1 filter matching any delegations with amount more than 10M $TOKEN on chain C + +See config.example.toml for real-life examples. ### Queries and filters -This is quite complex and deserves a special explanation. +This is another quite complex topic and deserves a special explanation. + +When a node starts, it connects to a Websocket of the fullnode and subscribes to queries (`queries` in `.toml` config). +If there's a transaction that does not match these filters, a fullnode won't emit the event for it +and this transaction won't reach the app. -When a node starts, it connects to a Websocket of the fullnode and subscribes to queries (`queries` in `.toml` config). If there's a transaction that does not match these filters, a fullnode won't emit the event for it and this transaction won't reach the app. +If using filters (`filters` in `.toml` config), when a transaction is received, all messages in the transaction +are checked whether they match these filters, and can be filtered out (and the transaction itself would be filtered out +if there are 0 non-filtered messages left). -If using filters (`filters` in `.toml` config), when a transaction is received, all messages in the transaction are checked whether they match these filters, and can be filtered out (and the transaction itself would be filtered out if there are 0 non filtered messages left). +Using filters can be useful is you have transactions with multiple messages, where you only need to know about one +(for example, someone claiming rewards from your validator and other ones, when you need to know only about claiming +from your validator). -Using filters can be useful is you have transactions with multiple messages, where you only need to know about one (for example, someone claiming rewards from your validator and other ones, when you need to know only about claiming from your validator). +Keep in mind that queries is set on the app level, while filters are set on a chain subscription level, +so you can have some generic query on a chain, and more granular filter on each of your chain subscriptions. Filters should follow the same pattern as queries, but they can only match the following pattern (so no AND/OR support): - `xxx = yyy` (which would filter the transaction if key doesn't match value) - `xxx! = yyy` (which would filter the transaction if key does match value) -Please note that the message would not be filtered out if it matches at least one filter. Example: you have a message that has `xxx = yyy` as events, and if using `xxx != yyy` and `xxx != zzz` as filters, it won't get filtered out (as it would not match the first filter but would match the second one). +Please note that the message would not be filtered out if it matches at least one filter. +Example: you have a message that has `xxx = yyy` as events, and if using `xxx != yyy` and `xxx != zzz` as filters, +it won't get filtered out (as it would not match the first filter but would match the second one). -You can always use `tx.height > 0`, which will send you the information on all transactions in chain, or check out something we have: +You can always use `tx.height > 0`, which will send you the information on all transactions in chain, +or check out something we have: ``` @@ -135,7 +208,10 @@ filters = [ See [the documentation](https://docs.tendermint.com/master/rpc/#/Websocket/subscribe) for more information on queries. -One important thing to keep in mind: by default, Tendermint RPC now only allows 5 connections per client, so if you have more than 5 filters specified, this will fail when subscribing to 6th one. If you own the node you are subscribing to, o fix this, change this parameter to something that suits your needs in `/config/config.toml`: +One important thing to keep in mind: by default, Tendermint RPC now only allows 5 connections per client, +so if you have more than 5 filters specified, this will fail when subscribing to 6th one. +If you own the node you are subscribing to, o fix this, change this parameter to something that suits your needs +in `/config/config.toml`: ``` max_subscriptions_per_client = 5 @@ -144,12 +220,16 @@ max_subscriptions_per_client = 5 ## Notifications channels Go to [@BotFather](https://t.me/BotFather) in Telegram and create a bot. After that, there are two options: -- you want to send messages to a user. This user should write a message to [@getmyid_bot](https://t.me/getmyid_bot), then copy the `Your user ID` number. Also keep in mind that the bot won't be able to send messages unless you contact it first, so write a message to a bot before proceeding. -- you want to send messages to a channel. Write something to a channel, then forward it to [@getmyid_bot](https://t.me/getmyid_bot) and copy the `Forwarded from chat` number. Then add the bot as an admin. +- you want to send messages to a user. This user should write a message to [@getmyid_bot](https://t.me/getmyid_bot), +then copy the `Your user ID` number. Also keep in mind that the bot won't be able to send messages +unless you contact it first, so write a message to a bot before proceeding. +- you want to send messages to a channel. Write something to a channel, then forward it to [@getmyid_bot](https://t.me/getmyid_bot) +and copy the `Forwarded from chat` number. Then add the bot as an admin. Then run a program with Telegram config (see `config.example.toml` as example). -You would likely want to also put only the IDs of trusted people to admins list in Telegram config, so the bot won't react to anyone writing messages to it except these users. +You would likely want to also put only the IDs of trusted people to admins list in Telegram config, so the bot +won't react to anyone writing messages to it except these users. Additionally, for the ease of using commands, you can put the following list as bot commands in @BotFather settings: diff --git a/config.example.toml b/config.example.toml index 15f0fff..cd5d760 100644 --- a/config.example.toml +++ b/config.example.toml @@ -1,11 +1,14 @@ -# Path to where aliases in .toml will be stored. If omitted, no aliases setting/displaying would work. -aliases = "/home/monitoring/config/cosmos-transactions-bot-aliases.toml" -# Timezone in which time (like undelegation finish time) will be displayed. Defaults to "Etc/GMT", so UTC+0 +# Path to where aliases in .toml will be stored. +# If omitted, no aliases setting/displaying would work. +aliases = "cosmos-transactions-bot-aliases.toml" +# Timezone in which time (like undelegation finish time) will be displayed. +# Defaults to "Etc/GMT", so UTC+0 timezone = "Europe/Moscow" # Logging configuration [log] -# Log level. Set to "debug" or "trace" to make it more verbose, or to "warn"/"error" to make it less verbose. +# Log level. Set to "debug" or "trace" to make it more verbose, or to "warn"/"error" +# to make it less verbose. # Defaults to "info" level = "info" # If true, all logs would be displayed in JSON. Useful if you are using centralized logging @@ -14,47 +17,42 @@ json = false # Reporters configuration. [[reporters]] +# Reporter name. Should be unique. +name = "telegram-1" # Reporter type. Currently, the only supported type is "telegram", which is the default. type = "telegram" # Telegram config configuration. Required if the type is "telegram". # See README.md for more details. +# Has 3 params: +# - token - bot token +# - chat - a chat/channel to post messages to +# - admins - a whitelist of user IDs allowed to send commands to the bot, optional but recommended. telegram-config = { token = "xxx:yyy", chat = 12345, admins = [67890] } -# Per-chain configuration. There can be multiple chains. -[[chains]] -# Chain codename, required. +# There can be multiple reporters. +[[reporters]] +name = "telegram-2" +type = "telegram" +telegram-config = { token = "zzz:aaa", chat = 98765, admins = [43210] } + +# Subscriptions config. See README.md on how this schema works. +[[subscriptions]] +# Reporter name to send events matching this subscription to. +# Should be one of the names of the reporters declared above, or the app won't start +# with the config validation error +reporter = "telegram-1" +# Subscription name, for metrics. Should be unique. +name = "subscription-1" + +# Chain subscriptions for this subscription. +[[subscriptions.chains]] +# Chain name. Should be one of the names declared below in chains section, +# or the app won't start with the config validation error. name = "cosmos" -# Chain pretty name, optional. If provided, would be used in reports, if not, -# codename would be used. -pretty-name = "Cosmos Hub" -# Tendermint RPC nodes to subscribe to. At least one is required, better to have multiple ones -# as a failover. -tendermint-nodes = [ - "https://rpc.cosmos.quokkastake.io:443", -] -# API nodes to get blockchain data (validators, proposals etc.) from. -api-nodes = [ - "https://api.cosmos.quokkastake.io", -] -# Queries, see README.md for details. -queries = [ - "tx.height > 0" -] # Filter, see README.md for details. filters = [ "message.action = '/cosmos.gov.v1beta1.MsgVote'", ] -# Denoms list. -denoms = [ - # Each denom inside must have "denom" and "display-denom" fields and additionaly - # denom-coefficient (set to 1000000 by default) and coingecko-currency. - # Example: if there's a transfer transaction for 10,000,000 uatom, - # and the coingecko price for $ATOM is 10$ and if all fields are set, - # instead of displaying amount as `10000000.000000uatom` it would be displayed - # as `10.000000atom ($100.00)`. - # If coingecko-currency is omitted, no price would be displayed. - { denom = "uatom", display-denom = "atom", denom-coefficient = 1000000, coingecko-currency = "cosmos" } -] # If set to true and there is a message not supported by this app, # it would post a message about that, otherwise it would ignore such a message. # Defaults to false. @@ -84,6 +82,58 @@ filter-internal-messages = true # - `Error: RPC error -32000 - Server error: subscription was cancelled (reason: Tendermint exited)` # If this is set to true (default), such messages would be displayed, if not, they will be skipped. log-node-errors = true + +# There can be multiple chain subscriptions per subscription. +[[subscriptions.chains]] +name = "sentinel" +filters = ["message.action = '/cosmos.staking.v1beta1.MsgDelegate'"] + +# There can also be multiple subscriptions. This one, for example, +# sends everything to a different reporter. +[[subscriptions]] +name = "subscription-2" +reporter = "telegram-2" +[[subscriptions.chains]] +name = "cosmos" +filters = ["message.action = '/cosmos.staking.v1beta1.MsgUndelegate'"] +[[subscriptions.chains]] +name = "sentinel" +filters = ["message.action = '/cosmos.staking.v1beta1.MsgBeginRedelegate'"] + + + +# Per-chain configuration. +[[chains]] +# Chain codename, required. +name = "cosmos" +# Chain pretty name, optional. If provided, would be used in reports, if not, +# codename would be used. +pretty-name = "Cosmos Hub" +# Tendermint RPC nodes to subscribe to. At least one is required, better to have multiple ones +# as a failover. +tendermint-nodes = [ + "https://rpc.cosmos.quokkastake.io:443", +] +# API nodes to get blockchain data (validators, proposals etc.) from. +api-nodes = [ + "https://api.cosmos.quokkastake.io", +] +# Queries, see README.md for details. +# Defaults to ["tx.height > 0"], so basically all transactions on chain. +queries = [ + "tx.height > 0" +] +# Denoms list. +denoms = [ + # Each denom inside must have "denom" and "display-denom" fields and additionaly + # denom-coefficient (set to 1000000 by default) and coingecko-currency. + # Example: if there's a transfer transaction for 10,000,000 uatom, + # and the coingecko price for $ATOM is 10$ and if all fields are set, + # instead of displaying amount as `10000000.000000uatom` it would be displayed + # as `10.000000atom ($100.00)`. + # If coingecko-currency is omitted, no price would be displayed. + { denom = "uatom", display-denom = "atom", denom-coefficient = 1000000, coingecko-currency = "cosmos" } +] # Explorer configuration. # Priorities: # 1) ping.pub @@ -111,3 +161,14 @@ block-link-pattern = "https://mintscan.io/cosmos/blocks/%s" # A pattern for validator links for the explorer. validator-link-pattern = "https://mintscan.io/cosmos/validators/%s" + +# There can be multiple chains. +[[chains]] +name = "sentinel" +pretty-name = "Sentinel" +tendermint-nodes = ["https://rpc.sentinel.quokkastake.io:443"] +api-nodes = ["https://api.sentinel.quokkastake.io"] +denoms = [ + { denom = "udvpn", display-denom = "dvpn", coingecko-currency = "sentinel" } +] +mintscan-prefix = "sentinel" diff --git a/images/schema.png b/images/schema.png new file mode 100644 index 0000000..4274734 Binary files /dev/null and b/images/schema.png differ diff --git a/pkg/alias_manager/alias_manager.go b/pkg/alias_manager/alias_manager.go index e164607..1c4f241 100644 --- a/pkg/alias_manager/alias_manager.go +++ b/pkg/alias_manager/alias_manager.go @@ -4,7 +4,7 @@ import ( "os" "main/pkg/config" - "main/pkg/config/types" + configTypes "main/pkg/config/types" "github.com/BurntSushi/toml" "github.com/rs/zerolog" @@ -14,7 +14,7 @@ type Aliases *map[string]string type TomlAliases map[string]Aliases type ChainAliases struct { - Chain *types.Chain + Chain *configTypes.Chain Aliases Aliases } type AllChainAliases map[string]*ChainAliases @@ -22,7 +22,7 @@ type AllChainAliases map[string]*ChainAliases type AliasManager struct { Logger zerolog.Logger Path string - Chains config.Chains + Chains configTypes.Chains Aliases AllChainAliases } @@ -36,15 +36,15 @@ func (a AllChainAliases) ToTomlAliases() TomlAliases { } type ChainAliasesLinks struct { - Chain *types.Chain - Links map[string]types.Link + Chain *configTypes.Chain + Links map[string]configTypes.Link } func (a AllChainAliases) ToAliasesLinks() []ChainAliasesLinks { aliasesLinks := make([]ChainAliasesLinks, 0) for _, chainAliases := range a { - links := make(map[string]types.Link) + links := make(map[string]configTypes.Link) if chainAliases.Aliases == nil { continue diff --git a/pkg/app.go b/pkg/app.go index 3618edf..2a19b3e 100644 --- a/pkg/app.go +++ b/pkg/app.go @@ -2,7 +2,6 @@ package pkg import ( configTypes "main/pkg/config/types" - "main/pkg/types" "os" "os/signal" "syscall" @@ -10,7 +9,7 @@ import ( "main/pkg/alias_manager" "main/pkg/config" "main/pkg/data_fetcher" - "main/pkg/filterer" + filtererPkg "main/pkg/filterer" loggerPkg "main/pkg/logger" metricsPkg "main/pkg/metrics" nodesManagerPkg "main/pkg/nodes_manager" @@ -23,9 +22,9 @@ type App struct { Logger zerolog.Logger Chains []*configTypes.Chain NodesManager *nodesManagerPkg.NodesManager - Reporters []reportersPkg.Reporter + Reporters reportersPkg.Reporters DataFetchers map[string]*data_fetcher.DataFetcher - Filterers map[string]*filterer.Filterer + Filterer *filtererPkg.Filterer MetricsManager *metricsPkg.Manager Version string @@ -57,10 +56,7 @@ func NewApp(config *config.AppConfig, version string) *App { dataFetchers[chain.Name] = data_fetcher.NewDataFetcher(logger, chain, aliasManager, metricsManager) } - filterers := make(map[string]*filterer.Filterer, len(config.Chains)) - for _, chain := range config.Chains { - filterers[chain.Name] = filterer.NewFilterer(logger, chain, metricsManager) - } + filterer := filtererPkg.NewFilterer(logger, config, metricsManager) return &App{ Logger: logger.With().Str("component", "app").Logger(), @@ -68,7 +64,7 @@ func NewApp(config *config.AppConfig, version string) *App { Reporters: reporters, NodesManager: nodesManager, DataFetchers: dataFetchers, - Filterers: filterers, + Filterer: filterer, MetricsManager: metricsManager, Version: version, } @@ -81,13 +77,11 @@ func (a *App) Start() { for _, reporter := range a.Reporters { reporter.Init() - a.MetricsManager.LogReporterEnabled(reporter.Name(), reporter.Enabled()) - if reporter.Enabled() { - a.Logger.Info(). - Str("name", reporter.Name()). - Str("type", reporter.Type()). - Msg("Init reporter") - } + a.MetricsManager.LogReporterEnabled(reporter.Name(), reporter.Type()) + a.Logger.Info(). + Str("name", reporter.Name()). + Str("type", reporter.Type()). + Msg("Init reporter") } a.NodesManager.Listen() @@ -98,40 +92,38 @@ func (a *App) Start() { for { select { case rawReport := <-a.NodesManager.Channel: - chainFilterer, _ := a.Filterers[rawReport.Chain.Name] fetcher, _ := a.DataFetchers[rawReport.Chain.Name] - reportableFiltered := chainFilterer.Filter(rawReport.Reportable) - if reportableFiltered == nil { + reportablesForReporters := a.Filterer.GetReportableForReporters(rawReport) + + if len(reportablesForReporters) == 0 { a.Logger.Debug(). Str("node", rawReport.Node). Str("chain", rawReport.Chain.Name). Str("hash", rawReport.Reportable.GetHash()). - Msg("Got report") + Msg("Got report which is nowhere to send") continue } - report := types.Report{ - Node: rawReport.Node, - Chain: rawReport.Chain, - Reportable: reportableFiltered, - } - - a.Logger.Info(). - Str("node", report.Node). - Str("chain", report.Chain.Name). - Str("hash", report.Reportable.GetHash()). - Msg("Got report") + for reporterName, report := range reportablesForReporters { + a.Logger.Info(). + Str("node", report.Node). + Str("chain", report.Chain.Name). + Str("reporter", reporterName). + Str("hash", report.Reportable.GetHash()). + Msg("Got report") - a.MetricsManager.LogReport(report) + rawReport.Reportable.GetAdditionalData(fetcher) - rawReport.Reportable.GetAdditionalData(fetcher) + reporter := a.Reporters.FindByName(reporterName) - for _, reporter := range a.Reporters { if err := reporter.Send(report); err != nil { a.Logger.Error(). Err(err). Msg("Error sending report") + a.MetricsManager.LogReport(report, reporterName, false) + } else { + a.MetricsManager.LogReport(report, reporterName, true) } } case <-quit: diff --git a/pkg/config/config.go b/pkg/config/config.go index 0254378..bc3bfef 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -16,28 +16,15 @@ import ( "github.com/rs/zerolog" ) -type Chains []*types.Chain - -func (c Chains) FindByName(name string) *types.Chain { - for _, chain := range c { - if chain.Name == name { - return chain - } - } - - return nil -} - -type Reporters []*types.Reporter - type AppConfig struct { - Path string - AliasesPath string - LogConfig LogConfig - Chains Chains - Reporters Reporters - Metrics MetricsConfig - Timezone *time.Location + Path string + AliasesPath string + LogConfig LogConfig + Chains types.Chains + Subscriptions types.Subscriptions + Reporters types.Reporters + Metrics MetricsConfig + Timezone *time.Location } type LogConfig struct { @@ -94,6 +81,9 @@ func FromTomlConfig(c *tomlConfig.TomlConfig, path string) *AppConfig { Reporters: utils.Map(c.Reporters, func(r *tomlConfig.Reporter) *types.Reporter { return r.ToAppConfigReporter() }), + Subscriptions: utils.Map(c.Subscriptions, func(s *tomlConfig.Subscription) *types.Subscription { + return s.ToAppConfigSubscription() + }), Timezone: timezone, } } @@ -109,9 +99,10 @@ func (c *AppConfig) ToTomlConfig() *tomlConfig.TomlConfig { ListenAddr: c.Metrics.ListenAddr, Enabled: null.BoolFrom(c.Metrics.Enabled), }, - Chains: utils.Map(c.Chains, tomlConfig.FromAppConfigChain), - Reporters: utils.Map(c.Reporters, tomlConfig.FromAppConfigReporter), - Timezone: c.Timezone.String(), + Chains: utils.Map(c.Chains, tomlConfig.FromAppConfigChain), + Reporters: utils.Map(c.Reporters, tomlConfig.FromAppConfigReporter), + Subscriptions: utils.Map(c.Subscriptions, tomlConfig.FromAppConfigSubscription), + Timezone: c.Timezone.String(), } } diff --git a/pkg/config/toml_config/chain.go b/pkg/config/toml_config/chain.go index 76bc13e..b1628e7 100644 --- a/pkg/config/toml_config/chain.go +++ b/pkg/config/toml_config/chain.go @@ -5,26 +5,19 @@ import ( "main/pkg/config/types" "github.com/cometbft/cometbft/libs/pubsub/query" - "gopkg.in/guregu/null.v4" ) type Chain struct { - Name string `toml:"name"` - PrettyName string `toml:"pretty-name"` - TendermintNodes []string `toml:"tendermint-nodes"` - APINodes []string `toml:"api-nodes"` - Queries []string `toml:"queries"` - Filters []string `toml:"filters"` - MintscanPrefix string `toml:"mintscan-prefix"` - PingPrefix string `toml:"ping-prefix"` - PingBaseUrl string `default:"https://ping.pub" toml:"ping-base-url"` - Explorer *Explorer `toml:"explorer"` - LogUnknownMessages null.Bool `default:"false" toml:"log-unknown-messages"` - LogUnparsedMessages null.Bool `default:"true" toml:"log-unparsed-messages"` - LogFailedTransactions null.Bool `default:"true" toml:"log-failed-transactions"` - LogNodeErrors null.Bool `default:"true" toml:"log-node-errors"` - FilterInternalMessages null.Bool `default:"true" toml:"filter-internal-messages"` - Denoms DenomInfos `toml:"denoms"` + Name string `toml:"name"` + PrettyName string `toml:"pretty-name"` + TendermintNodes []string `toml:"tendermint-nodes"` + APINodes []string `toml:"api-nodes"` + Queries []string `default:"[\"tx.height > 1\"]" toml:"queries"` + MintscanPrefix string `toml:"mintscan-prefix"` + PingPrefix string `toml:"ping-prefix"` + PingBaseUrl string `default:"https://ping.pub" toml:"ping-base-url"` + Explorer *Explorer `toml:"explorer"` + Denoms DenomInfos `toml:"denoms"` } func (c *Chain) Validate() error { @@ -50,12 +43,6 @@ func (c *Chain) Validate() error { } } - for index, filter := range c.Filters { - if _, err := query.New(filter); err != nil { - return fmt.Errorf("error in filter %d: %s", index, err) - } - } - for index, denom := range c.Denoms { if err := denom.Validate(); err != nil { return fmt.Errorf("error in denom %d: %s", index, err) @@ -81,46 +68,30 @@ func (c *Chain) ToAppConfigChain() *types.Chain { explorer = c.Explorer.ToAppConfigExplorer() } - filters := make([]query.Query, len(c.Filters)) - for index, filter := range c.Filters { - filters[index] = *query.MustParse(filter) - } - queries := make([]query.Query, len(c.Queries)) for index, q := range c.Queries { queries[index] = *query.MustParse(q) } return &types.Chain{ - Name: c.Name, - PrettyName: c.PrettyName, - TendermintNodes: c.TendermintNodes, - APINodes: c.APINodes, - Queries: queries, - Filters: filters, - Explorer: explorer, - SupportedExplorer: supportedExplorer, - LogUnknownMessages: c.LogUnknownMessages.Bool, - LogUnparsedMessages: c.LogUnparsedMessages.Bool, - LogFailedTransactions: c.LogFailedTransactions.Bool, - LogNodeErrors: c.LogNodeErrors.Bool, - FilterInternalMessages: c.FilterInternalMessages.Bool, - Denoms: c.Denoms.ToAppConfigDenomInfos(), + Name: c.Name, + PrettyName: c.PrettyName, + TendermintNodes: c.TendermintNodes, + APINodes: c.APINodes, + Queries: queries, + Explorer: explorer, + SupportedExplorer: supportedExplorer, + Denoms: c.Denoms.ToAppConfigDenomInfos(), } } func FromAppConfigChain(c *types.Chain) *Chain { chain := &Chain{ - Name: c.Name, - PrettyName: c.PrettyName, - TendermintNodes: c.TendermintNodes, - APINodes: c.APINodes, - LogUnknownMessages: null.BoolFrom(c.LogUnknownMessages), - LogUnparsedMessages: null.BoolFrom(c.LogUnparsedMessages), - LogFailedTransactions: null.BoolFrom(c.LogFailedTransactions), - LogNodeErrors: null.BoolFrom(c.LogNodeErrors), - FilterInternalMessages: null.BoolFrom(c.FilterInternalMessages), - Denoms: TomlConfigDenomsFrom(c.Denoms), + Name: c.Name, + PrettyName: c.PrettyName, + TendermintNodes: c.TendermintNodes, + APINodes: c.APINodes, + Denoms: TomlConfigDenomsFrom(c.Denoms), } if c.SupportedExplorer == nil && c.Explorer != nil { @@ -138,11 +109,6 @@ func FromAppConfigChain(c *types.Chain) *Chain { chain.PingBaseUrl = ping.BaseUrl } - chain.Filters = make([]string, len(c.Filters)) - for index, filter := range c.Filters { - chain.Filters[index] = filter.String() - } - chain.Queries = make([]string, len(c.Queries)) for index, q := range c.Queries { chain.Queries[index] = q.String() @@ -173,3 +139,13 @@ func (chains Chains) Validate() error { return nil } + +func (chains Chains) HasChainByName(name string) bool { + for _, chain := range chains { + if chain.Name == name { + return true + } + } + + return false +} diff --git a/pkg/config/toml_config/reporter.go b/pkg/config/toml_config/reporter.go index 80b5592..7079528 100644 --- a/pkg/config/toml_config/reporter.go +++ b/pkg/config/toml_config/reporter.go @@ -66,6 +66,16 @@ func (reporters Reporters) Validate() error { return nil } +func (reporters Reporters) HasReporterByName(name string) bool { + for _, reporter := range reporters { + if reporter.Name == name { + return true + } + } + + return false +} + func FromAppConfigReporter(reporter *types.Reporter) *Reporter { var telegramConfig *TelegramConfig diff --git a/pkg/config/toml_config/subscription.go b/pkg/config/toml_config/subscription.go new file mode 100644 index 0000000..3d91598 --- /dev/null +++ b/pkg/config/toml_config/subscription.go @@ -0,0 +1,144 @@ +package toml_config + +import ( + "fmt" + "main/pkg/config/types" + + "github.com/cometbft/cometbft/libs/pubsub/query" + "gopkg.in/guregu/null.v4" +) + +type Subscriptions []*Subscription + +type Subscription struct { + Name string `toml:"name"` + Reporter string `toml:"reporter"` + ChainSubscription ChainSubscriptions `toml:"chains"` +} + +type ChainSubscriptions []*ChainSubscription + +type ChainSubscription struct { + Chain string `toml:"name"` + Filters []string `toml:"filters"` + LogUnknownMessages null.Bool `default:"false" toml:"log-unknown-messages"` + LogUnparsedMessages null.Bool `default:"true" toml:"log-unparsed-messages"` + LogFailedTransactions null.Bool `default:"true" toml:"log-failed-transactions"` + LogNodeErrors null.Bool `default:"true" toml:"log-node-errors"` + FilterInternalMessages null.Bool `default:"true" toml:"filter-internal-messages"` +} + +func (subscriptions Subscriptions) Validate() error { + for index, subscription := range subscriptions { + if err := subscription.Validate(); err != nil { + return fmt.Errorf("error in subscription %d: %s", index, err) + } + } + + // checking names uniqueness + names := map[string]bool{} + + for _, subscription := range subscriptions { + if _, ok := names[subscription.Name]; ok { + return fmt.Errorf("duplicate subscription name: %s", subscription.Name) + } + + names[subscription.Name] = true + } + + return nil +} + +func (s *Subscription) Validate() error { + if s.Name == "" { + return fmt.Errorf("empty subscription name") + } + + if s.Reporter == "" { + return fmt.Errorf("empty reporter name") + } + + for index, subscription := range s.ChainSubscription { + if err := subscription.Validate(); err != nil { + return fmt.Errorf("error in subscription %d: %s", index, err) + } + } + + return nil +} + +func (s *ChainSubscription) Validate() error { + if s.Chain == "" { + return fmt.Errorf("empty chain name") + } + + for index, filter := range s.Filters { + if _, err := query.New(filter); err != nil { + return fmt.Errorf("error in filter %d: %s", index, err) + } + } + + return nil +} + +func (s *ChainSubscription) ToAppConfigChainSubscription() *types.ChainSubscription { + filters := make([]query.Query, len(s.Filters)) + for index, filter := range s.Filters { + filters[index] = *query.MustParse(filter) + } + + return &types.ChainSubscription{ + Chain: s.Chain, + Filters: filters, + LogUnknownMessages: s.LogUnknownMessages.Bool, + LogUnparsedMessages: s.LogUnparsedMessages.Bool, + LogFailedTransactions: s.LogFailedTransactions.Bool, + LogNodeErrors: s.LogNodeErrors.Bool, + FilterInternalMessages: s.FilterInternalMessages.Bool, + } +} + +func (s *Subscription) ToAppConfigSubscription() *types.Subscription { + chainSubscriptions := make(types.ChainSubscriptions, len(s.ChainSubscription)) + for index, chainSubscription := range s.ChainSubscription { + chainSubscriptions[index] = chainSubscription.ToAppConfigChainSubscription() + } + + return &types.Subscription{ + Name: s.Name, + Reporter: s.Reporter, + ChainSubscriptions: chainSubscriptions, + } +} + +func FromAppConfigChainSubscription(s *types.ChainSubscription) *ChainSubscription { + subscription := &ChainSubscription{ + Chain: s.Chain, + LogUnknownMessages: null.BoolFrom(s.LogUnknownMessages), + LogUnparsedMessages: null.BoolFrom(s.LogUnparsedMessages), + LogFailedTransactions: null.BoolFrom(s.LogFailedTransactions), + LogNodeErrors: null.BoolFrom(s.LogNodeErrors), + FilterInternalMessages: null.BoolFrom(s.FilterInternalMessages), + } + + subscription.Filters = make([]string, len(s.Filters)) + for index, filter := range s.Filters { + subscription.Filters[index] = filter.String() + } + + return subscription +} + +func FromAppConfigSubscription(s *types.Subscription) *Subscription { + subscription := &Subscription{ + Name: s.Name, + Reporter: s.Reporter, + ChainSubscription: make(ChainSubscriptions, len(s.ChainSubscriptions)), + } + + for index, chainSubscription := range s.ChainSubscriptions { + subscription.ChainSubscription[index] = FromAppConfigChainSubscription(chainSubscription) + } + + return subscription +} diff --git a/pkg/config/toml_config/toml_config.go b/pkg/config/toml_config/toml_config.go index 7dfb80a..b075985 100644 --- a/pkg/config/toml_config/toml_config.go +++ b/pkg/config/toml_config/toml_config.go @@ -12,7 +12,8 @@ type TomlConfig struct { LogConfig LogConfig `toml:"log"` MetricsConfig MetricsConfig `toml:"metrics"` Chains Chains `toml:"chains"` - Timezone string `default:"Etc/GMT" toml:"timezone"` + Subscriptions Subscriptions `toml:"subscriptions"` + Timezone string `default:"Etc/GMT" toml:"timezone"` Reporters Reporters `toml:"reporters"` } @@ -39,5 +40,30 @@ func (c *TomlConfig) Validate() error { return fmt.Errorf("error in reporters: %s", err) } + if err := c.Subscriptions.Validate(); err != nil { + return fmt.Errorf("error in subscriptions: %s", err) + } + + for index, subscription := range c.Subscriptions { + for chainSubscriptionIndex, chainSubscription := range subscription.ChainSubscription { + if !c.Chains.HasChainByName(chainSubscription.Chain) { + return fmt.Errorf( + "error in subscription %d: error in chain %d: no such chain '%s'", + index, + chainSubscriptionIndex, + chainSubscription.Chain, + ) + } + } + + if !c.Reporters.HasReporterByName(subscription.Reporter) { + return fmt.Errorf( + "error in subscription %d: no such reporter '%s'", + index, + subscription.Reporter, + ) + } + } + return nil } diff --git a/pkg/config/types/chain.go b/pkg/config/types/chain.go index a277e7e..c3d0a33 100644 --- a/pkg/config/types/chain.go +++ b/pkg/config/types/chain.go @@ -8,6 +8,18 @@ import ( "github.com/rs/zerolog" ) +type Chains []*Chain + +func (c Chains) FindByName(name string) *Chain { + for _, chain := range c { + if chain.Name == name { + return chain + } + } + + return nil +} + type Chain struct { Name string PrettyName string @@ -17,14 +29,6 @@ type Chain struct { Explorer *Explorer SupportedExplorer SupportedExplorer Denoms DenomInfos - - LogUnknownMessages bool - LogUnparsedMessages bool - LogFailedTransactions bool - LogNodeErrors bool - FilterInternalMessages bool - - Filters Filters } func (c Chain) GetName() string { diff --git a/pkg/config/types/reporter.go b/pkg/config/types/reporter.go index f930388..2aaf6fd 100644 --- a/pkg/config/types/reporter.go +++ b/pkg/config/types/reporter.go @@ -1,5 +1,7 @@ package types +type Reporters []*Reporter + type TelegramConfig struct { Chat int64 Token string diff --git a/pkg/config/types/subscription.go b/pkg/config/types/subscription.go new file mode 100644 index 0000000..787f2fe --- /dev/null +++ b/pkg/config/types/subscription.go @@ -0,0 +1,22 @@ +package types + +type Subscriptions []*Subscription + +type Subscription struct { + Name string + Reporter string + ChainSubscriptions ChainSubscriptions +} + +type ChainSubscriptions []*ChainSubscription + +type ChainSubscription struct { + Chain string + Filters Filters + + LogUnknownMessages bool + LogUnparsedMessages bool + LogFailedTransactions bool + LogNodeErrors bool + FilterInternalMessages bool +} diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 6d85dc4..2bcf1f5 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -1,9 +1,17 @@ package constants +type EventFilterReason string + const ( - PrometheusMetricsPrefix = "cosmos_transactions_bot_" + PrometheusMetricsPrefix string = "cosmos_transactions_bot_" ReporterTypeTelegram string = "telegram" + + EventFilterReasonTxErrorNotLogged EventFilterReason = "tx_error_not_logged" + EventFilterReasonNodeErrorNotLogged EventFilterReason = "node_error_not_logged" + EventFilterReasonUnsupportedMsgTypeNotLogged EventFilterReason = "unsupported_msg_type_not_logged" + EventFilterReasonFailedTxNotLogged EventFilterReason = "failed_tx_not_logged" + EventFilterReasonEmptyTxNotLogged EventFilterReason = "empty_tx_not_logged" ) func GetReporterTypes() []string { diff --git a/pkg/converter/converter.go b/pkg/converter/converter.go index d5e2cdc..f88317f 100644 --- a/pkg/converter/converter.go +++ b/pkg/converter/converter.go @@ -84,10 +84,6 @@ func (c *Converter) ParseEvent(event jsonRpcTypes.RPCResponse, nodeURL string) t return nil } - c.Logger.Trace(). - Str("values", fmt.Sprintf("%+v", resultEvent.Events)). - Msg("Event values") - eventDataTx, ok := resultEvent.Data.(tendermintTypes.EventDataTx) if !ok { c.Logger.Debug().Msg("Could not convert tx result to EventDataTx.") diff --git a/pkg/filterer/filterer.go b/pkg/filterer/filterer.go index cf8bd66..d5ff6cf 100644 --- a/pkg/filterer/filterer.go +++ b/pkg/filterer/filterer.go @@ -2,7 +2,9 @@ package filterer import ( "fmt" + configPkg "main/pkg/config" configTypes "main/pkg/config/types" + "main/pkg/constants" messagesPkg "main/pkg/messages" metricsPkg "main/pkg/metrics" "main/pkg/types" @@ -12,64 +14,125 @@ import ( ) type Filterer struct { - Logger zerolog.Logger - MetricsManager *metricsPkg.Manager - Chain *configTypes.Chain - lastBlockHeight int64 + Logger zerolog.Logger + MetricsManager *metricsPkg.Manager + Config *configPkg.AppConfig + lastBlockHeights map[string]int64 } func NewFilterer( logger *zerolog.Logger, - chain *configTypes.Chain, + config *configPkg.AppConfig, metricsManager *metricsPkg.Manager, ) *Filterer { return &Filterer{ Logger: logger.With(). Str("component", "filterer"). - Str("chain", chain.Name). Logger(), - MetricsManager: metricsManager, - Chain: chain, - lastBlockHeight: 0, + MetricsManager: metricsManager, + Config: config, + lastBlockHeights: map[string]int64{}, } } -func (f *Filterer) Filter(reportable types.Reportable) types.Reportable { +func (f *Filterer) GetReportableForReporters( + report types.Report, +) map[string]types.Report { + reportables := make(map[string]types.Report) + + for _, subscription := range f.Config.Subscriptions { + for _, chainSubscription := range subscription.ChainSubscriptions { + if chainSubscription.Chain != report.Chain.Name { + continue + } + + chain := f.Config.Chains.FindByName(chainSubscription.Chain) + + reportableFiltered := f.FilterForChainAndSubscription( + report.Reportable, + chain, + subscription, + chainSubscription, + ) + + if reportableFiltered != nil { + f.Logger.Info(). + Str("type", report.Reportable.Type()). + Str("chain", chain.Name). + Str("hash", report.Reportable.GetHash()). + Str("subscription_name", subscription.Name). + Msg("Got report for subscription") + reportables[subscription.Reporter] = types.Report{ + Chain: report.Chain, + Node: report.Node, + Reportable: reportableFiltered, + Subscription: subscription, + ChainSubscription: chainSubscription, + } + } + } + } + + return reportables +} + +func (f *Filterer) FilterForChainAndSubscription( + reportable types.Reportable, + chain *configTypes.Chain, + subscription *configTypes.Subscription, + chainSubscription *configTypes.ChainSubscription, +) types.Reportable { // Filtering out TxError only if chain's log-node-errors = true. if _, ok := reportable.(*types.TxError); ok { - if !f.Chain.LogNodeErrors { - f.MetricsManager.LogFilteredEvent(f.Chain.Name, reportable.Type()) + if !chainSubscription.LogNodeErrors { + f.MetricsManager.LogFilteredEvent( + chainSubscription.Chain, + reportable.Type(), + constants.EventFilterReasonTxErrorNotLogged, + ) f.Logger.Debug().Msg("Got transaction error, skipping as node errors logging is disabled") return nil } - f.MetricsManager.LogMatchedEvent(f.Chain.Name, reportable.Type()) + f.MetricsManager.LogMatchedEvent(chainSubscription.Chain, reportable.Type(), subscription.Name) return reportable } if _, ok := reportable.(*types.NodeConnectError); ok { - if !f.Chain.LogNodeErrors { - f.MetricsManager.LogFilteredEvent(f.Chain.Name, reportable.Type()) + if !chainSubscription.LogNodeErrors { + f.MetricsManager.LogFilteredEvent( + chainSubscription.Chain, + reportable.Type(), + constants.EventFilterReasonNodeErrorNotLogged, + ) f.Logger.Debug().Msg("Got node error, skipping as node errors logging is disabled") return nil } - f.MetricsManager.LogMatchedEvent(f.Chain.Name, reportable.Type()) + f.MetricsManager.LogMatchedEvent(chainSubscription.Chain, reportable.Type(), subscription.Name) return reportable } tx, ok := reportable.(*types.Tx) if !ok { f.Logger.Error().Str("type", reportable.Type()).Msg("Unsupported reportable type, ignoring.") - f.MetricsManager.LogFilteredEvent(f.Chain.Name, reportable.Type()) + f.MetricsManager.LogFilteredEvent( + chainSubscription.Chain, + reportable.Type(), + constants.EventFilterReasonUnsupportedMsgTypeNotLogged, + ) return nil } - if !f.Chain.LogFailedTransactions && tx.Code > 0 { + if !chainSubscription.LogFailedTransactions && tx.Code > 0 { f.Logger.Debug(). Str("hash", tx.GetHash()). Msg("Transaction is failed, skipping") - f.MetricsManager.LogFilteredEvent(f.Chain.Name, reportable.Type()) + f.MetricsManager.LogFilteredEvent( + chainSubscription.Chain, + reportable.Type(), + constants.EventFilterReasonFailedTxNotLogged, + ) return nil } @@ -78,23 +141,25 @@ func (f *Filterer) Filter(reportable types.Reportable) types.Reportable { f.Logger.Fatal().Err(err).Msg("Error converting height to int64") } - if f.lastBlockHeight != 0 && f.lastBlockHeight > txHeight { + chainLastBlockHeight, ok := f.lastBlockHeights[chain.Name] + if ok && chainLastBlockHeight > txHeight { f.Logger.Debug(). + Str("chain", chainSubscription.Chain). Str("hash", tx.GetHash()). Int64("height", txHeight). - Int64("last_height", f.lastBlockHeight). + Int64("last_height", chainLastBlockHeight). Msg("Transaction height is less than the last one received, skipping") return nil } - if f.lastBlockHeight == 0 || f.lastBlockHeight < txHeight { - f.lastBlockHeight = txHeight + if !ok || chainLastBlockHeight < txHeight { + f.lastBlockHeights[chain.Name] = txHeight } messages := make([]types.Message, 0) for _, message := range tx.Messages { - filteredMessage := f.FilterMessage(message, false) + filteredMessage := f.FilterMessage(message, subscription, chainSubscription, false) if filteredMessage != nil { messages = append(messages, filteredMessage) } @@ -104,18 +169,27 @@ func (f *Filterer) Filter(reportable types.Reportable) types.Reportable { f.Logger.Debug(). Str("hash", tx.GetHash()). Msg("All messages in transaction were filtered out, skipping.") - f.MetricsManager.LogFilteredEvent(f.Chain.Name, reportable.Type()) + f.MetricsManager.LogFilteredEvent( + chainSubscription.Chain, + reportable.Type(), + constants.EventFilterReasonEmptyTxNotLogged, + ) return nil } tx.Messages = messages - f.MetricsManager.LogMatchedEvent(f.Chain.Name, reportable.Type()) + f.MetricsManager.LogMatchedEvent(chainSubscription.Chain, reportable.Type(), subscription.Name) return tx } -func (f *Filterer) FilterMessage(message types.Message, internal bool) types.Message { +func (f *Filterer) FilterMessage( + message types.Message, + subscription *configTypes.Subscription, + chainSubscription *configTypes.ChainSubscription, + internal bool, +) types.Message { if unsupportedMsg, ok := message.(*messagesPkg.MsgUnsupportedMessage); ok { - if f.Chain.LogUnknownMessages { + if chainSubscription.LogUnknownMessages { f.Logger.Error().Str("type", unsupportedMsg.MsgType).Msg("Unsupported message type") return message } else { @@ -125,7 +199,7 @@ func (f *Filterer) FilterMessage(message types.Message, internal bool) types.Mes } if unparsedMsg, ok := message.(*messagesPkg.MsgUnparsedMessage); ok { - if f.Chain.LogUnparsedMessages { + if chainSubscription.LogUnparsedMessages { f.Logger.Error().Err(unparsedMsg.Error).Str("type", unparsedMsg.MsgType).Msg("Error parsing message") return message } @@ -137,15 +211,15 @@ func (f *Filterer) FilterMessage(message types.Message, internal bool) types.Mes return nil } - // internal -> filter only if f.Chain.FilterInternalMessages is true + // internal -> filter only if subscription.FilterInternalMessages is true // !internal -> filter regardless - if !internal || f.Chain.FilterInternalMessages { - matches, err := f.Chain.Filters.Matches(message.GetValues()) + if !internal || chainSubscription.FilterInternalMessages { + matches, err := chainSubscription.Filters.Matches(message.GetValues()) f.Logger.Trace(). Str("type", message.Type()). Str("values", fmt.Sprintf("%+v", message.GetValues().ToMap())). - Str("filters", fmt.Sprintf("%+v", f.Chain.Filters)). + Str("filters", fmt.Sprintf("%+v", chainSubscription.Filters)). Bool("matches", matches). Msg("Result of matching message events against filters") @@ -170,7 +244,7 @@ func (f *Filterer) FilterMessage(message types.Message, internal bool) types.Mes // Processing internal messages (such as ones in MsgExec) for _, internalMessage := range message.GetParsedMessages() { - if internalMessageParsed := f.FilterMessage(internalMessage, true); internalMessageParsed != nil { + if internalMessageParsed := f.FilterMessage(internalMessage, subscription, chainSubscription, true); internalMessageParsed != nil { parsedInternalMessages = append(parsedInternalMessages, internalMessageParsed) } } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index b4e58f6..4c66002 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -21,33 +21,39 @@ type Manager struct { logger zerolog.Logger config configPkg.MetricsConfig - lastBlockHeightCollector *prometheus.GaugeVec - lastBlockTimeCollector *prometheus.GaugeVec - nodeConnectedCollector *prometheus.GaugeVec - reconnectsCounter *prometheus.CounterVec - + // Chains metrics + lastBlockHeightCollector *prometheus.GaugeVec + lastBlockTimeCollector *prometheus.GaugeVec + chainInfoGauge *prometheus.GaugeVec successfulQueriesCollector *prometheus.CounterVec failedQueriesCollector *prometheus.CounterVec + eventsTotalCounter *prometheus.CounterVec + eventsFilteredCounter *prometheus.CounterVec - eventsTotalCounter *prometheus.CounterVec - eventsFilteredCounter *prometheus.CounterVec - eventsMatchedCounter *prometheus.CounterVec - - reportsCounter *prometheus.CounterVec - reportEntriesCounter *prometheus.CounterVec + // Node metrics + nodeConnectedCollector *prometheus.GaugeVec + reconnectsCounter *prometheus.CounterVec + // Reporters metrics + reporterReportsCounter *prometheus.CounterVec + reporterErrorsCounter *prometheus.CounterVec + reportEntriesCounter *prometheus.CounterVec reporterEnabledGauge *prometheus.GaugeVec - reporterQueriesCounter *prometheus.CounterVec + // Subscriptions metrics + eventsMatchedCounter *prometheus.CounterVec + + // App metrics appVersionGauge *prometheus.GaugeVec startTimeGauge *prometheus.GaugeVec - chainInfoGauge *prometheus.GaugeVec } func NewManager(logger *zerolog.Logger, config configPkg.MetricsConfig) *Manager { return &Manager{ logger: logger.With().Str("component", "metrics").Logger(), config: config, + + // Chain metrics lastBlockHeightCollector: promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: constants.PrometheusMetricsPrefix + "last_height", Help: "Height of the last block processed", @@ -56,10 +62,10 @@ func NewManager(logger *zerolog.Logger, config configPkg.MetricsConfig) *Manager Name: constants.PrometheusMetricsPrefix + "last_time", Help: "Time of the last block processed", }, []string{"chain"}), - nodeConnectedCollector: promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: constants.PrometheusMetricsPrefix + "node_connected", - Help: "Whether the node is successfully connected (1 if yes, 0 if no)", - }, []string{"chain", "node"}), + chainInfoGauge: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: constants.PrometheusMetricsPrefix + "chain_info", + Help: "Chain info, with constant 1 as value and pretty_name and chain as labels", + }, []string{"chain", "pretty_name"}), successfulQueriesCollector: promauto.NewCounterVec(prometheus.CounterOpts{ Name: constants.PrometheusMetricsPrefix + "node_successful_queries_total", Help: "Counter of successful node queries", @@ -68,22 +74,50 @@ func NewManager(logger *zerolog.Logger, config configPkg.MetricsConfig) *Manager Name: constants.PrometheusMetricsPrefix + "node_failed_queries_total", Help: "Counter of failed node queries", }, []string{"chain", "node", "type"}), - reportsCounter: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: constants.PrometheusMetricsPrefix + "node_reports", - Help: "Counter of reports send", - }, []string{"chain"}), - reportEntriesCounter: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: constants.PrometheusMetricsPrefix + "node_report_entries_total", - Help: "Counter of report entries send", - }, []string{"chain", "type"}), + eventsTotalCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: constants.PrometheusMetricsPrefix + "events_total", + Help: "WebSocket events received by node", + }, []string{"chain", "node"}), + eventsFilteredCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: constants.PrometheusMetricsPrefix + "events_filtered", + Help: "WebSocket events filtered out by chain, type and reason", + }, []string{"chain", "type", "reason"}), + + // Node metrics + nodeConnectedCollector: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: constants.PrometheusMetricsPrefix + "node_connected", + Help: "Whether the node is successfully connected (1 if yes, 0 if no)", + }, []string{"chain", "node"}), + reconnectsCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: constants.PrometheusMetricsPrefix + "reconnects_total", + Help: "Node reconnects count", + }, []string{"chain", "node"}), + + // Reporter metrics reporterEnabledGauge: promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: constants.PrometheusMetricsPrefix + "reporter_enabled", Help: "Whether the reporter is enabled (1 if yes, 0 if no)", - }, []string{"name"}), - reporterQueriesCounter: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: constants.PrometheusMetricsPrefix + "reporter_queries", - Help: "Reporters' queries count ", - }, []string{"chain", "name", "query"}), + }, []string{"name", "type"}), + reporterReportsCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: constants.PrometheusMetricsPrefix + "reporter_reports", + Help: "Counter of reports sent successfully", + }, []string{"chain", "reporter", "type", "subscription"}), + reporterErrorsCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: constants.PrometheusMetricsPrefix + "reporter_errors", + Help: "Counter of failed reports sends", + }, []string{"chain", "reporter", "type", "subscription"}), + reportEntriesCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: constants.PrometheusMetricsPrefix + "report_entries_total", + Help: "Counter of messages types per each successfully sent report", + }, []string{"chain", "reporter", "type", "subscription"}), + + // Subscription metrics + eventsMatchedCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: constants.PrometheusMetricsPrefix + "events_matched", + Help: "WebSocket events matching filters by chain", + }, []string{"chain", "type", "subscription"}), + + // App metrics appVersionGauge: promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: constants.PrometheusMetricsPrefix + "version", Help: "App version", @@ -92,26 +126,6 @@ func NewManager(logger *zerolog.Logger, config configPkg.MetricsConfig) *Manager Name: constants.PrometheusMetricsPrefix + "start_time", Help: "Unix timestamp on when the app was started. Useful for annotations.", }, []string{}), - eventsTotalCounter: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: constants.PrometheusMetricsPrefix + "events_total", - Help: "WebSocket events received by node", - }, []string{"chain", "node"}), - eventsFilteredCounter: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: constants.PrometheusMetricsPrefix + "events_filtered", - Help: "WebSocket events filtered out by chain", - }, []string{"chain", "type"}), - eventsMatchedCounter: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: constants.PrometheusMetricsPrefix + "events_matched", - Help: "WebSocket events matching filters by chain", - }, []string{"chain", "type"}), - reconnectsCounter: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: constants.PrometheusMetricsPrefix + "reconnects_total", - Help: "Node reconnects count", - }, []string{"chain", "node"}), - chainInfoGauge: promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: constants.PrometheusMetricsPrefix + "chain_info", - Help: "Chain info, with constant 1 as value and pretty_name and chain as labels", - }, []string{"chain", "pretty_name"}), } } @@ -125,10 +139,6 @@ func (m *Manager) SetAllDefaultMetrics(chains []*configTypes.Chain) { } } func (m *Manager) SetDefaultMetrics(chain *configTypes.Chain) { - m.reportsCounter. - With(prometheus.Labels{"chain": chain.Name}). - Add(0) - m.chainInfoGauge. With(prometheus.Labels{"chain": chain.Name, "pretty_name": chain.PrettyName}). Set(1) @@ -217,25 +227,47 @@ func (m *Manager) LogTendermintQuery(chain string, query queryInfo.QueryInfo, qu } } -func (m *Manager) LogReport(report types.Report) { - m.reportsCounter. - With(prometheus.Labels{"chain": report.Chain.Name}). +func (m *Manager) LogReport(report types.Report, reporterName string, success bool) { + if !success { + m.reporterErrorsCounter. + With(prometheus.Labels{ + "chain": report.Chain.Name, + "reporter": reporterName, + "type": report.Reportable.Type(), + "subscription": report.Subscription.Name, + }). + Inc() + return + } + + m.reporterReportsCounter. + With(prometheus.Labels{ + "chain": report.Chain.Name, + "reporter": reporterName, + "type": report.Reportable.Type(), + "subscription": report.Subscription.Name, + }). Inc() for _, entry := range report.Reportable.GetMessages() { m.reportEntriesCounter. With(prometheus.Labels{ - "chain": report.Chain.Name, - "type": entry.Type(), + "chain": report.Chain.Name, + "reporter": reporterName, + "type": entry.Type(), + "subscription": report.Subscription.Name, }). Inc() } } -func (m *Manager) LogReporterEnabled(name string, enabled bool) { +func (m *Manager) LogReporterEnabled(name, reporterType string) { m.reporterEnabledGauge. - With(prometheus.Labels{"name": name}). - Set(utils.BoolToFloat64(enabled)) + With(prometheus.Labels{ + "name": name, + "type": reporterType, + }). + Set(1) } func (m *Manager) LogAppVersion(version string) { @@ -250,15 +282,23 @@ func (m *Manager) LogWSEvent(chain string, node string) { Inc() } -func (m *Manager) LogFilteredEvent(chain string, eventType string) { +func (m *Manager) LogFilteredEvent(chain string, eventType string, reason constants.EventFilterReason) { m.eventsFilteredCounter. - With(prometheus.Labels{"chain": chain, "type": eventType}). + With(prometheus.Labels{ + "chain": chain, + "type": eventType, + "reason": string(reason), + }). Inc() } -func (m *Manager) LogMatchedEvent(chain string, eventType string) { +func (m *Manager) LogMatchedEvent(chain string, eventType string, subscription string) { m.eventsMatchedCounter. - With(prometheus.Labels{"chain": chain, "type": eventType}). + With(prometheus.Labels{ + "chain": chain, + "type": eventType, + "subscription": subscription, + }). Inc() } diff --git a/pkg/reporters/reporter.go b/pkg/reporters/reporter.go index f271f3c..1cbdb2c 100644 --- a/pkg/reporters/reporter.go +++ b/pkg/reporters/reporter.go @@ -16,10 +16,21 @@ type Reporter interface { Init() Name() string Type() string - Enabled() bool Send(report types.Report) error } +type Reporters []Reporter + +func (r Reporters) FindByName(name string) Reporter { + for _, reporter := range r { + if reporter.Name() == name { + return reporter + } + } + + return nil +} + func GetReporter( reporterConfig *configTypes.Reporter, appConfig *config.AppConfig, diff --git a/pkg/reporters/telegram/telegram.go b/pkg/reporters/telegram/telegram.go index a908c5c..2931d7b 100644 --- a/pkg/reporters/telegram/telegram.go +++ b/pkg/reporters/telegram/telegram.go @@ -96,11 +96,6 @@ func (reporter *Reporter) Init() { reporter.TelegramBot = bot go reporter.TelegramBot.Start() } - -func (reporter *Reporter) Enabled() bool { - return reporter.Token != "" && reporter.Chat != 0 -} - func (reporter *Reporter) GetTemplate(name string) (*template.Template, error) { if cachedTemplate, ok := reporter.Templates[name]; ok { reporter.Logger.Trace().Str("type", name).Msg("Using cached template") @@ -127,14 +122,14 @@ func (reporter *Reporter) GetTemplate(name string) (*template.Template, error) { } func (reporter *Reporter) Render(templateName string, data interface{}) (string, error) { - template, err := reporter.GetTemplate(templateName) + reportTemplate, err := reporter.GetTemplate(templateName) if err != nil { reporter.Logger.Error().Err(err).Str("type", templateName).Msg("Error loading template") return "", err } var buffer bytes.Buffer - err = template.Execute(&buffer, data) + err = reportTemplate.Execute(&buffer, data) if err != nil { reporter.Logger.Error().Err(err).Str("type", templateName).Msg("Error rendering template") return "", err diff --git a/pkg/types/report.go b/pkg/types/report.go index ca46e81..6862ea3 100644 --- a/pkg/types/report.go +++ b/pkg/types/report.go @@ -5,7 +5,9 @@ import ( ) type Report struct { - Chain types.Chain - Node string - Reportable Reportable + Chain types.Chain + Subscription *types.Subscription + ChainSubscription *types.ChainSubscription + Node string + Reportable Reportable } diff --git a/templates/telegram/Tx.html b/templates/telegram/Tx.html index 63c8ea5..ea6af40 100644 --- a/templates/telegram/Tx.html +++ b/templates/telegram/Tx.html @@ -1,7 +1,7 @@ 💸 New transaction on chain {{ .Chain.GetName }} Hash: {{ SerializeLink .Reportable.Hash }} Height: {{ SerializeLink .Reportable.Height }} -{{- if .Chain.LogFailedTransactions }} +{{- if .ChainSubscription.LogFailedTransactions }} {{- if not .Reportable.Code }} Status: 👌 Success {{- else }}