Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 120 additions & 17 deletions scheds/rust/scx_rusty/src/bpf/main.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ struct {
__uint(map_flags, 0);
} node_data SEC(".maps");


struct lock_wrapper {
struct bpf_spin_lock lock;
};
Expand Down Expand Up @@ -302,6 +303,7 @@ static void task_load_adj(struct task_ctx *taskc,
{
taskc->runnable = runnable;
ravg_accumulate(&taskc->dcyc_rd, taskc->runnable, now, load_half_life);
ravg_accumulate(&taskc->l3_rd, taskc->l3_traffic, now, load_half_life);
}

static struct bucket_ctx *lookup_dom_bucket(dom_ptr dom_ctx,
Expand Down Expand Up @@ -343,7 +345,7 @@ static u64 scale_inverse_fair(u64 value, u64 weight)
return value * 100 / weight;
}

static void dom_dcycle_adj(dom_ptr domc, u32 weight, u64 now, bool runnable)
static void dom_dcycle_adj(dom_ptr domc, u32 weight, u64 now, bool runnable, s64 l3_adj)
{
struct bucket_ctx *bucket;
struct lock_wrapper *lockw;
Expand All @@ -363,19 +365,22 @@ static void dom_dcycle_adj(dom_ptr domc, u32 weight, u64 now, bool runnable)

bpf_spin_lock(&lockw->lock);
bucket->dcycle += adj;
bucket->l3 += l3_adj;
ravg_accumulate(&bucket->rd, bucket->dcycle, now, load_half_life);
ravg_accumulate(&bucket->l3_rd, bucket->l3, now, load_half_life);
bpf_spin_unlock(&lockw->lock);

if (adj < 0 && (s64)bucket->dcycle < 0)
scx_bpf_error("cpu%d dom%u bucket%u load underflow (dcycle=%lld adj=%lld)",
if ((adj < 0 && (s64)bucket->dcycle < 0) || ((s64) bucket->l3 - l3_adj < 0))
scx_bpf_error("cpu%d dom%u bucket%u load underflow (dcycle=%lld l3=%lld adj=%lld l3_adj=%lld)",
bpf_get_smp_processor_id(), dom_id, bucket_idx,
bucket->dcycle, adj);
bucket->dcycle, bucket->l3, adj, l3_adj);

if (debug >=2 &&
(!domc->dbg_dcycle_printed_at || now - domc->dbg_dcycle_printed_at >= 1000000000)) {
bpf_printk("DCYCLE ADJ dom=%u bucket=%u adj=%lld dcycle=%u avg_dcycle=%llu",
bpf_printk("ADJ dom=%u bucket=%u adj=%lld dcycle=%u avg_dcycle=%llu avg_l3=%llu",
dom_id, bucket_idx, adj, bucket->dcycle,
ravg_read(&bucket->rd, now, load_half_life) >> RAVG_FRAC_BITS);
ravg_read(&bucket->rd, now, load_half_life) >> RAVG_FRAC_BITS,
ravg_read(&bucket->l3_rd, now, load_half_life) >> RAVG_FRAC_BITS);
domc->dbg_dcycle_printed_at = now;
}
}
Expand All @@ -387,8 +392,9 @@ static void dom_dcycle_xfer_task(struct task_struct *p, struct task_ctx *taskc,
struct bucket_ctx *from_bucket, *to_bucket;
u32 idx = 0, weight = taskc->weight;
struct lock_wrapper *from_lockw, *to_lockw;
struct ravg_data task_dcyc_rd;
struct ravg_data task_dcyc_rd, task_l3_rd;
u64 from_dcycle[2], to_dcycle[2], task_dcycle;
u64 from_l3[2], to_l3[2], task_l3;

from_lockw = lookup_dom_bkt_lock(from_domc->id, weight);
to_lockw = lookup_dom_bkt_lock(to_domc->id, weight);
Expand All @@ -409,49 +415,78 @@ static void dom_dcycle_xfer_task(struct task_struct *p, struct task_ctx *taskc,
*/
ravg_accumulate(&taskc->dcyc_rd, taskc->runnable, now, load_half_life);
task_dcyc_rd = taskc->dcyc_rd;
if (debug >= 2)
task_l3_rd = taskc->l3_rd;
if (debug >= 2) {
task_dcycle = ravg_read(&task_dcyc_rd, now, load_half_life);
task_l3 = ravg_read(&task_dcyc_rd, now, load_half_life);
}

/* transfer out of @from_domc */
bpf_spin_lock(&from_lockw->lock);
if (taskc->runnable)
if (taskc->runnable) {
from_bucket->dcycle--;
from_bucket->l3 -= taskc->l3_traffic;
}

if (debug >= 2)
if (debug >= 2) {
from_dcycle[0] = ravg_read(&from_bucket->rd, now, load_half_life);
from_l3[0] = ravg_read(&from_bucket->rd, now, load_half_life);
}

ravg_transfer(&from_bucket->rd, from_bucket->dcycle,
&task_dcyc_rd, taskc->runnable, load_half_life, false);
ravg_transfer(&from_bucket->l3_rd, from_bucket->l3,
&task_l3_rd, taskc->l3_traffic, load_half_life, false);

if (debug >= 2)

if (debug >= 2) {
from_dcycle[1] = ravg_read(&from_bucket->rd, now, load_half_life);
from_l3[1] = ravg_read(&from_bucket->rd, now, load_half_life);
}

bpf_spin_unlock(&from_lockw->lock);

/* transfer into @to_domc */
bpf_spin_lock(&to_lockw->lock);
if (taskc->runnable)
if (taskc->runnable) {
to_bucket->dcycle++;
to_bucket->l3 += taskc->l3_traffic;
}

if (debug >= 2)
if (debug >= 2) {
to_dcycle[0] = ravg_read(&to_bucket->rd, now, load_half_life);
to_l3[0] = ravg_read(&to_bucket->rd, now, load_half_life);
}

ravg_transfer(&to_bucket->rd, to_bucket->dcycle,
&task_dcyc_rd, taskc->runnable, load_half_life, true);
ravg_transfer(&to_bucket->l3_rd, to_bucket->l3,
&task_l3_rd, taskc->l3_traffic, load_half_life, true);

if (debug >= 2)

if (debug >= 2) {
to_dcycle[1] = ravg_read(&to_bucket->rd, now, load_half_life);
to_l3[1] = ravg_read(&to_bucket->rd, now, load_half_life);
}

bpf_spin_unlock(&to_lockw->lock);

if (debug >= 2)
if (debug >= 2) {
bpf_printk("XFER DCYCLE dom%u->%u task=%lu from=%lu->%lu to=%lu->%lu",
from_domc->id, to_domc->id,
task_dcycle >> RAVG_FRAC_BITS,
from_dcycle[0] >> RAVG_FRAC_BITS,
from_dcycle[1] >> RAVG_FRAC_BITS,
to_dcycle[0] >> RAVG_FRAC_BITS,
to_dcycle[1] >> RAVG_FRAC_BITS);
bpf_printk("XFER L3 dom%u->%u task=%lu from=%lu->%lu to=%lu->%lu",
from_domc->id, to_domc->id,
task_l3 >> RAVG_FRAC_BITS,
from_l3[0] >> RAVG_FRAC_BITS,
from_l3[1] >> RAVG_FRAC_BITS,
to_l3[0] >> RAVG_FRAC_BITS,
to_l3[1] >> RAVG_FRAC_BITS);
}
}

static u64 dom_min_vruntime(dom_ptr domc)
Expand Down Expand Up @@ -572,6 +607,70 @@ static void refresh_tune_params(void)
}
}

