From d644ef3dcd4b13f5383e24adf10670d5aff97a29 Mon Sep 17 00:00:00 2001 From: Marcel Herm Date: Fri, 17 Dec 2021 10:46:39 +0100 Subject: [PATCH 1/5] Add new follow method to follow entries from next to tail --- sdjournal/read.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sdjournal/read.go b/sdjournal/read.go index 51a060fb..23b0be5b 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -256,6 +256,39 @@ process: } } +// FollowTail synchronously follows the JournalReader, writing each new journal entry to writer. The +// follow will continue until a single time.Time is received on the until channel. +func (r *JournalReader) FollowTail(entries chan *JournalEntry) error { + defer close(entries) + + // skip first entry which has already been read + if _, err := r.journal.Next(); err != nil { + return err + } + + for { + status := r.journal.Wait(200 * time.Millisecond) + if status != SD_JOURNAL_APPEND && status != SD_JOURNAL_INVALIDATE { + continue + } + + for { + if c, err := r.journal.Next(); err != nil { + return err + } else if c == 0 { + // EOF, should mean we're at the tail + break + } + + if entry, err := r.journal.GetEntry(); err != nil { + return err + } else { + entries <- entry + } + } + } +} + // simpleMessageFormatter is the default formatter. // It returns a string representing the current journal entry in a simple format which // includes the entry timestamp and MESSAGE field. From 92cdb09e700bcc2473b596777441cd47e61088f1 Mon Sep 17 00:00:00 2001 From: Marcel Herm Date: Fri, 17 Dec 2021 12:42:55 +0100 Subject: [PATCH 2/5] Update method description --- sdjournal/read.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdjournal/read.go b/sdjournal/read.go index 23b0be5b..2c21ade5 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -256,8 +256,8 @@ process: } } -// FollowTail synchronously follows the JournalReader, writing each new journal entry to writer. The -// follow will continue until a single time.Time is received on the until channel. +// FollowTail synchronously follows the JournalReader, writing each new journal entry to entries. +// It will start from the next unread entry. func (r *JournalReader) FollowTail(entries chan *JournalEntry) error { defer close(entries) From 9ceb5b8f710169bfb6444fc5627cf38237de476a Mon Sep 17 00:00:00 2001 From: Marcel Herm Date: Fri, 17 Dec 2021 12:52:22 +0100 Subject: [PATCH 3/5] Add SkipN method to allow follow-decoupled entry skipping --- sdjournal/read.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/sdjournal/read.go b/sdjournal/read.go index 2c21ade5..2012d46c 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -256,16 +256,25 @@ process: } } +// SkipN skips the next n entries and returns the number of skipped entries and an eventual error. +func (r *JournalReader) SkipN(n int) (int, error) { + var i int + for i := 1; i <= n; i++ { + c, err := r.journal.Next() + if err != nil { + return i, err + } else if c == 0 { + return i, nil + } + } + return i, nil +} + // FollowTail synchronously follows the JournalReader, writing each new journal entry to entries. // It will start from the next unread entry. func (r *JournalReader) FollowTail(entries chan *JournalEntry) error { defer close(entries) - // skip first entry which has already been read - if _, err := r.journal.Next(); err != nil { - return err - } - for { status := r.journal.Wait(200 * time.Millisecond) if status != SD_JOURNAL_APPEND && status != SD_JOURNAL_INVALIDATE { From 325976a1d44e7ec53cff9e4d7488ba178c0250c1 Mon Sep 17 00:00:00 2001 From: Marcel Herm Date: Thu, 31 Mar 2022 16:33:16 +0200 Subject: [PATCH 4/5] Improve error handling, refactor loop, add context to FollowTail --- sdjournal/read.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/sdjournal/read.go b/sdjournal/read.go index 2012d46c..fc7e4a3f 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -16,10 +16,12 @@ package sdjournal import ( + "context" "errors" "fmt" "io" "log" + "strconv" "strings" "sync" "time" @@ -258,24 +260,35 @@ process: // SkipN skips the next n entries and returns the number of skipped entries and an eventual error. func (r *JournalReader) SkipN(n int) (int, error) { + if n < 0 { + return -1, errors.New("can not skip by negative number " + strconv.Itoa(n)) + } var i int - for i := 1; i <= n; i++ { + for i < n { c, err := r.journal.Next() if err != nil { return i, err } else if c == 0 { return i, nil } + i += 1 } return i, nil } // FollowTail synchronously follows the JournalReader, writing each new journal entry to entries. // It will start from the next unread entry. -func (r *JournalReader) FollowTail(entries chan *JournalEntry) error { +func (r *JournalReader) FollowTail(entries chan *JournalEntry, ctx context.Context) error { defer close(entries) for { + select { + case <-ctx.Done(): + fmt.Println("Context done, exit FollowTail") + return nil + default: + } + status := r.journal.Wait(200 * time.Millisecond) if status != SD_JOURNAL_APPEND && status != SD_JOURNAL_INVALIDATE { continue From 7c68517deb08b6b2d13fbd2fa7d549ceab78cca0 Mon Sep 17 00:00:00 2001 From: Marcel Herm Date: Fri, 13 May 2022 15:11:22 +0200 Subject: [PATCH 5/5] Add basic test case for FollowTail --- sdjournal/journal_test.go | 58 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/sdjournal/journal_test.go b/sdjournal/journal_test.go index 925dd6c4..d559f91f 100755 --- a/sdjournal/journal_test.go +++ b/sdjournal/journal_test.go @@ -17,6 +17,7 @@ package sdjournal import ( "bytes" + "context" "errors" "fmt" "io" @@ -84,6 +85,63 @@ func TestJournalFollow(t *testing.T) { } } +func TestJournalFollowTail(t *testing.T) { + r, err := NewJournalReader(JournalReaderConfig{ + Since: time.Duration(-15) * time.Second, + Matches: []Match{ + { + Field: SD_JOURNAL_FIELD_SYSTEMD_UNIT, + Value: "NetworkManager.service", + }, + }, + }) + + if err != nil { + t.Fatalf("Error opening journal: %s", err) + } + + if r == nil { + t.Fatal("Got a nil reader") + } + + defer r.Close() + + // start writing some test entries + done := make(chan struct{}, 1) + errCh := make(chan error, 1) + defer close(done) + go func() { + for { + select { + case <-done: + return + default: + if perr := journal.Print(journal.PriInfo, "test message %s", time.Now()); err != nil { + errCh <- perr + return + } + + time.Sleep(time.Second) + } + } + }() + + // and follow the reader synchronously + entries := make(chan *JournalEntry) + timeout := time.Duration(5) * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if err = r.FollowTail(entries, ctx); err != nil { + t.Fatalf("Error during follow: %s", err) + } + + select { + case err := <-errCh: + t.Fatalf("Error writing to journal: %s", err) + default: + } +} + func TestJournalWait(t *testing.T) { id := time.Now().String() j, err := NewJournal()