Skip to content

Commit

Permalink
Removed cloned shard_id
Browse files Browse the repository at this point in the history
  • Loading branch information
gr211 committed Oct 22, 2023
1 parent 9a9880b commit 09a249b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 18 deletions.
12 changes: 5 additions & 7 deletions src/kinesis/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use aws_sdk_kinesis::operation::get_shard_iterator::{
GetShardIteratorError, GetShardIteratorOutput,
};
use aws_sdk_kinesis::operation::list_shards::ListShardsOutput;
use aws_sdk_kinesis::types::Shard;
use chrono::Utc;
use log::{debug, info};
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -165,13 +166,10 @@ pub async fn get_shards(client: &AwsKinesisClient, stream: &str) -> io::Result<V
Ok(_) => {
let shards: Vec<String> = results
.iter()
.flat_map(|r| {
r.shards()
.unwrap()
.iter()
.map(|s| s.shard_id().unwrap().to_string())
.collect::<Vec<String>>()
})
.filter_map(ListShardsOutput::shards)
.flat_map(|s| s.iter())
.filter_map(Shard::shard_id)
.map(str::to_string)
.collect::<Vec<String>>();

info!("Found {} shards", shards.len());
Expand Down
22 changes: 11 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,18 @@ async fn main() -> Result<()> {
let shard_id = shard_id.clone();
let semaphore = semaphore.clone();

tokio::spawn(async move {
let shard_processor = kinesis::helpers::new(
client.clone(),
stream_name,
shard_id,
from_datetime,
to_datetime,
semaphore,
tx_records.clone(),
tx_ticker_updates.clone(),
);
let shard_processor = kinesis::helpers::new(
client.clone(),
stream_name,
shard_id,
from_datetime,
to_datetime,
semaphore,
tx_records.clone(),
tx_ticker_updates.clone(),
);

tokio::spawn(async move {
shard_processor.run().await.unwrap();
})
})
Expand Down

0 comments on commit 09a249b

Please sign in to comment.