Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for storing the "local" pool on Azure #1074

Merged
merged 13 commits into from
Jun 17, 2024
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
- name: Make
env:
RUN_LONG_TESTS: 'yes'
AZURE_STORAGE_ENDPOINT: "127.0.0.1:10000"
AZURE_STORAGE_ENDPOINT: "http://127.0.0.1:10000/devstoreaccount1"
AZURE_STORAGE_ACCOUNT: "devstoreaccount1"
AZURE_STORAGE_ACCESS_KEY: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
run: |
Expand Down
24 changes: 23 additions & 1 deletion api/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"fmt"
"net/http"
"os"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -482,7 +483,16 @@
var e error

// provision download location
task.TempDownPath, e = context.PackagePool().(aptly.LocalPackagePool).GenerateTempPath(task.File.Filename)
if pp, ok := context.PackagePool().(aptly.LocalPackagePool); ok {
task.TempDownPath, e = pp.GenerateTempPath(task.File.Filename)
} else {
var file *os.File
file, e = os.CreateTemp("", task.File.Filename)
if e == nil {
task.TempDownPath = file.Name()
file.Close()
}

Check warning on line 494 in api/mirror.go

View check run for this annotation

Codecov / codecov/patch

api/mirror.go#L489-L494

Added lines #L489 - L494 were not covered by tests
}
if e != nil {
pushError(e)
continue
Expand Down Expand Up @@ -530,6 +540,18 @@
log.Info().Msgf("%s: Background processes finished", b.Name)
close(taskFinished)

defer func() {
for _, task := range queue {
if task.TempDownPath == "" {
continue

Check warning on line 546 in api/mirror.go

View check run for this annotation

Codecov / codecov/patch

api/mirror.go#L546

Added line #L546 was not covered by tests
}

if err := os.Remove(task.TempDownPath); err != nil && !os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Failed to delete %s: %v\n", task.TempDownPath, err)
}

Check warning on line 551 in api/mirror.go

View check run for this annotation

Codecov / codecov/patch

api/mirror.go#L550-L551

Added lines #L550 - L551 were not covered by tests
}
}()

