forked from skeema/tengo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
docker.go
390 lines (361 loc) · 12.6 KB
/
docker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
package tengo
import (
"bytes"
"errors"
"fmt"
"io"
"log"
"os"
"strconv"
"strings"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/go-sql-driver/mysql"
)
// DockerClientOptions specifies options when instantiating a Docker client.
// No options are currently supported, but this may change in the future.
type DockerClientOptions struct{}
// DockerClient manages lifecycle of local Docker containers for sandbox
// database instances. It wraps and hides the implementation of a specific
// Docker client implementation. (This package currently uses
// github.com/fsouza/go-dockerclient, but may later switch to the official
// Docker Golang client.)
type DockerClient struct {
client *docker.Client
Options DockerClientOptions
}
// NewDockerClient is a constructor for DockerClient
func NewDockerClient(opts DockerClientOptions) (*DockerClient, error) {
var dc *DockerClient
client, err := docker.NewClientFromEnv()
if err == nil {
dc = &DockerClient{
client: client,
Options: opts,
}
}
return dc, err
}
// DockerizedInstanceOptions specifies options for creating or finding a
// sandboxed database instance inside a Docker container.
type DockerizedInstanceOptions struct {
Name string
Image string
RootPassword string
DefaultConnParams string
}
// CreateInstance attempts to create a Docker container with the supplied name
// (any arbitrary name, or blank to assign random) and image (such as
// "mysql:5.6", or just "mysql" to indicate latest). A connection pool will be
// established for the instance.
func (dc *DockerClient) CreateInstance(opts DockerizedInstanceOptions) (*DockerizedInstance, error) {
if opts.Image == "" {
return nil, errors.New("CreateInstance: image cannot be empty string")
}
tokens := strings.SplitN(opts.Image, ":", 2)
repository := tokens[0]
tag := "latest"
if len(tokens) > 1 {
tag = tokens[1]
}
// Pull image from remote if missing
if _, err := dc.client.InspectImage(opts.Image); err != nil {
opts := docker.PullImageOptions{
Repository: repository,
Tag: tag,
}
if err := dc.client.PullImage(opts, docker.AuthConfiguration{}); err != nil {
return nil, err
}
}
// Create and start container
var env []string
if opts.RootPassword == "" {
env = append(env, "MYSQL_ALLOW_EMPTY_PASSWORD=1")
} else {
env = append(env, fmt.Sprintf("MYSQL_ROOT_PASSWORD=%s", opts.RootPassword))
}
ccopts := docker.CreateContainerOptions{
Name: opts.Name,
Config: &docker.Config{
Image: opts.Image,
Env: env,
},
HostConfig: &docker.HostConfig{
PortBindings: map[docker.Port][]docker.PortBinding{
"3306/tcp": {
{HostIP: "127.0.0.1"},
},
},
},
}
di := &DockerizedInstance{
DockerizedInstanceOptions: opts,
Manager: dc,
}
var err error
if di.container, err = dc.client.CreateContainer(ccopts); err != nil {
return nil, err
} else if err = di.Start(); err != nil {
return di, err
}
// Confirm containerized database is reachable, and create Tengo instance
if err := di.TryConnect(); err != nil {
return di, err
}
return di, nil
}
// GetInstance attempts to find an existing container with name equal to
// opts.Name. If the container is found, it will be started if not already
// running, and a connection pool will be established. If the container does
// not exist or cannot be started or connected to, a nil *DockerizedInstance
// and a non-nil error will be returned.
// If a non-blank opts.Image is supplied, and the existing container has a
// a different image, the instance's flavor will be examined as a fallback. If
// it also does not match the requested image, an error will be returned.
func (dc *DockerClient) GetInstance(opts DockerizedInstanceOptions) (*DockerizedInstance, error) {
var err error
di := &DockerizedInstance{
Manager: dc,
DockerizedInstanceOptions: opts,
}
if di.container, err = dc.client.InspectContainer(opts.Name); err != nil {
return nil, err
}
actualImage := di.container.Image
if strings.HasPrefix(actualImage, "sha256:") {
if imageInfo, err := dc.client.InspectImage(actualImage[7:]); err == nil {
for _, rt := range imageInfo.RepoTags {
if rt == opts.Image || opts.Image == "" {
actualImage = rt
break
}
}
}
}
if opts.Image == "" {
di.Image = actualImage
}
if err = di.Start(); err != nil {
return nil, err
}
if err = di.TryConnect(); err != nil {
return nil, err
}
// The actual image may not match the requested one if, for example, the tag
// for version a.b previously pointed to a.b.c but now points to a.b.d. We
// check the instance's flavor as a fallback.
if opts.Image != "" && opts.Image != actualImage && opts.Image != di.Flavor().String() && opts.Image != di.Flavor().Family().String() {
return nil, fmt.Errorf("Container %s based on unexpected image: expected %s, found %s", opts.Name, opts.Image, actualImage)
}
return di, nil
}
// GetOrCreateInstance attempts to fetch an existing Docker container with name
// equal to opts.Name. If it exists and its image (or flavor) matches
// opts.Image, and there are no errors starting or connecting to the instance,
// it will be returned. If it exists but its image/flavor don't match, or it
// cannot be started or connected to, an error will be returned. If no container
// exists with this name, a new one will attempt to be created.
func (dc *DockerClient) GetOrCreateInstance(opts DockerizedInstanceOptions) (*DockerizedInstance, error) {
di, err := dc.GetInstance(opts)
if err == nil {
return di, nil
} else if _, ok := err.(*docker.NoSuchContainer); ok {
return dc.CreateInstance(opts)
}
return nil, err
}
// DockerizedInstance is a database instance running in a local Docker
// container.
type DockerizedInstance struct {
*Instance
DockerizedInstanceOptions
Manager *DockerClient
container *docker.Container
}
// Start starts the corresponding containerized mysql-server. If it is not
// already running, an error will be returned if it cannot be started. If it is
// already running, nil will be returned.
func (di *DockerizedInstance) Start() error {
err := di.Manager.client.StartContainer(di.container.ID, nil)
if _, ok := err.(*docker.ContainerAlreadyRunning); err == nil || ok {
di.container, err = di.Manager.client.InspectContainer(di.container.ID)
// In some cases it appears StartContainer returns prior to the port mapping
// being in place. Retry the inspection up to 5 more times if so.
for n := 1; err == nil && di.Port() == 0; n++ {
if n >= 6 {
return fmt.Errorf("Unable to find port mapping for container %s", di.Name)
}
time.Sleep(time.Duration(n) * time.Millisecond)
di.container, err = di.Manager.client.InspectContainer(di.container.ID)
}
}
return err
}
// Stop halts the corresponding containerized mysql-server, but does not
// destroy the container. The connection pool will be removed. If the container
// was not already running, nil will be returned.
func (di *DockerizedInstance) Stop() error {
di.CloseAll()
err := di.Manager.client.StopContainer(di.container.ID, 10)
if _, ok := err.(*docker.ContainerNotRunning); !ok && err != nil {
return err
}
return nil
}
// Destroy stops and deletes the corresponding containerized mysql-server.
func (di *DockerizedInstance) Destroy() error {
di.CloseAll()
rcopts := docker.RemoveContainerOptions{
ID: di.container.ID,
Force: true,
RemoveVolumes: true,
}
err := di.Manager.client.RemoveContainer(rcopts)
if _, ok := err.(*docker.NoSuchContainer); ok {
err = nil
}
return err
}
// TryConnect sets up a connection pool to the containerized mysql-server,
// and tests connectivity. It returns an error if a connection cannot be
// established within 30 seconds.
func (di *DockerizedInstance) TryConnect() (err error) {
var ok bool
di.Instance, err = NewInstance("mysql", di.DSN())
if err != nil {
return err
}
for attempts := 0; attempts < 120; attempts++ {
if ok, err = di.Instance.CanConnect(); ok {
return err
}
time.Sleep(250 * time.Millisecond)
}
return err
}
// Port returns the actual port number on localhost that maps to the container's
// internal port 3306.
func (di *DockerizedInstance) Port() int {
portAndProto := docker.Port("3306/tcp")
portBindings, ok := di.container.NetworkSettings.Ports[portAndProto]
if !ok || len(portBindings) == 0 {
return 0
}
result, _ := strconv.Atoi(portBindings[0].HostPort)
return result
}
// DSN returns a github.com/go-sql-driver/mysql formatted DSN corresponding
// to its containerized mysql-server instance.
func (di *DockerizedInstance) DSN() string {
var pass string
if di.RootPassword != "" {
pass = fmt.Sprintf(":%s", di.RootPassword)
}
return fmt.Sprintf("root%s@tcp(127.0.0.1:%d)/?%s", pass, di.Port(), di.DefaultConnParams)
}
func (di *DockerizedInstance) String() string {
return fmt.Sprintf("DockerizedInstance:%d", di.Port())
}
// NukeData drops all non-system schemas and tables in the containerized
// mysql-server, making it useful as a per-test cleanup method in
// implementations of IntegrationTestSuite.BeforeTest.
func (di *DockerizedInstance) NukeData() error {
schemas, err := di.Instance.SchemaNames()
if err != nil {
return err
}
for _, schema := range schemas {
if err := di.Instance.DropSchema(schema, BulkDropOptions{SkipBinlog: true}); err != nil {
return err
}
}
return nil
}
// SourceSQL reads the specified file and executes it against the containerized
// mysql-server. The file should contain one or more valid SQL instructions,
// typically a mix of DML and/or DDL statements. It is useful as a per-test
// setup method in implementations of IntegrationTestSuite.BeforeTest.
func (di *DockerizedInstance) SourceSQL(filePath string) (string, error) {
f, err := os.Open(filePath)
if err != nil {
return "", fmt.Errorf("SourceSQL %s: Unable to open setup file %s: %s", di, filePath, err)
}
defer f.Close()
stdoutStr, err := di.Source(f)
if err != nil {
return stdoutStr, fmt.Errorf("SourceSQL %s: Error sourcing file %s: %v", di, filePath, err)
}
return stdoutStr, nil
}
// SourceString reads the specified string and executes it against the containerized
// mysql-server. The string should contain one or more valid SQL instructions,
// typically a mix of DML and/or DDL statements. It is useful as a per-test
// setup method in implementations of IntegrationTestSuite.BeforeTest.
func (di *DockerizedInstance) SourceString(str string) (string, error) {
stdoutStr, err := di.Source(strings.NewReader(str))
if err != nil {
return stdoutStr, fmt.Errorf("SourceString %s: Error sourcing string %s: %v", di, str, err)
}
return stdoutStr, nil
}
// Source reads from the io.Reader and executes it against the containerized
// mysql-server. The io.Reader should contain one or more valid SQL instructions,
// typically a mix of DML and/or DDL statements. It is useful as a per-test
// setup method in implementations of IntegrationTestSuite.BeforeTest.
func (di *DockerizedInstance) Source(reader io.Reader) (string, error) {
cmd := []string{"mysql", "-tvvv", "-u", "root"}
if di.RootPassword != "" {
cmd = append(cmd, fmt.Sprintf("-p%s", di.RootPassword))
}
ceopts := docker.CreateExecOptions{
AttachStdout: true,
AttachStderr: true,
AttachStdin: true,
Cmd: cmd,
Container: di.container.ID,
}
exec, err := di.Manager.client.CreateExec(ceopts)
if err != nil {
return "", err
}
var stdout, stderr bytes.Buffer
seopts := docker.StartExecOptions{
OutputStream: &stdout,
ErrorStream: &stderr,
InputStream: reader,
}
if err = di.Manager.client.StartExec(exec.ID, seopts); err != nil {
return "", err
}
stdoutStr := stdout.String()
stderrStr := strings.Replace(stderr.String(), "Warning: Using a password on the command line interface can be insecure.\n", "", 1)
if strings.Contains(stderrStr, "ERROR") {
return stdoutStr, errors.New(stderrStr)
}
return stdoutStr, nil
}
type filteredLogger struct {
logger *log.Logger
}
func (fl filteredLogger) Print(v ...interface{}) {
for _, arg := range v {
if err, ok := arg.(error); ok {
if msg := err.Error(); strings.Contains(msg, "EOF") || strings.Contains(msg, "unexpected read") {
return
}
}
}
fl.logger.Print(v...)
}
// UseFilteredDriverLogger overrides the mysql driver's logger to avoid excessive
// messages. This suppresses the driver's "unexpected EOF" output, which occurs
// when an initial connection is refused or a connection drops early. This
// excessive logging can occur whenever DockerClient.CreateInstance() or
// DockerClient.GetInstance() is waiting for the instance to finish starting.
func UseFilteredDriverLogger() {
fl := filteredLogger{
logger: log.New(os.Stderr, "[mysql] ", log.Ldate|log.Ltime|log.Lshortfile),
}
mysql.SetLogger(fl)
}