Skip to content

Commit 7f70436

Browse files
authored
samples(storage): add a sample for downloading chunks concurrently using Transfer Manager (#5402)
* Add sample download_chunks_concurrently with test. * Improve test and comments. * Improve cleanup in download_chunks_concurrently_test.go * Improve comments and tests for download_chunks_concurrently * Remove unnecessary space. * Resolve comments.
1 parent e85b254 commit 7f70436

File tree

2 files changed

+209
-0
lines changed

2 files changed

+209
-0
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// [START storage_download_chunks_concurrently]
16+
package transfermanager
17+
18+
import (
19+
"context"
20+
"fmt"
21+
"io"
22+
"os"
23+
24+
"cloud.google.com/go/storage"
25+
"cloud.google.com/go/storage/transfermanager"
26+
)
27+
28+
// downloadChunksConcurrently downloads a single file in chunks, concurrently in a process pool.
29+
func downloadChunksConcurrently(w io.Writer, bucketName, blobName, filename string) error {
30+
// bucketName := "your-bucket-name"
31+
// blobName := "target-file"
32+
// filename := "path/to/your/local/file.txt"
33+
34+
// The chunkSize is the size of each chunk to be downloaded.
35+
// The performance impact of this value depends on the use case.
36+
// For example, for a slow network, using a smaller chunkSize may be better.
37+
// Providing this parameter is optional and the default value is 32 MiB.
38+
chunkSize := 16 * 1024 * 1024 // 16 MiB
39+
40+
// The maximum number of workers to use for the operation.
41+
// Please note, providing this parameter is optional.
42+
// The performance impact of this value depends on the use case.
43+
// To download one large file, the default value: NumCPU / 2 is usually fine.
44+
workers := 8
45+
46+
ctx := context.Background()
47+
client, err := storage.NewClient(ctx)
48+
if err != nil {
49+
return fmt.Errorf("storage.NewClient: %w", err)
50+
}
51+
defer client.Close()
52+
53+
d, err := transfermanager.NewDownloader(client, transfermanager.WithPartSize(int64(chunkSize)), transfermanager.WithWorkers(workers))
54+
if err != nil {
55+
return fmt.Errorf("transfermanager.NewDownloader: %w", err)
56+
}
57+
58+
f, err := os.Create(filename)
59+
if err != nil {
60+
return fmt.Errorf("os.Create: %w", err)
61+
}
62+
63+
in := &transfermanager.DownloadObjectInput{
64+
Bucket: bucketName,
65+
Object: blobName,
66+
Destination: f,
67+
}
68+
69+
if err := d.DownloadObject(ctx, in); err != nil {
70+
return fmt.Errorf("d.DownloadObject: %w", err)
71+
}
72+
73+
// Wait for all downloads to complete and close the downloader.
74+
// This allows to synchronize the download processes.
75+
results, err := d.WaitAndClose()
76+
if err != nil {
77+
return fmt.Errorf("d.WaitAndClose: %w", err)
78+
}
79+
80+
// Process the downloader result.
81+
if len(results) != 1 {
82+
return fmt.Errorf("expected 1 result, got %d", len(results))
83+
}
84+
result := results[0]
85+
if result.Err != nil {
86+
fmt.Fprintf(w, "download of %v failed with error %v\n", result.Object, result.Err)
87+
}
88+
fmt.Fprintf(w, "Downloaded %v to %v.\n", blobName, filename)
89+
90+
return nil
91+
}
92+
93+
// [END storage_download_chunks_concurrently]
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package transfermanager
16+
17+
import (
18+
"bytes"
19+
"context"
20+
"crypto/rand"
21+
"fmt"
22+
"io"
23+
"log"
24+
"os"
25+
"strings"
26+
"testing"
27+
28+
"cloud.google.com/go/storage"
29+
"github.com/GoogleCloudPlatform/golang-samples/internal/testutil"
30+
"github.com/google/uuid"
31+
)
32+
33+
const (
34+
testPrefix = "storage-objects-test"
35+
downloadObject = "tm-obj-download"
36+
)
37+
38+
var (
39+
tmBucketName string
40+
storageClient *storage.Client
41+
downloadData []byte
42+
)
43+
44+
func TestMain(m *testing.M) {
45+
ctx := context.Background()
46+
tc, _ := testutil.ContextMain(m)
47+
48+
var err error
49+
50+
// Create fixture client & bucket to use across tests.
51+
storageClient, err = storage.NewClient(ctx)
52+
if err != nil {
53+
log.Fatalf("storage.NewClient: %v", err)
54+
}
55+
tmBucketName = fmt.Sprintf("%s-%s", testPrefix, uuid.New().String())
56+
bucket := storageClient.Bucket(tmBucketName)
57+
if err := bucket.Create(ctx, tc.ProjectID, nil); err != nil {
58+
log.Fatalf("Bucket(%q).Create: %v", tmBucketName, err)
59+
}
60+
61+
// Create object fixture for download tests.
62+
w := bucket.Object(downloadObject).NewWriter(ctx)
63+
downloadData = make([]byte, 2*1024*1024) // 2 MiB
64+
if _, err := rand.Read(downloadData); err != nil {
65+
log.Fatalf("rand.Read: %v", err)
66+
}
67+
if _, err := io.Copy(w, bytes.NewReader(downloadData)); err != nil {
68+
log.Fatalf("uploading object: %v", err)
69+
}
70+
if err := w.Close(); err != nil {
71+
log.Fatalf("closing writer: %v", err)
72+
}
73+
74+
// Run tests.
75+
exitCode := m.Run()
76+
77+
// Cleanup bucket and objects.
78+
if err := testutil.DeleteBucketIfExists(ctx, storageClient, tmBucketName); err != nil {
79+
log.Printf("deleting bucket: %v", err)
80+
}
81+
os.Exit(exitCode)
82+
}
83+
84+
func TestDownloadChunksConcurrently(t *testing.T) {
85+
bucketName := tmBucketName
86+
blobName := downloadObject
87+
88+
// Create a temporary file to download to, ensuring we have permissions
89+
// and the file is cleaned up.
90+
f, err := os.CreateTemp("", "tm-file-test-")
91+
if err != nil {
92+
t.Fatalf("os.CreateTemp: %v", err)
93+
}
94+
fileName := f.Name()
95+
f.Close() // Close the file so the download can write to it.
96+
defer os.Remove(fileName)
97+
98+
var buf bytes.Buffer
99+
if err := downloadChunksConcurrently(&buf, bucketName, blobName, fileName); err != nil {
100+
t.Errorf("downloadChunksConcurrently: %v", err)
101+
}
102+
103+
if got, want := buf.String(), fmt.Sprintf("Downloaded %v to %v", blobName, fileName); !strings.Contains(got, want) {
104+
t.Errorf("got %q, want to contain %q", got, want)
105+
}
106+
107+
// Verify that the downloaded data is the same as the uploaded data.
108+
downloadedBytes, err := os.ReadFile(fileName)
109+
if err != nil {
110+
t.Fatalf("os.ReadFile(%q): %v", fileName, err)
111+
}
112+
113+
if !bytes.Equal(downloadedBytes, downloadData) {
114+
t.Errorf("downloaded data does not match uploaded data. got %d bytes, want %d bytes", len(downloadedBytes), len(downloadData))
115+
}
116+
}

0 commit comments

Comments
 (0)