Skip to content

Commit

Permalink
Timeout (#48)
Browse files Browse the repository at this point in the history
Timeout (#48)
  • Loading branch information
gr211 authored Oct 20, 2023
1 parent 91d0d3d commit b4f6173
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 87 deletions.
92 changes: 41 additions & 51 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies = ["release"]
script = '''
mkdir -p ${DESTDIR}/usr/local/bin
sudo cp target/release/kinesis-tailr ${DESTDIR}/usr/local/bin/
echo Installed to ${DESTDIR}/usr/local/bin/kinesis-tailr
'''

[tasks.install]
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The [release page](https://github.com/grumlimited/kinesis-tailr/releases) provid
--from-datetime <FROM_DATETIME> Start datetime position to tail from. ISO 8601 format
--to-datetime <TO_DATETIME> End datetime position to tail up to. ISO 8601 format
--max-messages <MAX_MESSAGES> Maximum number of messages to retrieve
--timeout <TIMEOUT> Exit if no messages received after <timeout> seconds
--max-attempts <MAX_ATTEMPTS> Maximum number of aws sdk retries. Increase if you are seeing throttling errors [default: 3]
--no-color Disable color output
--print-delimiter Print a delimiter between each payload
Expand Down
4 changes: 4 additions & 0 deletions src/cli_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub struct Opt {
#[structopt(long)]
pub max_messages: Option<u32>,

/// Exit if no messages received after <timeout> seconds.
#[structopt(long)]
pub timeout: Option<u16>,

/// Maximum number of aws sdk retries. Increase if you are seeing throttling errors.
#[structopt(long)]
#[clap(default_value_t = 3)]
Expand Down
4 changes: 2 additions & 2 deletions src/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ where
.send(TickerMessage::CountUpdate(ShardCountUpdate {
shard_id: self.get_config().shard_id.clone(),
millis_behind,
nb_records,
}))
.await
.expect("Could not send TickerUpdate to tx_ticker_updates");
Expand Down Expand Up @@ -236,8 +237,7 @@ where

tx_shard_iterator_progress
.send(shard_iterator_progress)
.await
.unwrap();
.await?;
} else {
debug!(
"{} records in batch for shard-id {} and {} records before {}",
Expand Down
13 changes: 2 additions & 11 deletions src/kinesis/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use aws_sdk_kinesis::error::SdkError;
use aws_sdk_kinesis::operation::get_shard_iterator::{
GetShardIteratorError, GetShardIteratorOutput,
};
use aws_sdk_kinesis::operation::list_shards::{ListShardsError, ListShardsOutput};
use aws_sdk_kinesis::operation::list_shards::ListShardsOutput;
use chrono::Utc;
use log::{debug, info};
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -180,15 +179,7 @@ pub async fn get_shards(client: &AwsKinesisClient, stream: &str) -> io::Result<V

Ok(shards)
}
Err(e) => {
let message = match e.downcast_ref::<SdkError<ListShardsError>>() {
Some(SdkError::ServiceError(inner)) => inner.err().to_string(),
Some(other) => other.to_string(),
_ => e.to_string(),
};

Err(io::Error::new(io::ErrorKind::Other, message))
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, format!("{:?}", e))),
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/kinesis/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use anyhow::Result;
use async_trait::async_trait;
use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput;
use aws_sdk_kinesis::primitives::DateTime;
use chrono::Utc;
use chrono::{Duration, Utc};
use std::fmt::Debug;
use std::sync::Arc;
use thiserror::Error;
Expand All @@ -32,6 +32,8 @@ pub enum ShardProcessorADT {
pub enum ProcessError {
#[error("The stream panicked: {0}")]
PanicError(String),
#[error("The stream timed out after {0}.")]
Timeout(Duration),
}

#[derive(Debug, Clone, PartialEq)]
Expand Down
6 changes: 4 additions & 2 deletions src/kinesis/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ async fn produced_record_is_processed() {
ticker_update,
TickerMessage::CountUpdate(ShardCountUpdate {
shard_id: "shardId-000000000000".to_string(),
millis_behind: 1000
millis_behind: 1000,
nb_records: 1
})
);

Expand Down Expand Up @@ -176,7 +177,8 @@ async fn beyond_to_timestamp_is_received() {
ticker_update,
TickerMessage::CountUpdate(ShardCountUpdate {
shard_id: "shardId-000000000000".to_string(),
millis_behind: 1000
millis_behind: 1000,
nb_records: 1
})
);

Expand Down
Loading

0 comments on commit b4f6173

Please sign in to comment.