void *bpf_cast_to_kern_ctx(void *) __ksym;

/*
* Performance counter callback.
*/
SEC("perf_event")
int read_sample(struct bpf_perf_event_data_kern __kptr *arg)
{
struct bpf_perf_event_data_kern *ctx, a;
union perf_mem_data_src data_src;
struct perf_sample_data data;
struct task_ctx *taskc;
struct task_struct *p;
int ret;

ctx = bpf_cast_to_kern_ctx(arg);

if ((ret = bpf_probe_read_kernel(&a, sizeof(a), ctx))) {
scx_bpf_error("[0] %s: bpf_probe_read_kernel failed", __func__);
return -EACCES;
}

if ((ret = bpf_probe_read_kernel(&data, sizeof(data), a.data))) {
scx_bpf_error("%s: bpf_probe_read_kernel failed", __func__);
return -EACCES;
}

data_src = ctx->data->data_src;
if (!ctx->data->sample_flags || data_src.mem_op == 1)
return 0;


p = bpf_get_current_task_btf();
if (!p) {
bpf_printk("could not retrieve current task");
return 0;
}

/* Benign failure for tasks not in scx, e.g., idle. */
taskc = try_lookup_task_ctx(p);
if (!taskc)
return 0;


/* Require level 3 because it is really spammy. */
if (debug >= 3) {
bpf_printk("(1/2) %s\t(0x%lx,0x%lx,0x%lx) ",
data_src.mem_op == 2 ? "STORE" : (data_src.mem_op == 4 ? "LOAD" : "UNKNOWN") ,
data_src.mem_lvl_num,
data_src.mem_snoop,
data_src.mem_remote
);
bpf_printk("(2/2) [%llx, %llx] 0x%lx",
ctx->data->phys_addr,
ctx->data->addr,
data_src.mem_dtlb
);
}

taskc->l3_next += 1;

return 0;
}

