Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support basic postgres tvf #18811

Merged
merged 35 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
df14470
interim: add tvf to binder
kwannoel Oct 7, 2024
b56f281
bind postgres_query table_function
kwannoel Oct 7, 2024
ccc90fb
add boilerplate for optimizer plan nodes
kwannoel Oct 7, 2024
22ec6b3
update plan node fields
kwannoel Oct 7, 2024
514e031
fmt
kwannoel Oct 8, 2024
b8f8aea
update proto + add batch executor skeleton
kwannoel Oct 8, 2024
7e19074
fmt
kwannoel Oct 8, 2024
26a7a11
interim commit: add pg tvf functionality
kwannoel Oct 8, 2024
4abe65c
support serde in postgres_query executor
kwannoel Oct 8, 2024
b1a286d
add tests
kwannoel Oct 8, 2024
e2e20f0
fix table function error in madsim + fix postgres_query tvf test
kwannoel Oct 8, 2024
2b5946e
consume all remaining chunks
kwannoel Oct 8, 2024
6d09259
add logs
kwannoel Oct 8, 2024
c2b444b
fix connection not polled
kwannoel Oct 9, 2024
172943b
fix distribution
kwannoel Oct 9, 2024
d526c2a
fmt
kwannoel Oct 9, 2024
111a7a5
fix
kwannoel Oct 9, 2024
89d3ca1
fix
kwannoel Oct 9, 2024
2afb409
clean
kwannoel Oct 9, 2024
032230a
fix
kwannoel Oct 9, 2024
b532620
address review comments
kwannoel Oct 10, 2024
b5e76a5
test all supported datatypes
kwannoel Oct 10, 2024
4bc2797
fix
kwannoel Oct 10, 2024
9885b3a
fix
kwannoel Oct 10, 2024
735bc75
fix
kwannoel Oct 10, 2024
5b66d01
cleanup slt
kwannoel Oct 10, 2024
c254e71
match quotes
kwannoel Oct 10, 2024
650ecd9
fix test
kwannoel Oct 10, 2024
1413bd4
Update src/batch/src/executor/postgres_query.rs
kwannoel Oct 10, 2024
66464d9
add timezone
kwannoel Oct 10, 2024
4674982
Update src/frontend/src/binder/expr/function/mod.rs
kwannoel Oct 10, 2024
fe10528
fix
kwannoel Oct 10, 2024
ad46398
fix
kwannoel Oct 10, 2024
4a58d05
fix
kwannoel Oct 10, 2024
8ea47ed
fix slt
kwannoel Oct 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ risedev slt './e2e_test/source/cdc/cdc.check_new_rows.slt'
# drop relations
risedev slt './e2e_test/source/cdc/cdc_share_stream_drop.slt'

echo "--- postgres_query tvf test"
risedev slt './e2e_test/source/tvf/postgres_query.slt'
Comment on lines +144 to +145
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to put in e2e_test/source_inline, and no need to modify CI script

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently inline source test does not have a postgres service managed by risedev. We can refactor it after #18099.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


echo "--- Kill cluster"
risedev ci-kill
export RISINGWAVE_CI=true
Expand Down
40 changes: 40 additions & 0 deletions e2e_test/source/tvf/postgres_query.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
system ok
PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test psql -c "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to use $RISEDEV_ envvars, instead of hard-coded ones. Then you can run it easier locally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current e2e-source-test does not have a postgres service managed by risedev. We can refactor it after #18099.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we already have risedev managed pg #16662

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it can be configured as a service in the risedev config. But switching the risedev configuration may cause diff conflicts with #18099. There could be conflicts with the ci-managed postgres instance as well.

Migrating everything together in #18858 after #18099 will be more straightforward.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, we already have a dedicated risedev profile, so I don't think it will affect other stuff..

echo "--- e2e, inline test"
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-inline-source-test
risedev slt './e2e_test/source_inline/**/*.slt'

risingwave/risedev.yml

Lines 952 to 979 in c80cbf3

ci-inline-source-test:
config-path: src/config/ci-recovery.toml
steps:
- use: minio
- use: etcd
unsafe-no-fsync: true
- use: meta-node
meta-backend: etcd
- use: compute-node
enable-tiered-cache: true
- use: frontend
- use: compactor
- use: pubsub
persist-data: true
- use: kafka
user-managed: true
address: message_queue
port: 29092
- use: schema-registry
user-managed: true
address: schemaregistry
port: 8082
- use: mysql
port: 3306
address: mysql
user: root
password: 123456
user-managed: true

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just mean to write the new tests introduced in the new style, without changing existing tests. If you mean migrating all tests to the style later after that PR, it also sounds good

CREATE TABLE test (
id bigint primary key,
v1 bool,
v2 smallint,
v3 integer,
v4 bigint,
v5 real,
v6 double precision,
v7 numeric,
v8 date,
v9 time,
v10 timestamp,
v11 timestamptz,
v12 text,
v13 varchar,
v14 interval,
v15 jsonb,
v16 bytea
);"

