diff --git a/sdjournal/journal_test.go b/sdjournal/journal_test.go index 925dd6c4..d559f91f 100755 --- a/sdjournal/journal_test.go +++ b/sdjournal/journal_test.go @@ -17,6 +17,7 @@ package sdjournal import ( "bytes" + "context" "errors" "fmt" "io" @@ -84,6 +85,63 @@ func TestJournalFollow(t *testing.T) { } } +func TestJournalFollowTail(t *testing.T) { + r, err := NewJournalReader(JournalReaderConfig{ + Since: time.Duration(-15) * time.Second, + Matches: []Match{ + { + Field: SD_JOURNAL_FIELD_SYSTEMD_UNIT, + Value: "NetworkManager.service", + }, + }, + }) + + if err != nil { + t.Fatalf("Error opening journal: %s", err) + } + + if r == nil { + t.Fatal("Got a nil reader") + } + + defer r.Close() + + // start writing some test entries + done := make(chan struct{}, 1) + errCh := make(chan error, 1) + defer close(done) + go func() { + for { + select { + case <-done: + return + default: + if perr := journal.Print(journal.PriInfo, "test message %s", time.Now()); err != nil { + errCh <- perr + return + } + + time.Sleep(time.Second) + } + } + }() + + // and follow the reader synchronously + entries := make(chan *JournalEntry) + timeout := time.Duration(5) * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if err = r.FollowTail(entries, ctx); err != nil { + t.Fatalf("Error during follow: %s", err) + } + + select { + case err := <-errCh: + t.Fatalf("Error writing to journal: %s", err) + default: + } +} + func TestJournalWait(t *testing.T) { id := time.Now().String() j, err := NewJournal() diff --git a/sdjournal/read.go b/sdjournal/read.go index 51a060fb..fc7e4a3f 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -16,10 +16,12 @@ package sdjournal import ( + "context" "errors" "fmt" "io" "log" + "strconv" "strings" "sync" "time" @@ -256,6 +258,59 @@ process: } } +// SkipN skips the next n entries and returns the number of skipped entries and an eventual error. +func (r *JournalReader) SkipN(n int) (int, error) { + if n < 0 { + return -1, errors.New("can not skip by negative number " + strconv.Itoa(n)) + } + var i int + for i < n { + c, err := r.journal.Next() + if err != nil { + return i, err + } else if c == 0 { + return i, nil + } + i += 1 + } + return i, nil +} + +// FollowTail synchronously follows the JournalReader, writing each new journal entry to entries. +// It will start from the next unread entry. +func (r *JournalReader) FollowTail(entries chan *JournalEntry, ctx context.Context) error { + defer close(entries) + + for { + select { + case <-ctx.Done(): + fmt.Println("Context done, exit FollowTail") + return nil + default: + } + + status := r.journal.Wait(200 * time.Millisecond) + if status != SD_JOURNAL_APPEND && status != SD_JOURNAL_INVALIDATE { + continue + } + + for { + if c, err := r.journal.Next(); err != nil { + return err + } else if c == 0 { + // EOF, should mean we're at the tail + break + } + + if entry, err := r.journal.GetEntry(); err != nil { + return err + } else { + entries <- entry + } + } + } +} + // simpleMessageFormatter is the default formatter. // It returns a string representing the current journal entry in a simple format which // includes the entry timestamp and MESSAGE field.