diff --git a/admin/admin.go b/admin/admin.go index 487c8b44..62175af4 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -77,6 +77,12 @@ func WithNamespace(namespace string) AdminOption { } } +func WithTls(useTls bool) AdminOption { + return func(options *adminOptions) { + options.ClientOptions.RemotingClientConfig.UseTls = useTls + } +} + type admin struct { cli internal.RMQClient diff --git a/consumer/option.go b/consumer/option.go index ac7dd931..24acf7c3 100644 --- a/consumer/option.go +++ b/consumer/option.go @@ -381,3 +381,9 @@ func WithLimiter(limiter Limiter) Option { opts.Limiter = limiter } } + +func WithTls(useTls bool) Option { + return func(opts *consumerOptions) { + opts.ClientOptions.RemotingClientConfig.UseTls = useTls + } +} diff --git a/examples/consumer/tls/main.go b/examples/consumer/tls/main.go new file mode 100644 index 00000000..248c8371 --- /dev/null +++ b/examples/consumer/tls/main.go @@ -0,0 +1,59 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/primitive" +) + +func main() { + c, _ := rocketmq.NewPushConsumer( + consumer.WithGroupName("testGroup"), + consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), + consumer.WithTls(true), + ) + err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, + msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { + for i := range msgs { + fmt.Printf("subscribe callback: %v \n", msgs[i]) + } + + return consumer.ConsumeSuccess, nil + }) + if err != nil { + fmt.Println(err.Error()) + } + // Note: start after subscribe + err = c.Start() + if err != nil { + fmt.Println(err.Error()) + os.Exit(-1) + } + time.Sleep(time.Hour) + err = c.Shutdown() + if err != nil { + fmt.Printf("shutdown Consumer error: %s", err.Error()) + } +} diff --git a/examples/producer/tls/main.go b/examples/producer/tls/main.go new file mode 100644 index 00000000..c926c054 --- /dev/null +++ b/examples/producer/tls/main.go @@ -0,0 +1,62 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "os" + "strconv" + + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/apache/rocketmq-client-go/v2/producer" +) + +// Package main implements a simple producer to send message. +func main() { + p, _ := rocketmq.NewProducer( + producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), + producer.WithRetry(2), + producer.WithTls(true), + ) + err := p.Start() + if err != nil { + fmt.Printf("start producer error: %s", err.Error()) + os.Exit(1) + } + topic := "test" + + for i := 0; i < 10; i++ { + msg := &primitive.Message{ + Topic: topic, + Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)), + } + res, err := p.SendSync(context.Background(), msg) + + if err != nil { + fmt.Printf("send message error: %s\n", err) + } else { + fmt.Printf("send message success: result=%s\n", res.String()) + } + } + err = p.Shutdown() + if err != nil { + fmt.Printf("shutdown producer error: %s", err.Error()) + } +} diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go index 40136895..45dfbbf5 100644 --- a/internal/remote/remote_client.go +++ b/internal/remote/remote_client.go @@ -38,6 +38,7 @@ type TcpOption struct { ConnectionTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration + UseTls bool } //go:generate mockgen -source remote_client.go -destination mock_remote_client.go -self_package github.com/apache/rocketmq-client-go/v2/internal/remote --package remote RemotingClient diff --git a/internal/remote/tcp_conn.go b/internal/remote/tcp_conn.go index 93ed8379..3b7d1644 100644 --- a/internal/remote/tcp_conn.go +++ b/internal/remote/tcp_conn.go @@ -18,6 +18,7 @@ package remote import ( "context" + "crypto/tls" "net" "sync" "time" @@ -34,11 +35,19 @@ type tcpConnWrapper struct { func initConn(ctx context.Context, addr string, config *RemotingClientConfig) (*tcpConnWrapper, error) { var d net.Dialer - d.KeepAlive = config.KeepAliveDuration d.Deadline = time.Now().Add(config.ConnectionTimeout) - conn, err := d.DialContext(ctx, "tcp", addr) + var conn net.Conn + var err error + if config.UseTls { + conn, err = tls.DialWithDialer(&d, "tcp", addr, &tls.Config{ + InsecureSkipVerify: true, + }) + } else { + conn, err = d.DialContext(ctx, "tcp", addr) + } + if err != nil { return nil, err } diff --git a/producer/option.go b/producer/option.go index c3a0dc42..6e43cc25 100644 --- a/producer/option.go +++ b/producer/option.go @@ -178,3 +178,9 @@ func WithCompressLevel(level int) Option { opts.CompressLevel = level } } + +func WithTls(useTls bool) Option { + return func(opts *producerOptions) { + opts.ClientOptions.RemotingClientConfig.UseTls = useTls + } +}