system ok
PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test psql -c "
INSERT INTO test SELECT generate_series(1, 100), true, 1, 1, 1, 1.0, 1.0, 1.0, '2021-01-01', '00:00:00', '2021-01-01 00:00:00', '2021-01-01 00:00:00', 'text', 'varchar', '1 day', '{}', '\\x01';"
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
"

query II
select * from postgres_query('db', '5432', 'postgres', 'postgres', 'cdc_test', 'select * from test where id > 90;');
----
91 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 00:00:00+00:00 text varchar 1 day {} \x5c783031
92 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 00:00:00+00:00 text varchar 1 day {} \x5c783031
93 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 00:00:00+00:00 text varchar 1 day {} \x5c783031
94 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 00:00:00+00:00 text varchar 1 day {} \x5c783031
95 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 00:00:00+00:00 text varchar 1 day {} \x5c783031
96 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 00:00:00+00:00 text varchar 1 day {} \x5c783031
97 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 00:00:00+00:00 text varchar 1 day {} \x5c783031
98 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 00:00:00+00:00 text varchar 1 day {} \x5c783031
99 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 00:00:00+00:00 text varchar 1 day {} \x5c783031
100 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 00:00:00+00:00 text varchar 1 day {} \x5c783031
12 changes: 12 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ message FileScanNode {
repeated string file_location = 7;
}

// NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed.
message PostgresQueryNode {
repeated plan_common.ColumnDesc columns = 1;
string hostname = 2;
string port = 3;
string username = 4;
string password = 5;
string database = 6;
string query = 7;
}

message ProjectNode {
repeated expr.ExprNode select_list = 1;
}
Expand Down Expand Up @@ -373,6 +384,7 @@ message PlanNode {
LogRowSeqScanNode log_row_seq_scan = 37;
FileScanNode file_scan = 38;
IcebergScanNode iceberg_scan = 39;
PostgresQueryNode postgres_query = 40;
// The following nodes are used for testing.
bool block_executor = 100;
bool busy_loop_executor = 101;
Expand Down
2 changes: 2 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ message TableFunction {
JSONB_TO_RECORDSET = 17;
// file scan
FILE_SCAN = 19;
// postgres query
POSTGRES_QUERY = 20;
// User defined table function
USER_DEFINED = 100;
}
Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"fs",
] }
tokio-metrics = "0.3.0"
tokio-postgres = "0.7"
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tonic = { workspace = true }
Expand Down
8 changes: 8 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
use risingwave_storage::error::StorageError;
use thiserror::Error;
use thiserror_ext::Construct;
use tokio_postgres;
use tonic::Status;

use crate::worker_manager::worker_node_manager::FragmentId;
Expand Down Expand Up @@ -127,6 +128,13 @@ pub enum BatchError {
ParquetError,
),

#[error(transparent)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the error messages from Postgres can be clearly distinguished from ours, given that we're using transparent and not including any prompt here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we introduce more external system queries and directly interact with them in the batch crate, I'm concerned that BatchError might become another maintainability hell just like ConnectorError.

Just FYI, in #15086, we made ConnectorError a wrapper of anyhow::Error because it has too many variants (may check the documentation on def_anyhow_wrapper for the underlying ideas). For example, shall we introduce a ExtSystemError with this approach (or even reuse ConnectorError) and make it the only variant for all external system queries in BatchError?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI, in #15086, we made ConnectorError a wrapper of anyhow::Error because it has too many variants (may check the documentation on def_anyhow_wrapper for the underlying ideas). For example, shall we introduce a ExtSystemError with this approach (or even reuse ConnectorError) and make it the only variant for all external system queries in BatchError?

I think this a fair point. There's quite a few tvf to be supported. And we should preemptively introduce an ExtSystemError to deal with it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize there's more external system errors like iceberg, parquet from before this PR, and not just postgres error, which was introduced in this PR.
I will handle it in a separate PR since this is a larger refactor: #18860


    #[error(transparent)]
    Iceberg(
        #[from]
        #[backtrace]
        iceberg::Error,
    ),

    #[error(transparent)]
    Parquet(
        #[from]
        #[backtrace]
        ParquetError,
    ),

    #[error(transparent)]
    Postgres(
        #[from]
        #[backtrace]
        tokio_postgres::Error,
    ),

Postgres(
#[from]
#[backtrace]
tokio_postgres::Error,
),

// Make the ref-counted type to be a variant for easier code structuring.
// TODO(error-handling): replace with `thiserror_ext::Arc`
#[error(transparent)]
Expand Down
3 changes: 3 additions & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod max_one_row;
mod merge_sort;
mod merge_sort_exchange;
mod order_by;
mod postgres_query;
mod project;
mod project_set;
mod row_seq_scan;
Expand Down Expand Up @@ -65,6 +66,7 @@ pub use max_one_row::*;
pub use merge_sort::*;
pub use merge_sort_exchange::*;
pub use order_by::*;
pub use postgres_query::*;
pub use project::*;
pub use project_set::*;
use risingwave_common::array::DataChunk;
Expand Down Expand Up @@ -244,6 +246,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
NodeBody::MaxOneRow => MaxOneRowExecutor,
NodeBody::FileScan => FileScanExecutorBuilder,
NodeBody::IcebergScan => IcebergScanExecutorBuilder,
NodeBody::PostgresQuery => PostgresQueryExecutorBuilder,
// Follow NodeBody only used for test
NodeBody::BlockExecutor => BlockExecutorBuilder,
NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder,
Expand Down
190 changes: 190 additions & 0 deletions src/batch/src/executor/postgres_query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use thiserror_ext::AsReport;
use tokio_postgres;

