From a612761adf5e12d163726804118712eacde45cea Mon Sep 17 00:00:00 2001 From: Jacob Nelson Date: Mon, 12 Oct 2015 23:31:14 -0700 Subject: [PATCH] revert eailer scaling fix; add automatic poll tick estimation --- system/tasks/TaskingScheduler.cpp | 14 +++------ system/tasks/TaskingScheduler.hpp | 52 +++++++++++++++++++++++-------- 2 files changed, 44 insertions(+), 22 deletions(-) diff --git a/system/tasks/TaskingScheduler.cpp b/system/tasks/TaskingScheduler.cpp index afe8bb6b4..14bd07b27 100644 --- a/system/tasks/TaskingScheduler.cpp +++ b/system/tasks/TaskingScheduler.cpp @@ -38,9 +38,8 @@ #include "../PerformanceTools.hpp" /// TODO: this should be based on some actual time-related metric so behavior is predictable across machines -DEFINE_int64( periodic_poll_ticks, 0, "number of ticks to wait before polling periodic queue for one core (set to 0 for auto-growth)"); -DEFINE_int64( periodic_poll_ticks_base, 28000, "number of ticks to wait before polling periodic queue for one core (see _growth for increase)"); -DEFINE_int64( periodic_poll_ticks_growth, 281, "number of ticks to add per core"); +DEFINE_int64( periodic_poll_ticks, 0, "number of ticks to wait before polling periodic queue for one core (set to 0 for dynamic balance using poll_factor)"); +DEFINE_double( poll_factor, 6.0, "ratio between time spent working and time spent polling network (default 6:1)"); DEFINE_bool(poll_on_idle, true, "have tasking layer poll aggregator if it has nothing better to do"); @@ -85,6 +84,7 @@ TaskingScheduler global_scheduler; , work_args( NULL ) , previous_periodic_ts( 0 ) , periodic_poll_ticks( 0 ) + , dynamic_poll_ticks( true ) , in_no_switch_region_( false ) , prev_ts( 0 ) , prev_stats_blob_ts( 0 ) @@ -101,13 +101,9 @@ void TaskingScheduler::init ( Worker * master_arg, TaskManager * taskman ) { current_thread = master; task_manager = taskman; work_args = new task_worker_args( taskman, this ); - if( 0 == FLAGS_periodic_poll_ticks ) { - periodic_poll_ticks = FLAGS_periodic_poll_ticks_base + global_communicator.cores * FLAGS_periodic_poll_ticks_growth; - if( 0 == global_communicator.mycore ) { - VLOG(2) << "Actual periodic poll ticks value is " << periodic_poll_ticks; - } - } else { + if( 0 != FLAGS_periodic_poll_ticks ) { periodic_poll_ticks = FLAGS_periodic_poll_ticks; + dynamic_poll_ticks = false; } } diff --git a/system/tasks/TaskingScheduler.hpp b/system/tasks/TaskingScheduler.hpp index 640808828..ef8ce4f99 100644 --- a/system/tasks/TaskingScheduler.hpp +++ b/system/tasks/TaskingScheduler.hpp @@ -59,8 +59,6 @@ GRAPPA_DECLARE_METRIC( SimpleMetric, scheduler_context_switches ); GRAPPA_DECLARE_METRIC( SimpleMetric, scheduler_count); - - // forward declarations namespace Grappa { namespace impl { void idle_flush_rdma_aggregator(); } @@ -71,6 +69,8 @@ namespace Metrics { void sample_all(); } bool idle_flush_aggregator(); DECLARE_int64( periodic_poll_ticks ); +DECLARE_double( poll_factor ); + DECLARE_bool(poll_on_idle); DECLARE_bool(flush_on_idle); DECLARE_bool(rdma_flush_on_idle); @@ -130,8 +130,9 @@ class TaskingScheduler : public Scheduler { // STUB: replace with real periodic threads Grappa::Timestamp previous_periodic_ts; Grappa::Timestamp periodic_poll_ticks; + bool dynamic_poll_ticks; inline bool should_run_periodic( Grappa::Timestamp current_ts ) { - return current_ts - previous_periodic_ts > periodic_poll_ticks; + return (current_ts - previous_periodic_ts) > periodic_poll_ticks; } Worker * periodicDequeue(Grappa::Timestamp current_ts) { @@ -140,6 +141,9 @@ class TaskingScheduler : public Scheduler { // Grappa::Timestamp current_ts = Grappa::timestamp(); if( should_run_periodic( current_ts ) ) { + // record time at start of periodic worker execution + previous_periodic_ts = Grappa::force_tick(); + // run periodic thread return periodicQ.dequeue(); } else { return NULL; @@ -193,16 +197,19 @@ class TaskingScheduler : public Scheduler { // Grappa::Metrics::dump_stats_blob(); // } - // check for periodic tasks - result = periodicDequeue(current_ts); - if (result != NULL) { - // DVLOG(5) << current_thread->id << " scheduler: pick periodic"; - *(stats.state_timers[ stats.prev_state ]) += (current_ts - prev_ts) / tick_scale; - stats.prev_state = TaskingSchedulerMetrics::StatePoll; - prev_ts = current_ts; - return result; + // Check for periodic tasks. Skip if we ran the periodic + // thread in the last scheduling cycle to give new tasks a + // chance to start. + if( stats.prev_state != TaskingSchedulerMetrics::StatePoll ) { + result = periodicDequeue(current_ts); + if (result != NULL) { + // DVLOG(5) << current_thread->id << " scheduler: pick periodic"; + *(stats.state_timers[ stats.prev_state ]) += (current_ts - prev_ts) / tick_scale; + stats.prev_state = TaskingSchedulerMetrics::StatePoll; + prev_ts = current_ts; + return result; + } } - // check ready tasks result = readyQ.dequeue(); @@ -379,8 +386,27 @@ class TaskingScheduler : public Scheduler { /// Put the Worker into the periodic queue void periodic( Worker * thr ) { periodicQ.enqueue( thr ); - Grappa::tick(); + + // record time at end of periodic worker execution Grappa::Timestamp current_ts = Grappa::force_tick(); + + if( dynamic_poll_ticks ) { // set next timeout based on this poll time + // first make sure we have a starting timestamp. + if( previous_periodic_ts == 0 ) { + previous_periodic_ts = current_ts; + } + + // now figure out how long polling took + auto poll_ticks = current_ts - previous_periodic_ts; + + + if( poll_ticks > 0 ) { + // assuming the timestamp counter hasn't rolled over, + // compute next time to run polling thread + periodic_poll_ticks = static_cast( FLAGS_poll_factor * poll_ticks ); + } + } + previous_periodic_ts = current_ts; }