Skip to content

Commit

Permalink
feat(object store): introduce object prefix for opendal object store (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Jun 17, 2024
1 parent e34c83b commit 1b8a563
Show file tree
Hide file tree
Showing 34 changed files with 239 additions and 49 deletions.
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ internal data_directory
internal parallel_compact_size_mb
internal sstable_size_mb
internal state_store
internal use_new_object_prefix_strategy
postmaster backup_storage_directory
postmaster backup_storage_url
postmaster barrier_interval_ms
Expand Down
1 change: 1 addition & 0 deletions proto/java_binding.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ message ReadPlan {
catalog.Table table_catalog = 7;

repeated uint32 vnode_ids = 8;
bool use_new_object_prefix_strategy = 9;
}
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ message SystemParams {
optional bool pause_on_next_bootstrap = 13;
optional string wasm_storage_url = 14 [deprecated = true];
optional bool enable_tracing = 15;
optional bool use_new_object_prefix_strategy = 16;
}

message GetSystemParamsRequest {}
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ macro_rules! for_all_params {
{ max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", },
{ pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", },
{ enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", },
{ use_new_object_prefix_strategy, bool, None, false, "Whether to split object prefix.", },
}
};
}
Expand Down Expand Up @@ -376,6 +377,7 @@ macro_rules! impl_system_params_for_test {
ret.state_store = Some("hummock+memory".to_string());
ret.backup_storage_url = Some("memory".into());
ret.backup_storage_directory = Some("backup".into());
ret.use_new_object_prefix_strategy = Some(false);
ret
}
};
Expand Down Expand Up @@ -441,6 +443,7 @@ mod tests {
(MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"),
(PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"),
(ENABLE_TRACING_KEY, "true"),
(USE_NEW_OBJECT_PREFIX_STRATEGY_KEY, "false"),
("a_deprecated_param", "foo"),
];

Expand Down
8 changes: 8 additions & 0 deletions src/common/src/system_param/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ where
self.inner().data_directory.as_ref().unwrap()
}

fn use_new_object_prefix_strategy(&self) -> bool {
*self
.inner()
.use_new_object_prefix_strategy
.as_ref()
.unwrap()
}

fn backup_storage_url(&self) -> &str {
self.inner().backup_storage_url.as_ref().unwrap()
}
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ pub async fn compute_node_serve(
) -> (Vec<JoinHandle<()>>, Sender<()>) {
// Load the configuration.
let config = load_config(&opts.config_path, &opts);

info!("Starting compute node",);
info!("> config: {:?}", config);
info!(
Expand Down Expand Up @@ -211,6 +210,7 @@ pub async fn compute_node_serve(
storage_metrics.clone(),
compactor_metrics.clone(),
await_tree_config.clone(),
system_params.use_new_object_prefix_strategy(),
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,4 @@ This page is automatically generated by `./risedev generate-example-config`
| pause_on_next_bootstrap | Whether to pause all data sources on next bootstrap. | false |
| sstable_size_mb | Target size of the Sstable. | 256 |
| state_store | URL for the state store | |
| use_new_object_prefix_strategy | Whether to split object prefix. | |
8 changes: 7 additions & 1 deletion src/ctl/src/cmd_impl/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub enum BenchCommands {
#[clap(long, default_value_t = 1)]
threads: usize,
data_dir: Option<String>,
#[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
use_new_object_prefix_strategy: bool,
},
}

Expand Down Expand Up @@ -86,9 +88,13 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> {
mv_name,
threads,
data_dir,
use_new_object_prefix_strategy,
} => {
let (hummock, metrics) = context
.hummock_store_with_metrics(HummockServiceOpts::from_env(data_dir)?)
.hummock_store_with_metrics(HummockServiceOpts::from_env(
data_dir,
use_new_object_prefix_strategy,
)?)
.await?;
let table = get_table_catalog(meta.clone(), mv_name).await?;
let mut handlers = vec![];
Expand Down
6 changes: 5 additions & 1 deletion src/ctl/src/cmd_impl/hummock/list_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ pub async fn list_kv(
epoch: u64,
table_id: u32,
data_dir: Option<String>,
use_new_object_prefix_strategy: bool,
) -> anyhow::Result<()> {
let hummock = context
.hummock_store(HummockServiceOpts::from_env(data_dir)?)
.hummock_store(HummockServiceOpts::from_env(
data_dir,
use_new_object_prefix_strategy,
)?)
.await?;
if is_max_epoch(epoch) {
tracing::info!("using MAX EPOCH as epoch");
Expand Down
16 changes: 13 additions & 3 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct SstDumpArgs {
print_table: bool,
#[clap(short = 'd')]
data_dir: Option<String>,
#[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
use_new_object_prefix_strategy: bool,
}

pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> {
Expand All @@ -72,7 +74,10 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
if args.print_level {
// Level information is retrieved from meta service
let hummock = context
.hummock_store(HummockServiceOpts::from_env(args.data_dir.clone())?)
.hummock_store(HummockServiceOpts::from_env(
args.data_dir.clone(),
args.use_new_object_prefix_strategy,
)?)
.await?;
let version = hummock.inner().get_pinned_version().version().clone();
let sstable_store = hummock.sstable_store();
Expand Down Expand Up @@ -108,8 +113,13 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
}
} else {
// Object information is retrieved from object store. Meta service is not required.
let hummock_service_opts = HummockServiceOpts::from_env(args.data_dir.clone())?;
let sstable_store = hummock_service_opts.create_sstable_store().await?;
let hummock_service_opts = HummockServiceOpts::from_env(
args.data_dir.clone(),
args.use_new_object_prefix_strategy,
)?;
let sstable_store = hummock_service_opts
.create_sstable_store(args.use_new_object_prefix_strategy)
.await?;
if let Some(obj_id) = &args.object_id {
let obj_store = sstable_store.store();
let obj_path = sstable_store.get_sst_data_path(*obj_id);
Expand Down
8 changes: 6 additions & 2 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@ pub async fn print_user_key_in_archive(
archive_ids: Vec<HummockVersionId>,
data_dir: String,
user_key: String,
use_new_object_prefix_strategy: bool,
) -> anyhow::Result<()> {
let user_key_bytes = hex::decode(user_key.clone()).unwrap_or_else(|_| {
panic!("cannot decode user key {} into raw bytes", user_key);
});
let user_key = UserKey::decode(&user_key_bytes);
println!("user key: {user_key:?}");

let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?;
let hummock_opts =
HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?;
let hummock = context.hummock_store(hummock_opts).await?;
let sstable_store = hummock.sstable_store();
let archive_object_store = sstable_store.store();
Expand Down Expand Up @@ -178,8 +180,10 @@ pub async fn print_version_delta_in_archive(
archive_ids: Vec<HummockVersionId>,
data_dir: String,
sst_id: HummockSstableObjectId,
use_new_object_prefix_strategy: bool,
) -> anyhow::Result<()> {
let hummock_opts = HummockServiceOpts::from_env(Some(data_dir.clone()))?;
let hummock_opts =
HummockServiceOpts::from_env(Some(data_dir.clone()), use_new_object_prefix_strategy)?;
let hummock = context.hummock_store(hummock_opts).await?;
let sstable_store = hummock.sstable_store();
let archive_object_store = sstable_store.store();
Expand Down
24 changes: 20 additions & 4 deletions src/ctl/src/cmd_impl/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,35 @@ pub fn make_storage_table<S: StateStore>(
))
}

pub async fn scan(context: &CtlContext, mv_name: String, data_dir: Option<String>) -> Result<()> {
pub async fn scan(
context: &CtlContext,
mv_name: String,
data_dir: Option<String>,
use_new_object_prefix_strategy: bool,
) -> Result<()> {
let meta_client = context.meta_client().await?;
let hummock = context
.hummock_store(HummockServiceOpts::from_env(data_dir)?)
.hummock_store(HummockServiceOpts::from_env(
data_dir,
use_new_object_prefix_strategy,
)?)
.await?;
let table = get_table_catalog(meta_client, mv_name).await?;
do_scan(table, hummock).await
}

pub async fn scan_id(context: &CtlContext, table_id: u32, data_dir: Option<String>) -> Result<()> {
pub async fn scan_id(
context: &CtlContext,
table_id: u32,
data_dir: Option<String>,
use_new_object_prefix_strategy: bool,
) -> Result<()> {
let meta_client = context.meta_client().await?;
let hummock = context
.hummock_store(HummockServiceOpts::from_env(data_dir)?)
.hummock_store(HummockServiceOpts::from_env(
data_dir,
use_new_object_prefix_strategy,
)?)
.await?;
let table = get_table_catalog_by_id(meta_client, table_id).await?;
do_scan(table, hummock).await
Expand Down
16 changes: 14 additions & 2 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct HummockServiceOpts {
pub hummock_url: String,
pub data_dir: Option<String>,

use_new_object_prefix_strategy: bool,

heartbeat_handle: Option<JoinHandle<()>>,
heartbeat_shutdown_sender: Option<Sender<()>>,
}
Expand All @@ -55,7 +57,10 @@ impl HummockServiceOpts {
/// Currently, we will read these variables for meta:
///
/// * `RW_HUMMOCK_URL`: hummock store address
pub fn from_env(data_dir: Option<String>) -> Result<Self> {
pub fn from_env(
data_dir: Option<String>,
use_new_object_prefix_strategy: bool,
) -> Result<Self> {
let hummock_url = match env::var("RW_HUMMOCK_URL") {
Ok(url) => {
if !url.starts_with("hummock+") {
Expand All @@ -80,11 +85,13 @@ impl HummockServiceOpts {
bail!(MESSAGE);
}
};

Ok(Self {
hummock_url,
data_dir,
heartbeat_handle: None,
heartbeat_shutdown_sender: None,
use_new_object_prefix_strategy,
})
}

Expand Down Expand Up @@ -142,6 +149,7 @@ impl HummockServiceOpts {
metrics.storage_metrics.clone(),
metrics.compactor_metrics.clone(),
None,
self.use_new_object_prefix_strategy,
)
.await?;

Expand All @@ -157,7 +165,10 @@ impl HummockServiceOpts {
}
}

pub async fn create_sstable_store(&self) -> Result<Arc<SstableStore>> {
pub async fn create_sstable_store(
&self,
use_new_object_prefix_strategy: bool,
) -> Result<Arc<SstableStore>> {
let object_store = build_remote_object_store(
self.hummock_url.strip_prefix("hummock+").unwrap(),
Arc::new(ObjectStoreMetrics::unused()),
Expand Down Expand Up @@ -190,6 +201,7 @@ impl HummockServiceOpts {
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
)),
use_new_object_prefix_strategy,
meta_cache,
block_cache,
})))
Expand Down
Loading

0 comments on commit 1b8a563

Please sign in to comment.