Skip to content

Commit

Permalink
fix(publisher): fix redis command usage (#184)
Browse files Browse the repository at this point in the history
This pull request introduces Redis integration into the `publisher`
service, refactoring the code to accommodate the new Redis client and
updating the handling of Redis commands. The most important changes
include adding the Redis client configuration, modifying the `Publisher`
and `Service` structs, and updating Redis command methods.

### Redis Integration:

*
[`publisher/cmd/worker/main.go`](diffhunk://#diff-0c11d49a84dbe2803f3a531ba845598d4d2c9f4be0b64f1f2ff347d8e9afcb45R14):
Added Redis client configuration and passed the Redis client to the
`tiup.NewPublisher` function.
[[1]](diffhunk://#diff-0c11d49a84dbe2803f3a531ba845598d4d2c9f4be0b64f1f2ff347d8e9afcb45R14)
[[2]](diffhunk://#diff-0c11d49a84dbe2803f3a531ba845598d4d2c9f4be0b64f1f2ff347d8e9afcb45R52-R65)

### Struct Modifications:

*
[`publisher/pkg/config/config.go`](diffhunk://#diff-41ecac7d460be2b3759c4d88b60e7177542db7888ba0c8254f8eab123a17b64bR8-L20):
Added a `Redis` struct to hold Redis configuration and updated the
`Worker` and `Service` structs to include this new `Redis` field.

### Redis Command Updates:

*
[`publisher/pkg/impl/tiup/publisher.go`](diffhunk://#diff-620e3293cf87ba54e2ed9269601e334343caa82c36ecd1371d17fa71ce8393c5L22-R26):
Changed the `redisClient` type from `*redis.Client` to `redis.Cmdable`
and updated Redis command methods to use `SetXX` instead of `Set`.
[[1]](diffhunk://#diff-620e3293cf87ba54e2ed9269601e334343caa82c36ecd1371d17fa71ce8393c5L22-R26)
[[2]](diffhunk://#diff-620e3293cf87ba54e2ed9269601e334343caa82c36ecd1371d17fa71ce8393c5L45-R45)
[[3]](diffhunk://#diff-620e3293cf87ba54e2ed9269601e334343caa82c36ecd1371d17fa71ce8393c5L55-R57)
*
[`publisher/pkg/impl/tiup/service.go`](diffhunk://#diff-df9108e5a06158c0822dfd658fdfcbf5fa1ca7bcde2884ecaa32a0d79bf515faL22-R28):
Updated the `redisClient` type and changed the Redis command method from
`SetXX` to `SetNX` for initial status setting.
[[1]](diffhunk://#diff-df9108e5a06158c0822dfd658fdfcbf5fa1ca7bcde2884ecaa32a0d79bf515faL22-R28)
[[2]](diffhunk://#diff-df9108e5a06158c0822dfd658fdfcbf5fa1ca7bcde2884ecaa32a0d79bf515faL77-R77)

Signed-off-by: wuhuizuo <[email protected]>

---------

Signed-off-by: wuhuizuo <[email protected]>
  • Loading branch information
wuhuizuo authored Oct 22, 2024
1 parent ed099a3 commit a7470b6
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 18 deletions.
11 changes: 10 additions & 1 deletion publisher/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"syscall"

"github.com/cloudevents/sdk-go/v2/event"
"github.com/go-redis/redis/v8"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -48,12 +49,20 @@ func main() {
}
}

// Configure Redis client
redisClient := redis.NewClient(&redis.Options{
Addr: config.Redis.Addr,
Password: config.Redis.Password,
Username: config.Redis.Username,
DB: config.Redis.DB,
})

ctx := log.Logger.WithContext(context.Background())
// Create TiUP publisher
var handler *tiup.Publisher
{
var err error
handler, err = tiup.NewPublisher(config.MirrorUrl, config.LarkWebhookURL, &log.Logger)
handler, err = tiup.NewPublisher(config.MirrorUrl, config.LarkWebhookURL, &log.Logger, redisClient)
if err != nil {
log.Fatal().Err(err).Msg("Error creating handler")
}
Expand Down
19 changes: 11 additions & 8 deletions publisher/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@ type Worker struct {
KafkaBasic `yaml:",inline" json:",inline"`
ConsumerGroup string `yaml:"consumer_group" json:"consumer_group,omitempty"`
} `yaml:"kafka" json:"kafka,omitempty"`
Redis Redis `yaml:"redis" json:"redis,omitempty"`
MirrorUrl string `yaml:"mirror_url" json:"mirror_url,omitempty"`
LarkWebhookURL string `yaml:"lark_webhook_url" json:"lark_webhook_url,omitempty"`
}

type Service struct {
Kafka KafkaBasic `yaml:"kafka" json:"kafka,omitempty"`
Redis struct {
Addr string `yaml:"addr" json:"addr,omitempty"`
DB int `yaml:"db" json:"db,omitempty"`
Username string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
} `yaml:"redis" json:"redis,omitempty"`
EventSource string `yaml:"event_source" json:"event_source,omitempty"`
Kafka KafkaBasic `yaml:"kafka" json:"kafka,omitempty"`
Redis Redis `yaml:"redis" json:"redis,omitempty"`
EventSource string `yaml:"event_source" json:"event_source,omitempty"`
}

type Redis struct {
Addr string `yaml:"addr" json:"addr,omitempty"`
DB int `yaml:"db" json:"db,omitempty"`
Username string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
}

type KafkaBasic struct {
Expand Down
12 changes: 6 additions & 6 deletions publisher/pkg/impl/tiup/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ type Publisher struct {
mirrorURL string
larkWebhookURL string
logger zerolog.Logger
redisClient *redis.Client
redisClient redis.Cmdable
}

func NewPublisher(mirrorURL, larkWebhookURL string, logger *zerolog.Logger) (*Publisher, error) {
handler := Publisher{mirrorURL: mirrorURL, larkWebhookURL: larkWebhookURL}
func NewPublisher(mirrorURL, larkWebhookURL string, logger *zerolog.Logger, redisClient redis.Cmdable) (*Publisher, error) {
handler := Publisher{mirrorURL: mirrorURL, larkWebhookURL: larkWebhookURL, redisClient: redisClient}
if logger == nil {
handler.logger = zerolog.New(os.Stderr).With().Timestamp().Logger()
} else {
Expand All @@ -42,7 +42,7 @@ func (p *Publisher) Handle(event cloudevents.Event) cloudevents.Result {
if !slices.Contains(p.SupportEventTypes(), event.Type()) {
return cloudevents.ResultNACK
}
p.redisClient.Set(context.Background(), event.ID(), PublishStateProcessing, redis.KeepTTL)
p.redisClient.SetXX(context.Background(), event.ID(), PublishStateProcessing, redis.KeepTTL)

data := new(PublishRequest)
if err := event.DataAs(&data); err != nil {
Expand All @@ -52,9 +52,9 @@ func (p *Publisher) Handle(event cloudevents.Event) cloudevents.Result {
result := p.handleImpl(data)
switch {
case cloudevents.IsACK(result):
p.redisClient.Set(context.Background(), event.ID(), PublishStateSuccess, redis.KeepTTL)
p.redisClient.SetXX(context.Background(), event.ID(), PublishStateSuccess, redis.KeepTTL)
default:
p.redisClient.Set(context.Background(), event.ID(), PublishStateFailed, redis.KeepTTL)
p.redisClient.SetXX(context.Background(), event.ID(), PublishStateFailed, redis.KeepTTL)
p.notifyLark(&data.Publish, result)
}

Expand Down
6 changes: 3 additions & 3 deletions publisher/pkg/impl/tiup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
type tiupsrvc struct {
logger *zerolog.Logger
kafkaWriter *kafka.Writer
redisClient *redis.Client
redisClient redis.Cmdable
eventSource string
stateTTL time.Duration
}

// NewTiup returns the tiup service implementation.
func NewTiup(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClient *redis.Client, eventSrc string) gentiup.Service {
func NewTiup(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClient redis.Cmdable, eventSrc string) gentiup.Service {
return &tiupsrvc{
logger: logger,
kafkaWriter: kafkaWriter,
Expand Down Expand Up @@ -74,7 +74,7 @@ func (s *tiupsrvc) RequestToPublish(ctx context.Context, p *gentiup.RequestToPub

// 5. Init the request dealing status in redis with the request id.
for _, requestID := range requestIDs {
if err := s.redisClient.SetXX(ctx, requestID, PublishStateQueued, s.stateTTL).Err(); err != nil {
if err := s.redisClient.SetNX(ctx, requestID, PublishStateQueued, s.stateTTL).Err(); err != nil {
return nil, fmt.Errorf("failed to set initial status in Redis: %v", err)
}
}
Expand Down

0 comments on commit a7470b6

Please sign in to comment.