From 643568993b57fb21c961efe84f9f4da0989afb9b Mon Sep 17 00:00:00 2001 From: Kajetan Date: Sat, 10 Feb 2024 15:55:17 +0100 Subject: [PATCH] Clean up --- tests/configs/.rr-init.yaml | 13 ---------- tests/configs/.rr-pq.yaml | 12 --------- tests/jobs_test.go | 50 ++++++++++++++++++------------------- 3 files changed, 25 insertions(+), 50 deletions(-) diff --git a/tests/configs/.rr-init.yaml b/tests/configs/.rr-init.yaml index 6ba9a26..3f878ec 100644 --- a/tests/configs/.rr-init.yaml +++ b/tests/configs/.rr-init.yaml @@ -33,25 +33,12 @@ jobs: driver: google-pub-sub config: prefetch: 1000 - visibility_timeout: 0 - wait_time_seconds: 0 - queue: default - attributes: - DelaySeconds: 0 - MaximumMessageSize: 262144 - MessageRetentionPeriod: 345600 - ReceiveMessageWaitTimeSeconds: 0 - VisibilityTimeout: 30 tags: test: "tag" test-2: driver: google-pub-sub config: - prefetch: 1000 - queue: default-2 - attributes: - MessageRetentionPeriod: 86400 tags: test: "tag" consume: [ "test-1", "test-2" ] diff --git a/tests/configs/.rr-pq.yaml b/tests/configs/.rr-pq.yaml index 4318edb..701c8d3 100644 --- a/tests/configs/.rr-pq.yaml +++ b/tests/configs/.rr-pq.yaml @@ -31,15 +31,6 @@ jobs: driver: google-pub-sub config: prefetch: 1000 - visibility_timeout: 0 - wait_time_seconds: 0 - queue: default-1-pq - attributes: - DelaySeconds: 0 - MaximumMessageSize: 262144 - MessageRetentionPeriod: 345600 - ReceiveMessageWaitTimeSeconds: 0 - VisibilityTimeout: 30 tags: test: "tag-pq" @@ -47,9 +38,6 @@ jobs: driver: google-pub-sub config: prefetch: 1000 - queue: default-2-pq - attributes: - MessageRetentionPeriod: 86400 tags: test: "tag" consume: [ "test-1-pq", "test-2-pq" ] diff --git a/tests/jobs_test.go b/tests/jobs_test.go index ca28daa..3946ab8 100644 --- a/tests/jobs_test.go +++ b/tests/jobs_test.go @@ -282,7 +282,7 @@ func TestJobsError(t *testing.T) { time.Sleep(time.Second * 5) } -func TestRemovePQ(t *testing.T) { +func TestAutoAck(t *testing.T) { err := helpers.CleanEmulator() if err != nil { assert.FailNow(t, "error", err.Error()) @@ -291,8 +291,8 @@ func TestRemovePQ(t *testing.T) { cont := endure.New(slog.LevelDebug) cfg := &config.Plugin{ - Version: "2023.2.0", - Path: "configs/.rr-pq.yaml", + Version: "2023.3.0", + Path: "configs/.rr-init.yaml", Prefix: "rr", } @@ -355,27 +355,18 @@ func TestRemovePQ(t *testing.T) { }() time.Sleep(time.Second * 3) - - for i := 0; i < 10; i++ { - t.Run("PushPipeline", helpers.PushToPipe("test-1-pq", false, "127.0.0.1:6601")) - t.Run("PushPipeline", helpers.PushToPipe("test-2-pq", false, "127.0.0.1:6601")) - } - time.Sleep(time.Second * 3) - - t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6601", "test-1-pq", "test-2-pq")) + t.Run("PushPipeline", helpers.PushToPipe("test-1", true, "127.0.0.1:6001")) + t.Run("PushPipeline", helpers.PushToPipe("test-2", true, "127.0.0.1:6001")) + time.Sleep(time.Second * 2) + t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6001", "test-1", "test-2")) stopCh <- struct{}{} wg.Wait() - assert.Equal(t, 0, oLogger.FilterMessageSnippet("job was processed successfully").Len()) - assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was started").Len()) - assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len()) - assert.Equal(t, 20, oLogger.FilterMessageSnippet("job was pushed successfully").Len()) - assert.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len()) - assert.Equal(t, 2, oLogger.FilterMessageSnippet("listener was stopped").Len()) + require.Equal(t, 4, oLogger.FilterMessageSnippet("auto ack is turned on, message acknowledged").Len()) } -func TestAutoAck(t *testing.T) { +func TestRemovePQ(t *testing.T) { err := helpers.CleanEmulator() if err != nil { assert.FailNow(t, "error", err.Error()) @@ -384,8 +375,8 @@ func TestAutoAck(t *testing.T) { cont := endure.New(slog.LevelDebug) cfg := &config.Plugin{ - Version: "2023.3.0", - Path: "configs/.rr-init.yaml", + Version: "2023.2.0", + Path: "configs/.rr-pq.yaml", Prefix: "rr", } @@ -448,13 +439,22 @@ func TestAutoAck(t *testing.T) { }() time.Sleep(time.Second * 3) - t.Run("PushPipeline", helpers.PushToPipe("test-1", true, "127.0.0.1:6001")) - t.Run("PushPipeline", helpers.PushToPipe("test-2", true, "127.0.0.1:6001")) - time.Sleep(time.Second * 2) - t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6001", "test-1", "test-2")) + + for i := 0; i < 10; i++ { + t.Run("PushPipeline", helpers.PushToPipe("test-1-pq", false, "127.0.0.1:6601")) + t.Run("PushPipeline", helpers.PushToPipe("test-2-pq", false, "127.0.0.1:6601")) + } + time.Sleep(time.Second * 3) + + t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6601", "test-1-pq", "test-2-pq")) stopCh <- struct{}{} wg.Wait() - require.Equal(t, 4, oLogger.FilterMessageSnippet("auto ack is turned on, message acknowledged").Len()) + assert.Equal(t, 0, oLogger.FilterMessageSnippet("job was processed successfully").Len()) + assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was started").Len()) + assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len()) + assert.Equal(t, 20, oLogger.FilterMessageSnippet("job was pushed successfully").Len()) + assert.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len()) + assert.Equal(t, 2, oLogger.FilterMessageSnippet("listener was stopped").Len()) }