-
Notifications
You must be signed in to change notification settings - Fork 13
/
cmd_api_server.go
392 lines (343 loc) · 8.97 KB
/
cmd_api_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
//
// Launch our API-server.
//
package main
import (
"bytes"
"crypto/sha1"
"fmt"
"io"
"io/ioutil"
"net/http"
"path/filepath"
"strings"
"sync"
"github.com/gorilla/mux"
"github.com/skx/sos/libconfig"
)
// OPTIONS holds options passed to this sub-command, so that we can later
// test if `-verbose` is in-force.
var OPTIONS apiServerCmd
//
// Start the upload/download servers running.
//
func apiServer(options apiServerCmd) {
//
// If we received blob-servers on the command-line use them too.
//
// NOTE: blob-servers added on the command-line are placed in the
// "default" group.
//
if options.blob != "" {
servers := strings.Split(options.blob, ",")
for _, entry := range servers {
libconfig.AddServer("default", entry)
}
} else {
//
// Initialize the servers from our config file(s).
//
libconfig.InitServers()
}
//
// If we're merely dumping the servers then do so now.
//
if options.dump {
fmt.Printf("\t% 10s - %s\n", "group", "server")
for _, entry := range libconfig.Servers() {
fmt.Printf("\t% 10s - %s\n", entry.Group, entry.Location)
}
return
}
OPTIONS = options
//
// Otherwise show a banner, then launch the server-threads.
//
fmt.Printf("[Launching API-server]\n")
fmt.Printf("\nUpload service\nhttp://%s:%d/upload\n", options.host, options.uport)
fmt.Printf("\nDownload service\nhttp://%s:%d/fetch/:id\n", options.host, options.dport)
//
// Show the blob-servers, and their weights
//
fmt.Printf("\nBlob-servers:\n")
fmt.Printf("\t% 10s - %s\n", "group", "server")
for _, entry := range libconfig.Servers() {
fmt.Printf("\t% 10s - %s\n", entry.Group, entry.Location)
}
fmt.Printf("\n")
//
// Create a route for uploading.
//
upRouter := mux.NewRouter()
upRouter.HandleFunc("/upload", APIUploadHandler).Methods("POST")
upRouter.PathPrefix("/").HandlerFunc(APIMissingHandler)
//
// Create a route for downloading.
//
downRouter := mux.NewRouter()
downRouter.HandleFunc("/fetch/{id}", APIDownloadHandler).Methods("GET")
downRouter.HandleFunc("/fetch/{id}", APIDownloadHandler).Methods("HEAD")
downRouter.PathPrefix("/").HandlerFunc(APIMissingHandler)
//
// The following code is a hack to allow us to run two distinct
// HTTP-servers on different ports.
//
// It's almost sexy the way it worked the first time :)
//
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
err := http.ListenAndServe(fmt.Sprintf("%s:%d", options.host, options.uport),
upRouter)
if err != nil {
panic(err)
}
wg.Done()
}()
wg.Add(1)
go func() {
err := http.ListenAndServe(fmt.Sprintf("%s:%d", options.host, options.dport),
downRouter)
if err != nil {
panic(err)
}
wg.Done()
}()
wg.Wait()
}
//
// This is a helper for allowing us to consume a HTTP-body more than once.
///
type myReader struct {
*bytes.Buffer
}
//
// So that it implements the io.ReadCloser interface
//
func (m myReader) Close() error { return nil }
// APIUploadHandler handles uploads to the API server.
//
// This should attempt to upload against the blob-servers and return
// when that is complete. If there is a failure then it should
// repeat the process until all known servers are exhausted.
//
// The retry logic is described in the file `SCALING.md` in the
// repository, but in brief there are two cases:
//
// * All the servers are in the group `default`.
//
// * There are N defined groups.
//
// Both cases are handled by the call to OrderedServers() which
// returns the known blob-servers in a suitable order to minimize
// lookups. See `SCALING.md` for more details.
//
//
func APIUploadHandler(res http.ResponseWriter, req *http.Request) {
//
// We create a new buffer to hold the request-body.
//
buf, _ := ioutil.ReadAll(req.Body)
//
// Create a copy of the buffer, so that we can consume
// it initially to hash the data.
//
rdr1 := myReader{bytes.NewBuffer(buf)}
//
// Get the SHA1 hash of the uploaded data.
//
hasher := sha1.New()
b, _ := ioutil.ReadAll(rdr1)
hasher.Write([]byte(b))
hash := hasher.Sum(nil)
//
// Now we're going to attempt to re-POST the uploaded
// content to one of our blob-servers.
//
// We try each blob-server in turn, and if/when we receive
// a successful result we'll return it to the caller.
//
for _, s := range libconfig.OrderedServers() {
//
// Replace the request body with the (second) copy we made.
//
rdr2 := myReader{bytes.NewBuffer(buf)}
req.Body = rdr2
//
// This is where we'll POST to.
//
url := fmt.Sprintf("%s%s%x", s.Location, "/blob/", hash)
//
// Build up a new request.
//
child, _ := http.NewRequest("POST", url, req.Body)
//
// Propagate any incoming X-headers
//
for header, value := range req.Header {
if strings.HasPrefix(header, "X-") {
child.Header.Set(header, value[0])
}
}
//
// Send the request.
//
client := &http.Client{}
r, err := client.Do(child)
//
// If there was no error we're good.
//
if err == nil {
//
// We read the reply we received from the
// blob-server and return it to the caller.
//
response, _ := ioutil.ReadAll(r.Body)
if response != nil {
fmt.Fprintf(res, string(response))
return
}
}
}
//
// If we reach here we've attempted our upload on every
// known blob-server and none accepted it.
//
// Let the caller know.
//
res.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(res, "{\"error\":\"upload failed\"}")
return
}
//
// APIDownloadHandler handles downloads from the API server.
//
// This should attempt to download against the blob-servers and return
// when that is complete. If there is a failure then it should
// repeat the process until all known servers are exhausted..
//
// The retry logic is described in the file `SCALING.md` in the
// repository, but in brief there are two cases:
//
// * All the servers are in the group `default`.
//
// * There are N defined groups.
//
// Both cases are handled by the call to OrderedServers() which
// returns the known blob-servers in a suitable order to minimize
// lookups. See `SCALING.md` for more details.
//
//
func APIDownloadHandler(res http.ResponseWriter, req *http.Request) {
//
// The ID of the file we're to retrieve.
//
vars := mux.Vars(req)
id := vars["id"]
//
// Strip any extension which might be present on the ID.
//
extension := filepath.Ext(id)
id = id[0 : len(id)-len(extension)]
//
// We try each blob-server in turn, and if/when we receive
// a successfully result we'll return it to the caller.
//
for _, s := range libconfig.OrderedServers() {
//
// Show which back-end we're going to use.
//
if OPTIONS.verbose {
fmt.Printf("Attempting retrieval from %s%s%s\n", s.Location, "/blob/", id)
}
//
// Build up the request.
//
response, err := http.Get(fmt.Sprintf("%s%s%s", s.Location, "/blob/", id))
//
// If there was no error we're good.
//
if err != nil || response.StatusCode != 200 {
//
// If there was an error then we skip this server
//
if err != nil {
if OPTIONS.verbose {
fmt.Printf("\tError fetching: %s\n", err.Error())
}
} else {
//
// If there was no error then the HTTP-connection
// to the back-end succeeded, but that didn't
// return a 200 OK.
//
// This might happen if a file was uploaded
// to only one host, but we've hit another.
//
// (i.e. Replication is pending.)
//
if OPTIONS.verbose {
fmt.Printf("\tStatus Code : %d\n", response.StatusCode)
}
}
} else {
//
// We read the reply we received from the
// blob-server and return it to the caller.
//
body, _ := ioutil.ReadAll(response.Body)
if body != nil {
//
// We found a non-empty result on a back-end
// server, so we're going to pipe the data
// back.
if OPTIONS.verbose {
fmt.Printf("\tFound, read %d bytes\n", len(body))
}
//
// If we found the file, and the body
// was non-empty then we'll return
// a HTTP-OK response.
//
// If the request-method was HEAD
// and the file isn't found then the 404-result
// at the foot of this function will ensure
// that a negative response is sent.
//
if req.Method == "HEAD" {
res.Header().Set("Connection", "close")
res.WriteHeader(http.StatusOK)
return
}
//
// Copy any X-Header which was present
// into the reply too.
//
for header, value := range response.Header {
if strings.HasPrefix(header, "X-") {
res.Header().Set(header, value[0])
}
}
//
// Now send back the body.
//
io.Copy(res, bytes.NewReader(body))
return
}
}
}
//
// If we reach here we've attempted our download on every
// known blob-server and none succeeded.
//
// Let the caller know.
//
res.Header().Set("Connection", "close")
res.WriteHeader(http.StatusNotFound)
}
// APIMissingHandler is a fall-back handler for all requests which are
// neither upload nor download.
func APIMissingHandler(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(http.StatusNotFound)
fmt.Fprintf(res, "Invalid method or location.")
}