Skip to content

Commit

Permalink
Merge branch 'main' into fix_multi_uris
Browse files Browse the repository at this point in the history
  • Loading branch information
Gsantomaggio authored Jun 12, 2024
2 parents f055cf8 + e5b3ba9 commit d41df0a
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 24 deletions.
5 changes: 3 additions & 2 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type Client struct {
serverProperties map[string]string
}

func newClient(connectionName string, broker *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client {
func newClient(connectionName string, broker *Broker,
tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client {
var clientBroker = broker
if broker == nil {
clientBroker = newBrokerDefault()
Expand Down Expand Up @@ -100,7 +101,7 @@ func newClient(connectionName string, broker *Broker, tcpParameters *TCPParamete
mutex: &sync.Mutex{},
destructor: &sync.Once{},
},
socketCallTimeout: defaultSocketCallTimeout,
socketCallTimeout: rpcTimeOut,
availableFeatures: newAvailableFeatures(),
}
c.setConnectionName(connectionName)
Expand Down
6 changes: 3 additions & 3 deletions pkg/stream/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ var _ = Describe("Streaming testEnvironment", func() {
})

It("Client.queryOffset won't hang if timeout happens", func() {
cli := newClient("connName", nil, nil, nil)
cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout)
cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{}))
cli.socketCallTimeout = time.Millisecond

Expand All @@ -193,7 +193,7 @@ var _ = Describe("Streaming testEnvironment", func() {
})

It("Client.queryPublisherSequence won't hang if timeout happens", func() {
cli := newClient("connName", nil, nil, nil)
cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout)
cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{}))
cli.socketCallTimeout = time.Millisecond

Expand All @@ -203,7 +203,7 @@ var _ = Describe("Streaming testEnvironment", func() {
})

It("Client.StreamStats won't hang if timeout happens", func() {
cli := newClient("connName", nil, nil, nil)
cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout)
cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{}))
cli.socketCallTimeout = time.Millisecond

Expand Down
6 changes: 3 additions & 3 deletions pkg/stream/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Coordinator", func() {
client *Client
)
BeforeEach(func() {
client = newClient("test-client", nil, nil, nil)
client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout)

})
AfterEach(func() {
Expand Down Expand Up @@ -131,7 +131,7 @@ var _ = Describe("Coordinator", func() {
client *Client
)
BeforeEach(func() {
client = newClient("test-client", nil, nil, nil)
client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout)

})
AfterEach(func() {
Expand Down Expand Up @@ -185,7 +185,7 @@ var _ = Describe("Coordinator", func() {
client *Client
)
BeforeEach(func() {
client = newClient("test-client", nil, nil, nil)
client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout)

})
AfterEach(func() {
Expand Down
46 changes: 30 additions & 16 deletions pkg/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
if options == nil {
options = NewEnvironmentOptions()
}
client := newClient("go-stream-locator", nil, options.TCPParameters, options.SaslConfiguration)

if options.RPCTimeout <= 0 {
options.RPCTimeout = defaultSocketCallTimeout
}

client := newClient("go-stream-locator", nil,
options.TCPParameters, options.SaslConfiguration, options.RPCTimeout)
defer func(client *Client) {
err := client.Close()
if err != nil {
Expand Down Expand Up @@ -96,7 +102,8 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
}
func (env *Environment) newReconnectClient() (*Client, error) {
broker := env.options.ConnectionParameters[0]
client := newClient("go-stream-locator", broker, env.options.TCPParameters, env.options.SaslConfiguration)
client := newClient("go-stream-locator", broker, env.options.TCPParameters,
env.options.SaslConfiguration, env.options.RPCTimeout)

err := client.connect()
tentatives := 1
Expand All @@ -110,7 +117,7 @@ func (env *Environment) newReconnectClient() (*Client, error) {
rand.Seed(time.Now().UnixNano())
n := rand.Intn(len(env.options.ConnectionParameters))
client = newClient("stream-locator", env.options.ConnectionParameters[n], env.options.TCPParameters,
env.options.SaslConfiguration)
env.options.SaslConfiguration, env.options.RPCTimeout)
tentatives = tentatives + 1
err = client.connect()

Expand Down Expand Up @@ -153,7 +160,7 @@ func (env *Environment) NewProducer(streamName string, producerOptions *Producer
return nil, err
}

return env.producers.newProducer(client, streamName, producerOptions, env.options.AddressResolver)
return env.producers.newProducer(client, streamName, producerOptions, env.options.AddressResolver, env.options.RPCTimeout)
}

func (env *Environment) StreamExists(streamName string) (bool, error) {
Expand Down Expand Up @@ -243,7 +250,7 @@ func (env *Environment) NewConsumer(streamName string,
return nil, err
}

return env.consumers.NewSubscriber(client, streamName, messagesHandler, options, env.options.AddressResolver)
return env.consumers.NewSubscriber(client, streamName, messagesHandler, options, env.options.AddressResolver, env.options.RPCTimeout)
}

func (env *Environment) NewSuperStreamProducer(superStream string, superStreamProducerOptions *SuperStreamProducerOptions) (*SuperStreamProducer, error) {
Expand Down Expand Up @@ -272,6 +279,7 @@ type EnvironmentOptions struct {
MaxProducersPerClient int
MaxConsumersPerClient int
AddressResolver *AddressResolver
RPCTimeout time.Duration
}

func NewEnvironmentOptions() *EnvironmentOptions {
Expand All @@ -281,6 +289,7 @@ func NewEnvironmentOptions() *EnvironmentOptions {
ConnectionParameters: []*Broker{},
TCPParameters: newTCPParameterDefault(),
SaslConfiguration: newSaslConfigurationDefault(),
RPCTimeout: defaultSocketCallTimeout,
}
}

Expand Down Expand Up @@ -443,6 +452,11 @@ func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptio
return envOptions
}

func (envOptions *EnvironmentOptions) SetRPCTimeout(timeout time.Duration) *EnvironmentOptions {
envOptions.RPCTimeout = timeout
return envOptions
}

type environmentCoordinator struct {
mutex *sync.Mutex
mutexContext *sync.RWMutex
Expand Down Expand Up @@ -524,7 +538,7 @@ func (c *Client) maybeCleanConsumers(streamName string) {
}

func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string,
options *ProducerOptions) (*Producer, error) {
options *ProducerOptions, rpcTimeout time.Duration) (*Producer, error) {
cc.mutex.Lock()
defer cc.mutex.Unlock()
cc.mutexContext.Lock()
Expand All @@ -542,7 +556,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
}

if clientResult == nil {
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration)
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
}

err := clientResult.connect()
Expand All @@ -559,7 +573,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
if err != nil {
return nil, err
}
clientResult = cc.newClientForProducer(options.ClientProvidedName, leader, tcpParameters, saslConfiguration)
clientResult = cc.newClientForProducer(options.ClientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
err = clientResult.connect()
if err != nil {
return nil, err
Expand All @@ -576,8 +590,8 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
return producer, nil
}

func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client {
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration)
func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client {
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeOut)
chMeta := make(chan metaDataUpdateEvent, 1)
clientResult.metadataListener = chMeta
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
Expand All @@ -598,7 +612,7 @@ func (cc *environmentCoordinator) newClientForProducer(connectionName string, le

func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration,
streamName string, messagesHandler MessagesHandler,
options *ConsumerOptions) (*Consumer, error) {
options *ConsumerOptions, rpcTimeout time.Duration) (*Consumer, error) {
cc.mutex.Lock()
defer cc.mutex.Unlock()
cc.mutexContext.Lock()
Expand All @@ -612,7 +626,7 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro
}

if clientResult == nil {
clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration)
clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout)
chMeta := make(chan metaDataUpdateEvent)
clientResult.metadataListener = chMeta
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
Expand Down Expand Up @@ -677,7 +691,7 @@ func newProducers(maxItemsForClient int) *producersEnvironment {
}

func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName string,
options *ProducerOptions, resolver *AddressResolver) (*Producer, error) {
options *ProducerOptions, resolver *AddressResolver, rpcTimeOut time.Duration) (*Producer, error) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
leader, err := clientLocator.BrokerLeader(streamName)
Expand All @@ -697,7 +711,7 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st
leader.cloneFrom(clientLocator.broker, resolver)

producer, err := ps.producersCoordinator[coordinatorKey].newProducer(leader, clientLocator.tcpParameters,
clientLocator.saslConfiguration, streamName, options)
clientLocator.saslConfiguration, streamName, options, rpcTimeOut)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -742,7 +756,7 @@ func newConsumerEnvironment(maxItemsForClient int) *consumersEnvironment {

func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName string,
messagesHandler MessagesHandler,
consumerOptions *ConsumerOptions, resolver *AddressResolver) (*Consumer, error) {
consumerOptions *ConsumerOptions, resolver *AddressResolver, rpcTimeout time.Duration) (*Consumer, error) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
consumerBroker, err := clientLocator.BrokerForConsumer(streamName)
Expand All @@ -767,7 +781,7 @@ func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName
consumer, err := ps.consumersCoordinator[coordinatorKey].
newConsumer(clientProvidedName, consumerBroker, clientLocator.tcpParameters,
clientLocator.saslConfiguration,
streamName, messagesHandler, consumerOptions)
streamName, messagesHandler, consumerOptions, rpcTimeout)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/stream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ var _ = Describe("Environment test", func() {
MaxProducersPerClient: 1,
MaxConsumersPerClient: 1,
AddressResolver: nil,
RPCTimeout: defaultSocketCallTimeout,
})

Expect(err).NotTo(HaveOccurred())
Expand Down

0 comments on commit d41df0a

Please sign in to comment.