Skip to content

Commit

Permalink
Merge branch 'main' into feature/drop-view
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 authored Jul 11, 2024
2 parents 5fe5489 + 7ad248d commit ae4d9a0
Show file tree
Hide file tree
Showing 35 changed files with 445 additions and 236 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ GreptimeDB uses the [Apache 2.0 license](https://github.com/GreptimeTeam/greptim
- To ensure that community is free and confident in its ability to use your contributions, please sign the Contributor License Agreement (CLA) which will be incorporated in the pull request process.
- Make sure all files have proper license header (running `docker run --rm -v $(pwd):/github/workspace ghcr.io/korandoru/hawkeye-native:v3 format` from the project root).
- Make sure all your codes are formatted and follow the [coding style](https://pingcap.github.io/style-guide/rust/) and [style guide](docs/style-guide.md).
- Make sure all unit tests are passed (using `cargo test --workspace` or [nextest](https://nexte.st/index.html) `cargo nextest run`).
- Make sure all unit tests are passed using [nextest](https://nexte.st/index.html) `cargo nextest run`.
- Make sure all clippy warnings are fixed (you can check it locally by running `cargo clippy --workspace --all-targets -- -D warnings`).

#### `pre-commit` Hooks
Expand Down
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,14 @@ run-it-in-container: start-etcd ## Run integration tests in dev-builder.
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:latest \
make test sqlness-test BUILD_JOBS=${BUILD_JOBS}

.PHONY: run-cluster-with-etcd
run-cluster-with-etcd: ## Run greptime cluster with etcd in docker-compose.
.PHONY: start-cluster
start-cluster: ## Start the greptimedb cluster with etcd by using docker compose.
docker compose -f ./docker/docker-compose/cluster-with-etcd.yaml up

.PHONY: stop-cluster
stop-cluster: ## Stop the greptimedb cluster that created by docker compose.
docker compose -f ./docker/docker-compose/cluster-with-etcd.yaml stop

##@ Docs
config-docs: ## Generate configuration documentation from toml files.
docker run --rm \
Expand Down
2 changes: 2 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.data_freeze_threshold` | Integer | `32768` | The max rows of data inside the actively writing buffer in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.<br/>Only available for `partition_tree` memtable. |
| `region_engine.file` | -- | -- | Enable the file engine. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. |
| `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. |
Expand Down Expand Up @@ -428,6 +429,7 @@
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.data_freeze_threshold` | Integer | `32768` | The max rows of data inside the actively writing buffer in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.<br/>Only available for `partition_tree` memtable. |
| `region_engine.file` | -- | -- | Enable the file engine. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. |
| `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. |
Expand Down
4 changes: 4 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,10 @@ data_freeze_threshold = 32768
## Only available for `partition_tree` memtable.
fork_dictionary_bytes = "1GiB"

[[region_engine]]
## Enable the file engine.
[region_engine.file]

## The logging options.
[logging]
## The directory to store the log files.
Expand Down
4 changes: 4 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,10 @@ data_freeze_threshold = 32768
## Only available for `partition_tree` memtable.
fork_dictionary_bytes = "1GiB"

[[region_engine]]
## Enable the file engine.
[region_engine.file]

## The logging options.
[logging]
## The directory to store the log files.
Expand Down
47 changes: 39 additions & 8 deletions docker/docker-compose/cluster-with-etcd.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
x-custom:
initial_cluster_token: &initial_cluster_token "--initial-cluster-token=etcd-cluster"
common_settings: &common_settings
etcd_initial_cluster_token: &etcd_initial_cluster_token "--initial-cluster-token=etcd-cluster"
etcd_common_settings: &etcd_common_settings
image: quay.io/coreos/etcd:v3.5.10
entrypoint: /usr/local/bin/etcd
greptimedb_image: &greptimedb_image docker.io/greptimedb/greptimedb:latest

services:
etcd0:
<<: *common_settings
<<: *etcd_common_settings
container_name: etcd0
ports:
- 2379:2379
Expand All @@ -22,7 +23,7 @@ services:
- --election-timeout=1250
- --initial-cluster=etcd0=http://etcd0:2380
- --initial-cluster-state=new
- *initial_cluster_token
- *etcd_initial_cluster_token
volumes:
- /tmp/greptimedb-cluster-docker-compose/etcd0:/var/lib/etcd
healthcheck:
Expand All @@ -34,7 +35,7 @@ services:
- greptimedb

metasrv:
image: docker.io/greptime/greptimedb:latest
image: *greptimedb_image
container_name: metasrv
ports:
- 3002:3002
Expand All @@ -56,27 +57,34 @@ services:
- greptimedb

datanode0:
image: docker.io/greptime/greptimedb:latest
image: *greptimedb_image
container_name: datanode0
ports:
- 3001:3001
- 5000:5000
command:
- datanode
- start
- --node-id=0
- --rpc-addr=0.0.0.0:3001
- --rpc-hostname=datanode0:3001
- --metasrv-addrs=metasrv:3002
- --http-addr=0.0.0.0:5000
volumes:
- /tmp/greptimedb-cluster-docker-compose/datanode0:/tmp/greptimedb
healthcheck:
test: [ "CMD", "curl", "-f", "http://datanode0:5000/health" ]
interval: 5s
timeout: 3s
retries: 5
depends_on:
metasrv:
condition: service_healthy
networks:
- greptimedb

frontend0:
image: docker.io/greptime/greptimedb:latest
image: *greptimedb_image
container_name: frontend0
ports:
- 4000:4000
Expand All @@ -91,8 +99,31 @@ services:
- --rpc-addr=0.0.0.0:4001
- --mysql-addr=0.0.0.0:4002
- --postgres-addr=0.0.0.0:4003
healthcheck:
test: [ "CMD", "curl", "-f", "http://frontend0:4000/health" ]
interval: 5s
timeout: 3s
retries: 5
depends_on:
metasrv:
datanode0:
condition: service_healthy
networks:
- greptimedb

flownode0:
image: *greptimedb_image
container_name: flownode0
ports:
- 4004:4004
command:
- flownode
- start
- --node-id=0
- --metasrv-addrs=metasrv:3002
- --rpc-addr=0.0.0.0:4004
- --rpc-hostname=flownode0:4004
depends_on:
frontend0:
condition: service_healthy
networks:
- greptimedb
Expand Down
55 changes: 31 additions & 24 deletions src/cmd/tests/load_config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_telemetry::logging::LoggingOptions;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::{DatanodeWalConfig, StandaloneWalConfig};
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use file_engine::config::EngineConfig;
use frontend::frontend::FrontendOptions;
use frontend::service_config::datanode::DatanodeClientOptions;
use meta_client::MetaClientOptions;
Expand Down Expand Up @@ -71,18 +72,21 @@ fn test_load_datanode_example_config() {
data_home: "/tmp/greptimedb/".to_string(),
..Default::default()
},
region_engine: vec![RegionEngineConfig::Mito(MitoConfig {
num_workers: 8,
auto_flush_interval: Duration::from_secs(3600),
scan_parallelism: 0,
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
sst_meta_cache_size: ReadableSize::mb(128),
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
max_background_jobs: 4,
..Default::default()
})],
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig {
num_workers: 8,
auto_flush_interval: Duration::from_secs(3600),
scan_parallelism: 0,
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
sst_meta_cache_size: ReadableSize::mb(128),
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
max_background_jobs: 4,
..Default::default()
}),
RegionEngineConfig::File(EngineConfig {}),
],
logging: LoggingOptions {
level: Some("info".to_string()),
otlp_endpoint: Some("".to_string()),
Expand Down Expand Up @@ -207,18 +211,21 @@ fn test_load_standalone_example_config() {
sync_period: Some(Duration::from_secs(10)),
..Default::default()
}),
region_engine: vec![RegionEngineConfig::Mito(MitoConfig {
num_workers: 8,
auto_flush_interval: Duration::from_secs(3600),
scan_parallelism: 0,
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
sst_meta_cache_size: ReadableSize::mb(128),
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
max_background_jobs: 4,
..Default::default()
})],
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig {
num_workers: 8,
auto_flush_interval: Duration::from_secs(3600),
scan_parallelism: 0,
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
sst_meta_cache_size: ReadableSize::mb(128),
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
max_background_jobs: 4,
..Default::default()
}),
RegionEngineConfig::File(EngineConfig {}),
],
storage: StorageConfig {
data_home: "/tmp/greptimedb/".to_string(),
..Default::default()
Expand Down
36 changes: 29 additions & 7 deletions src/common/datasource/src/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::format::FileMetaData;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::{FuturesAsyncReader, ObjectStore};
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::WriterProperties;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::schema::types::ColumnPath;
use snafu::ResultExt;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};

Expand Down Expand Up @@ -184,14 +186,16 @@ impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
/// Returns number of rows written.
pub async fn stream_to_parquet(
mut stream: SendableRecordBatchStream,
schema: datatypes::schema::SchemaRef,
store: ObjectStore,
path: &str,
concurrency: usize,
) -> Result<usize> {
let write_props = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.build();
let schema = stream.schema();
let write_props = column_wise_config(
WriterProperties::builder().set_compression(Compression::ZSTD(ZstdLevel::default())),
schema,
)
.build();
let inner_writer = store
.writer_with(path)
.concurrent(concurrency)
Expand All @@ -200,7 +204,7 @@ pub async fn stream_to_parquet(
.map(|w| w.into_futures_async_write().compat_write())
.context(WriteObjectSnafu { path })?;

let mut writer = AsyncArrowWriter::try_new(inner_writer, schema, Some(write_props))
let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props))
.context(WriteParquetSnafu { path })?;
let mut rows_written = 0;

Expand All @@ -216,6 +220,24 @@ pub async fn stream_to_parquet(
Ok(rows_written)
}

/// Customizes per-column properties.
fn column_wise_config(
mut props: WriterPropertiesBuilder,
schema: SchemaRef,
) -> WriterPropertiesBuilder {
// Disable dictionary for timestamp column, since for increasing timestamp column,
// the dictionary pages will be larger than data pages.
for col in schema.column_schemas() {
if col.data_type.is_timestamp() {
let path = ColumnPath::new(vec![col.name.clone()]);
props = props
.set_column_dictionary_enabled(path.clone(), false)
.set_column_encoding(path, Encoding::DELTA_BINARY_PACKED)
}
}
props
}

#[cfg(test)]
mod tests {
use common_test_util::find_workspace_path;
Expand Down
4 changes: 0 additions & 4 deletions src/common/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ impl From<StandaloneWalConfig> for DatanodeWalConfig {
StandaloneWalConfig::RaftEngine(config) => Self::RaftEngine(config),
StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
broker_endpoints: config.broker_endpoints,
compression: config.compression,
max_batch_bytes: config.max_batch_bytes,
consumer_wait_timeout: config.consumer_wait_timeout,
backoff: config.backoff,
Expand All @@ -114,7 +113,6 @@ mod tests {
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression;

use super::*;
use crate::config::kafka::common::BackoffConfig;
Expand Down Expand Up @@ -207,7 +205,6 @@ mod tests {
let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
let expected = DatanodeKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: Compression::NoCompression,
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
Expand All @@ -229,7 +226,6 @@ mod tests {
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
compression: Compression::NoCompression,
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
Expand Down
5 changes: 0 additions & 5 deletions src/common/wal/src/config/kafka/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression;
use serde::{Deserialize, Serialize};

use crate::config::kafka::common::{backoff_prefix, BackoffConfig};
Expand All @@ -27,9 +26,6 @@ use crate::BROKER_ENDPOINT;
pub struct DatanodeKafkaConfig {
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec<String>,
/// The compression algorithm used to compress kafka records.
#[serde(skip)]
pub compression: Compression,
/// TODO(weny): Remove the alias once we release v0.9.
/// The max size of a single producer batch.
#[serde(alias = "max_batch_size")]
Expand All @@ -46,7 +42,6 @@ impl Default for DatanodeKafkaConfig {
fn default() -> Self {
Self {
broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
compression: Compression::NoCompression,
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
Expand Down
Loading

0 comments on commit ae4d9a0

Please sign in to comment.