Skip to content

Commit

Permalink
Removed some unneeded clone()s (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
gr211 authored May 13, 2024
1 parent 8c7066d commit 88bdf52
Show file tree
Hide file tree
Showing 7 changed files with 7 additions and 65 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Arch Linux.
--to-datetime <TO_DATETIME> End datetime position to tail up to. ISO 8601 format
--max-messages <MAX_MESSAGES> Maximum number of messages to retrieve
--timeout <TIMEOUT> Exit if no messages received after <timeout> seconds
--max-attempts <MAX_ATTEMPTS> Maximum number of aws sdk retries. Increase if you are seeing throttling errors [default: 3]
--max-attempts <MAX_ATTEMPTS> Maximum number of aws sdk retries. Increase if you are seeing throttling errors [default: 10]
--no-color Disable color output
--print-delimiter Print a delimiter between each payload
--print-key Print the partition key
Expand All @@ -52,7 +52,6 @@ Arch Linux.
--progress Print progress status
--shard-id <SHARD_ID> Shard ID to tail from. Repeat option for each shard ID to filter on
-o, --output-file <OUTPUT_FILE> Output file to write to
-c, --concurrent <CONCURRENT> Concurrent number of shards to tail
-v, --verbose Display additional information
--base64 Base64 encode payloads (eg. for binary data)
--utf8 Forces UTF-8 printable payloads
Expand Down
8 changes: 1 addition & 7 deletions src/cli_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use chrono::{DateTime, Utc};
use clap::Parser;
use log::info;

pub const SEMAPHORE_DEFAULT_SIZE: usize = 50;

#[derive(Debug, Parser)]
#[command(
version = "{#RELEASE_VERSION} - Grum Ltd\nReport bugs to https://github.com/grumlimited/kinesis-tailr/issues"
Expand Down Expand Up @@ -42,7 +40,7 @@ pub struct Opt {

/// Maximum number of aws sdk retries. Increase if you are seeing throttling errors.
#[structopt(long)]
#[clap(default_value_t = 3)]
#[clap(default_value_t = 10)]
pub max_attempts: u32,

/// Disable color output
Expand Down Expand Up @@ -81,10 +79,6 @@ pub struct Opt {
#[structopt(long, short)]
pub output_file: Option<String>,

/// Concurrent number of shards to tail
#[structopt(short, long)]
pub concurrent: Option<usize>,

/// Display additional information
#[structopt(short, long)]
pub verbose: bool,
Expand Down
16 changes: 2 additions & 14 deletions src/kinesis.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use aws_sdk_kinesis::operation::get_records::GetRecordsError;
use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput;
use chrono::prelude::*;
use chrono::Utc;
use log::{debug, warn};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::time::{sleep, Duration};
Expand Down Expand Up @@ -42,14 +43,6 @@ where
self.seed_shards(tx_shard_iterator_progress.clone()).await?;

while let Some(res) = rx_shard_iterator_progress.recv().await {
let permit = self
.get_config()
.semaphore
.clone()
.acquire_owned()
.await
.unwrap();

let res_clone = res.clone();

match res.next_shard_iterator {
Expand Down Expand Up @@ -112,8 +105,6 @@ where
rx_shard_iterator_progress.close();
}
};

drop(permit);
}

debug!("ShardProcessor {} finished", self.get_config().shard_id);
Expand All @@ -138,8 +129,6 @@ where
&self,
tx_shard_iterator_progress: Sender<ShardIteratorProgress>,
) -> Result<()> {
let permit = self.get_config().semaphore.clone().acquire_owned().await?;

debug!("Seeding shard {}", self.get_config().shard_id);

match self.get_iterator().await {
Expand All @@ -160,7 +149,6 @@ where
}
}

drop(permit);
Ok(())
}

Expand Down
6 changes: 1 addition & 5 deletions src/kinesis/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use aws_sdk_kinesis::types::Shard;
use chrono::Utc;
use log::{debug, info};
use tokio::sync::mpsc::Sender;
use tokio::sync::Semaphore;
use tokio::time::sleep;

