Skip to content

Commit

Permalink
feat(logfwd): log forwarding implementation II (#256)
Browse files Browse the repository at this point in the history
This PR contains the second part of #165, including the actual mechanics of
log forwarding in Pebble. It builds upon #209 and #252. This includes an
abstract `logClient` interface but doesn't include the specific Loki/syslog
implementations - this will come in later PRs.

*This is a modification of #218, designed to fix fundamental issues identified
 with that PR.*

### Current design

For each log target, there is a single "gatherer", which receives logs from a
bunch of services and writes them to a "client". The gatherer runs a control
loop in a separate goroutine, which waits to receive logs on a channel, and
writes these to the client.

For each service, the gatherer spawns a "log puller". Each puller runs in a
separate goroutine, pulling logs from an iterator on the service's
ringbuffer. Pulled logs are sent to the gatherer's control loop on the shared
channel.

The `logClient` interface represents a client to a specific type of backend.
In future PRs, we will add `lokiClient` and `syslogClient` types which
implement `logClient`.

`logClient` includes two methods

```go
type logClient interface {
	Write(context.Context, servicelog.Entry) error
	Flush(context.Context) error
}
```

Client implementations have some freedom about the semantics of these
methods.

- For a buffering client (e.g. HTTP), `Write` could add the log to the
  client's internal buffer, while `Flush` would prepare and send an HTTP
  request with the buffered logs.
- For a non-buffering client (TCP?), `Write` could serialise the log directly
  to the open connection, while `Flush` would be a no-op.

### Teardown

Gracefully stopping a log gatherer is a complex process with multiple steps.

- At the instant we attempt to stop the gatherer, it may be in the middle of
  flushing its client. So, wait a small amount of time for the client to
  finish flushing, but cancel the flush if this takes too long.
- The service may have emitted some final logs on shutdown. Give each puller
  some time to pull the final logs from its iterator - but again, force kill
  it if this is taking too long.
- Once the pullers are all finished, we must have received all the logs we're
  gonna get, so we can safely shut down the main loop.
- Do a final flush of the client to send off any remaining buffered logs.

All this logic is encapsulated in the `gatherer.stop()` method.

## QA

I've included some sample implementations of `logClient` [here]
(https://github.com/barrettj12/pebble/blob/logfwd-fake/internals/overlord/logstate/fake.go).
They just print the logs to stdout. These can be used to verify that the log
forwarding mechanics work properly.

Create a simple logging service, e.g.

```bash
#!/bin/bash
while true; do
  echo "Hello"
  sleep 1
done
```

and a simple plan using this service

```yaml
services:
  svc1: &logsvc
    command: /home/jb/git/canonical/pebble/logfwd-impl2/pebble/logsvc
    startup: enabled
    override: merge
  svc2: *logsvc

log-targets:
  tgt1:
    override: merge
    services: [all]
    type: loki
    location: unnecessary
```

Add the [`fake.go`]
(https://github.com/barrettj12/pebble/blob/logfwd-fake/internals/overlord/logstate/fake.go)
file to the repo.

Comment out the following line
https://github.com/canonical/pebble/blob/3e904f9d22f297b68cba2dc33c9cf8e1bbbadd90/internals/overlord/logstate/gatherer.go#L356
and replace it with e.g.

```go
return &nonBufferingClient{}, nil          // unbuffered
return &bufferingClient{}, nil             // unlimited-size buffer, will flush on timeout only
return &bufferingClient{threshold: 3}, nil // buffer with max size: 3 logs
```

You might also want to change the gatherer's tick period:
https://github.com/canonical/pebble/blob/3e904f9d22f297b68cba2dc33c9cf8e1bbbadd90/internals/overlord/logstate/gatherer.go#L32

Run Pebble with

```
go run ./cmd/pebble run
```

and verify the logs are printing to stdout.

---

JUJU-3776
  • Loading branch information
barrettj12 authored Aug 22, 2023
1 parent fd34da1 commit 315ef31
Show file tree
Hide file tree
Showing 12 changed files with 1,046 additions and 27 deletions.
319 changes: 319 additions & 0 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package logstate

import (
"context"
"fmt"
"time"

"gopkg.in/tomb.v2"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)

const (
parserSize = 4 * 1024
bufferTimeout = 1 * time.Second

// These constants control the maximum time allowed for each teardown step.
timeoutCurrentFlush = 1 * time.Second
timeoutPullers = 2 * time.Second
timeoutMainLoop = 3 * time.Second
// timeoutFinalFlush is measured from when the gatherer's main loop finishes,
// NOT from when Stop() is called like the other constants.
timeoutFinalFlush = 2 * time.Second
)

// logGatherer is responsible for collecting service logs from a bunch of
// services, and sending them to its logClient.
// One logGatherer will run per log target. Its loop() method should be run
// in its own goroutine.
// A logGatherer will spawn a separate logPuller for each service it collects
// logs from. Each logPuller will run in a separate goroutine, and send logs to
// the logGatherer via a shared channel.
// The logGatherer will "flush" the client:
// - after a timeout (1s) has passed since the first log was written;
// - when it is told to shut down.
//
// The client may also flush itself when its internal buffer reaches a certain
// size.
// Calling the Stop() method will tear down the logGatherer and all of its
// associated logPullers. Stop() can be called from an outside goroutine.
type logGatherer struct {
logGathererArgs

targetName string
// tomb for the main loop
tomb tomb.Tomb

client logClient
// Context to pass to client methods
clientCtx context.Context
// cancel func for clientCtx - can be used during teardown if required, to
// ensure the client is not blocking subsequent teardown steps.
clientCancel context.CancelFunc

pullers *pullerGroup
// All pullers send logs on this channel, received by main loop
entryCh chan servicelog.Entry
}

// logGathererArgs allows overriding the newLogClient method and time values
// in testing.
type logGathererArgs struct {
bufferTimeout time.Duration
timeoutFinalFlush time.Duration
// method to get a new client
newClient func(*plan.LogTarget) (logClient, error)
}

func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) {
return newLogGathererInternal(target, logGathererArgs{})
}

// newLogGathererInternal contains the actual creation code for a logGatherer.
// This function is used in the real implementation, but also allows overriding
// certain configuration values for testing.
func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logGatherer, error) {
args = fillDefaultArgs(args)
client, err := args.newClient(target)
if err != nil {
return nil, fmt.Errorf("cannot create log client: %w", err)
}

g := &logGatherer{
logGathererArgs: args,

targetName: target.Name,
client: client,
entryCh: make(chan servicelog.Entry),
pullers: newPullerGroup(target.Name),
}
g.clientCtx, g.clientCancel = context.WithCancel(context.Background())
g.tomb.Go(g.loop)
g.tomb.Go(g.pullers.tomb.Wait)

return g, nil
}

func fillDefaultArgs(args logGathererArgs) logGathererArgs {
if args.bufferTimeout == 0 {
args.bufferTimeout = bufferTimeout
}
if args.timeoutFinalFlush == 0 {
args.timeoutFinalFlush = timeoutFinalFlush
}
if args.newClient == nil {
args.newClient = newLogClient
}
return args
}

// PlanChanged is called by the LogManager when the plan is changed, if this
// gatherer's target exists in the new plan.
func (g *logGatherer) PlanChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) {
// Remove old pullers
for _, svcName := range g.pullers.Services() {
svc, svcExists := pl.Services[svcName]
if !svcExists {
g.pullers.Remove(svcName)
continue
}

tgt := pl.LogTargets[g.targetName]
if !svc.LogsTo(tgt) {
g.pullers.Remove(svcName)
}
}

// Add new pullers
for _, service := range pl.Services {
target := pl.LogTargets[g.targetName]
if !service.LogsTo(target) {
continue
}

buffer, bufferExists := buffers[service.Name]
if !bufferExists {
// We don't yet have a reference to the service's ring buffer
// Need to wait until ServiceStarted
continue
}

g.pullers.Add(service.Name, buffer, g.entryCh)
}
}

