Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Ensure active_workers are released (#2155)
Browse files Browse the repository at this point in the history
Ensure that active workers are released by running all tasks to
completion (try_join_all v.s. join_all).

In addition, add a check to skip a tick if the pool is overloaded.

Signed-off-by: Joe Grund <[email protected]>
  • Loading branch information
jgrund authored Aug 13, 2020
1 parent 4e5ade0 commit bdb4a34
Showing 1 changed file with 73 additions and 48 deletions.
121 changes: 73 additions & 48 deletions iml-task-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use futures::{future::try_join_all, lock::Mutex, TryFutureExt};
use futures::{future::join_all, lock::Mutex, FutureExt, TryFutureExt};
use iml_action_client::invoke_rust_agent;
use iml_postgres::{
get_db_pool,
sqlx::{self, Done, Executor, PgPool},
sqlx::{self, pool::PoolConnection, Done, Executor, PgPool, Postgres},
};
use iml_tracing::tracing;
use iml_wire_types::{
Expand All @@ -29,7 +29,7 @@ const FID_LIMIT: i64 = 2000;
const DELAY: Duration = Duration::from_secs(5);

async fn available_workers(
pool: &PgPool,
conn: &mut PoolConnection<Postgres>,
active: Arc<Mutex<HashSet<i32>>>,
) -> Result<Vec<LustreClient>, error::ImlTaskRunnerError> {
let ids = active.lock().await;
Expand All @@ -46,7 +46,7 @@ async fn available_workers(
"#,
&ids
)
.fetch_all(pool)
.fetch_all(conn)
.await?;

Ok(clients)
Expand Down Expand Up @@ -200,21 +200,30 @@ async fn send_work(
match invoke_rust_agent(fqdn, &action, &args).await {
Err(e) => {
tracing::info!("Failed to send {} to {}: {:?}", &action, fqdn, e);

return trans.rollback().map_ok(|_| 0).err_into().await;
}
Ok(res) => {
let agent_result: AgentResult = serde_json::from_value(res)?;

match agent_result {
Ok(data) => {
tracing::debug!("Success {} on {}: {:?}", &action, fqdn, data);
tracing::debug!("Success {} on {}: {:?}", action, fqdn, data);

let errors: Vec<FidError> = serde_json::from_value(data)?;
failed += errors.len();

if task.keep_failed {
let task_id = task.id;

for err in errors.iter() {
let fid = LustreFid::from_str(&err.fid)
.expect("FIXME: This needs proper error handling");
let fid = match LustreFid::from_str(&err.fid) {
Ok(x) => x,
Err(e) => {
tracing::info!("Could not convert FidError {:?} to LustreFid. Error: {:?}", err, e);
continue;
}
};

// #FIXME: This would be better as a bulk insert
if let Err(e) = trans
Expand Down Expand Up @@ -243,6 +252,7 @@ async fn send_work(
}
Err(err) => {
tracing::info!("Failed {} on {}: {}", &action, fqdn, err);

return trans.rollback().map_ok(|_| 0).err_into().await;
}
}
Expand Down Expand Up @@ -271,6 +281,27 @@ async fn send_work(
Ok(completed as i64)
}

async fn run_tasks(fqdn: &str, worker: &LustreClient, xs: Vec<Task>, pool: &PgPool) {
let fsname = &worker.filesystem;
let host_id = worker.host_id;

let xs = xs.into_iter().map(|task| async move {
for _ in 0..10_u8 {
let rc = send_work(&pool, &fqdn, &fsname, &task, host_id)
.inspect_err(|e| tracing::warn!("send_work({}) failed {:?}", task.name, e))
.await?;

if rc < FID_LIMIT {
break;
}
}

Ok::<_, error::ImlTaskRunnerError>(())
});

join_all(xs).await;
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
iml_tracing::init();
Expand All @@ -283,58 +314,52 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
loop {
interval.tick().await;

let workers = available_workers(&pg_pool, Arc::clone(&active_clients))
.await
.unwrap_or_default();
let mut conn = match pg_pool.try_acquire() {
Some(x) => x,
None => {
tracing::info!(
"Could not acquire connection (pool full), will try again next tick"
);
continue;
}
};

active_clients
.lock()
.await
.extend(workers.iter().map(|w| w.id));
let workers = available_workers(&mut conn, Arc::clone(&active_clients)).await?;
drop(conn);

tokio::spawn({
try_join_all(workers.into_iter().map(|worker| {
let pg_pool = pg_pool.clone();
let active_clients = Arc::clone(&active_clients);
{
let mut x = active_clients.lock().await;

async move {
let tasks = tasks_per_worker(&pg_pool, &worker).await?;
let fqdn = worker_fqdn(&pg_pool, &worker).await?;
let host_id = worker.host_id;
x.extend(workers.iter().map(|w| w.id));

let rc = try_join_all(tasks.into_iter().map(|task| {
let pg_pool = pg_pool.clone();
let fsname = &worker.filesystem;
let fqdn = &fqdn;
tracing::debug!("Active Clients {:?}", x);
}

async move {
let mut count = 0;
let xs = workers.into_iter().map(|worker| {
let pg_pool = pg_pool.clone();
let active_clients = Arc::clone(&active_clients);
let active_clients2 = Arc::clone(&active_clients);
let worker_id = worker.id;

loop {
let rc = send_work(&pg_pool, &fqdn, &fsname, &task, host_id)
.await
.map_err(|e| {
tracing::warn!("send_work({}) failed {:?}", task.name, e);
e
})?;
async move {
let tasks = tasks_per_worker(&pg_pool, &worker).await?;
let fqdn = worker_fqdn(&pg_pool, &worker).await?;

count += 1;
run_tasks(&fqdn, &worker, tasks, &pg_pool).await;

if rc < FID_LIMIT || count > 10 {
break;
}
}
Ok::<_, error::ImlTaskRunnerError>(())
}
.then(move |x| async move {
let mut c = active_clients2.lock().await;

Ok::<_, error::ImlTaskRunnerError>(())
}
}))
.await;
c.remove(&worker_id);

active_clients.lock().await.remove(&worker.id);
tracing::debug!("Releasing Client {:?}. Active Clients {:?}", worker_id, c);

rc
}
}))
x
})
});

tokio::spawn(join_all(xs));
}
}

0 comments on commit bdb4a34

Please sign in to comment.