Skip to content

Commit

Permalink
refactor(publisher): simplify the the publishing logic to fileserver (#…
Browse files Browse the repository at this point in the history
…212)

BREAKING CHANGE:
the target file key is changed on the fileserver, now some of the file
keys are:

- tidb:
- download/builds/pingcap/tidb/master/abc123def/linux_amd64/tidb.tar.gz
  - download/builds/pingcap/tidb/master/abc123def/linux_amd64/br.tar.gz
-
download/builds/pingcap/tidb/master/abc123def/linux_amd64/dumpling.tar.gz
-
download/builds/pingcap/tidb/master/abc123def/linux_amd64/tidb-lightning.tar.gz
-
download/builds/pingcap/tidb/master/abc123def/linux_amd64/tidb-lightning-ctl.tar.gz
- tikv:
-
download/builds/tikv/tikv/master/abc123def/linux_amd64/linux_amd64/tikv.tar.gz",
-
download/builds/tikv/tikv/master/abc123def/linux_amd64/linux_amd64/tikv-ctl.tar.gz",
......

target ref keys are:

- tidb: download/refs/pingcap/tidb/master/sha1
- tikv: download/refs/tikv/tikv/master/sha1

Signed-off-by: wuhuizuo <[email protected]>

Signed-off-by: wuhuizuo <[email protected]>
  • Loading branch information
