From 2e1e85dca11b5a3af86e9bf6ef21ce3a0245c3c9 Mon Sep 17 00:00:00 2001 From: wuhuizuo Date: Mon, 28 Oct 2024 20:41:00 +0800 Subject: [PATCH] fix(publisher): control nightly interval from configuration file (#186) Signed-off-by: wuhuizuo --------- Signed-off-by: wuhuizuo --- publisher/cmd/publisher/init.go | 25 +++++++ publisher/cmd/publisher/main.go | 67 ++++++++----------- publisher/cmd/worker/main.go | 11 ++- publisher/design/design.go | 4 +- publisher/example/config/worker-example.yaml | 6 +- publisher/gen/http/openapi.json | 2 +- publisher/gen/http/openapi.yaml | 1 + publisher/gen/http/openapi3.json | 2 +- publisher/gen/http/openapi3.yaml | 5 +- .../gen/http/tiup/client/encode_decode.go | 4 +- publisher/pkg/config/config.go | 9 ++- publisher/pkg/impl/tiup/funcs.go | 2 +- publisher/pkg/impl/tiup/publisher.go | 43 ++++++------ publisher/pkg/impl/tiup/service.go | 19 +++--- publisher/pkg/impl/tiup/types.go | 2 +- 15 files changed, 118 insertions(+), 84 deletions(-) create mode 100644 publisher/cmd/publisher/init.go diff --git a/publisher/cmd/publisher/init.go b/publisher/cmd/publisher/init.go new file mode 100644 index 00000000..c28686be --- /dev/null +++ b/publisher/cmd/publisher/init.go @@ -0,0 +1,25 @@ +package main + +import ( + "fmt" + "os" + + "gopkg.in/yaml.v3" + + "github.com/PingCAP-QE/ee-apps/publisher/pkg/config" +) + +// Load and parse configuration +func loadConfig(configFile string) (config.Service, error) { + var config config.Service + { + configData, err := os.ReadFile(configFile) + if err != nil { + return config, fmt.Errorf("error reading config file: %v", err) + } + if err := yaml.Unmarshal(configData, &config); err != nil { + return config, fmt.Errorf("error parsing config file: %v", err) + } + } + return config, nil +} diff --git a/publisher/cmd/publisher/main.go b/publisher/cmd/publisher/main.go index c4aab12e..53152dcb 100644 --- a/publisher/cmd/publisher/main.go +++ b/publisher/cmd/publisher/main.go @@ -16,10 +16,8 @@ import ( "github.com/segmentio/kafka-go" "goa.design/clue/debug" "goa.design/clue/log" - "gopkg.in/yaml.v3" gentiup "github.com/PingCAP-QE/ee-apps/publisher/gen/tiup" - "github.com/PingCAP-QE/ee-apps/publisher/pkg/config" "github.com/PingCAP-QE/ee-apps/publisher/pkg/impl/tiup" ) @@ -54,11 +52,36 @@ func main() { logLevel = zerolog.DebugLevel } zerolog.SetGlobalLevel(logLevel) + logger := zerolog.New(os.Stderr).With().Timestamp().Str("service", gentiup.ServiceName).Logger() - // Initialize the services. - tiupSvc, err := initTiupService(*configFile) + // Load and parse configuration + config, err := loadConfig(*configFile) if err != nil { - log.Fatalf(ctx, err, "failed to initialize service") + log.Fatalf(ctx, err, "failed to load configuration") + } + + // Initialize the services. + var ( + tiupSvc gentiup.Service + ) + { + // Configure Kafka kafkaWriter + kafkaWriter := kafka.NewWriter(kafka.WriterConfig{ + Brokers: config.Kafka.Brokers, + Topic: config.Kafka.Topic, + Balancer: &kafka.LeastBytes{}, + Logger: kafka.LoggerFunc(logger.Printf), + }) + + // Configure Redis client + redisClient := redis.NewClient(&redis.Options{ + Addr: config.Redis.Addr, + Password: config.Redis.Password, + Username: config.Redis.Username, + DB: config.Redis.DB, + }) + + tiupSvc = tiup.NewService(&logger, kafkaWriter, redisClient, config.EventSource) } // Wrap the services in endpoints that can be invoked from other services @@ -127,37 +150,3 @@ func main() { wg.Wait() log.Printf(ctx, "exited") } - -func initTiupService(configFile string) (gentiup.Service, error) { - // Load and parse configuration - var config config.Service - { - configData, err := os.ReadFile(configFile) - if err != nil { - return nil, fmt.Errorf("error reading config file: %v", err) - } - if err := yaml.Unmarshal(configData, &config); err != nil { - return nil, fmt.Errorf("error parsing config file: %v", err) - } - } - - logger := zerolog.New(os.Stderr).With().Timestamp().Str("service", gentiup.ServiceName).Logger() - - // Configure Kafka kafkaWriter - kafkaWriter := kafka.NewWriter(kafka.WriterConfig{ - Brokers: config.Kafka.Brokers, - Topic: config.Kafka.Topic, - Balancer: &kafka.LeastBytes{}, - Logger: kafka.LoggerFunc(logger.Printf), - }) - - // Configure Redis client - redisClient := redis.NewClient(&redis.Options{ - Addr: config.Redis.Addr, - Password: config.Redis.Password, - Username: config.Redis.Username, - DB: config.Redis.DB, - }) - - return tiup.NewTiup(&logger, kafkaWriter, redisClient, config.EventSource), nil -} diff --git a/publisher/cmd/worker/main.go b/publisher/cmd/worker/main.go index 7a897a18..fa923fcd 100644 --- a/publisher/cmd/worker/main.go +++ b/publisher/cmd/worker/main.go @@ -9,6 +9,7 @@ import ( "os/signal" "sync" "syscall" + "time" "github.com/cloudevents/sdk-go/v2/event" "github.com/go-redis/redis/v8" @@ -62,7 +63,15 @@ func main() { var handler *tiup.Publisher { var err error - handler, err = tiup.NewPublisher(config.MirrorUrl, config.LarkWebhookURL, &log.Logger, redisClient) + nigthlyInterval, err := time.ParseDuration(config.Options.NightlyInterval) + if err != nil { + log.Fatal().Err(err).Msg("Error parsing nightly interval") + } + handler, err = tiup.NewPublisher(&log.Logger, redisClient, tiup.PublisherOptions{ + MirrorURL: config.Options.MirrorURL, + LarkWebhookURL: config.Options.LarkWebhookURL, + NightlyInterval: nigthlyInterval, + }) if err != nil { log.Fatal().Err(err).Msg("Error creating handler") } diff --git a/publisher/design/design.go b/publisher/design/design.go index cd7797ae..e8444b3b 100644 --- a/publisher/design/design.go +++ b/publisher/design/design.go @@ -4,7 +4,7 @@ package design import ( _ "goa.design/plugins/v3/zerologger" - . "goa.design/goa/v3/dsl" + . "goa.design/goa/v3/dsl" //nolint ) var _ = API("publisher", func() { @@ -57,7 +57,7 @@ var _ = Service("tiup", func() { Required("request_id") }) Result(String, "request state", func() { - Enum("queued", "processing", "success", "failed") + Enum("queued", "processing", "success", "failed", "canceled") }) HTTP(func() { GET("/publish-request/{request_id}") diff --git a/publisher/example/config/worker-example.yaml b/publisher/example/config/worker-example.yaml index 5be4ad13..bb74e36d 100644 --- a/publisher/example/config/worker-example.yaml +++ b/publisher/example/config/worker-example.yaml @@ -3,5 +3,7 @@ kafka: - example-bootstrap.kafka:9092 topic: example-topic consumer_group: example-group -mirror_url: http://tiup.mirror.site -lark_webhook_url: https://feishu.custom-bot-webhook # create and copy the url then paste here. \ No newline at end of file +options: + mirror_url: http://tiup.mirror.site + lark_webhook_url: https://feishu.custom-bot-webhook # create and copy the url then paste here. + nightly_interval: 1h \ No newline at end of file diff --git a/publisher/gen/http/openapi.json b/publisher/gen/http/openapi.json index cd062e8b..8be87663 100644 --- a/publisher/gen/http/openapi.json +++ b/publisher/gen/http/openapi.json @@ -1 +1 @@ -{"swagger":"2.0","info":{"title":"Publish API","description":"Publish API","contact":{"name":"WuHui Zuo","email":"wuhui.zuo@pingcap.com","url":"https://github.com/wuhuizuo"},"version":"1.0.0"},"host":"0.0.0.0:80","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/tiup/publish-request":{"post":{"tags":["tiup"],"summary":"request-to-publish tiup","operationId":"tiup#request-to-publish","parameters":[{"name":"Request-To-PublishRequestBody","in":"body","required":true,"schema":{"$ref":"#/definitions/TiupRequestToPublishRequestBody","required":["artifact_url","tiup-mirror"]}}],"responses":{"200":{"description":"OK response.","schema":{"type":"array","items":{"type":"string","example":"Cupiditate suscipit hic quidem voluptates nostrum necessitatibus."}}}},"schemes":["http"]}},"/tiup/publish-request/{request_id}":{"get":{"tags":["tiup"],"summary":"query-publishing-status tiup","operationId":"tiup#query-publishing-status","parameters":[{"name":"request_id","in":"path","description":"request track id","required":true,"type":"string"}],"responses":{"200":{"description":"OK response.","schema":{"type":"string","enum":["queued","processing","success","failed"]}}},"schemes":["http"]}}},"definitions":{"TiupRequestToPublishRequestBody":{"title":"TiupRequestToPublishRequestBody","type":"object","properties":{"artifact_url":{"type":"string","description":"The full url of the pushed image, contain the tag part. It will parse the repo from it.","example":"Optio necessitatibus ipsa incidunt."},"request_id":{"type":"string","description":"The request id","example":"Et ullam."},"tiup-mirror":{"type":"string","description":"Staging is http://tiup.pingcap.net:8988, product is http://tiup.pingcap.net:8987.","default":"http://tiup.pingcap.net:8988","example":"Illo harum quis voluptatem."},"version":{"type":"string","description":"Force set the version. Default is the artifact version read from `org.opencontainers.image.version` of the manifest config.","example":"Expedita et necessitatibus ut molestias."}},"example":{"artifact_url":"Et suscipit et.","request_id":"Corrupti ut natus aut ipsam reprehenderit.","tiup-mirror":"Assumenda tempora autem accusantium est.","version":"Ab expedita repellendus."},"required":["artifact_url","tiup-mirror"]}}} \ No newline at end of file +{"swagger":"2.0","info":{"title":"Publish API","description":"Publish API","contact":{"name":"WuHui Zuo","email":"wuhui.zuo@pingcap.com","url":"https://github.com/wuhuizuo"},"version":"1.0.0"},"host":"0.0.0.0:80","consumes":["application/json","application/xml","application/gob"],"produces":["application/json","application/xml","application/gob"],"paths":{"/tiup/publish-request":{"post":{"tags":["tiup"],"summary":"request-to-publish tiup","operationId":"tiup#request-to-publish","parameters":[{"name":"Request-To-PublishRequestBody","in":"body","required":true,"schema":{"$ref":"#/definitions/TiupRequestToPublishRequestBody","required":["artifact_url","tiup-mirror"]}}],"responses":{"200":{"description":"OK response.","schema":{"type":"array","items":{"type":"string","example":"Cupiditate suscipit hic quidem voluptates nostrum necessitatibus."}}}},"schemes":["http"]}},"/tiup/publish-request/{request_id}":{"get":{"tags":["tiup"],"summary":"query-publishing-status tiup","operationId":"tiup#query-publishing-status","parameters":[{"name":"request_id","in":"path","description":"request track id","required":true,"type":"string"}],"responses":{"200":{"description":"OK response.","schema":{"type":"string","enum":["queued","processing","success","failed","canceled"]}}},"schemes":["http"]}}},"definitions":{"TiupRequestToPublishRequestBody":{"title":"TiupRequestToPublishRequestBody","type":"object","properties":{"artifact_url":{"type":"string","description":"The full url of the pushed image, contain the tag part. It will parse the repo from it.","example":"Optio necessitatibus ipsa incidunt."},"request_id":{"type":"string","description":"The request id","example":"Et ullam."},"tiup-mirror":{"type":"string","description":"Staging is http://tiup.pingcap.net:8988, product is http://tiup.pingcap.net:8987.","default":"http://tiup.pingcap.net:8988","example":"Illo harum quis voluptatem."},"version":{"type":"string","description":"Force set the version. Default is the artifact version read from `org.opencontainers.image.version` of the manifest config.","example":"Expedita et necessitatibus ut molestias."}},"example":{"artifact_url":"Et suscipit et.","request_id":"Corrupti ut natus aut ipsam reprehenderit.","tiup-mirror":"Assumenda tempora autem accusantium est.","version":"Ab expedita repellendus."},"required":["artifact_url","tiup-mirror"]}}} \ No newline at end of file diff --git a/publisher/gen/http/openapi.yaml b/publisher/gen/http/openapi.yaml index 09aeffa9..f95cf57f 100644 --- a/publisher/gen/http/openapi.yaml +++ b/publisher/gen/http/openapi.yaml @@ -64,6 +64,7 @@ paths: - processing - success - failed + - canceled schemes: - http definitions: diff --git a/publisher/gen/http/openapi3.json b/publisher/gen/http/openapi3.json index b73223a2..a22b8d6d 100644 --- a/publisher/gen/http/openapi3.json +++ b/publisher/gen/http/openapi3.json @@ -1 +1 @@ -{"openapi":"3.0.3","info":{"title":"Publish API","description":"Publish API","contact":{"name":"WuHui Zuo","url":"https://github.com/wuhuizuo","email":"wuhui.zuo@pingcap.com"},"version":"1.0.0"},"servers":[{"url":"http://0.0.0.0:80"}],"paths":{"/tiup/publish-request":{"post":{"tags":["tiup"],"summary":"request-to-publish tiup","operationId":"tiup#request-to-publish","requestBody":{"required":true,"content":{"application/json":{"schema":{"$ref":"#/components/schemas/RequestToPublishRequestBody"},"example":{"artifact_url":"Omnis expedita.","request_id":"Est sequi placeat.","tiup-mirror":"Pariatur rerum consectetur deleniti architecto sunt.","version":"Dicta id perferendis rem a."}}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"type":"array","items":{"type":"string","example":"Eum saepe nihil omnis dolorem eveniet."},"description":"request track ids","example":["Rerum voluptas dolore.","Eveniet vero voluptas.","Voluptates voluptatem accusamus nisi omnis quia molestias."]},"example":["Pariatur consequuntur itaque est.","Exercitationem atque amet optio."]}}}}}},"/tiup/publish-request/{request_id}":{"get":{"tags":["tiup"],"summary":"query-publishing-status tiup","operationId":"tiup#query-publishing-status","parameters":[{"name":"request_id","in":"path","description":"request track id","required":true,"schema":{"type":"string","description":"request track id","example":"Aliquam ut laborum nulla."},"example":"Error qui dolores et."}],"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"type":"string","description":"request state","example":"queued","enum":["queued","processing","success","failed"]},"example":"failed"}}}}}}},"components":{"schemas":{"RequestToPublishRequestBody":{"type":"object","properties":{"artifact_url":{"type":"string","description":"The full url of the pushed image, contain the tag part. It will parse the repo from it.","example":"Et eum odit eos ratione."},"request_id":{"type":"string","description":"The request id","example":"Voluptas ratione hic libero nisi."},"tiup-mirror":{"type":"string","description":"Staging is http://tiup.pingcap.net:8988, product is http://tiup.pingcap.net:8987.","default":"http://tiup.pingcap.net:8988","example":"Rerum tempore voluptas."},"version":{"type":"string","description":"Force set the version. Default is the artifact version read from `org.opencontainers.image.version` of the manifest config.","example":"Laboriosam optio omnis cupiditate magnam nisi."}},"example":{"artifact_url":"Rerum voluptate accusantium optio.","request_id":"Atque vero in molestiae odit consequatur.","tiup-mirror":"Quibusdam nisi quam.","version":"Magni quia adipisci excepturi."},"required":["artifact_url","tiup-mirror"]}}},"tags":[{"name":"tiup","description":"TiUP Publisher service"}]} \ No newline at end of file +{"openapi":"3.0.3","info":{"title":"Publish API","description":"Publish API","contact":{"name":"WuHui Zuo","url":"https://github.com/wuhuizuo","email":"wuhui.zuo@pingcap.com"},"version":"1.0.0"},"servers":[{"url":"http://0.0.0.0:80"}],"paths":{"/tiup/publish-request":{"post":{"tags":["tiup"],"summary":"request-to-publish tiup","operationId":"tiup#request-to-publish","requestBody":{"required":true,"content":{"application/json":{"schema":{"$ref":"#/components/schemas/RequestToPublishRequestBody"},"example":{"artifact_url":"Omnis expedita.","request_id":"Est sequi placeat.","tiup-mirror":"Pariatur rerum consectetur deleniti architecto sunt.","version":"Dicta id perferendis rem a."}}}},"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"type":"array","items":{"type":"string","example":"Eum saepe nihil omnis dolorem eveniet."},"description":"request track ids","example":["Rerum voluptas dolore.","Eveniet vero voluptas.","Voluptates voluptatem accusamus nisi omnis quia molestias."]},"example":["Pariatur consequuntur itaque est.","Exercitationem atque amet optio."]}}}}}},"/tiup/publish-request/{request_id}":{"get":{"tags":["tiup"],"summary":"query-publishing-status tiup","operationId":"tiup#query-publishing-status","parameters":[{"name":"request_id","in":"path","description":"request track id","required":true,"schema":{"type":"string","description":"request track id","example":"Aliquam ut laborum nulla."},"example":"Error qui dolores et."}],"responses":{"200":{"description":"OK response.","content":{"application/json":{"schema":{"type":"string","description":"request state","example":"processing","enum":["queued","processing","success","failed","canceled"]},"example":"queued"}}}}}}},"components":{"schemas":{"RequestToPublishRequestBody":{"type":"object","properties":{"artifact_url":{"type":"string","description":"The full url of the pushed image, contain the tag part. It will parse the repo from it.","example":"Et eum odit eos ratione."},"request_id":{"type":"string","description":"The request id","example":"Voluptas ratione hic libero nisi."},"tiup-mirror":{"type":"string","description":"Staging is http://tiup.pingcap.net:8988, product is http://tiup.pingcap.net:8987.","default":"http://tiup.pingcap.net:8988","example":"Rerum tempore voluptas."},"version":{"type":"string","description":"Force set the version. Default is the artifact version read from `org.opencontainers.image.version` of the manifest config.","example":"Laboriosam optio omnis cupiditate magnam nisi."}},"example":{"artifact_url":"Rerum voluptate accusantium optio.","request_id":"Atque vero in molestiae odit consequatur.","tiup-mirror":"Quibusdam nisi quam.","version":"Magni quia adipisci excepturi."},"required":["artifact_url","tiup-mirror"]}}},"tags":[{"name":"tiup","description":"TiUP Publisher service"}]} \ No newline at end of file diff --git a/publisher/gen/http/openapi3.yaml b/publisher/gen/http/openapi3.yaml index cff11258..50a28058 100644 --- a/publisher/gen/http/openapi3.yaml +++ b/publisher/gen/http/openapi3.yaml @@ -69,13 +69,14 @@ paths: schema: type: string description: request state - example: queued + example: processing enum: - queued - processing - success - failed - example: failed + - canceled + example: queued components: schemas: RequestToPublishRequestBody: diff --git a/publisher/gen/http/tiup/client/encode_decode.go b/publisher/gen/http/tiup/client/encode_decode.go index 1a84eed9..2afdbcd3 100644 --- a/publisher/gen/http/tiup/client/encode_decode.go +++ b/publisher/gen/http/tiup/client/encode_decode.go @@ -138,8 +138,8 @@ func DecodeQueryPublishingStatusResponse(decoder func(*http.Response) goahttp.De if err != nil { return nil, goahttp.ErrDecodingError("tiup", "query-publishing-status", err) } - if !(body == "queued" || body == "processing" || body == "success" || body == "failed") { - err = goa.MergeErrors(err, goa.InvalidEnumValueError("body", body, []any{"queued", "processing", "success", "failed"})) + if !(body == "queued" || body == "processing" || body == "success" || body == "failed" || body == "canceled") { + err = goa.MergeErrors(err, goa.InvalidEnumValueError("body", body, []any{"queued", "processing", "success", "failed", "canceled"})) } if err != nil { return nil, goahttp.ErrValidationError("tiup", "query-publishing-status", err) diff --git a/publisher/pkg/config/config.go b/publisher/pkg/config/config.go index b9c5b653..fd794e13 100644 --- a/publisher/pkg/config/config.go +++ b/publisher/pkg/config/config.go @@ -5,9 +5,12 @@ type Worker struct { KafkaBasic `yaml:",inline" json:",inline"` ConsumerGroup string `yaml:"consumer_group" json:"consumer_group,omitempty"` } `yaml:"kafka" json:"kafka,omitempty"` - Redis Redis `yaml:"redis" json:"redis,omitempty"` - MirrorUrl string `yaml:"mirror_url" json:"mirror_url,omitempty"` - LarkWebhookURL string `yaml:"lark_webhook_url" json:"lark_webhook_url,omitempty"` + Redis Redis `yaml:"redis" json:"redis,omitempty"` + Options struct { + MirrorURL string `yaml:"mirror_url" json:"mirror_url,omitempty"` + LarkWebhookURL string `yaml:"lark_webhook_url" json:"lark_webhook_url,omitempty"` + NightlyInterval string `yaml:"nightly_interval" json:"nightly_interval,omitempty"` + } } type Service struct { diff --git a/publisher/pkg/impl/tiup/funcs.go b/publisher/pkg/impl/tiup/funcs.go index cf7e08bb..aad1fb02 100644 --- a/publisher/pkg/impl/tiup/funcs.go +++ b/publisher/pkg/impl/tiup/funcs.go @@ -229,7 +229,7 @@ func AnalyzeFromOciArtifact(repo, tag string) ([]PublishRequest, error) { return publishRequests, nil } -func AnalyzeFromOciArtifactUrl(url string) ([]PublishRequest, error) { +func analyzeFromOciArtifactUrl(url string) ([]PublishRequest, error) { repo, tag, err := splitRepoAndTag(url) if err != nil { return nil, err diff --git a/publisher/pkg/impl/tiup/publisher.go b/publisher/pkg/impl/tiup/publisher.go index dd8e303d..852b5558 100644 --- a/publisher/pkg/impl/tiup/publisher.go +++ b/publisher/pkg/impl/tiup/publisher.go @@ -17,28 +17,30 @@ import ( ) type Publisher struct { - mirrorURL string - larkWebhookURL string - nightlyTTL time.Duration - logger zerolog.Logger - redisClient redis.Cmdable + logger zerolog.Logger + redisClient redis.Cmdable + options PublisherOptions } -func NewPublisher(mirrorURL, larkWebhookURL string, logger *zerolog.Logger, redisClient redis.Cmdable) (*Publisher, error) { - handler := Publisher{mirrorURL: mirrorURL, larkWebhookURL: larkWebhookURL, redisClient: redisClient} +type PublisherOptions struct { + MirrorURL string + LarkWebhookURL string + NightlyInterval time.Duration +} + +func NewPublisher(logger *zerolog.Logger, redisClient redis.Cmdable, options PublisherOptions) (*Publisher, error) { + handler := Publisher{options: options, redisClient: redisClient} if logger == nil { handler.logger = zerolog.New(os.Stderr).With().Timestamp().Logger() } else { handler.logger = *logger } - handler.nightlyTTL = DefaultNightlyInternal - return &handler, nil } func (p *Publisher) SupportEventTypes() []string { - return []string{EventTypeTiupPublishRequest} + return []string{EventTypePublishRequest} } // Handle for test case run events @@ -53,7 +55,7 @@ func (p *Publisher) Handle(event cloudevents.Event) cloudevents.Result { return cloudevents.NewReceipt(false, "invalid data: %v", err) } - result := p.rateLimit(data, p.nightlyTTL, p.handleImpl) + result := p.rateLimit(data, p.options.NightlyInterval, p.handleImpl) switch { case cloudevents.IsACK(result): p.redisClient.SetXX(context.Background(), event.ID(), PublishStateSuccess, redis.KeepTTL) @@ -74,7 +76,7 @@ func (p *Publisher) rateLimit(data *PublishRequest, ttl time.Duration, run func( } // Add rate limiting - rateLimitKey := fmt.Sprintf("ratelimit:tiup:%s:%s:%s:%s", p.mirrorURL, data.Publish.Name, data.Publish.OS, data.Publish.Arch) + rateLimitKey := fmt.Sprintf("ratelimit:tiup:%s:%s:%s:%s", p.options.MirrorURL, data.Publish.Name, data.Publish.OS, data.Publish.Arch) count, err := p.redisClient.Incr(context.Background(), rateLimitKey).Result() if err != nil { p.logger.Err(err).Msg("rate limit check failed") @@ -86,12 +88,15 @@ func (p *Publisher) rateLimit(data *PublishRequest, ttl time.Duration, run func( p.redisClient.Expire(context.Background(), rateLimitKey, ttl) return run(data) } + p.logger.Debug().Str("key", rateLimitKey).Msg("cache key") p.logger.Warn(). - Str("mirror", p.mirrorURL). + Str("mirror", p.options.MirrorURL). Str("pkg", data.Publish.Name). Str("os", data.Publish.OS). Str("arch", data.Publish.Arch). + Dur("ttl", ttl). + Int64("count", count). Msg("rate limit execeeded for package") return fmt.Errorf("skip: rate limit exceeded for package %s", data.Publish.Name) @@ -115,7 +120,7 @@ func (p *Publisher) handleImpl(data *PublishRequest) cloudevents.Result { // 3. check the package is in the mirror. // printf 'post_check "$(tiup mirror show)/%s-%s-%s-%s.tar.gz" "%s"\n' \ - remoteURL := fmt.Sprintf("%s/%s-%s-%s-%s.tar.gz", p.mirrorURL, data.Publish.Name, data.Publish.Version, data.Publish.OS, data.Publish.Arch) + remoteURL := fmt.Sprintf("%s/%s-%s-%s-%s.tar.gz", p.options.MirrorURL, data.Publish.Name, data.Publish.Version, data.Publish.OS, data.Publish.Arch) if err := postCheck(saveTo, remoteURL); err != nil { p.logger.Err(err).Str("remote", remoteURL).Msg("post check failed") return cloudevents.NewReceipt(false, "post check failed: %v", err) @@ -126,7 +131,7 @@ func (p *Publisher) handleImpl(data *PublishRequest) cloudevents.Result { } func (p *Publisher) notifyLark(publishInfo *PublishInfo, err error) { - if p.larkWebhookURL == "" { + if p.options.LarkWebhookURL == "" { return } @@ -135,7 +140,7 @@ func (p *Publisher) notifyLark(publishInfo *PublishInfo, err error) { publishInfo.Version, publishInfo.OS, publishInfo.Arch, - p.mirrorURL, + p.options.MirrorURL, err) payload := map[string]interface{}{ @@ -150,7 +155,7 @@ func (p *Publisher) notifyLark(publishInfo *PublishInfo, err error) { p.logger.Err(err).Msg("failed to marshal JSON payload") } - resp, err := http.Post(p.larkWebhookURL, "application/json", bytes.NewBuffer(jsonPayload)) + resp, err := http.Post(p.options.LarkWebhookURL, "application/json", bytes.NewBuffer(jsonPayload)) if err != nil { p.logger.Err(err).Msg("failed to send notification to Lark") } @@ -168,7 +173,7 @@ func (p *Publisher) publish(file string, info *PublishInfo) error { } command := exec.Command("tiup", args...) command.Env = os.Environ() - command.Env = append(command.Env, "TIUP_MIRRORS="+p.mirrorURL) + command.Env = append(command.Env, "TIUP_MIRRORS="+p.options.MirrorURL) p.logger.Debug().Any("args", command.Args).Any("env", command.Args).Msg("will execute tiup command") output, err := command.Output() if err != nil { @@ -176,7 +181,7 @@ func (p *Publisher) publish(file string, info *PublishInfo) error { return err } p.logger.Info(). - Str("mirror", p.mirrorURL). + Str("mirror", p.options.MirrorURL). Str("output", string(output)). Msg("tiup package publish success") diff --git a/publisher/pkg/impl/tiup/service.go b/publisher/pkg/impl/tiup/service.go index 02a1f093..139a11ff 100644 --- a/publisher/pkg/impl/tiup/service.go +++ b/publisher/pkg/impl/tiup/service.go @@ -24,8 +24,8 @@ type tiupsrvc struct { stateTTL time.Duration } -// NewTiup returns the tiup service implementation. -func NewTiup(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClient redis.Cmdable, eventSrc string) gentiup.Service { +// NewService returns the tiup service implementation. +func NewService(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClient redis.Cmdable, eventSrc string) gentiup.Service { return &tiupsrvc{ logger: logger, kafkaWriter: kafkaWriter, @@ -39,7 +39,7 @@ func NewTiup(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClient redi func (s *tiupsrvc) RequestToPublish(ctx context.Context, p *gentiup.RequestToPublishPayload) (res []string, err error) { s.logger.Info().Msgf("tiup.request-to-publish") // 1. Analyze the artifact_url to get the repo and tag and the tiup package information. - publishRequests, err := AnalyzeFromOciArtifactUrl(p.ArtifactURL) + publishRequests, err := analyzeFromOciArtifactUrl(p.ArtifactURL) if err != nil { return nil, err } @@ -49,10 +49,10 @@ func (s *tiupsrvc) RequestToPublish(ctx context.Context, p *gentiup.RequestToPub } } - // 3. Compose cloud events with the analyzed results. + // 2. Compose cloud events with the analyzed results. events := s.composeEvents(publishRequests) - // 4. Send it to kafka topic with the request id as key and the event as value. + // 3. Send it to kafka topic with the request id as key and the event as value. var messages []kafka.Message for _, event := range events { bs, _ := event.MarshalJSON() @@ -72,21 +72,20 @@ func (s *tiupsrvc) RequestToPublish(ctx context.Context, p *gentiup.RequestToPub requestIDs = append(requestIDs, event.ID()) } - // 5. Init the request dealing status in redis with the request id. + // 4. Init the request dealing status in redis with the request id. for _, requestID := range requestIDs { if err := s.redisClient.SetNX(ctx, requestID, PublishStateQueued, s.stateTTL).Err(); err != nil { return nil, fmt.Errorf("failed to set initial status in Redis: %v", err) } } - // 6. Return the request id. + // 5. Return the request id. return requestIDs, nil } // QueryPublishingStatus implements query-publishing-status. func (s *tiupsrvc) QueryPublishingStatus(ctx context.Context, p *gentiup.QueryPublishingStatusPayload) (res string, err error) { s.logger.Info().Msgf("tiup.query-publishing-status") - // 1. Get the request dealing status from redis with the request id. status, err := s.redisClient.Get(ctx, p.RequestID).Result() if err != nil { @@ -105,9 +104,9 @@ func (s *tiupsrvc) composeEvents(requests []PublishRequest) []cloudevents.Event for _, request := range requests { event := cloudevents.NewEvent() event.SetID(uuid.New().String()) - event.SetType(EventTypeTiupPublishRequest) + event.SetType(EventTypePublishRequest) event.SetSource(s.eventSource) - event.SetSubject(EventTypeTiupPublishRequest) + event.SetSubject(EventTypePublishRequest) event.SetData(cloudevents.ApplicationJSON, request) ret = append(ret, event) } diff --git a/publisher/pkg/impl/tiup/types.go b/publisher/pkg/impl/tiup/types.go index 1b877066..ebfca3e8 100644 --- a/publisher/pkg/impl/tiup/types.go +++ b/publisher/pkg/impl/tiup/types.go @@ -3,7 +3,7 @@ package tiup import "time" const ( - EventTypeTiupPublishRequest = "net.pingcap.tibuild.tiup-publish-request" + EventTypePublishRequest = "net.pingcap.tibuild.tiup-publish-request" FromTypeOci = "oci" FromTypeHTTP = "http"