static u64 min(u64 a, u64 b)
{
return a <= b ? a : b;
Expand Down Expand Up @@ -1431,7 +1530,7 @@ void BPF_STRUCT_OPS(rusty_runnable, struct task_struct *p, u64 enq_flags)
wakee_ctx->is_kworker = p->flags & PF_WQ_WORKER;

task_load_adj(wakee_ctx, now, true);
dom_dcycle_adj(wakee_ctx->domc, wakee_ctx->weight, now, true);
dom_dcycle_adj(wakee_ctx->domc, wakee_ctx->weight, now, true, wakee_ctx->l3_traffic);

if (fifo_sched)
return;
Expand Down Expand Up @@ -1554,7 +1653,11 @@ void BPF_STRUCT_OPS(rusty_quiescent, struct task_struct *p, u64 deq_flags)
return;

task_load_adj(taskc, now, false);
dom_dcycle_adj(domc, taskc->weight, now, false);
dom_dcycle_adj(domc, taskc->weight, now, false, -taskc->l3_traffic);

/* Update our current L3 traffic prediction. */
taskc->l3_traffic = taskc->l3_next;
taskc->l3_next = 0;

if (fifo_sched)
return;
Expand Down
7 changes: 7 additions & 0 deletions scheds/rust/scx_rusty/src/bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ struct task_ctx {
u32 pid;

struct ravg_data dcyc_rd;
struct ravg_data l3_rd;

u64 l3_traffic;
u64 l3_next;
};

/* XXXETSAL Same rationale as for dom_ptr. Remove once we dump Clang 18.*/
Expand All @@ -76,6 +80,9 @@ typedef struct task_ctx *task_ptr;
struct bucket_ctx {
u64 dcycle;
struct ravg_data rd;

u64 l3;
struct ravg_data l3_rd;
};

struct dom_active_tasks {
Expand Down
11 changes: 9 additions & 2 deletions scheds/rust/scx_rusty/src/load_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ pub struct LoadBalancer<'a, 'b> {

lb_apply_weight: bool,
balance_load: bool,
l3_balancing: bool,
}

// Verify that the number of buckets is a factor of the maximum weight to
Expand All @@ -476,6 +477,7 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
skip_kworkers: bool,
lb_apply_weight: bool,
balance_load: bool,
l3_balancing: bool,
) -> Self {
Self {
skel,
Expand All @@ -489,6 +491,7 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
balance_load,

dom_group,
l3_balancing,
}
}

Expand Down Expand Up @@ -573,7 +576,7 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {

for bucket in 0..NUM_BUCKETS {
let bucket_ctx = &dom_ctx.buckets[bucket as usize];
let rd = &bucket_ctx.rd;
let rd = if self.l3_balancing { &bucket_ctx.l3_rd } else { &bucket_ctx.rd };
let duty_cycle = ravg_read(
rd.val,
rd.val_at,
Expand Down Expand Up @@ -653,7 +656,7 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
continue;
}

let rd = &taskc.dcyc_rd;
let rd = if self.l3_balancing { &taskc.l3_rd } else { &taskc.dcyc_rd };
let mut load = ravg_read(
rd.val,
rd.val_at,
Expand Down Expand Up @@ -769,6 +772,10 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
}

let load = *(task.load);
if load == 0.0f64 {
return Ok(None);
}

let taskc_p = task.taskc_p;
task.migrated.set(true);
std::mem::swap(&mut push_dom.tasks, &mut SortedVec::from_unsorted(tasks));
Expand Down
19 changes: 19 additions & 0 deletions scheds/rust/scx_rusty/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use tuner::Tuner;
pub mod load_balance;
use load_balance::LoadBalancer;

pub mod perf;
use perf::init_perf_counters;

mod stats;
use std::collections::BTreeMap;
use std::mem::MaybeUninit;
Expand Down Expand Up @@ -227,6 +230,10 @@ struct Opts {
/// prioritize energy efficiency. When in doubt, use 0 or 1024.
#[clap(long, default_value = "0")]
perf: u32,

/// Use L3 traffic sampling for load balancing instead of CPU load.
#[clap(long, default_value="false")]
l3_balancing: bool,
}

fn read_cpu_busy_and_total(reader: &procfs::ProcReader) -> Result<(u64, u64)> {
Expand Down Expand Up @@ -348,6 +355,8 @@ struct Scheduler<'a> {

tuner: Tuner,
stats_server: StatsServer<StatsCtx, (StatsCtx, ClusterStats)>,
_pefds: Vec<(i32, libbpf_rs::Link)>,
l3_balancing: bool,
}

impl<'a> Scheduler<'a> {
Expand Down Expand Up @@ -444,7 +453,14 @@ impl<'a> Scheduler<'a> {

// Attach.
let mut skel = scx_ops_load!(skel, rusty, uei)?;

let mut pefds: Vec<(i32, libbpf_rs::Link)> = vec![];
for i in 0..32 {
pefds.push(init_perf_counters(&mut skel, &i)?);
}

let struct_ops = Some(scx_ops_attach!(skel, rusty)?);

let stats_server = StatsServer::new(stats::server_data()).launch()?;

for (id, dom) in domains.doms().iter() {
Expand All @@ -466,6 +482,7 @@ impl<'a> Scheduler<'a> {
tune_interval: Duration::from_secs_f64(opts.tune_interval),
balance_load: !opts.no_load_balance,
balanced_kworkers: opts.balanced_kworkers,
l3_balancing: opts.l3_balancing,

dom_group: domains.clone(),
proc_reader,
Expand All @@ -482,6 +499,7 @@ impl<'a> Scheduler<'a> {
opts.slice_us_overutil * 1000,
)?,
stats_server,
_pefds: pefds,
})
}

Expand Down Expand Up @@ -560,6 +578,7 @@ impl<'a> Scheduler<'a> {
self.balanced_kworkers,
self.tuner.fully_utilized,
self.balance_load,
self.l3_balancing,
);

lb.load_balance()?;
Expand Down
Loading
Loading