Skip to content

Commit 9d5197b

Browse files
pracucciWing924
andauthored
Backport WAL config fix (#2071) (#2079)
Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Wei He <[email protected]>
1 parent f69ccee commit 9d5197b

File tree

4 files changed

+34
-30
lines changed

4 files changed

+34
-30
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
## master / unreleased
44

55

6+
## 0.6.1 / 2020-02-05
7+
8+
* [BUGFIX] Fixed parsing of the WAL configuration when specified in the YAML config file. #2071
9+
610
## 0.6.0 / 2020-01-28
711

812
Note that the ruler flags need to be changed in this upgrade. You're moving from a single node ruler to something that might need to be sharded.

pkg/ingester/ingester.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ var (
4242

4343
// Config for an Ingester.
4444
type Config struct {
45-
WALConfig WALConfig
45+
WALConfig WALConfig `yaml:"walconfig,omitempty"`
4646
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
4747

4848
// Config for transferring chunks. Zero or negative = no retries.
@@ -141,7 +141,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
141141
return NewV2(cfg, clientConfig, limits, registerer)
142142
}
143143

144-
if cfg.WALConfig.walEnabled {
144+
if cfg.WALConfig.WALEnabled {
145145
// If WAL is enabled, we don't transfer out the data to any ingester.
146146
// Either the next ingester which takes it's place should recover from WAL
147147
// or the data has to be flushed during scaledown.
@@ -167,14 +167,14 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
167167
var err error
168168
// During WAL recovery, it will create new user states which requires the limiter.
169169
// Hence initialise the limiter before creating the WAL.
170-
// The '!cfg.WALConfig.walEnabled' argument says don't flush on shutdown if the WAL is enabled.
171-
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.walEnabled)
170+
// The '!cfg.WALConfig.WALEnabled' argument says don't flush on shutdown if the WAL is enabled.
171+
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled)
172172
if err != nil {
173173
return nil, err
174174
}
175175
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
176176

177-
if cfg.WALConfig.recover {
177+
if cfg.WALConfig.Recover {
178178
level.Info(util.Logger).Log("msg", "recovering from WAL")
179179
start := time.Now()
180180
if err := recoverFromWAL(i); err != nil {
@@ -286,7 +286,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
286286

287287
var lastPartialErr *validationError
288288
var record *Record
289-
if i.cfg.WALConfig.walEnabled {
289+
if i.cfg.WALConfig.WALEnabled {
290290
record = recordPool.Get().(*Record)
291291
record.UserId = userID
292292
// Assuming there is not much churn in most cases, there is no use

pkg/ingester/wal.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,21 @@ import (
2929

3030
// WALConfig is config for the Write Ahead Log.
3131
type WALConfig struct {
32-
walEnabled bool `yaml:"wal_enabled,omitempty"`
33-
checkpointEnabled bool `yaml:"checkpoint_enabled,omitempty"`
34-
recover bool `yaml:"recover_from_wal,omitempty"`
35-
dir string `yaml:"wal_dir,omitempty"`
36-
checkpointDuration time.Duration `yaml:"checkpoint_duration,omitempty"`
37-
metricsRegisterer prometheus.Registerer
32+
WALEnabled bool `yaml:"wal_enabled,omitempty"`
33+
CheckpointEnabled bool `yaml:"checkpoint_enabled,omitempty"`
34+
Recover bool `yaml:"recover_from_wal,omitempty"`
35+
Dir string `yaml:"wal_dir,omitempty"`
36+
CheckpointDuration time.Duration `yaml:"checkpoint_duration,omitempty"`
37+
metricsRegisterer prometheus.Registerer `yaml:"-"`
3838
}
3939

4040
// RegisterFlags adds the flags required to config this to the given FlagSet
4141
func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) {
42-
f.StringVar(&cfg.dir, "ingester.wal-dir", "wal", "Directory to store the WAL and/or recover from WAL.")
43-
f.BoolVar(&cfg.recover, "ingester.recover-from-wal", false, "Recover data from existing WAL irrespective of WAL enabled/disabled.")
44-
f.BoolVar(&cfg.walEnabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.")
45-
f.BoolVar(&cfg.checkpointEnabled, "ingester.checkpoint-enabled", false, "Enable checkpointing of in-memory chunks.")
46-
f.DurationVar(&cfg.checkpointDuration, "ingester.checkpoint-duration", 30*time.Minute, "Interval at which checkpoints should be created.")
42+
f.StringVar(&cfg.Dir, "ingester.wal-dir", "wal", "Directory to store the WAL and/or recover from WAL.")
43+
f.BoolVar(&cfg.Recover, "ingester.recover-from-wal", false, "Recover data from existing WAL irrespective of WAL enabled/disabled.")
44+
f.BoolVar(&cfg.WALEnabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.")
45+
f.BoolVar(&cfg.CheckpointEnabled, "ingester.checkpoint-enabled", false, "Enable checkpointing of in-memory chunks.")
46+
f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 30*time.Minute, "Interval at which checkpoints should be created.")
4747
}
4848

4949
// WAL interface allows us to have a no-op WAL when the WAL is disabled.
@@ -77,15 +77,15 @@ type walWrapper struct {
7777

7878
// newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL.
7979
func newWAL(cfg WALConfig, userStatesFunc func() map[string]*userState) (WAL, error) {
80-
if !cfg.walEnabled {
80+
if !cfg.WALEnabled {
8181
return &noopWAL{}, nil
8282
}
8383

8484
var walRegistry prometheus.Registerer
8585
if cfg.metricsRegisterer != nil {
8686
walRegistry = prometheus.WrapRegistererWith(prometheus.Labels{"kind": "wal"}, cfg.metricsRegisterer)
8787
}
88-
tsdbWAL, err := wal.NewSize(util.Logger, walRegistry, cfg.dir, wal.DefaultSegmentSize/4, true)
88+
tsdbWAL, err := wal.NewSize(util.Logger, walRegistry, cfg.Dir, wal.DefaultSegmentSize/4, true)
8989
if err != nil {
9090
return nil, err
9191
}
@@ -158,11 +158,11 @@ func (w *walWrapper) Log(record *Record) error {
158158
func (w *walWrapper) run() {
159159
defer w.wait.Done()
160160

161-
if !w.cfg.checkpointEnabled {
161+
if !w.cfg.CheckpointEnabled {
162162
return
163163
}
164164

165-
ticker := time.NewTicker(w.cfg.checkpointDuration)
165+
ticker := time.NewTicker(w.cfg.CheckpointDuration)
166166
defer ticker.Stop()
167167

168168
for {
@@ -190,7 +190,7 @@ func (w *walWrapper) run() {
190190
const checkpointPrefix = "checkpoint."
191191

192192
func (w *walWrapper) performCheckpoint() (err error) {
193-
if !w.cfg.checkpointEnabled {
193+
if !w.cfg.CheckpointEnabled {
194194
return nil
195195
}
196196

@@ -359,7 +359,7 @@ func (w *walWrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Finge
359359
}
360360

361361
func recoverFromWAL(ingester *Ingester) (err error) {
362-
walDir := ingester.cfg.WALConfig.dir
362+
walDir := ingester.cfg.WALConfig.Dir
363363
// Use a local userStates, so we don't need to worry about locking.
364364
userStates := newUserStates(ingester.limiter, ingester.cfg, ingester.metrics)
365365

pkg/ingester/wal_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ func TestWAL(t *testing.T) {
2020
require.NoError(t, err)
2121

2222
cfg := defaultIngesterTestConfig()
23-
cfg.WALConfig.walEnabled = true
24-
cfg.WALConfig.checkpointEnabled = true
25-
cfg.WALConfig.recover = true
26-
cfg.WALConfig.dir = dirname
27-
cfg.WALConfig.checkpointDuration = 100 * time.Millisecond
23+
cfg.WALConfig.WALEnabled = true
24+
cfg.WALConfig.CheckpointEnabled = true
25+
cfg.WALConfig.Recover = true
26+
cfg.WALConfig.Dir = dirname
27+
cfg.WALConfig.CheckpointDuration = 100 * time.Millisecond
2828

2929
numSeries := 100
3030
numSamplesPerSeriesPerPush := 10
@@ -37,8 +37,8 @@ func TestWAL(t *testing.T) {
3737

3838
for r := 0; r < numRestarts; r++ {
3939
if r == numRestarts-1 {
40-
cfg.WALConfig.walEnabled = false
41-
cfg.WALConfig.checkpointEnabled = false
40+
cfg.WALConfig.WALEnabled = false
41+
cfg.WALConfig.CheckpointEnabled = false
4242
}
4343
// Start a new ingester and recover the WAL.
4444
_, ing = newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig())

0 commit comments

Comments
 (0)