wuhuizuo authored Dec 17, 2024
1 parent 020157a commit 492a0b1
Show file tree
Hide file tree
Showing 11 changed files with 318 additions and 517 deletions.
22 changes: 10 additions & 12 deletions publisher/pkg/impl/fileserver_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func NewFileserver(logger *zerolog.Logger, kafkaWriter *kafka.Writer, redisClien
func (s *fileserversrvc) RequestToPublish(ctx context.Context, p *fileserver.RequestToPublishPayload) (res []string, err error) {
s.logger.Info().Msgf("fileserver.request-to-publish")
// 1. Analyze the artifact_url to get the repo and tag and the tiup package information.
publishRequests, err := analyzeFsFromOciArtifactUrl(p.ArtifactURL)
publishRequest, err := analyzeFsFromOciArtifactUrl(p.ArtifactURL)
if err != nil {
return nil, err
}

// 2. Compose cloud events with the analyzed results.
events := s.composeEvents(publishRequests)
events := s.composeEvents(publishRequest)

// 3. Send it to kafka topic with the request id as key and the event as value.
var messages []kafka.Message
Expand Down Expand Up @@ -93,17 +93,15 @@ func (s *fileserversrvc) QueryPublishingStatus(ctx context.Context, p *fileserve
return status, nil
}

func (s *fileserversrvc) composeEvents(requests []PublishRequest) []cloudevents.Event {
func (s *fileserversrvc) composeEvents(request *PublishRequestFS) []cloudevents.Event {
var ret []cloudevents.Event
for _, request := range requests {
event := cloudevents.NewEvent()
event.SetID(uuid.New().String())
event.SetType(EventTypeFsPublishRequest)
event.SetSource(s.eventSource)
event.SetSubject(request.Publish.Name)
event.SetData(cloudevents.ApplicationJSON, request)
ret = append(ret, event)
}
event := cloudevents.NewEvent()
event.SetID(uuid.New().String())
event.SetType(EventTypeFsPublishRequest)
event.SetSource(s.eventSource)
event.SetSubject(request.Publish.Repo)
event.SetData(cloudevents.ApplicationJSON, request)
ret = append(ret, event)

return ret
}
96 changes: 44 additions & 52 deletions publisher/pkg/impl/fileserver_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (p *fsWorker) Handle(event cloudevents.Event) cloudevents.Result {
}
p.redisClient.SetXX(context.Background(), event.ID(), PublishStateProcessing, redis.KeepTTL)

data := new(PublishRequest)
data := new(PublishRequestFS)
if err := event.DataAs(&data); err != nil {
return cloudevents.NewReceipt(false, "invalid data: %v", err)
}
Expand All @@ -91,32 +91,57 @@ func (p *fsWorker) Handle(event cloudevents.Event) cloudevents.Result {
return result
}

func (p *fsWorker) handle(data *PublishRequest) cloudevents.Result {
// 1. get the the file content from data.From.
err := doWithOCIFile(data.From.Oci, func(input io.Reader) error {
return doWithTempFileFromReader(input, func(inputF *os.File) error {
// 2. publish the tarball.
return p.publish(inputF, &data.Publish)
func (p *fsWorker) handle(data *PublishRequestFS) cloudevents.Result {
// 1. upload all the tarballs
for fromFile, targetKey := range targetFsFullPaths(&data.Publish) {
from := *data.From.Oci
from.File = fromFile
err := doWithOCIFile(&from, func(input io.Reader) error {
return doWithTempFileFromReader(input, func(inputF *os.File) error {
// 2. publish the tarball.
return p.publish(inputF, targetKey)
})
})
if err != nil {
p.logger.
Err(err).
Str("bucket", p.options.S3.BucketName).
Str("key", targetKey).
Msg("failed to upload file to KS3 bucket.")
return err
}
}

// 2. update git ref sha: download/refs/<repo>/<branch>/sha1
refKV := targetFsRefKeyValue(&data.Publish)
_, err := p.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(p.options.S3.BucketName),
Key: aws.String(refKV[0]),
Body: bytes.NewReader([]byte(refKV[1])),
})
if err != nil {
p.logger.Err(err).Msg("publish to fileserver failed")
p.logger.
Err(err).
Str("bucket", p.options.S3.BucketName).
Str("key", refKV[0]).
Msg("failed to update content in KS3 bucket.")
return cloudevents.NewReceipt(false, "publish to fileserver failed: %v", err)
}
p.logger.Info().Msg("publish to fileserver success")

p.logger.Debug().Str("bucket", p.options.S3.BucketName).
Str("key", refKV[0]).Msg("publish success")
return cloudevents.ResultACK
}

func (p *fsWorker) notifyLark(publishInfo *PublishInfo, err error) {
func (p *fsWorker) notifyLark(publishInfo *PublishInfoFS, err error) {
if p.options.LarkWebhookURL == "" {
return
}

message := fmt.Sprintf("Failed to publish %s/%s/%s file to fileserver: %v",
publishInfo.Name,
publishInfo.Version,
publishInfo.EntryPoint,
publishInfo.Repo,
publishInfo.CommitSHA,
"*",
err)

payload := map[string]interface{}{
Expand All @@ -142,50 +167,17 @@ func (p *fsWorker) notifyLark(publishInfo *PublishInfo, err error) {
}
}

func (p *fsWorker) publish(content io.ReadSeeker, info *PublishInfo) error {
keys := targetFsFullPaths(info)
if len(keys) == 0 {
return nil
}

func (p *fsWorker) publish(content io.ReadSeeker, targetKey string) error {
bucketName := p.options.S3.BucketName

// upload the artifact files to KS3 bucket.
for _, key := range keys {
if _, err := content.Seek(0, io.SeekStart); err != nil {
return err
}

_, err := p.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: content,
})
if err != nil {
p.logger.
Err(err).
Str("bucket", bucketName).
Str("key", key).
Msg("failed to upload file to KS3 bucket.")
return err
}
if _, err := content.Seek(0, io.SeekStart); err != nil {
return err
}

// update git ref sha: download/refs/pingcap/<comp>/<branch>/sha1
refKV := targetFsRefKeyValue(info)
_, err := p.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(refKV[0]),
Body: bytes.NewReader([]byte(refKV[1])),
Key: aws.String(targetKey),
Body: content,
})
if err != nil {
p.logger.
Err(err).
Str("bucket", bucketName).
Str("key", refKV[0]).
Msg("failed to upload content in KS3 bucket.")
return err
}

return nil
return err
}
4 changes: 2 additions & 2 deletions publisher/pkg/impl/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/PingCAP-QE/ee-apps/dl/pkg/oci"
)

func downloadFile(data *PublishRequest) (string, error) {
func downloadFile(data *PublishRequestTiUP) (string, error) {
switch data.From.Type {
case FromTypeOci:
// save to file with `saveTo` param:
Expand Down Expand Up @@ -53,7 +53,7 @@ func downloadFileFromReader(input io.Reader) (ret string, err error) {
ret = input.Name()
return nil
})

return
}

Expand Down
Loading

0 comments on commit 492a0b1

Please sign in to comment.