// ServiceStarted is called by the LogManager on the start of a service which
// logs to this gatherer's target.
func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.RingBuffer) {
g.pullers.Add(service.Name, buffer, g.entryCh)
}

// The main control loop for the logGatherer. loop receives logs from the
// pullers on entryCh, and writes them to the client. It also flushes the
// client periodically, and exits when the gatherer's tomb is killed.
func (g *logGatherer) loop() error {
timer := newTimer()
defer timer.Stop()

mainLoop:
for {
select {
case <-g.tomb.Dying():
break mainLoop

case <-timer.Expired():
// Mark timer as unset
timer.Stop()
err := g.client.Flush(g.clientCtx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}

case entry := <-g.entryCh:
err := g.client.Write(g.clientCtx, entry)
if err != nil {
logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err)
}
timer.EnsureSet(g.bufferTimeout)
}
}

// Final flush to send any remaining logs buffered in the client
// We need to create a new context, as the previous one may have been cancelled.
ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush)
defer cancel()
err := g.client.Flush(ctx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
return nil
}

// Stop tears down the gatherer and associated resources (pullers, client).
// This method will block until gatherer teardown is complete.
//
// The teardown process has several steps:
// - If the main loop is in the middle of a flush when we call Stop, this
// will block the pullers from sending logs to the gatherer. Hence, wait
// for the current flush to complete.
// - Wait for the pullers to pull the final logs from the iterator.
// - Kill the main loop.
// - Flush out any final logs buffered in the client.
func (g *logGatherer) Stop() {
// Wait up to timeoutCurrentFlush for the current flush to complete (if any)
time.AfterFunc(timeoutCurrentFlush, g.clientCancel)

// Wait up to timeoutPullers for the pullers to pull the final logs from the
// iterator and send to the main loop.
time.AfterFunc(timeoutPullers, func() {
logger.Debugf("gatherer %q: force killing log pullers", g.targetName)
g.pullers.KillAll()
})

// Kill the main loop once either:
// - all the pullers are done
// - timeoutMainLoop has passed
g.pullers.tomb.Kill(nil)
select {
case <-g.pullers.Done():
logger.Debugf("gatherer %q: pullers have finished", g.targetName)
case <-time.After(timeoutMainLoop):
logger.Debugf("gatherer %q: force killing main loop", g.targetName)
}

g.tomb.Kill(nil)
// Wait for final flush in the main loop
err := g.tomb.Wait()
if err != nil {
logger.Noticef("Cannot shut down gatherer: %v", err)
}
}

