Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring configuration #91

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/lomik/zapwriter"
)

// App is an application object used in main
type App struct {
sync.RWMutex
Config *Config
Expand All @@ -29,7 +30,7 @@ type App struct {
Pickle receiver.Receiver
Grpc receiver.Receiver
Prometheus receiver.Receiver
TelegrafHttpJson receiver.Receiver
TelegrafHTTPJSON receiver.Receiver
Collector *Collector // (!!!) Should be re-created on every change config/modules
writeChan chan *RowBinary.WriteBuffer
exit chan bool
Expand Down Expand Up @@ -129,9 +130,9 @@ func (app *App) stopListeners() {
logger.Debug("finished", zap.String("module", "prometheus"))
}

if app.TelegrafHttpJson != nil {
app.TelegrafHttpJson.Stop()
app.TelegrafHttpJson = nil
if app.TelegrafHTTPJSON != nil {
app.TelegrafHTTPJSON.Stop()
app.TelegrafHTTPJSON = nil
logger.Debug("finished", zap.String("module", "telegraf_http_json"))
}
}
Expand Down Expand Up @@ -246,16 +247,16 @@ func (app *App) Start() (err error) {
/* UPLOADER end */

/* RECEIVER start */
if conf.Tcp.Enabled {
if conf.TCP.Enabled {
app.TCP, err = receiver.New(
"tcp://"+conf.Tcp.Listen,
"tcp://"+conf.TCP.Listen,
app.Config.TagDesc,
receiver.ParseThreads(runtime.GOMAXPROCS(-1)*2),
receiver.WriteChan(app.writeChan),
receiver.DropFuture(uint32(conf.Tcp.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.Tcp.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.Tcp.DropLongerThan),
receiver.ReadTimeout(uint32(conf.Tcp.ReadTimeout.Value().Seconds())),
receiver.DropFuture(uint32(conf.TCP.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.TCP.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.TCP.DropLongerThan),
receiver.ReadTimeout(uint32(conf.TCP.ReadTimeout.Value().Seconds())),
)

if err != nil {
Expand All @@ -265,15 +266,15 @@ func (app *App) Start() (err error) {
http.HandleFunc("/debug/receive/tcp/dropped/", app.TCP.DroppedHandler)
}

if conf.Udp.Enabled {
if conf.UDP.Enabled {
app.UDP, err = receiver.New(
"udp://"+conf.Udp.Listen,
"udp://"+conf.UDP.Listen,
app.Config.TagDesc,
receiver.ParseThreads(runtime.GOMAXPROCS(-1)*2),
receiver.WriteChan(app.writeChan),
receiver.DropFuture(uint32(conf.Udp.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.Udp.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.Udp.DropLongerThan),
receiver.DropFuture(uint32(conf.UDP.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.UDP.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.UDP.DropLongerThan),
)

if err != nil {
Expand Down Expand Up @@ -335,22 +336,22 @@ func (app *App) Start() (err error) {
http.HandleFunc("/debug/receive/prometheus/dropped/", app.Prometheus.DroppedHandler)
}

if conf.TelegrafHttpJson.Enabled {
app.TelegrafHttpJson, err = receiver.New(
"telegraf+http+json://"+conf.TelegrafHttpJson.Listen,
if conf.TelegrafHTTPJSON.Enabled {
app.TelegrafHTTPJSON, err = receiver.New(
"telegraf+http+json://"+conf.TelegrafHTTPJSON.Listen,
app.Config.TagDesc,
receiver.WriteChan(app.writeChan),
receiver.DropFuture(uint32(conf.TelegrafHttpJson.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.TelegrafHttpJson.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.TelegrafHttpJson.DropLongerThan),
receiver.ConcatChar(conf.TelegrafHttpJson.Concat),
receiver.DropFuture(uint32(conf.TelegrafHTTPJSON.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.TelegrafHTTPJSON.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.TelegrafHTTPJSON.DropLongerThan),
receiver.ConcatChar(conf.TelegrafHTTPJSON.Concat),
)

if err != nil {
return
}

http.HandleFunc("/debug/receive/telegraf_http_json/dropped/", app.TelegrafHttpJson.DroppedHandler)
http.HandleFunc("/debug/receive/telegraf_http_json/dropped/", app.TelegrafHTTPJSON.DroppedHandler)
}
/* RECEIVER end */

Expand Down
4 changes: 2 additions & 2 deletions carbon/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func NewCollector(app *App) *Collector {
c.stats = append(c.stats, moduleCallback("prometheus", app.Prometheus))
}

if app.TelegrafHttpJson != nil {
c.stats = append(c.stats, moduleCallback("telegraf_http_json", app.TelegrafHttpJson))
if app.TelegrafHTTPJSON != nil {
c.stats = append(c.stats, moduleCallback("telegraf_http_json", app.TelegrafHTTPJSON))
}

for n, u := range app.Uploaders {
Expand Down
26 changes: 14 additions & 12 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

const (
// MetricEndpointLocal used to send metrics in the carbon-clickhouse itself
MetricEndpointLocal = "local"
)

Expand All @@ -27,7 +28,7 @@ type commonConfig struct {
}

type clickhouseConfig struct {
Url string `toml:"url"`
URL string `toml:"url"`
}

type udpConfig struct {
Expand Down Expand Up @@ -72,7 +73,7 @@ type promConfig struct {
DropLongerThan uint16 `toml:"drop-longer-than"`
}

type telegrafHttpJsonConfig struct {
type telegrafHTTPJSONConfig struct {
Listen string `toml:"listen"`
Enabled bool `toml:"enabled"`
DropFuture *config.Duration `toml:"drop-future"`
Expand Down Expand Up @@ -100,12 +101,12 @@ type Config struct {
Common commonConfig `toml:"common"`
Data dataConfig `toml:"data"`
Upload map[string]*uploader.Config `toml:"upload"`
Udp udpConfig `toml:"udp"`
Tcp tcpConfig `toml:"tcp"`
UDP udpConfig `toml:"udp"`
TCP tcpConfig `toml:"tcp"`
Pickle pickleConfig `toml:"pickle"`
Grpc grpcConfig `toml:"grpc"`
Prometheus promConfig `toml:"prometheus"`
TelegrafHttpJson telegrafHttpJsonConfig `toml:"telegraf_http_json"`
TelegrafHTTPJSON telegrafHTTPJSONConfig `toml:"telegraf_http_json"`
Pprof pprofConfig `toml:"pprof"`
Logging []zapwriter.Config `toml:"logging"`
TagDesc tags.TagConfig `toml:"convert_to_tagged"`
Expand All @@ -125,23 +126,23 @@ func NewConfig() *Config {
},
Logging: nil,
Data: dataConfig{
Path: "/data/carbon-clickhouse/",
Path: "/var/lib/carbon-clickhouse/",
FileInterval: &config.Duration{
Duration: time.Second,
},
AutoInterval: config.NewChunkAutoInterval(),
CompAlgo: &config.Compression{CompAlgo: config.CompAlgoNone},
CompLevel: 0,
},
Udp: udpConfig{
UDP: udpConfig{
Listen: ":2003",
Enabled: true,
LogIncomplete: false,
DropFuture: &config.Duration{},
DropPast: &config.Duration{},
DropLongerThan: 0,
},
Tcp: tcpConfig{
TCP: tcpConfig{
Listen: ":2003",
Enabled: true,
DropFuture: &config.Duration{},
Expand Down Expand Up @@ -172,7 +173,7 @@ func NewConfig() *Config {
DropPast: &config.Duration{},
DropLongerThan: 0,
},
TelegrafHttpJson: telegrafHttpJsonConfig{
TelegrafHTTPJSON: telegrafHTTPJSONConfig{
Listen: ":2007",
Enabled: false,
DropFuture: &config.Duration{},
Expand All @@ -192,13 +193,14 @@ func NewConfig() *Config {
return cfg
}

// NewLoggingConfig returns the zapwriter.Config with logging into "/var/log/carbon-clickhouse/carbon-clickhouse.log"
func NewLoggingConfig() zapwriter.Config {
cfg := zapwriter.NewConfig()
cfg.File = "/var/log/carbon-clickhouse/carbon-clickhouse.log"
return cfg
}

// PrintConfig ...
// PrintDefaultConfig ...
func PrintDefaultConfig() error {
cfg := NewConfig()
buf := new(bytes.Buffer)
Expand All @@ -212,7 +214,7 @@ func PrintDefaultConfig() error {
}

cfg.Upload = map[string]*uploader.Config{
"graphite": &uploader.Config{
"graphite": {
Type: "points",
Timeout: &config.Duration{
Duration: time.Minute,
Expand All @@ -221,7 +223,7 @@ func PrintDefaultConfig() error {
TableName: "graphite",
URL: "http://localhost:8123/",
},
"graphite_tree": &uploader.Config{
"graphite_tree": {
Type: "tree",
Timeout: &config.Duration{
Duration: time.Minute,
Expand Down
28 changes: 17 additions & 11 deletions receiver/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ type Base struct {
pastDropped uint64 // atomic
tooLongDropped uint64 // atomic
}
droppedList [droppedListSize]string
droppedListNext int
droppedListMu sync.Mutex
parseThreads int
dropFutureSeconds uint32
dropPastSeconds uint32
dropTooLongLimit uint16
droppedList [droppedListSize]string
droppedListNext int
droppedListMu sync.Mutex
parseThreads int
dropFutureSeconds uint32
dropPastSeconds uint32
dropTooLongLimit uint16
readTimeoutSeconds uint32
writeChan chan *RowBinary.WriteBuffer
logger *zap.Logger
Tags tags.TagConfig
concatCharacter string
writeChan chan *RowBinary.WriteBuffer
logger *zap.Logger
Tags tags.TagConfig
concatCharacter string
}

func NewBase(logger *zap.Logger, config tags.TagConfig) Base {
Expand All @@ -56,6 +56,12 @@ func sendInt64Gauge(send func(metric string, value float64), metric string, valu
send(metric, float64(atomic.LoadInt64(value)))
}

func (base *Base) applyOptions(opts ...Option) {
for _, applyOption := range opts {
applyOption(base)
}
}

func (base *Base) isDrop(nowTime uint32, metricTime uint32) bool {
if base.dropFutureSeconds != 0 && (metricTime > (nowTime + base.dropFutureSeconds)) {
atomic.AddUint64(&base.stat.futureDropped, 1)
Expand Down
26 changes: 14 additions & 12 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
return nil, err
}

base := NewBase(zapwriter.Logger(strings.Replace(u.Scheme, "+", "_", -1)), config)

for _, optApply := range opts {
optApply(&base)
}
logger := zapwriter.Logger(strings.Replace(u.Scheme, "+", "_", -1))

if u.Scheme == "tcp" {
addr, err := net.ResolveTCPAddr("tcp", u.Host)
Expand All @@ -110,9 +106,10 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
}

r := &TCP{
Base: base,
Base: NewBase(logger, config),
parseChan: make(chan *Buffer),
}
r.applyOptions(opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -128,9 +125,10 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
}

r := &Pickle{
Base: base,
Base: NewBase(logger, config),
parseChan: make(chan []byte),
}
r.applyOptions(opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -146,9 +144,10 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
}

r := &UDP{
Base: base,
Base: NewBase(logger, config),
parseChan: make(chan *Buffer),
}
r.applyOptions(opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -164,8 +163,9 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
}

r := &GRPC{
Base: base,
Base: NewBase(logger, config),
}
r.applyOptions(opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -181,8 +181,9 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
}

r := &PrometheusRemoteWrite{
Base: base,
Base: NewBase(logger, config),
}
r.applyOptions(opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand All @@ -197,9 +198,10 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
return nil, err
}

r := &TelegrafHttpJson{
Base: base,
r := &TelegrafHTTPJSON{
Base: NewBase(logger, config),
}
r.applyOptions(opts...)

if err = r.Listen(addr); err != nil {
return nil, err
Expand Down
Loading