-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3_backend.go
123 lines (101 loc) · 2.42 KB
/
s3_backend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main
import (
"io"
"log"
"math/rand"
"time"
"github.com/mitchellh/goamz/aws"
"github.com/mitchellh/goamz/s3"
)
type s3Backend struct {
AccessKey string
SecretKey string
BucketName string
bucket *s3.Bucket
}
func newS3Backend(accessKey, secretKey, bucket string) *s3Backend {
auth := aws.Auth{
AccessKey: accessKey,
SecretKey: secretKey,
}
// TODO: allow configuration of buckets in other regions
useast := aws.USEast
connection := s3.New(auth, useast)
mybucket := connection.Bucket(bucket)
return &s3Backend{accessKey, secretKey, bucket, mybucket}
}
func (s s3Backend) String() string {
return "S3"
}
func (s *s3Backend) Write(key key, r io.ReadCloser) error {
b, err := io.ReadAll(r)
if err != nil {
log.Println("error writing into buffer")
log.Println(err)
return err
}
err = s.bucket.Put(key.String(), b, "application/octet", s3.BucketOwnerFull)
if err != nil {
log.Println("uh oh. couldn't write to bucket")
log.Println(err)
return err
}
return nil
}
func (s s3Backend) Read(key key) ([]byte, error) {
return s.bucket.Get(key.String())
}
func (s s3Backend) Exists(key key) bool {
ls, err := s.bucket.List(key.String(), "", "", 1)
if err != nil {
return false
}
return len(ls.Contents) == 1
}
func (s *s3Backend) Delete(key key) error {
return s.bucket.Del(key.String())
}
func (s *s3Backend) ActiveAntiEntropy(cluster *cluster, site site, interval int) {
log.Println("S3 AAE starting")
// S3 backend doesn't need verification, just rebalancing
rand.Seed(time.Now().UnixNano())
var jitter = 1
for {
log.Println("AAE starting at the top")
res, err := s.bucket.List("", "", "", 1000)
if err != nil {
log.Fatal(err)
}
n := len(res.Contents)
idxes := rand.Perm(n)
for _, i := range idxes {
v := res.Contents[i]
jitter = rand.Intn(5)
time.Sleep(time.Duration(interval+jitter) * time.Second)
key, err := keyFromString(v.Key)
if err != nil {
continue
}
err = site.Rebalance(*key)
if err != nil {
log.Println(err)
}
}
}
}
type s3Verifier struct{}
func (v *s3Verifier) Verify(path string, key key, h string) error {
// S3 doesn't need verification
return nil
}
func (v *s3Verifier) VerifyKey(key key) error {
// S3 doesn't need verification
return nil
}
func (s s3Backend) NewVerifier(c *cluster) verifier {
return &s3Verifier{}
}
func (s s3Backend) FreeSpace() uint64 {
// TODO: this is just dummied out for now
return 1000000000
}