-
Notifications
You must be signed in to change notification settings - Fork 35
/
service.go
131 lines (122 loc) · 3.41 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package http
import (
"fmt"
"github.com/viant/endly"
"github.com/viant/endly/model/location"
"strconv"
)
const (
//ServiceID represents hTTPEndpoint service id.
ServiceID = "http/endpoint"
)
// service represents http endpoint service, that has ability to replay HTTP trips
type service struct {
*endly.AbstractService
servers map[int]*Server
}
func (s *service) shutdown(context *endly.Context, req *ShutdownRequest) (interface{}, error) {
s.Mutex().Lock()
defer s.Mutex().Unlock()
server, ok := s.servers[req.Port]
if !ok {
return nil, fmt.Errorf("ednpoint at %v, not found", req.Port)
}
err := server.Shutdown(context.Background())
return &struct{}{}, err
}
func (s *service) listen(context *endly.Context, request *ListenRequest) (*ListenResponse, error) {
state := context.State()
if request.BaseDirectory != "" {
request.BaseDirectory = location.NewResource(state.ExpandAsText(request.BaseDirectory)).Path()
}
key := ServiceID + ":" + strconv.Itoa(request.Port)
s.Mutex().Lock()
defer s.Mutex().Unlock()
var response *ListenResponse
var serviceState = s.State()
value := serviceState.Get(key)
if value != nil {
if response = value.(*ListenResponse); response != nil {
return response, nil
}
}
trips := request.AsHTTPServerTrips()
server, err := StartServer(request.Port, trips, request.RequestTemplate, request.ResponseTemplate)
if err != nil {
return nil, err
}
s.servers[request.Port] = server
response = &ListenResponse{
Trips: trips.Trips,
}
serviceState.Put(key, response)
return response, nil
}
func (s *service) registerRoutes() {
s.Register(&endly.Route{
Action: "listen",
RequestInfo: &endly.ActionInfo{
Description: "start HTTP endpoint",
},
RequestProvider: func() interface{} {
return &ListenRequest{}
},
ResponseProvider: func() interface{} {
return &ListenResponse{}
},
Handler: func(context *endly.Context, request interface{}) (interface{}, error) {
if req, ok := request.(*ListenRequest); ok {
return s.listen(context, req)
}
return nil, fmt.Errorf("unsupported request type: %T", request)
},
},
&endly.Route{
Action: "append",
RequestInfo: &endly.ActionInfo{
Description: "append http trips",
},
RequestProvider: func() interface{} {
return &AppendRequest{}
},
ResponseProvider: func() interface{} {
return &AppendResponse{}
},
Handler: func(context *endly.Context, request interface{}) (interface{}, error) {
if req, ok := request.(*AppendRequest); ok {
return s.append(context, req)
}
return nil, fmt.Errorf("unsupported request type: %T", request)
},
},
&endly.Route{
Action: "shutdown",
RequestInfo: &endly.ActionInfo{
Description: "stop HTTP endpoint",
},
RequestProvider: func() interface{} {
return &ShutdownRequest{}
},
ResponseProvider: func() interface{} {
return &struct{}{}
},
Handler: func(context *endly.Context, request interface{}) (interface{}, error) {
if req, ok := request.(*ShutdownRequest); ok {
return s.shutdown(context, req)
}
return nil, fmt.Errorf("unsupported request type: %T", request)
},
})
}
// New creates
//
// a new HTTP endpoint service, to replay previously recorded HTTP trips.
func New() endly.Service {
var result = &service{
servers: make(map[int]*Server),
AbstractService: endly.NewAbstractService(ServiceID),
}
result.AbstractService.Service = result
result.registerRoutes()
return result
}