Skip to content

Commit d4677b1

Browse files
authored
Merge pull request #1207 from percona/PBM-1605-fix-incomplete-backups-gcs-hmac
PBM-1605: PBM marks incomplete backups as successful without logging errors
2 parents 92bfb82 + e5441de commit d4677b1

File tree

2 files changed

+196
-3
lines changed

2 files changed

+196
-3
lines changed

pbm/storage/gcs/hmac_client.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package gcs
22

33
import (
44
"context"
5+
"encoding/base64"
6+
"encoding/binary"
7+
"hash/crc32"
58
"io"
69
"path"
710
"runtime"
@@ -73,22 +76,31 @@ func (h hmacClient) save(name string, data io.Reader, options ...storage.Option)
7376
partSize, storage.PrettySize(partSize))
7477
}
7578

79+
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
80+
dataWithCRC := io.TeeReader(data, crc)
81+
7682
putOpts := minio.PutObjectOptions{
7783
PartSize: uint64(partSize),
7884
NumThreads: uint(max(runtime.NumCPU()/2, 1)),
7985
}
80-
81-
_, err := h.client.PutObject(
86+
putInfo, err := h.client.PutObject(
8287
context.Background(),
8388
h.opts.Bucket,
8489
path.Join(h.opts.Prefix, name),
85-
data,
90+
dataWithCRC,
8691
-1,
8792
putOpts,
8893
)
8994
if err != nil {
9095
return errors.Wrap(err, "PutObject")
9196
}
97+
98+
localCRC := crcToBase64(crc.Sum32())
99+
if putInfo.ChecksumCRC32C != localCRC {
100+
return errors.Errorf("wrong CRC after uploading %s, GCS: %s, PBM: %s",
101+
name, putInfo.ChecksumCRC32C, localCRC)
102+
}
103+
92104
return nil
93105
}
94106

@@ -215,3 +227,9 @@ func (h hmacClient) getPartialObject(name string, buf *storage.Arena, start, len
215227

216228
return ch, nil
217229
}
230+
231+
func crcToBase64(v uint32) string {
232+
buf := make([]byte, 4)
233+
binary.BigEndian.PutUint32(buf, v)
234+
return base64.StdEncoding.EncodeToString(buf)
235+
}

pbm/storage/mio/minio_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@ package mio
22