select {
case <-context.Done():
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: interrupted")
Expand Down
6 changes: 4 additions & 2 deletions aptly/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type PackagePool interface {
Import(srcPath, basename string, checksums *utils.ChecksumInfo, move bool, storage ChecksumStorage) (path string, err error)
// LegacyPath returns legacy (pre 1.1) path to package file (relative to root)
LegacyPath(filename string, checksums *utils.ChecksumInfo) (string, error)
// Stat returns Unix stat(2) info
Stat(path string) (os.FileInfo, error)
// Size returns the size of the given file in bytes.
Size(path string) (size int64, err error)
// Open returns ReadSeekerCloser to access the file
Open(path string) (ReadSeekerCloser, error)
// FilepathList returns file paths of all the files in the pool
Expand All @@ -47,6 +47,8 @@ type PackagePool interface {

// LocalPackagePool is implemented by PackagePools residing on the same filesystem
type LocalPackagePool interface {
// Stat returns Unix stat(2) info
Stat(path string) (os.FileInfo, error)
// GenerateTempPath generates temporary path for download (which is fast to import into package pool later on)
GenerateTempPath(filename string) (string, error)
// Link generates hardlink to destination path
Expand Down
129 changes: 128 additions & 1 deletion azure/azure.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,129 @@
// Package azure handles publishing to Azure Storage
package azure

// Package azure handles publishing to Azure Storage

import (
"context"
"encoding/hex"
"fmt"
"io"
"net/url"
"path/filepath"
"time"

"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/aptly-dev/aptly/aptly"
)

func isBlobNotFound(err error) bool {
storageError, ok := err.(azblob.StorageError)
return ok && storageError.ServiceCode() == azblob.ServiceCodeBlobNotFound
}

type azContext struct {
container azblob.ContainerURL
prefix string
}

func newAzContext(accountName, accountKey, container, prefix, endpoint string) (*azContext, error) {
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return nil, err
}

Check warning on line 32 in azure/azure.go

View check run for this annotation

Codecov / codecov/patch

azure/azure.go#L31-L32

Added lines #L31 - L32 were not covered by tests

if endpoint == "" {
endpoint = fmt.Sprintf("https://%s.blob.core.windows.net", accountName)
}

Check warning on line 36 in azure/azure.go

View check run for this annotation

Codecov / codecov/patch

azure/azure.go#L35-L36

Added lines #L35 - L36 were not covered by tests

url, err := url.Parse(fmt.Sprintf("%s/%s", endpoint, container))
if err != nil {
return nil, err
}

Check warning on line 41 in azure/azure.go

View check run for this annotation

Codecov / codecov/patch

azure/azure.go#L40-L41

Added lines #L40 - L41 were not covered by tests

containerURL := azblob.NewContainerURL(*url, azblob.NewPipeline(credential, azblob.PipelineOptions{}))

result := &azContext{
container: containerURL,
prefix: prefix,
}

return result, nil
}

func (az *azContext) blobPath(path string) string {
return filepath.Join(az.prefix, path)
}

func (az *azContext) blobURL(path string) azblob.BlobURL {
return az.container.NewBlobURL(az.blobPath(path))
}

func (az *azContext) internalFilelist(prefix string, progress aptly.Progress) (paths []string, md5s []string, err error) {
const delimiter = "/"
paths = make([]string, 0, 1024)
md5s = make([]string, 0, 1024)
prefix = filepath.Join(az.prefix, prefix)
if prefix != "" {
prefix += delimiter
}

for marker := (azblob.Marker{}); marker.NotDone(); {
listBlob, err := az.container.ListBlobsFlatSegment(
context.Background(), marker, azblob.ListBlobsSegmentOptions{
Prefix: prefix,
MaxResults: 1,
Details: azblob.BlobListingDetails{Metadata: true}})
if err != nil {
return nil, nil, fmt.Errorf("error listing under prefix %s in %s: %s", prefix, az, err)
}

Check warning on line 78 in azure/azure.go

View check run for this annotation

Codecov / codecov/patch

azure/azure.go#L77-L78

Added lines #L77 - L78 were not covered by tests

marker = listBlob.NextMarker

for _, blob := range listBlob.Segment.BlobItems {
if prefix == "" {
paths = append(paths, blob.Name)
} else {
paths = append(paths, blob.Name[len(prefix):])
}
md5s = append(md5s, fmt.Sprintf("%x", blob.Properties.ContentMD5))
}

if progress != nil {
time.Sleep(time.Duration(500) * time.Millisecond)
progress.AddBar(1)
}

Check warning on line 94 in azure/azure.go

View check run for this annotation

Codecov / codecov/patch

azure/azure.go#L92-L94

Added lines #L92 - L94 were not covered by tests
}

return paths, md5s, nil
}

func (az *azContext) putFile(blob azblob.BlobURL, source io.Reader, sourceMD5 string) error {
uploadOptions := azblob.UploadStreamToBlockBlobOptions{
BufferSize: 4 * 1024 * 1024,
MaxBuffers: 8,
}

if len(sourceMD5) > 0 {
decodedMD5, err := hex.DecodeString(sourceMD5)
if err != nil {
return err
}

Check warning on line 110 in azure/azure.go

View check run for this annotation

Codecov / codecov/patch

azure/azure.go#L109-L110

Added lines #L109 - L110 were not covered by tests
uploadOptions.BlobHTTPHeaders = azblob.BlobHTTPHeaders{
ContentMD5: decodedMD5,
}
}

_, err := azblob.UploadStreamToBlockBlob(
context.Background(),
source,
blob.ToBlockBlobURL(),
uploadOptions,
)

return err
}

// String
func (az *azContext) String() string {
return fmt.Sprintf("Azure: %s/%s", az.container, az.prefix)
}
Loading
Loading