// timer wraps time.Timer and provides a better API.
type timer struct {
timer *time.Timer
set bool
}

func newTimer() timer {
t := timer{
timer: time.NewTimer(1 * time.Hour),
}
t.Stop()
return t
}

func (t *timer) Expired() <-chan time.Time {
return t.timer.C
}

func (t *timer) Stop() {
t.timer.Stop()
t.set = false
// Drain timer channel
select {
case <-t.timer.C:
default:
}
}

func (t *timer) EnsureSet(timeout time.Duration) {
if t.set {
return
}

t.timer.Reset(timeout)
t.set = true
}

// logClient handles requests to a specific type of log target. It encodes
// log messages in the required format, and sends the messages using the
// protocol required by that log target.
// For example, a logClient for Loki would encode the log messages in the
// JSON format expected by Loki, and send them over HTTP(S).
//
// logClient implementations have some freedom about the semantics of these
// methods. For a buffering client (e.g. HTTP):
// - Write could add the log to the client's internal buffer, calling Flush
// when this buffer reaches capacity.
// - Flush would prepare and send a request with the buffered logs.
//
// For a non-buffering client (e.g. TCP), Write could serialise the log
// directly to the open connection, while Flush would be a no-op.
type logClient interface {
// Write adds the given log entry to the client. Depending on the
// implementation of the client, this may send the log to the remote target,
// or simply add the log to an internal buffer, flushing that buffer when
// required.
Write(context.Context, servicelog.Entry) error

// Flush sends buffered logs (if any) to the remote target. For clients which
// don't buffer logs, Flush should be a no-op.
Flush(context.Context) error
}

func newLogClient(target *plan.LogTarget) (logClient, error) {
switch target.Type {
//case plan.LokiTarget: TODO
//case plan.SyslogTarget: TODO
default:
return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
}
}
Loading

0 comments on commit 315ef31

Please sign in to comment.