33
import (
44
"context"
5+
"io"
56
"net/url"
7+
"path"
8+
"runtime"
69
"testing"
10+
"time"
711

812
"github.com/minio/minio-go/v7"
913
"github.com/minio/minio-go/v7/pkg/credentials"
@@ -80,3 +84,174 @@ func TestMinio(t *testing.T) {
8084
storage.RunSplitMergeMWTests(t, stg)
8185
})
8286
}
87+
88+
// TestUploadGCS shows how it's possible to upload corrupted file
89+
// without getting any error from the minio library.
90+
//
91+
// To simulate network interruption use:
92+
// tc qdisc add dev eth0 root netem loss 100%
93+
//
94+
// To revert it to normal use:
95+
// tc qdisc del dev eth0 root netem
96+
func TestUploadGCS(t *testing.T) {
97+
t.Skip("for manual invocation, it will be deleted after GCS HMAC is deprecated")
98+
99+
ep := "storage.googleapis.com"
100+
bucket := "gcs-bucket"
101+
prefix := "test-prefix"
102+
accessKeyID := "key-id"
103+
secretAccessKey := "secret-key"
104+
105+
fname := time.Now().Format("2006-01-02T15:04:05")
106+
107+
mc, err := minio.New(ep, &minio.Options{
108+
Creds: credentials.NewStaticV2(accessKeyID, secretAccessKey, ""),
109+
Secure: true,
110+
})
111+
if err != nil {
112+
t.Fatalf("minio client creation for GCS: %v", err)
113+
}
114+
t.Log("minio client created")
115+
116+
t.Logf("uploading file: %s", fname)
117+
118+
infR := NewInfiniteCustomReader()
119+
r := io.LimitReader(infR, targetSizeBytes)
120+
121+
putOpts := minio.PutObjectOptions{
122+
PartSize: uint64(defaultPartSize),
123+
NumThreads: uint(max(runtime.NumCPU()/2, 1)),
124+
}
125+
info, err := mc.PutObject(
126+
context.Background(),
127+
bucket,
128+
path.Join(prefix, fname),
129+
r,
130+
-1,
131+
putOpts,
132+
)
133+
if err != nil {
134+
t.Fatalf("put object: %v", err)
135+
}
136+
137+
t.Logf("upload info: %#v", info)
138+
}
139+
140+
func TestUploadAWSSigV2(t *testing.T) {
141+
t.Skip("for manual invocation, it will be deleted after GCS HMAC is deprecated")
142+
143+
ep := "s3.amazonaws.com"
144+
region := "eu-central-1"
145+
bucket := "aws-bucket"
146+
prefix := "test-prefix"
147+
accessKeyID := "key-id"
148+
secretAccessKey := "secret-key"
149+
150+
fname := time.Now().Format("2006-01-02T15:04:05")
151+
152+
mc, err := minio.New(ep, &minio.Options{
153+
Region: region,
154+
Creds: credentials.NewStaticV2(accessKeyID, secretAccessKey, ""),
155+
Secure: true,
156+
})
157+
if err != nil {
158+
t.Fatalf("minio client creation for aws: %v", err)
159+
}
160+
t.Log("minio client created for aws with sigV2")
161+
162+
t.Logf("uploading file: %s", fname)
163+
164+
infR := NewInfiniteCustomReader()
165+
r := io.LimitReader(infR, targetSizeBytes)
166+
167+
putOpts := minio.PutObjectOptions{
168+
PartSize: uint64(defaultPartSize),
169+
NumThreads: uint(max(runtime.NumCPU()/2, 1)),
170+
}
171+
info, err := mc.PutObject(
172+
context.Background(),
173+
bucket,
174+
path.Join(prefix, fname),
175+
r,
176+
-1,
177+
putOpts,
178+
)
179+
if err != nil {
180+
t.Fatalf("put object: %v", err)
181+
}
182+
183+
t.Logf("upload info: %#v", info)
184+
}
185+
186+
func TestUploadAWSSigV4(t *testing.T) {
187+
t.Skip("for manual invocation, it will be deleted after GCS HMAC is deprecated")
188+
189+
ep := "s3.amazonaws.com"
190+
region := "eu-central-1"
191+
bucket := "aws-bucket"
192+
prefix := "test-prefix"
193+
accessKeyID := "key-id"
194+
secretAccessKey := "secret-key"
195+
196+
fname := time.Now().Format("2006-01-02T15:04:05")
197+
198+
mc, err := minio.New(ep, &minio.Options{
199+
Region: region,
200+
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
201+
Secure: true,
202+
})
203+
if err != nil {
204+
t.Fatalf("minio client creation for aws: %v", err)
205+
}
206+
t.Log("minio client created for aws with sigV4")
207+
208+
t.Logf("uploading file: %s ....", fname)
209+
210+
infR := NewInfiniteCustomReader()
211+
r := io.LimitReader(infR, targetSizeBytes)
212+
213+
putOpts := minio.PutObjectOptions{
214+
PartSize: uint64(defaultPartSize),
215+
NumThreads: uint(max(runtime.NumCPU()/2, 1)),
216+
}
217+
info, err := mc.PutObject(
218+
context.Background(),
219+
bucket,
220+
path.Join(prefix, fname),
221+
r,
222+
-1,
223+
putOpts,
224+
)
225+
if err != nil {
226+
t.Fatalf("put object: %v", err)
227+
}
228+
229+
t.Logf("upload info: %#v", info)
230+
}
231+
232+
const targetSizeBytes = 1000 * 1024 * 1024
233+
234+
type InfiniteCustomReader struct {
235+
pattern []byte
236+
patternIndex int
237+
}
238+
239+
func NewInfiniteCustomReader() *InfiniteCustomReader {
240+
pattern := []byte{0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x11, 0x22}
241+
242+
return &InfiniteCustomReader{
243+
pattern: pattern,
244+
patternIndex: 0,
245+
}
246+
}
247+
248+
func (r *InfiniteCustomReader) Read(p []byte) (int, error) {
249+
readLen := len(p)
250+
251+
for i := range readLen {
252+
p[i] = r.pattern[r.patternIndex]
253+
r.patternIndex = (r.patternIndex + 1) % len(r.pattern)
254+
}
255+
256+
return readLen, nil
257+
}

0 commit comments

Comments
 (0)