use crate::error::BatchError;
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, DataChunk, Executor, ExecutorBuilder};
use crate::task::BatchTaskContext;

/// `PostgresQuery` executor. Runs a query against a Postgres database.
pub struct PostgresQueryExecutor {
schema: Schema,
host: String,
port: String,
username: String,
password: String,
database: String,
query: String,
identity: String,
}

impl Executor for PostgresQueryExecutor {
fn schema(&self) -> &risingwave_common::catalog::Schema {
&self.schema
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
self.do_execute().boxed()
}
}

pub fn postgres_row_to_owned_row(
row: tokio_postgres::Row,
schema: &Schema,
) -> Result<OwnedRow, BatchError> {
let mut datums = vec![];
for i in 0..schema.fields.len() {
let rw_field = &schema.fields[i];
let name = rw_field.name.as_str();
let datum = postgres_cell_to_scalar_impl(&row, &rw_field.data_type, i, name)?;
datums.push(datum);
}
Ok(OwnedRow::new(datums))
}

// TODO(kwannoel): Support more types, see postgres connector's ScalarAdapter.
fn postgres_cell_to_scalar_impl(
row: &tokio_postgres::Row,
data_type: &DataType,
i: usize,
name: &str,
) -> Result<Datum, BatchError> {
let datum = match data_type {
DataType::Boolean
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Float32
| DataType::Float64
| DataType::Date
| DataType::Time
| DataType::Timestamp
| DataType::Timestamptz
| DataType::Jsonb
| DataType::Interval
| DataType::Varchar
| DataType::Bytea => {
// ScalarAdapter is also fine. But ScalarImpl is more efficient
row.try_get::<_, Option<ScalarImpl>>(i)?
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
}
DataType::Decimal => {
// Decimal is more efficient than PgNumeric in ScalarAdapter
let val = row.try_get::<_, Option<Decimal>>(i)?;
val.map(ScalarImpl::from)
}
_ => {
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
tracing::warn!(name, ?data_type, "unsupported data type, set to null");
None
}
};
Ok(datum)
}

impl PostgresQueryExecutor {
pub fn new(
schema: Schema,
host: String,
port: String,
username: String,
password: String,
database: String,
query: String,
identity: String,
) -> Self {
Self {
schema,
host,
port,
username,
password,
database,
query,
identity,
}
}

#[try_stream(ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
tracing::debug!("postgres_query_executor: started");
let conn_str = format!(
"host={} port={} user={} password={} dbname={}",
self.host, self.port, self.username, self.password, self.database
);
let (client, conn) = tokio_postgres::connect(&conn_str, tokio_postgres::NoTls).await?;

tokio::spawn(async move {
if let Err(e) = conn.await {
tracing::error!(
"postgres_query_executor: connection error: {:?}",
e.as_report()
);
}
});

// TODO(kwannoel): Use pagination using CURSOR.
let rows = client.query(&self.query, &[]).await?;
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
let mut builder = DataChunkBuilder::new(self.schema.data_types(), 1024);
tracing::debug!("postgres_query_executor: query executed, start deserializing rows");
// deserialize the rows
for row in rows {
let owned_row = postgres_row_to_owned_row(row, &self.schema)?;
if let Some(chunk) = builder.append_one_row(owned_row) {
yield chunk;
}
}
if let Some(chunk) = builder.consume_all() {
yield chunk;
}
return Ok(());
}
}

pub struct PostgresQueryExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for PostgresQueryExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
_inputs: Vec<BoxedExecutor>,
) -> crate::error::Result<BoxedExecutor> {
let postgres_query_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::PostgresQuery
)?;

Ok(Box::new(PostgresQueryExecutor::new(
Schema::from_iter(postgres_query_node.columns.iter().map(Field::from)),
postgres_query_node.hostname.clone(),
postgres_query_node.port.clone(),
postgres_query_node.username.clone(),
postgres_query_node.password.clone(),
postgres_query_node.database.clone(),
postgres_query_node.query.clone(),
source.plan_node().get_identity().clone(),
)))
}
}
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"signal",
"fs",
] }
tokio-postgres = "0.7"
tokio-stream = { workspace = true }
tonic = { workspace = true }
tracing = "0.1"
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/binder/expr/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,15 @@ impl Binder {
self.ensure_table_function_allowed()?;
return Ok(TableFunction::new_file_scan(args)?.into());
}
// `postgres_query` table function
if func_name.eq("postgres_query") {
reject_syntax!(
arg_list.variadic,
"`VARIADIC` is not allowed in table function call"
);
self.ensure_table_function_allowed()?;
return Ok(TableFunction::new_postgres_query(args)?.into());
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
}
// UDTF
if let Some(ref udf) = udf
&& udf.kind.is_table()
Expand Down
Loading
Loading