forked from hashicorp/vault-plugin-secrets-kv
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupgrade.go
310 lines (264 loc) · 8.13 KB
/
upgrade.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package kv
import (
"context"
"errors"
"fmt"
"path"
"strings"
"sync/atomic"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/vault/sdk/framework"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/helper/locksutil"
"github.com/hashicorp/vault/sdk/helper/pluginutil"
"github.com/hashicorp/vault/sdk/logical"
)
func (b *versionedKVBackend) perfSecondaryCheck() bool {
replState := b.System().ReplicationState()
if (!b.System().LocalMount() && replState.HasState(consts.ReplicationPerformanceSecondary)) ||
replState.HasState(consts.ReplicationPerformanceStandby) {
return true
}
return false
}
func (b *versionedKVBackend) upgradeCheck(next framework.OperationFunc) framework.OperationFunc {
return func(ctx context.Context, req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
if atomic.LoadUint32(b.upgrading) == 1 {
// Sleep for a very short time before returning. This helps clients
// that are trying to access a mount immediately upon enabling be
// more likely to behave correctly since the operation should take
// almost no time.
time.Sleep(15 * time.Millisecond)
if atomic.LoadUint32(b.upgrading) == 1 {
if b.perfSecondaryCheck() {
return logical.ErrorResponse("Waiting for the primary to upgrade from non-versioned to versioned data. This backend will be unavailable for a brief period and will resume service when the primary is finished."), logical.ErrInvalidRequest
} else {
return logical.ErrorResponse("Upgrading from non-versioned to versioned data. This backend will be unavailable for a brief period and will resume service shortly."), logical.ErrInvalidRequest
}
}
}
return next(ctx, req, data)
}
}
func (b *versionedKVBackend) upgradeDone(ctx context.Context, s logical.Storage) (bool, error) {
upgradeEntry, err := s.Get(ctx, path.Join(b.storagePrefix, "upgrading"))
if err != nil {
return false, err
}
var upgradeInfo UpgradeInfo
if upgradeEntry != nil {
err := proto.Unmarshal(upgradeEntry.Value, &upgradeInfo)
if err != nil {
return false, err
}
}
return upgradeInfo.Done, nil
}
func (b *versionedKVBackend) Upgrade(ctx context.Context, s logical.Storage) error {
replState := b.System().ReplicationState()
// Don't run if the plugin is in metadata mode.
if pluginutil.InMetadataMode() {
b.Logger().Info("upgrade not running while plugin is in metadata mode")
return nil
}
// Don't run while on a DR secondary.
if replState.HasState(consts.ReplicationDRSecondary) {
b.Logger().Info("upgrade not running on disaster recovery replication secondary")
return nil
}
if !atomic.CompareAndSwapUint32(b.upgrading, 0, 1) {
return errors.New("upgrade already in process")
}
// If we are a replication secondary or performance standby, wait until the primary has finished
// upgrading.
if b.perfSecondaryCheck() {
b.Logger().Info("upgrade not running on performance replication secondary or performance standby")
go func() {
for {
time.Sleep(time.Second)
// If we failed because the context is closed we are
// shutting down. Close this go routine and set the upgrade
// flag back to 0 for good measure.
if ctx.Err() != nil {
atomic.StoreUint32(b.upgrading, 0)
return
}
done, err := b.upgradeDone(ctx, s)
if err != nil {
b.Logger().Error("upgrading resulted in error", "error", err)
}
if done {
break
}
}
atomic.StoreUint32(b.upgrading, 0)
}()
return nil
}
// If we have 0 keys, it's either a new mount or one that's trivial to upgrade,
// so we should do the upgrade synchronously
upgradeSynchronously := false
keys, err := logical.CollectKeys(ctx, s)
if err != nil {
b.Logger().Error("upgrading resulted in error", "error", err)
return err
}
if len(keys) == 0 {
upgradeSynchronously = true
}
upgradeInfo := &UpgradeInfo{
StartedTime: ptypes.TimestampNow(),
}
// Encode the canary
info, err := proto.Marshal(upgradeInfo)
if err != nil {
return err
}
// Because this is a long-running process we need a new context.
ctx = context.Background()
upgradeKey := func(key string) error {
if strings.HasPrefix(key, b.storagePrefix) {
return nil
}
// Read the old data
data, err := s.Get(ctx, key)
if err != nil {
return err
}
locksutil.LockForKey(b.locks, key).Lock()
defer locksutil.LockForKey(b.locks, key).Unlock()
meta := &KeyMetadata{
Key: key,
Versions: map[uint64]*VersionMetadata{},
}
versionKey, err := b.getVersionKey(ctx, key, 1, s)
if err != nil {
return err
}
version := &Version{
Data: data.Value,
CreatedTime: ptypes.TimestampNow(),
}
buf, err := proto.Marshal(version)
if err != nil {
return err
}
// Store the version data
if err := s.Put(ctx, &logical.StorageEntry{
Key: versionKey,
Value: buf,
}); err != nil {
return err
}
// Store the metadata
meta.AddVersion(version.CreatedTime, nil, 1)
err = b.writeKeyMetadata(ctx, s, meta)
if err != nil {
return err
}
// delete the old key
err = s.Delete(ctx, key)
if err != nil {
return err
}
return nil
}
prepareUpgradeInfoDoneFunc := func() ([]byte, error) {
upgradeInfo.Done = true
info, err := proto.Marshal(upgradeInfo)
if err != nil {
b.Logger().Error("encoding upgrade info resulted in an error", "error", err)
return nil, err
}
return info, nil
}
writeUpgradeInfoDoneFunc := func(info []byte) {
for {
err = s.Put(ctx, &logical.StorageEntry{
Key: path.Join(b.storagePrefix, "upgrading"),
Value: info,
})
switch {
case err == nil:
return
case err.Error() == logical.ErrSetupReadOnly.Error():
time.Sleep(10 * time.Millisecond)
default:
b.Logger().Error("writing upgrade info resulted in an error, but all keys were successfully upgraded", "error", err)
return
}
}
}
upgradeFunc := func() {
// Write the canary value and if we are read only wait until the setup
// process has finished.
READONLY_LOOP:
for {
err := s.Put(ctx, &logical.StorageEntry{
Key: path.Join(b.storagePrefix, "upgrading"),
Value: info,
})
switch {
case err == nil:
break READONLY_LOOP
case err.Error() == logical.ErrSetupReadOnly.Error():
time.Sleep(10 * time.Millisecond)
default:
b.Logger().Error("writing upgrade info resulted in an error", "error", err)
return
}
}
b.Logger().Info("collecting keys to upgrade")
keys, err := logical.CollectKeys(ctx, s)
if err != nil {
b.Logger().Error("upgrading resulted in error", "error", err)
return
}
b.Logger().Info("done collecting keys", "num_keys", len(keys))
for i, key := range keys {
if b.Logger().IsDebug() && i%500 == 0 {
b.Logger().Debug("upgrading keys", "progress", fmt.Sprintf("%d/%d", i, len(keys)))
}
err := upgradeKey(key)
if err != nil {
b.Logger().Error("upgrading resulted in error", "error", err, "progress", fmt.Sprintf("%d/%d", i+1, len(keys)))
return
}
}
b.Logger().Info("upgrading keys finished")
// We do this now so that we ensure it's written by the primary before
// secondaries unblock
b.l.Lock()
if _, err = b.policy(ctx, s); err != nil {
b.Logger().Error("error checking/creating policy after upgrade", "error", err)
}
b.l.Unlock()
info, err := prepareUpgradeInfoDoneFunc()
if err != nil {
b.Logger().Error("error marshalling upgrade info after upgrade", "error", err)
return
}
writeUpgradeInfoDoneFunc(info)
atomic.StoreUint32(b.upgrading, 0)
}
if upgradeSynchronously {
// Set us to having 'upgraded' before we insert the upgrade value, as the mount is ready to use now
atomic.StoreUint32(b.upgrading, 0)
info, err := prepareUpgradeInfoDoneFunc()
if err != nil {
return err
}
// We write the upgrade done info into storage in a goroutine, as a Vault mount is set to read only
// during the mount process, so we cannot do it now
go writeUpgradeInfoDoneFunc(info)
} else {
// We run the actual upgrade in a go routine, so we don't block the client on a
// potentially long process.
go upgradeFunc()
}
return nil
}