Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
cv65kr committed Feb 10, 2024
1 parent c69ad8b commit 6435689
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 50 deletions.
13 changes: 0 additions & 13 deletions tests/configs/.rr-init.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,12 @@ jobs:
driver: google-pub-sub
config:
prefetch: 1000
visibility_timeout: 0
wait_time_seconds: 0
queue: default
attributes:
DelaySeconds: 0
MaximumMessageSize: 262144
MessageRetentionPeriod: 345600
ReceiveMessageWaitTimeSeconds: 0
VisibilityTimeout: 30
tags:
test: "tag"

test-2:
driver: google-pub-sub
config:
prefetch: 1000
queue: default-2
attributes:
MessageRetentionPeriod: 86400
tags:
test: "tag"
consume: [ "test-1", "test-2" ]
12 changes: 0 additions & 12 deletions tests/configs/.rr-pq.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,13 @@ jobs:
driver: google-pub-sub
config:
prefetch: 1000
visibility_timeout: 0
wait_time_seconds: 0
queue: default-1-pq
attributes:
DelaySeconds: 0
MaximumMessageSize: 262144
MessageRetentionPeriod: 345600
ReceiveMessageWaitTimeSeconds: 0
VisibilityTimeout: 30
tags:
test: "tag-pq"

test-2-pq:
driver: google-pub-sub
config:
prefetch: 1000
queue: default-2-pq
attributes:
MessageRetentionPeriod: 86400
tags:
test: "tag"
consume: [ "test-1-pq", "test-2-pq" ]
50 changes: 25 additions & 25 deletions tests/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestJobsError(t *testing.T) {
time.Sleep(time.Second * 5)
}

func TestRemovePQ(t *testing.T) {
func TestAutoAck(t *testing.T) {
err := helpers.CleanEmulator()
if err != nil {
assert.FailNow(t, "error", err.Error())
Expand All @@ -291,8 +291,8 @@ func TestRemovePQ(t *testing.T) {
cont := endure.New(slog.LevelDebug)

cfg := &config.Plugin{
Version: "2023.2.0",
Path: "configs/.rr-pq.yaml",
Version: "2023.3.0",
Path: "configs/.rr-init.yaml",
Prefix: "rr",
}

Expand Down Expand Up @@ -355,27 +355,18 @@ func TestRemovePQ(t *testing.T) {
}()

time.Sleep(time.Second * 3)

for i := 0; i < 10; i++ {
t.Run("PushPipeline", helpers.PushToPipe("test-1-pq", false, "127.0.0.1:6601"))
t.Run("PushPipeline", helpers.PushToPipe("test-2-pq", false, "127.0.0.1:6601"))
}
time.Sleep(time.Second * 3)

t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6601", "test-1-pq", "test-2-pq"))
t.Run("PushPipeline", helpers.PushToPipe("test-1", true, "127.0.0.1:6001"))
t.Run("PushPipeline", helpers.PushToPipe("test-2", true, "127.0.0.1:6001"))
time.Sleep(time.Second * 2)
t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6001", "test-1", "test-2"))

stopCh <- struct{}{}
wg.Wait()

assert.Equal(t, 0, oLogger.FilterMessageSnippet("job was processed successfully").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was started").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
assert.Equal(t, 20, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("listener was stopped").Len())
require.Equal(t, 4, oLogger.FilterMessageSnippet("auto ack is turned on, message acknowledged").Len())
}

func TestAutoAck(t *testing.T) {
func TestRemovePQ(t *testing.T) {
err := helpers.CleanEmulator()
if err != nil {
assert.FailNow(t, "error", err.Error())
Expand All @@ -384,8 +375,8 @@ func TestAutoAck(t *testing.T) {
cont := endure.New(slog.LevelDebug)

cfg := &config.Plugin{
Version: "2023.3.0",
Path: "configs/.rr-init.yaml",
Version: "2023.2.0",
Path: "configs/.rr-pq.yaml",
Prefix: "rr",
}

Expand Down Expand Up @@ -448,13 +439,22 @@ func TestAutoAck(t *testing.T) {
}()

time.Sleep(time.Second * 3)
t.Run("PushPipeline", helpers.PushToPipe("test-1", true, "127.0.0.1:6001"))
t.Run("PushPipeline", helpers.PushToPipe("test-2", true, "127.0.0.1:6001"))
time.Sleep(time.Second * 2)
t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6001", "test-1", "test-2"))

for i := 0; i < 10; i++ {
t.Run("PushPipeline", helpers.PushToPipe("test-1-pq", false, "127.0.0.1:6601"))
t.Run("PushPipeline", helpers.PushToPipe("test-2-pq", false, "127.0.0.1:6601"))
}
time.Sleep(time.Second * 3)

t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6601", "test-1-pq", "test-2-pq"))

stopCh <- struct{}{}
wg.Wait()

require.Equal(t, 4, oLogger.FilterMessageSnippet("auto ack is turned on, message acknowledged").Len())
assert.Equal(t, 0, oLogger.FilterMessageSnippet("job was processed successfully").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was started").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
assert.Equal(t, 20, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("listener was stopped").Len())
}

0 comments on commit 6435689

Please sign in to comment.