Skip to content

Commit

Permalink
fix test_query_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
suharev7 committed Sep 25, 2019
1 parent 96577d4 commit 153fb2d
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 18 deletions.
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "clickhouse-rs"
version = "0.1.15"
version = "0.1.16"
authors = ["Mikhail Sukharev <[email protected]>"]
license = "MIT"
homepage = "https://github.com/suharev7/clickhouse-rs"
Expand All @@ -14,8 +14,8 @@ edition = "2018"
exclude = ["test/*"]

[dependencies]
log = "0.4.6"
futures = "0.1.28"
log = "0.4.8"
futures = "0.1.29"
tokio = "0.1.22"
tokio-timer = "0.2.11"
hostname = "^0.1"
Expand All @@ -30,8 +30,8 @@ byteorder = "1.3.1"
failure = "0.1"
failure_derive = "0.1"
url="^2"
lazy_static = "1.3.0"
lazy_static = "1.4.0"

[dev-dependencies]
env_logger = "0.6.2"
rand = "0.7.0"
env_logger = "^0.7"
rand = "^0.7"
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ parameters:
- `retry_timeout` - Amount of time to wait before next retry. (defaults to `5 sec`).
- `ping_timeout` - Timeout for ping (defaults to `500 ms`).

- `query_timeout` - Timeout for queries (defaults to `180 sec`).
- `query_block_timeout` - Timeout for each block in a query (defaults to `180 sec`).

example:
```url
tcp://user:password@host:9000/clicks?compression=lz4&ping_timeout=42ms
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
//! - `retry_timeout` - Amount of time to wait before next retry. (defaults to `5 sec`).
//! - `ping_timeout` - Timeout for ping (defaults to `500 ms`).
//!
//! - `query_timeout` - Timeout for queries (defaults to `180 sec`).
//! - `query_block_timeout` - Timeout for each block in a query (defaults to `180 sec`).
//!
//! example:
//! ```url
//! tcp://user:password@host:9000/clicks?compression=lz4&ping_timeout=42ms
Expand Down
3 changes: 1 addition & 2 deletions src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,13 +470,12 @@ mod test {

#[test]
fn test_query_timeout() {
let test_db_url = format!("{}{}", DATABASE_URL.as_str(), "&query_timeout=5ms");
let test_db_url = format!("{}{}", DATABASE_URL.as_str(), "&query_timeout=10ms");
let pool = Pool::new(test_db_url.to_string());

let done = pool.get_handle()
.and_then(|c| c.query("SELECT sleep(10)").fetch_all());


run(done).unwrap_err();

let info = pool.info();
Expand Down
2 changes: 1 addition & 1 deletion src/types/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
binary::{protocol, Encoder},
binary::{Encoder, protocol},
client_info,
types::{Block, ClickhouseResult, Context, Query},
};
Expand Down
9 changes: 5 additions & 4 deletions src/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const DEFAULT_MIN_CONNS: usize = 10;
const DEFAULT_MAX_CONNS: usize = 20;

#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum State {
Raw(Options),
Url(String),
Expand Down Expand Up @@ -168,10 +169,10 @@ pub struct Options {
/// Timeout for connection (defaults to `500 ms`)
pub(crate) connection_timeout: Duration,

/// Timeout for queries (defaults to `180,000 ms`)
/// Timeout for queries (defaults to `180 sec`)
pub(crate) query_timeout: Duration,

/// Timeout for each block in a query (defaults to `180,000 ms`)
/// Timeout for each block in a query (defaults to `180 sec`)
pub(crate) query_block_timeout: Duration,
}

Expand All @@ -192,8 +193,8 @@ impl Default for Options {
retry_timeout: Duration::from_secs(5),
ping_timeout: Duration::from_millis(500),
connection_timeout: Duration::from_millis(500),
query_timeout: Duration::from_millis(180_000),
query_block_timeout: Duration::from_millis(180_000),
query_timeout: Duration::from_secs(180),
query_block_timeout: Duration::from_secs(180),
}
}
}
Expand Down
13 changes: 8 additions & 5 deletions src/types/query_result/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,13 @@ impl QueryResult {

/// Fetch data from table. It returns a block that contains all rows.
pub fn fetch_all(self) -> BoxFuture<(ClientHandle, Block)> {
let timeout = try_opt!(self.client.context.options.get()).query_timeout;

wrap_future(
self.fold_blocks(Vec::new(), |mut blocks, block| {
if !block.is_empty() {
blocks.push(block);
}
Ok(blocks)
})
.timeout(timeout)
.map_err(Error::from)
.map(|(h, blocks)| (h, Block::concat(blocks.as_slice())))
)
Expand All @@ -99,6 +96,7 @@ impl QueryResult {
Fut::Future: Send,
T: Send + 'static,
{
let timeout = try_opt!(self.client.context.options.get()).query_timeout;
let context = self.client.context.clone();
let pool = self.client.pool.clone();
let release_pool = self.client.pool.clone();
Expand All @@ -122,9 +120,10 @@ impl QueryResult {
_ => Either::Right(future::err(Error::Driver(DriverError::UnexpectedPacket))),
})
.map(|(c, t)| (c.unwrap(), t))
.timeout(timeout)
.map_err(move |err| {
release_pool.release_conn();
err
err.into()
}),
)
}
Expand Down Expand Up @@ -187,6 +186,7 @@ impl QueryResult {

let context = c.context.clone();
let pool = c.pool.clone();
let release_pool = c.pool.clone();

BlockStream::new(
c.inner
Expand All @@ -197,7 +197,10 @@ impl QueryResult {
pool,
)
.timeout(timeout)
.map_err(Error::from)
.map_err(move |err| {
release_pool.clone().release_conn();
err.into()
})
})
}

Expand Down

0 comments on commit 153fb2d

Please sign in to comment.