use crate::aws::client::AwsKinesisClient;
Expand All @@ -33,7 +32,6 @@ pub fn new(
shard_id: String,
from_datetime: Option<chrono::DateTime<Utc>>,
to_datetime: Option<chrono::DateTime<Utc>>,
semaphore: Arc<Semaphore>,
tx_records: Sender<Result<ShardProcessorADT, ProcessError>>,
tx_ticker_updates: Option<Sender<TickerMessage>>,
) -> Box<dyn ShardProcessor<AwsKinesisClient> + Send + Sync> {
Expand All @@ -46,7 +44,6 @@ pub fn new(
stream,
shard_id: Arc::new(shard_id),
to_datetime,
semaphore,
tx_records,
tx_ticker_updates,
},
Expand All @@ -58,7 +55,6 @@ pub fn new(
stream,
shard_id: Arc::new(shard_id),
to_datetime,
semaphore,
tx_records,
tx_ticker_updates,
},
Expand Down Expand Up @@ -194,5 +190,5 @@ pub fn wait_milliseconds() -> u64 {
use rand::prelude::*;
let mut rng = thread_rng();

rng.gen_range(50..=1000)
rng.gen_range(50..=100)
}
2 changes: 0 additions & 2 deletions src/kinesis/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::fmt::Debug;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::mpsc::Sender;
use tokio::sync::Semaphore;

#[derive(Debug, Clone)]
pub struct ShardIteratorProgress {
Expand Down Expand Up @@ -50,7 +49,6 @@ pub struct ShardProcessorConfig {
pub stream: String,
pub shard_id: Arc<String>,
pub to_datetime: Option<chrono::DateTime<Utc>>,
pub semaphore: Arc<Semaphore>,
pub tx_records: Sender<Result<ShardProcessorADT, ProcessError>>,
pub tx_ticker_updates: Option<Sender<TickerMessage>>,
}
Expand Down
21 changes: 1 addition & 20 deletions src/kinesis/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use aws_sdk_kinesis::types::error::InvalidArgumentException;
use aws_sdk_kinesis::types::{Record, Shard};
use chrono::prelude::*;
use chrono::Utc;
use tokio::sync::{mpsc, Semaphore};
use tokio::sync::mpsc;

use crate::aws::stream::StreamClient;
use crate::kinesis::helpers;
Expand All @@ -34,15 +34,12 @@ async fn seed_shards_test() {
done: Arc::new(Mutex::new(false)),
};

let semaphore: Arc<Semaphore> = Arc::new(Semaphore::new(10));

let processor = ShardProcessorLatest {
client,
config: ShardProcessorConfig {
stream: "test".to_string(),
shard_id: Arc::new("shardId-000000000000".to_string()),
to_datetime: None,
semaphore,
tx_records,
tx_ticker_updates: Some(tx_ticker_updates),
},
Expand Down Expand Up @@ -72,15 +69,12 @@ async fn seed_shards_test_timestamp_in_future() {

let client = TestTimestampInFutureKinesisClient {};

let semaphore: Arc<Semaphore> = Arc::new(Semaphore::new(10));

let processor = ShardProcessorAtTimestamp {
client,
config: ShardProcessorConfig {
stream: "test".to_string(),
shard_id: Arc::new("shardId-000000000000".to_string()),
to_datetime: None,
semaphore,
tx_records,
tx_ticker_updates: Some(tx_ticker_updates),
},
Expand All @@ -102,15 +96,12 @@ async fn produced_record_is_processed() {
done: Arc::new(Mutex::new(false)),
};

let semaphore: Arc<Semaphore> = Arc::new(Semaphore::new(10));

let processor = ShardProcessorLatest {
client: client.clone(),
config: ShardProcessorConfig {
stream: "test".to_string(),
shard_id: Arc::new("shardId-000000000000".to_string()),
to_datetime: None,
semaphore,
tx_records,
tx_ticker_updates: Some(tx_ticker_updates),
},
Expand Down Expand Up @@ -149,16 +140,13 @@ async fn beyond_to_timestamp_is_received() {
done: Arc::new(Mutex::new(false)),
};

let semaphore: Arc<Semaphore> = Arc::new(Semaphore::new(10));

let to_datetime = Utc.with_ymd_and_hms(2020, 6, 1, 12, 0, 0).unwrap();
let processor = ShardProcessorLatest {
client,
config: ShardProcessorConfig {
stream: "test".to_string(),
shard_id: Arc::new("shardId-000000000000".to_string()),
to_datetime: Some(to_datetime),
semaphore,
tx_records,
tx_ticker_updates: Some(tx_ticker_updates),
},
Expand Down Expand Up @@ -190,16 +178,13 @@ async fn has_records_beyond_end_ts_when_has_end_ts() {
done: Arc::new(Mutex::new(false)),
};

let semaphore: Arc<Semaphore> = Arc::new(Semaphore::new(10));

let to_datetime = Utc.with_ymd_and_hms(2020, 6, 1, 12, 0, 0).unwrap();
let processor = ShardProcessorLatest {
client,
config: ShardProcessorConfig {
stream: "test".to_string(),
shard_id: Arc::new("shardId-000000000000".to_string()),
to_datetime: Some(to_datetime),
semaphore,
tx_records,
tx_ticker_updates: Some(tx_ticker_updates),
},
Expand Down Expand Up @@ -251,15 +236,12 @@ async fn has_records_beyond_end_ts_when_no_end_ts() {
done: Arc::new(Mutex::new(false)),
};

let semaphore: Arc<Semaphore> = Arc::new(Semaphore::new(10));

let processor = ShardProcessorLatest {
client,
config: ShardProcessorConfig {
stream: "test".to_string(),
shard_id: Arc::new("shardId-000000000000".to_string()),
to_datetime: None,
semaphore,
tx_records,
tx_ticker_updates: Some(tx_ticker_updates),
},
Expand Down Expand Up @@ -303,7 +285,6 @@ async fn handle_iterator_refresh_ok() {
stream: "test".to_string(),
shard_id: Arc::new("shardId-000000000000".to_string()),
to_datetime: None,
semaphore: Arc::new(Semaphore::new(10)),
tx_records: mpsc::channel::<Result<ShardProcessorADT, ProcessError>>(10).0,
tx_ticker_updates: Some(mpsc::channel::<TickerMessage>(10).0),
},
Expand Down
16 changes: 1 addition & 15 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#![allow(clippy::result_large_err)]

use std::sync::Arc;

use anyhow::Result;
use clap::Parser;
use tokio::sync::{mpsc, Semaphore};
use tokio::sync::mpsc;
use tokio::task::JoinSet;

use kinesis::helpers::get_shards;
Expand Down Expand Up @@ -115,8 +113,6 @@ async fn main() -> Result<()> {
};

let shard_processors = {
let semaphore = semaphore(shard_count, opt.concurrent);

selected_shards
.iter()
.map(|shard_id| {
Expand All @@ -126,7 +122,6 @@ async fn main() -> Result<()> {
shard_id.clone(),
from_datetime,
to_datetime,
semaphore.clone(),
tx_records.clone(),
tx_ticker_updates.clone(),
);
Expand All @@ -150,12 +145,3 @@ async fn main() -> Result<()> {

Ok(())
}

fn semaphore(shard_count: usize, concurrent: Option<usize>) -> Arc<Semaphore> {
let concurrent = match concurrent {
Some(concurrent) => concurrent,
None => std::cmp::min(shard_count, SEMAPHORE_DEFAULT_SIZE),
};

Arc::new(Semaphore::new(concurrent))
}

0 comments on commit 88bdf52

Please sign in to comment.