Skip to content

Commit

Permalink
storage: Use Attributes API to reduce the amount of ObjectStore i…
Browse files Browse the repository at this point in the history
…nstances (#8517)
  • Loading branch information
Turbo87 authored Apr 25, 2024
1 parent 842f5bc commit 5f069aa
Showing 1 changed file with 36 additions and 58 deletions.
94 changes: 36 additions & 58 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use object_store::local::LocalFileSystem;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::prefix::PrefixStore;
use object_store::{ClientOptions, ObjectStore, Result};
use reqwest::header::CACHE_CONTROL;
use reqwest::header::{HeaderMap, HeaderValue};
use object_store::{Attribute, Attributes, ClientOptions, ObjectStore, Result};
use secrecy::{ExposeSecret, SecretString};
use std::fs;
use std::path::PathBuf;
Expand Down Expand Up @@ -110,14 +108,8 @@ impl StorageConfig {

pub struct Storage {
cdn_prefix: Option<String>,

store: Box<dyn ObjectStore>,
crate_upload_store: Box<dyn ObjectStore>,
readme_upload_store: Box<dyn ObjectStore>,
db_dump_upload_store: Arc<dyn ObjectStore>,

index_store: Box<dyn ObjectStore>,
index_upload_store: Box<dyn ObjectStore>,
store: Arc<dyn ObjectStore>,
index_store: Arc<dyn ObjectStore>,
}

impl Storage {
Expand All @@ -130,37 +122,24 @@ impl Storage {

match &config.backend {
StorageBackend::S3 { default, index } => {
let options = ClientOptions::default();
let store = build_s3(default, options);

let options = client_options(CONTENT_TYPE_CRATE, CACHE_CONTROL_IMMUTABLE);
let crate_upload_store = build_s3(default, options);

let options = client_options(CONTENT_TYPE_README, CACHE_CONTROL_README);
let readme_upload_store = build_s3(default, options);
let options = ClientOptions::default()
// The `BufWriter::new()` API currently does not allow
// specifying any file attributes, so we need to set the
// content type here instead for the database dump upload.
.with_content_type_for_suffix("gz", CONTENT_TYPE_DB_DUMP);

let options =
ClientOptions::default().with_default_content_type(CONTENT_TYPE_DB_DUMP);
let db_dump_upload_store = build_s3(default, options);

let options = ClientOptions::default();
let index_store = build_s3(index, options);
let store = build_s3(default, options);

let options = client_options(CONTENT_TYPE_INDEX, CACHE_CONTROL_INDEX);
let index_upload_store = build_s3(index, options);
let index_store = build_s3(index, Default::default());

if cdn_prefix.is_none() {
panic!("Missing S3_CDN environment variable");
}

Self {
store: Box::new(store),
crate_upload_store: Box::new(crate_upload_store),
readme_upload_store: Box::new(readme_upload_store),
db_dump_upload_store: Arc::new(db_dump_upload_store),
cdn_prefix,
index_store: Box::new(index_store),
index_upload_store: Box::new(index_upload_store),
store: Arc::new(store),
index_store: Arc::new(index_store),
}
}

Expand All @@ -185,13 +164,9 @@ impl Storage {
let index_store: Arc<dyn ObjectStore> = Arc::new(local_index);

Self {
store: Box::new(store.clone()),
crate_upload_store: Box::new(store.clone()),
readme_upload_store: Box::new(store.clone()),
db_dump_upload_store: store,
cdn_prefix,
index_store: Box::new(index_store.clone()),
index_upload_store: Box::new(index_store),
store,
index_store,
}
}

Expand All @@ -200,13 +175,9 @@ impl Storage {
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());

Self {
store: Box::new(store.clone()),
crate_upload_store: Box::new(store.clone()),
readme_upload_store: Box::new(store.clone()),
db_dump_upload_store: store.clone(),
cdn_prefix,
index_store: Box::new(PrefixStore::new(store.clone(), "index")),
index_upload_store: Box::new(PrefixStore::new(store, "index")),
store: store.clone(),
index_store: Arc::new(PrefixStore::new(store, "index")),
}
}
}
Expand Down Expand Up @@ -253,22 +224,38 @@ impl Storage {
#[instrument(skip(self, bytes))]
pub async fn upload_crate_file(&self, name: &str, version: &str, bytes: Bytes) -> Result<()> {
let path = crate_file_path(name, version);
self.crate_upload_store.put(&path, bytes.into()).await?;
let attributes = Attributes::from_iter([
(Attribute::ContentType, CONTENT_TYPE_CRATE),
(Attribute::CacheControl, CACHE_CONTROL_IMMUTABLE),
]);
let opts = attributes.into();
self.store.put_opts(&path, bytes.into(), opts).await?;
Ok(())
}

#[instrument(skip(self, bytes))]
pub async fn upload_readme(&self, name: &str, version: &str, bytes: Bytes) -> Result<()> {
let path = readme_path(name, version);
self.readme_upload_store.put(&path, bytes.into()).await?;
let attributes = Attributes::from_iter([
(Attribute::ContentType, CONTENT_TYPE_README),
(Attribute::CacheControl, CACHE_CONTROL_README),
]);
let opts = attributes.into();
self.store.put_opts(&path, bytes.into(), opts).await?;
Ok(())
}

#[instrument(skip(self, content))]
pub async fn sync_index(&self, name: &str, content: Option<String>) -> Result<()> {
let path = crates_io_index::Repository::relative_index_file_for_url(name).into();
if let Some(content) = content {
self.index_upload_store.put(&path, content.into()).await?;
let attributes = Attributes::from_iter([
(Attribute::ContentType, CONTENT_TYPE_INDEX),
(Attribute::CacheControl, CACHE_CONTROL_INDEX),
]);
let payload = content.into();
let opts = attributes.into();
self.index_store.put_opts(&path, payload, opts).await?;
} else {
self.index_store.delete(&path).await?;
}
Expand All @@ -278,7 +265,7 @@ impl Storage {

#[instrument(skip(self))]
pub async fn upload_db_dump(&self, target: &str, local_path: &StdPath) -> anyhow::Result<()> {
let store = self.db_dump_upload_store.clone();
let store = self.store.clone();

// Open the local tarball file
let mut local_file = File::open(local_path).await?;
Expand Down Expand Up @@ -318,15 +305,6 @@ impl Storage {
}
}

fn client_options(content_type: &str, cache_control: &'static str) -> ClientOptions {
let mut headers = HeaderMap::new();
headers.insert(CACHE_CONTROL, HeaderValue::from_static(cache_control));

ClientOptions::default()
.with_default_content_type(content_type)
.with_default_headers(headers)
}

fn build_s3(config: &S3Config, client_options: ClientOptions) -> AmazonS3 {
AmazonS3Builder::new()
.with_region(config.region.as_deref().unwrap_or(DEFAULT_REGION))
Expand Down

0 comments on commit 5f069aa

Please sign in to comment.