Skip to content

Commit

Permalink
Removed cloned shard_id + syntax clean ups (#49)
Browse files Browse the repository at this point in the history
Removed cloned shard_id + syntax clean ups (#49)
  • Loading branch information
gr211 authored Oct 22, 2023
1 parent b4f6173 commit 056ee6f
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 50 deletions.
23 changes: 12 additions & 11 deletions src/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod client {
use aws_config::meta::region::RegionProviderChain;
use aws_config::retry::RetryConfig;
use aws_sdk_kinesis::config::Region;
use aws_sdk_kinesis::error::SdkError;
use aws_sdk_kinesis::operation::get_records::GetRecordsOutput;
use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput;
use aws_sdk_kinesis::operation::list_shards::ListShardsOutput;
Expand Down Expand Up @@ -49,7 +50,7 @@ pub mod client {

fn get_region(&self) -> Option<&Region>;

fn to_aws_datetime(timestamp: &chrono::DateTime<Utc>) -> DateTime {
fn aws_datetime(timestamp: &chrono::DateTime<Utc>) -> DateTime {
DateTime::from_millis(timestamp.timestamp_millis())
}
}
Expand All @@ -66,7 +67,7 @@ pub mod client {
None => self.client.list_shards().stream_name(stream),
};

builder.send().await.map_err(|e| e.into())
builder.send().await.map_err(Into::into)
}

async fn get_records(&self, shard_iterator: &str) -> Result<GetRecordsOutput> {
Expand All @@ -75,8 +76,8 @@ pub mod client {
.shard_iterator(shard_iterator)
.send()
.await
.map_err(|e| e.into_service_error())
.map_err(|e| e.into())
.map_err(SdkError::into_service_error)
.map_err(Into::into)
}

async fn get_shard_iterator_at_timestamp(
Expand All @@ -88,13 +89,13 @@ pub mod client {
self.client
.get_shard_iterator()
.shard_iterator_type(ShardIteratorType::AtTimestamp)
.timestamp(Self::to_aws_datetime(timestamp))
.timestamp(Self::aws_datetime(timestamp))
.stream_name(stream)
.shard_id(shard_id)
.send()
.await
.map_err(|e| e.into_service_error())
.map_err(|e| e.into())
.map_err(SdkError::into_service_error)
.map_err(Into::into)
}

async fn get_shard_iterator_at_sequence(
Expand All @@ -111,8 +112,8 @@ pub mod client {
.shard_id(shard_id)
.send()
.await
.map_err(|e| e.into_service_error())
.map_err(|e| e.into())
.map_err(SdkError::into_service_error)
.map_err(Into::into)
}

async fn get_shard_iterator_latest(
Expand All @@ -127,8 +128,8 @@ pub mod client {
.shard_id(shard_id)
.send()
.await
.map_err(|e| e.into_service_error())
.map_err(|e| e.into())
.map_err(SdkError::into_service_error)
.map_err(Into::into)
}

fn get_region(&self) -> Option<&Region> {
Expand Down
20 changes: 7 additions & 13 deletions src/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ where
match res.next_shard_iterator {
Some(shard_iterator) => {
let result = self
.publish_records_shard(
&shard_iterator,
self.get_config().tx_ticker_updates.clone(),
tx_shard_iterator_progress.clone(),
)
.publish_records_shard(&shard_iterator, tx_shard_iterator_progress.clone())
.await;

if let Err(e) = result {
Expand Down Expand Up @@ -97,7 +93,9 @@ where
None => {
if let Some(sender) = self.get_config().tx_ticker_updates {
sender
.send(TickerMessage::RemoveShard(res.shard_id.clone()))
.send(TickerMessage::RemoveShard(
self.get_config().shard_id.clone(),
))
.await
.expect("Could not send RemoveShard to tx_ticker_updates");
};
Expand Down Expand Up @@ -137,13 +135,11 @@ where

match self.get_iterator().await {
Ok(resp) => {
let shard_iterator: Option<String> = resp.shard_iterator().map(|s| s.into());
tx_shard_iterator_progress
.clone()
.send(ShardIteratorProgress {
shard_id: self.get_config().shard_id,
last_sequence_id: None,
next_shard_iterator: shard_iterator,
next_shard_iterator: resp.shard_iterator().map(str::to_string),
})
.await?;
}
Expand All @@ -165,10 +161,10 @@ where
async fn publish_records_shard(
&self,
shard_iterator: &str,
tx_ticker_updates: Option<Sender<TickerMessage>>,
tx_shard_iterator_progress: Sender<ShardIteratorProgress>,
) -> Result<()> {
let resp = self.get_config().client.get_records(shard_iterator).await?;
let tx_ticker_updates = self.get_config().tx_ticker_updates;

let next_shard_iterator = resp.next_shard_iterator();

Expand Down Expand Up @@ -230,9 +226,8 @@ where
.map(|s| s.into());

let shard_iterator_progress = ShardIteratorProgress {
shard_id: self.get_config().shard_id,
last_sequence_id,
next_shard_iterator: next_shard_iterator.map(|s| s.into()),
next_shard_iterator: next_shard_iterator.map(str::to_string),
};

tx_shard_iterator_progress
Expand All @@ -252,7 +247,6 @@ where

tx_shard_iterator_progress
.send(ShardIteratorProgress {
shard_id: self.get_config().shard_id.clone(),
last_sequence_id: None,
next_shard_iterator: None,
})
Expand Down
17 changes: 7 additions & 10 deletions src/kinesis/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use aws_sdk_kinesis::operation::get_shard_iterator::{
GetShardIteratorError, GetShardIteratorOutput,
};
use aws_sdk_kinesis::operation::list_shards::ListShardsOutput;
use aws_sdk_kinesis::types::Shard;
use chrono::Utc;
use log::{debug, info};
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -101,12 +102,12 @@ where
.map(|output| {
(
Some(last_sequence_id.clone()),
output.shard_iterator().map(|v| v.to_string()),
output.shard_iterator().map(str::to_string),
)
}),
None => get_latest_iterator(iterator_provider)
.await
.map(|output| (None, output.shard_iterator().map(|v| v.to_string()))),
.map(|output| (None, output.shard_iterator().map(str::to_string))),
};

match result {
Expand All @@ -118,7 +119,6 @@ where

tx_shard_iterator_progress
.send(ShardIteratorProgress {
shard_id: shard_iterator_progress.shard_id.clone(),
last_sequence_id: sequence_id,
next_shard_iterator: Some(iterator),
})
Expand Down Expand Up @@ -166,13 +166,10 @@ pub async fn get_shards(client: &AwsKinesisClient, stream: &str) -> io::Result<V
Ok(_) => {
let shards: Vec<String> = results
.iter()
.flat_map(|r| {
r.shards()
.unwrap()
.iter()
.map(|s| s.shard_id().unwrap().to_string())
.collect::<Vec<String>>()
})
.filter_map(ListShardsOutput::shards)
.flat_map(|s| s.iter())
.filter_map(Shard::shard_id)
.map(str::to_string)
.collect::<Vec<String>>();

info!("Found {} shards", shards.len());
Expand Down
2 changes: 0 additions & 2 deletions src/kinesis/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use tokio::sync::Semaphore;

#[derive(Debug, Clone)]
pub struct ShardIteratorProgress {
pub(crate) shard_id: String,
pub(crate) last_sequence_id: Option<String>,
pub(crate) next_shard_iterator: Option<String>,
}
Expand Down Expand Up @@ -103,7 +102,6 @@ pub trait ShardProcessor<K: KinesisClient>: Send + Sync {
async fn publish_records_shard(
&self,
shard_iterator: &str,
tx_ticker: Option<Sender<TickerMessage>>,
tx_shard_iterator_progress: Sender<ShardIteratorProgress>,
) -> Result<()>;

Expand Down
3 changes: 0 additions & 3 deletions src/kinesis/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ async fn seed_shards_test() {

let shard_iterator_progress = rx_shard_iterator_progress.recv().await.unwrap();

assert_eq!(shard_iterator_progress.shard_id, "shardId-000000000000");
assert_eq!(
shard_iterator_progress.next_shard_iterator,
Some("shard_iterator_latest".to_string())
Expand Down Expand Up @@ -296,7 +295,6 @@ async fn has_records_beyond_end_ts_when_no_end_ts() {
#[tokio::test]
async fn handle_iterator_refresh_ok() {
let shard_iterator_progress = ShardIteratorProgress {
shard_id: "shardId-000000000000".to_string(),
last_sequence_id: Some("sequence_id".to_string()),
next_shard_iterator: Some("some_iterator".to_string()),
};
Expand Down Expand Up @@ -331,7 +329,6 @@ async fn handle_iterator_refresh_ok() {

let progress = rx_shard_iterator_progress.recv().await.unwrap();

assert_eq!(progress.shard_id, "shardId-000000000000".to_string());
assert_eq!(progress.last_sequence_id, Some("sequence_id".to_string()));
assert_eq!(
progress.next_shard_iterator,
Expand Down
22 changes: 11 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,18 @@ async fn main() -> Result<()> {
let shard_id = shard_id.clone();
let semaphore = semaphore.clone();

tokio::spawn(async move {
let shard_processor = kinesis::helpers::new(
client.clone(),
stream_name,
shard_id,
from_datetime,
to_datetime,
semaphore,
tx_records.clone(),
tx_ticker_updates.clone(),
);
let shard_processor = kinesis::helpers::new(
client.clone(),
stream_name,
shard_id,
from_datetime,
to_datetime,
semaphore,
tx_records.clone(),
tx_ticker_updates.clone(),
);

tokio::spawn(async move {
shard_processor.run().await.unwrap();
})
})
Expand Down

0 comments on commit 056ee6f

Please sign in to comment.