-
Notifications
You must be signed in to change notification settings - Fork 0
/
handler.go
156 lines (119 loc) · 3.18 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Package handler defined Handler type which implements http.Handler interface.
// Handler expects to receive POST requests only.
// Request body should contain list of URL, each URL on separate line.
// Once POST request is received, Handler reads its content, splits it into lines, and fetches URLs.
// Response consists of fetched documents' lengths, separated by new line. Result set is not guaranteed to be sorted.
// All errors (non 2XX response codes, timeouts, etc) are logged.
// While creating Handler, additional options can be provided to change its default behaviour.
// See: WithClient, WithLogger.
package handler
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"sync"
)
const defaultMaxIncomingRequests = 100
var defaultLogger = log.Default()
var defaultClient = http.DefaultClient
// semaphore is used to limit number
// of concurrent incoming requests.
type semaphore struct {
ch chan struct{}
}
// newSemaphore creates new semaphore.
func newSemaphore(cap int) *semaphore {
return &semaphore{
ch: make(chan struct{}, cap),
}
}
// semaphore tries to increase semaphore counter
// and returns true on success, and false otherwise.
func (s *semaphore) acquire() bool {
select {
case s.ch <- struct{}{}:
return true
default:
return false
}
}
// release decreases semaphore counter.
func (s *semaphore) release() {
<-s.ch
}
type Handler struct {
sem *semaphore
logger *log.Logger
client *http.Client
maxRequests int
}
// NewHandler created Handler and applies provided options.
func NewHandler(opts ...Option) *Handler {
h := &Handler{}
for _, opt := range opts {
opt.apply(h)
}
if h.maxRequests == 0 {
h.maxRequests = defaultMaxIncomingRequests
}
if h.client == nil {
h.client = defaultClient
}
if h.logger == nil {
h.logger = defaultLogger
}
h.sem = newSemaphore(h.maxRequests)
return h
}
func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
if request.Method != "POST" {
http.Error(writer, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}
if !h.sem.acquire() {
http.Error(writer, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable)
return
}
defer h.sem.release()
data, err := ioutil.ReadAll(request.Body)
if err != nil {
http.Error(writer, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
urls := strings.Split(string(data), "\n")
writer.Header().Add("Content-Type", "text/plain")
for size := range h.fetch(urls) {
fmt.Fprintln(writer, size)
}
}
// fetch concurrently fetches provided URLs.
// It returns channel received documents' lengths is sent to.
// After all documents are fetched, then channel is cloed.
func (h *Handler) fetch(urls []string) <-chan int {
ch := make(chan int)
go func() {
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
resp, err := h.client.Get(url)
if err != nil {
h.logger.Println(err)
return
}
content, err := ioutil.ReadAll(resp.Body)
if err != nil {
h.logger.Println(err)
return
}
ch <- len(content)
}(url)
}
wg.Wait()
close(ch)
}()
return ch
}