Skip to content

Commit

Permalink
feat: add option WithRemotingTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
twz915 committed Nov 27, 2023
1 parent 0a329cc commit 1ec876d
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 2 deletions.
9 changes: 9 additions & 0 deletions consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,15 @@ func WithLimiter(limiter Limiter) Option {
}
}

// WithRemotingTimeout set remote client timeout options
func WithRemotingTimeout(connectionTimeout, readTimeout, writeTimeout time.Duration) Option {
return func(opts *consumerOptions) {
opts.ClientOptions.RemotingClientConfig.ConnectionTimeout = connectionTimeout
opts.ClientOptions.RemotingClientConfig.ReadTimeout = readTimeout
opts.ClientOptions.RemotingClientConfig.WriteTimeout = writeTimeout
}
}

func WithTls(useTls bool) Option {
return func(opts *consumerOptions) {
opts.ClientOptions.RemotingClientConfig.UseTls = useTls
Expand Down
15 changes: 15 additions & 0 deletions consumer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consumer
import (
"reflect"
"testing"
"time"
)

func getFieldString(obj interface{}, field string) string {
Expand All @@ -12,6 +13,20 @@ func getFieldString(obj interface{}, field string) string {
}).String()
}

func TestWithRemotingTimeout(t *testing.T) {
opt := defaultPushConsumerOptions()
WithRemotingTimeout(3*time.Second, 4*time.Second, 5*time.Second)(&opt)
if timeout := opt.RemotingClientConfig.ConnectionTimeout; timeout != 3*time.Second {
t.Errorf("consumer option WithRemotingTimeout connectionTimeout. want:%s, got=%s", 3*time.Second, timeout)
}
if timeout := opt.RemotingClientConfig.ReadTimeout; timeout != 4*time.Second {
t.Errorf("consumer option WithRemotingTimeout readTimeout. want:%s, got=%s", 4*time.Second, timeout)
}
if timeout := opt.RemotingClientConfig.WriteTimeout; timeout != 5*time.Second {
t.Errorf("consumer option WithRemotingTimeout writeTimeout. want:%s, got=%s", 5*time.Second, timeout)
}
}

func TestWithUnitName(t *testing.T) {
opt := defaultPushConsumerOptions()
unitName := "unsh"
Expand Down
9 changes: 9 additions & 0 deletions producer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ func WithCompressLevel(level int) Option {
}
}

// WithRemotingTimeout set remote client timeout options
func WithRemotingTimeout(connectionTimeout, readTimeout, writeTimeout time.Duration) Option {
return func(opts *producerOptions) {
opts.ClientOptions.RemotingClientConfig.ConnectionTimeout = connectionTimeout
opts.ClientOptions.RemotingClientConfig.ReadTimeout = readTimeout
opts.ClientOptions.RemotingClientConfig.WriteTimeout = writeTimeout
}
}

func WithTls(useTls bool) Option {
return func(opts *producerOptions) {
opts.ClientOptions.RemotingClientConfig.UseTls = useTls
Expand Down
15 changes: 15 additions & 0 deletions producer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package producer
import (
"reflect"
"testing"
"time"
)

func getFieldString(obj interface{}, field string) string {
Expand All @@ -12,6 +13,20 @@ func getFieldString(obj interface{}, field string) string {
}).String()
}

func TestWithRemotingTimeout(t *testing.T) {
opt := defaultProducerOptions()
WithRemotingTimeout(3*time.Second, 4*time.Second, 5*time.Second)(&opt)
if timeout := opt.RemotingClientConfig.ConnectionTimeout; timeout != 3*time.Second {
t.Errorf("consumer option WithRemotingTimeout connectionTimeout. want:%s, got=%s", 3*time.Second, timeout)
}
if timeout := opt.RemotingClientConfig.ReadTimeout; timeout != 4*time.Second {
t.Errorf("consumer option WithRemotingTimeout readTimeout. want:%s, got=%s", 4*time.Second, timeout)
}
if timeout := opt.RemotingClientConfig.WriteTimeout; timeout != 5*time.Second {
t.Errorf("consumer option WithRemotingTimeout writeTimeout. want:%s, got=%s", 5*time.Second, timeout)
}
}

func TestWithUnitName(t *testing.T) {
opt := defaultProducerOptions()
unitName := "unsh"
Expand Down
5 changes: 3 additions & 2 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"

errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
"github.com/google/uuid"
"github.com/pkg/errors"
)

type defaultProducer struct {
Expand Down

0 comments on commit 1ec876d

Please sign in to comment.