From bfb9348cf9e621bc1a6654d245199d3da296a181 Mon Sep 17 00:00:00 2001 From: Shailesh Ahuja Date: Thu, 22 Aug 2024 20:37:55 +0800 Subject: [PATCH 1/3] Add UUID to queue --- autopaho/auto.go | 5 ++-- autopaho/queue/file/queue.go | 39 +++++++++++++++++++---------- autopaho/queue/file/queue_test.go | 14 ++++++++--- autopaho/queue/memory/queue.go | 16 +++++++----- autopaho/queue/memory/queue_test.go | 24 +++++++++++++++--- autopaho/queue/queue.go | 12 +++++---- autopaho/queue_test.go | 4 +-- go.mod | 1 + go.sum | 2 ++ 9 files changed, 81 insertions(+), 36 deletions(-) diff --git a/autopaho/auto.go b/autopaho/auto.go index 7e57446..381a4db 100644 --- a/autopaho/auto.go +++ b/autopaho/auto.go @@ -471,7 +471,8 @@ func (c *ConnectionManager) PublishViaQueue(ctx context.Context, p *QueuePublish if _, err := p.Packet().WriteTo(&b); err != nil { return err } - return c.queue.Enqueue(&b) + _, err := c.queue.Enqueue(&b) + return err } // TerminateConnectionForTest closes the active connection (if any). This function is intended for testing only, it @@ -561,7 +562,7 @@ connectionLoop: time.Sleep(1 * time.Second) continue } - r, err := entry.Reader() + _, r, err := entry.Reader() if err != nil { c.errors.Printf("error retrieving reader for queue entry: %s", err) if err := entry.Leave(); err != nil { diff --git a/autopaho/queue/file/queue.go b/autopaho/queue/file/queue.go index 36a5254..b0b7422 100644 --- a/autopaho/queue/file/queue.go +++ b/autopaho/queue/file/queue.go @@ -20,10 +20,12 @@ import ( "io" "os" "path/filepath" + "strings" "sync" "time" "github.com/eclipse/paho.golang/autopaho/queue" + "github.com/google/uuid" ) // A queue implementation that stores all data on disk @@ -131,10 +133,10 @@ func (q *Queue) WaitForEmpty() chan struct{} { } // Enqueue add item to the queue. -func (q *Queue) Enqueue(p io.Reader) error { +func (q *Queue) Enqueue(p io.Reader) (uuid.UUID, error) { q.mu.Lock() defer q.mu.Unlock() - err := q.put(p) + id, err := q.put(p) if err == nil && q.queueEmpty { q.queueEmpty = false for _, c := range q.waiting { @@ -142,7 +144,7 @@ func (q *Queue) Enqueue(p io.Reader) error { } q.waiting = q.waiting[:0] } - return err + return id, err } // Peek retrieves the oldest item from the queue (without removing it) @@ -165,23 +167,24 @@ func (q *Queue) Peek() (queue.Entry, error) { } // put writes out an item to disk -func (q *Queue) put(p io.Reader) error { +func (q *Queue) put(p io.Reader) (uuid.UUID, error) { + id := uuid.New() // Use CreateTemp to generate a file with a unique name (it will be removed when packet has been transmitted) - f, err := os.CreateTemp(q.path, q.prefix+"*"+q.extension) + f, err := os.Create(filepath.Join(q.path, q.prefix+id.String()+q.extension)) if err != nil { - return err + return uuid.Nil, err } if _, err = io.Copy(f, p); err != nil { f.Close() _ = os.Remove(f.Name()) // Attempt to remove the partial file (not much we can do if this fails) - return err + return uuid.Nil, err } if err = f.Close(); err != nil { _ = os.Remove(f.Name()) // Attempt to remove the partial file (not much we can do if this fails) - return err + return uuid.Nil, err } - return nil + return id, nil } // get() returns a ReadCloser that accesses the oldest file available @@ -195,7 +198,16 @@ func (q *Queue) get() (entry, error) { if err != nil { return entry{}, err } - return entry{f: f}, nil + + // Extract UUID from filename + fileNameUUID := strings.TrimSuffix(strings.TrimPrefix(filepath.Base(fn), q.prefix), q.extension) + uuid, err := uuid.Parse(fileNameUUID) + if err != nil { + f.Close() + return entry{}, fmt.Errorf("failed to parse UUID from filename: %w", err) + } + + return entry{f: f, uuid: uuid}, nil } // oldestEntry returns the filename of the oldest entry in the queue (if any - io.EOF means none) @@ -238,12 +250,13 @@ func (q *Queue) oldestEntry() (string, error) { // entry is used to return a queue entry from Peek type entry struct { - f *os.File + f *os.File + uuid uuid.UUID } // Reader provides access to the file contents -func (e entry) Reader() (io.Reader, error) { - return e.f, nil +func (e entry) Reader() (uuid.UUID, io.Reader, error) { + return e.uuid, e.f, nil } // Leave closes the entry leaving it in the queue (will be returned on subsequent calls to Peek) diff --git a/autopaho/queue/file/queue_test.go b/autopaho/queue/file/queue_test.go index c296381..91480c2 100644 --- a/autopaho/queue/file/queue_test.go +++ b/autopaho/queue/file/queue_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/eclipse/paho.golang/autopaho/queue" + "github.com/google/uuid" ) // TestFileQueue some basic tests of the queue @@ -50,7 +51,8 @@ func TestFileQueue(t *testing.T) { default: } testEntry := []byte("This is a test") - if err := q.Enqueue(bytes.NewReader(testEntry)); err != nil { + _, err = q.Enqueue(bytes.NewReader(testEntry)) + if err != nil { t.Fatalf("error adding to queue: %s", err) } select { @@ -61,7 +63,7 @@ func TestFileQueue(t *testing.T) { const entryFormat = "Queue entry %d for testing" for i := 0; i < 10; i++ { - if err := q.Enqueue(bytes.NewReader([]byte(fmt.Sprintf(entryFormat, i)))); err != nil { + if _, err := q.Enqueue(bytes.NewReader([]byte(fmt.Sprintf(entryFormat, i)))); err != nil { t.Fatalf("error adding entry %d: %s", i, err) } time.Sleep(time.Nanosecond) // Short delay due to file system time resolution @@ -78,10 +80,13 @@ func TestFileQueue(t *testing.T) { if err != nil { t.Fatalf("error peeking entry %d: %s", i, err) } - r, err := entry.Reader() + id, r, err := entry.Reader() if err != nil { t.Fatalf("error getting reader for entry %d: %s", i, err) } + if id == uuid.Nil { + t.Fatalf("expected non-nil UUID for entry %d", i) + } buf := &bytes.Buffer{} if _, err = buf.ReadFrom(r); err != nil { t.Fatalf("error reading entry %d: %s", i, err) @@ -114,7 +119,8 @@ func TestLeaveAndError(t *testing.T) { } testEntry := []byte("This is a test") - if err := q.Enqueue(bytes.NewReader(testEntry)); err != nil { + _, err = q.Enqueue(bytes.NewReader(testEntry)) + if err != nil { t.Fatalf("error adding to queue: %s", err) } diff --git a/autopaho/queue/memory/queue.go b/autopaho/queue/memory/queue.go index 92f44a9..d477e7c 100644 --- a/autopaho/queue/memory/queue.go +++ b/autopaho/queue/memory/queue.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/eclipse/paho.golang/autopaho/queue" + "github.com/google/uuid" ) // A queue implementation that stores all data in RAM @@ -32,6 +33,7 @@ type Queue struct { messages [][]byte waiting []chan<- struct{} // closed when something arrives in the queue waitingForEmpty []chan<- struct{} // closed when queue is empty + uniqueIDs []uuid.UUID } // New creates a new memory-based queue @@ -68,20 +70,21 @@ func (q *Queue) WaitForEmpty() chan struct{} { } // Enqueue add item to the queue. -func (q *Queue) Enqueue(p io.Reader) error { +func (q *Queue) Enqueue(p io.Reader) (uuid.UUID, error) { var b bytes.Buffer _, err := b.ReadFrom(p) if err != nil { - return fmt.Errorf("Queue.Push failed to read into buffer: %w", err) + return uuid.Nil, fmt.Errorf("Queue.Push failed to read into buffer: %w", err) } q.mu.Lock() defer q.mu.Unlock() q.messages = append(q.messages, b.Bytes()) + q.uniqueIDs = append(q.uniqueIDs, uuid.New()) for _, c := range q.waiting { close(c) } q.waiting = q.waiting[:0] - return nil + return q.uniqueIDs[len(q.uniqueIDs)-1], nil } // Peek retrieves the oldest item from the queue (without removing it) @@ -97,13 +100,13 @@ func (q *Queue) Peek() (queue.Entry, error) { // Reader implements Entry.Reader - As the entry will always be the first item in the queue this is implemented // against Queue rather than as a separate struct. -func (q *Queue) Reader() (io.Reader, error) { +func (q *Queue) Reader() (uuid.UUID, io.Reader, error) { q.mu.Lock() defer q.mu.Unlock() if len(q.messages) == 0 { - return nil, queue.ErrEmpty + return uuid.Nil, nil, queue.ErrEmpty } - return bytes.NewReader(q.messages[0]), nil + return q.uniqueIDs[0], bytes.NewReader(q.messages[0]), nil } // Leave implements Entry.Leave - the entry (will be returned on subsequent calls to Peek) @@ -128,6 +131,7 @@ func (q *Queue) remove() error { initialLen := len(q.messages) if initialLen > 0 { q.messages = q.messages[1:] + q.uniqueIDs = q.uniqueIDs[1:] } if initialLen <= 1 { // Queue is now, or was already, empty for _, c := range q.waitingForEmpty { diff --git a/autopaho/queue/memory/queue_test.go b/autopaho/queue/memory/queue_test.go index 5394a8b..b87a82d 100644 --- a/autopaho/queue/memory/queue_test.go +++ b/autopaho/queue/memory/queue_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/eclipse/paho.golang/autopaho/queue" + "github.com/google/uuid" ) // TestMemoryQueue some basic tests of the queue @@ -45,9 +46,13 @@ func TestMemoryQueue(t *testing.T) { default: } testEntry := []byte("This is a test") - if err := q.Enqueue(bytes.NewReader(testEntry)); err != nil { + id, err := q.Enqueue(bytes.NewReader(testEntry)) + if err != nil { t.Fatalf("error adding to queue: %s", err) } + if id == uuid.Nil { + t.Fatalf("expected non-nil UUID, got nil") + } select { case <-queueNotEmpty: case <-time.After(time.Second): @@ -56,9 +61,13 @@ func TestMemoryQueue(t *testing.T) { const entryFormat = "Queue entry %d for testing" for i := 0; i < 10; i++ { - if err := q.Enqueue(bytes.NewReader([]byte(fmt.Sprintf(entryFormat, i)))); err != nil { + id, err := q.Enqueue(bytes.NewReader([]byte(fmt.Sprintf(entryFormat, i)))) + if err != nil { t.Fatalf("error adding entry %d: %s", i, err) } + if id == uuid.Nil { + t.Fatalf("expected non-nil UUID for entry %d, got nil", i) + } } // Remove the initial "This is a test" entry @@ -73,10 +82,13 @@ func TestMemoryQueue(t *testing.T) { if err != nil { t.Fatalf("error peeking entry %d: %s", i, err) } - r, err := entry.Reader() + id, r, err := entry.Reader() if err != nil { t.Fatalf("error getting reader for entry %d: %s", i, err) } + if id == uuid.Nil { + t.Fatalf("expected non-nil UUID for entry %d, got nil", i) + } buf := &bytes.Buffer{} if _, err = buf.ReadFrom(r); err != nil { t.Fatalf("error reading entry %d: %s", i, err) @@ -105,9 +117,13 @@ func TestLeaveAndQuarantine(t *testing.T) { } testEntry := []byte("This is a test") - if err := q.Enqueue(bytes.NewReader(testEntry)); err != nil { + id, err := q.Enqueue(bytes.NewReader(testEntry)) + if err != nil { t.Fatalf("error adding to queue: %s", err) } + if id == uuid.Nil { + t.Fatalf("expected non-nil UUID, got nil") + } // Peek and leave the entry in the queue if entry, err := q.Peek(); err != nil { diff --git a/autopaho/queue/queue.go b/autopaho/queue/queue.go index 2fa4a91..2080c9b 100644 --- a/autopaho/queue/queue.go +++ b/autopaho/queue/queue.go @@ -18,6 +18,8 @@ package queue import ( "errors" "io" + + "github.com/google/uuid" ) var ( @@ -28,10 +30,10 @@ var ( // Users must call one of Leave, Remove, or Quarantine when done with the entry (and before calling Peek again) // `Reader()` must not be called after calling Leave, Remove, or Quarantine (and any Reader previously requestes should be considered invalid) type Entry interface { - Reader() (io.Reader, error) // Provides access to the file contents, subsequent calls may return the same reader - Leave() error // Leave the entry in the queue (same entry will be returned on subsequent calls to Peek). - Remove() error // Remove this entry from the queue. Returns queue.ErrEmpty if queue is empty after operation - Quarantine() error // Flag that this entry has an error (remove from queue, potentially retaining data with error flagged) + Reader() (uuid.UUID, io.Reader, error) // Provides access to the file contents, subsequent calls may return the same reader + Leave() error // Leave the entry in the queue (same entry will be returned on subsequent calls to Peek). + Remove() error // Remove this entry from the queue. Returns queue.ErrEmpty if queue is empty after operation + Quarantine() error // Flag that this entry has an error (remove from queue, potentially retaining data with error flagged) } // Queue provides the functionality needed to manage queued messages @@ -41,7 +43,7 @@ type Queue interface { Wait() chan struct{} // Enqueue add item to the queue. - Enqueue(p io.Reader) error + Enqueue(p io.Reader) (uuid.UUID, error) // Peek retrieves the oldest item from the queue without removing it // Users must call one of Close, Remove, or Quarantine when done with the entry, and before calling Peek again. diff --git a/autopaho/queue_test.go b/autopaho/queue_test.go index 64d17e4..6007f9f 100644 --- a/autopaho/queue_test.go +++ b/autopaho/queue_test.go @@ -86,7 +86,7 @@ func TestQueuedMessages(t *testing.T) { // Add a corrupt item to the queue (zero bytes) - this should be logged and ignored q := memqueue.New() - if err := q.Enqueue(bytes.NewReader(nil)); err != nil { + if _, err := q.Enqueue(bytes.NewReader(nil)); err != nil { t.Fatalf("failed to add corrupt zero byte item to queue") } @@ -303,7 +303,7 @@ func TestPreloadPublish(t *testing.T) { w.Close() }() - if err := q.Enqueue(r); err != nil { + if _, err := q.Enqueue(r); err != nil { t.Fatalf("failed to enqueue: %s", err) } } diff --git a/go.mod b/go.mod index b15edb7..1c665c4 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/uuid v1.6.0 github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 8b2ead0..1082b14 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= From 9e2d1966a838914545cb737830eca128e3ce6ee9 Mon Sep 17 00:00:00 2001 From: Shailesh Ahuja Date: Fri, 23 Aug 2024 10:34:11 +0800 Subject: [PATCH 2/3] Update docs --- autopaho/queue/file/queue.go | 4 ++-- autopaho/queue/queue.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/autopaho/queue/file/queue.go b/autopaho/queue/file/queue.go index b0b7422..799d838 100644 --- a/autopaho/queue/file/queue.go +++ b/autopaho/queue/file/queue.go @@ -169,7 +169,7 @@ func (q *Queue) Peek() (queue.Entry, error) { // put writes out an item to disk func (q *Queue) put(p io.Reader) (uuid.UUID, error) { id := uuid.New() - // Use CreateTemp to generate a file with a unique name (it will be removed when packet has been transmitted) + // Create a file with a unique name, it will be removed when packet has been transmitted f, err := os.Create(filepath.Join(q.path, q.prefix+id.String()+q.extension)) if err != nil { return uuid.Nil, err @@ -254,7 +254,7 @@ type entry struct { uuid uuid.UUID } -// Reader provides access to the file contents +// Reader provides access to the id and file contents func (e entry) Reader() (uuid.UUID, io.Reader, error) { return e.uuid, e.f, nil } diff --git a/autopaho/queue/queue.go b/autopaho/queue/queue.go index 2080c9b..08178b5 100644 --- a/autopaho/queue/queue.go +++ b/autopaho/queue/queue.go @@ -30,7 +30,7 @@ var ( // Users must call one of Leave, Remove, or Quarantine when done with the entry (and before calling Peek again) // `Reader()` must not be called after calling Leave, Remove, or Quarantine (and any Reader previously requestes should be considered invalid) type Entry interface { - Reader() (uuid.UUID, io.Reader, error) // Provides access to the file contents, subsequent calls may return the same reader + Reader() (uuid.UUID, io.Reader, error) // Provides access to the id, file contents, subsequent calls may return the same reader Leave() error // Leave the entry in the queue (same entry will be returned on subsequent calls to Peek). Remove() error // Remove this entry from the queue. Returns queue.ErrEmpty if queue is empty after operation Quarantine() error // Flag that this entry has an error (remove from queue, potentially retaining data with error flagged) @@ -42,7 +42,7 @@ type Queue interface { // queue is empty at the time of the call) Wait() chan struct{} - // Enqueue add item to the queue. + // Enqueue add item to the queue, returns the id of the entry Enqueue(p io.Reader) (uuid.UUID, error) // Peek retrieves the oldest item from the queue without removing it From 68ad21652618364a2d069b2cdbd9a243f0576649 Mon Sep 17 00:00:00 2001 From: Shailesh Ahuja Date: Fri, 23 Aug 2024 11:05:57 +0800 Subject: [PATCH 3/3] Create a struct for queueitem in memory impl --- autopaho/queue/memory/queue.go | 35 ++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/autopaho/queue/memory/queue.go b/autopaho/queue/memory/queue.go index d477e7c..b6e796d 100644 --- a/autopaho/queue/memory/queue.go +++ b/autopaho/queue/memory/queue.go @@ -27,13 +27,18 @@ import ( // A queue implementation that stores all data in RAM +// queueItem represents a single item in the queue +type queueItem struct { + message []byte + uniqueID uuid.UUID +} + // Queue - basic memory based queue type Queue struct { mu sync.Mutex - messages [][]byte + items []queueItem waiting []chan<- struct{} // closed when something arrives in the queue waitingForEmpty []chan<- struct{} // closed when queue is empty - uniqueIDs []uuid.UUID } // New creates a new memory-based queue @@ -45,7 +50,7 @@ func New() *Queue { func (q *Queue) Wait() chan struct{} { c := make(chan struct{}) q.mu.Lock() - if len(q.messages) > 0 { + if len(q.items) > 0 { q.mu.Unlock() close(c) return c @@ -59,7 +64,7 @@ func (q *Queue) Wait() chan struct{} { func (q *Queue) WaitForEmpty() chan struct{} { c := make(chan struct{}) q.mu.Lock() - if len(q.messages) == 0 { + if len(q.items) == 0 { q.mu.Unlock() close(c) return c @@ -78,23 +83,26 @@ func (q *Queue) Enqueue(p io.Reader) (uuid.UUID, error) { } q.mu.Lock() defer q.mu.Unlock() - q.messages = append(q.messages, b.Bytes()) - q.uniqueIDs = append(q.uniqueIDs, uuid.New()) + newItem := queueItem{ + message: b.Bytes(), + uniqueID: uuid.New(), + } + q.items = append(q.items, newItem) for _, c := range q.waiting { close(c) } q.waiting = q.waiting[:0] - return q.uniqueIDs[len(q.uniqueIDs)-1], nil + return newItem.uniqueID, nil } // Peek retrieves the oldest item from the queue (without removing it) func (q *Queue) Peek() (queue.Entry, error) { q.mu.Lock() defer q.mu.Unlock() - if len(q.messages) == 0 { + if len(q.items) == 0 { return nil, queue.ErrEmpty } - // Queue implements Entry directly (as this always references q.messages[0] + // Queue implements Entry directly (as this always references q.items[0] return q, nil } @@ -103,10 +111,10 @@ func (q *Queue) Peek() (queue.Entry, error) { func (q *Queue) Reader() (uuid.UUID, io.Reader, error) { q.mu.Lock() defer q.mu.Unlock() - if len(q.messages) == 0 { + if len(q.items) == 0 { return uuid.Nil, nil, queue.ErrEmpty } - return q.uniqueIDs[0], bytes.NewReader(q.messages[0]), nil + return q.items[0].uniqueID, bytes.NewReader(q.items[0].message), nil } // Leave implements Entry.Leave - the entry (will be returned on subsequent calls to Peek) @@ -128,10 +136,9 @@ func (q *Queue) Quarantine() error { func (q *Queue) remove() error { q.mu.Lock() defer q.mu.Unlock() - initialLen := len(q.messages) + initialLen := len(q.items) if initialLen > 0 { - q.messages = q.messages[1:] - q.uniqueIDs = q.uniqueIDs[1:] + q.items = q.items[1:] } if initialLen <= 1 { // Queue is now, or was already, empty for _, c := range q.waitingForEmpty {