Skip to content

Commit

Permalink
Initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
milesgranger committed Jan 17, 2023
1 parent 8e5fa8b commit 1f49ebb
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 93 deletions.
100 changes: 100 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,101 @@
/target
flaco/*.so
flaco/*.c
*.html
*.dat
heaptrack.*
.idea/
__pycache__/

# Distribution / packaging
wheelhouse/
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

*.cpp

# Jupyter Notebook
.ipynb_checkpoints
prof/
# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# dotenv
.env

# virtualenv
.venv
venv/
ENV/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site
149 changes: 69 additions & 80 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,105 +1,117 @@
use std::{
mem::take,
sync::mpsc::{channel, Sender},
thread,
time::Duration,
};

#[deny(missing_docs)]
use pyo3::prelude::*;
use pyo3::{
exceptions::{PyBrokenPipeError, PyValueError},
ffi::{PyGILState_Check, PyGILState_STATE},
exceptions::{PyBrokenPipeError, PyTimeoutError, PyValueError},
PyResult,
};
use std::{
mem::take,
sync::mpsc::{channel, RecvTimeoutError, Sender},
thread,
time::{Duration, Instant},
};

#[pymodule]
fn gilknocker(_py: Python, m: &PyModule) -> PyResult<()> {
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add_function(wrap_pyfunction!(lock_and_release_gil, m)?)?;
m.add_class::<KnockKnock>()?;
Ok(())
}

pub type Milliseconds = u128;

/// Struct for polling, knocking on the GIL,
/// checking if it's locked in the current thread
///
/// Example
/// -------
/// ```python
/// from gilknocker import KnockKnock
/// knocker = KnockKnock(100) # try to reacquire the gil every 100 microseconds
/// knocker.start()
/// ... some smart code ...
/// knocker.stop()
/// knocker.contention_metric # float between 0-1 indicating GIL contention
/// ```
#[pyclass(name = "KnockKnock")]
#[derive(Default)]
pub struct KnockKnock {
handle: Option<thread::JoinHandle<(Milliseconds, Milliseconds)>>,
handle: Option<thread::JoinHandle<f32>>,
channel: Option<Sender<bool>>,
time_locked_ms: Option<Milliseconds>,
time_unlocked_ms: Option<Milliseconds>,
monitor_interval: u64,
contention_metric: Option<f32>,
interval: Duration,
timeout: Duration,
}

#[pymethods]
impl KnockKnock {
/// Initialize with interval (microseconds), as the time between trying to acquire the GIL,
/// and timeout (seconds) as time to wait for monitoring thread to exit.
#[new]
pub fn __init__(monitor_interval_ms: u64) -> PyResult<Self> {
pub fn __new__(interval_micros: Option<u64>, timeout_secs: Option<u64>) -> PyResult<Self> {
let interval = Duration::from_micros(interval_micros.unwrap_or_else(|| 10));
let timeout = Duration::from_secs(timeout_secs.unwrap_or_else(|| 5));
if timeout <= interval {
return Err(PyValueError::new_err(format!(
"`interval` ({:?}) must be less than `timeout` ({:?})",
interval, timeout
)));
}
Ok(KnockKnock {
monitor_interval: monitor_interval_ms,
interval,
timeout,
..Default::default()
})
}
/// Get time locked
pub fn time_locked_ms(&self) -> Option<Milliseconds> {
self.time_locked_ms
}
/// Get time unlocked
pub fn time_unlocked_ms(&self) -> Option<Milliseconds> {
self.time_unlocked_ms

/// Get the contention metric, not _specific_ meaning other than a higher
/// value (closer to 1) indicates increased contention when acquiring the GIL.
/// and lower indicates less contention, with 0 theoretically indicating zero
/// contention.
#[getter]
pub fn contention_metric(&self) -> Option<f32> {
self.contention_metric
}

/// Start polling the GIL to check if it's locked.
pub fn start(&mut self, py: Python) -> () {
let (send, recv) = channel();
self.channel = Some(send);

let interval = self.monitor_interval;

let interval = self.interval;
let handle = py.allow_threads(move || {
thread::spawn(move || {
let mut time_locked_ms = 0;
let mut time_unlocked_ms = 0;
let duration = Duration::from_millis(interval);
loop {
if let Ok(stop) = recv.try_recv() {
if stop {
break;
}
}
unsafe {
if PyGILState_Check() == PyGILState_STATE::PyGILState_LOCKED as i32 {
time_locked_ms += duration.as_millis();
} else {
time_unlocked_ms += duration.as_millis();
}
}
thread::sleep(duration);
let mut time_to_acquire = Duration::from_millis(0);
let runtime = Instant::now();
while recv
.recv_timeout(interval)
.map_err(|e| e != RecvTimeoutError::Disconnected)
.unwrap_or(true)
{
let start = Instant::now();
time_to_acquire += Python::with_gil(move |_py| start.elapsed());
}
(time_locked_ms, time_unlocked_ms)
time_to_acquire.as_micros() as f32 / runtime.elapsed().as_micros() as f32
})
});
self.handle = Some(handle);
}

/// Stop polling the GIL.
pub fn stop(&mut self) -> PyResult<()> {
// Kill loop
if let Some(send) = &self.channel {
send.send(true)
.map_err(|e| PyBrokenPipeError::new_err(e.to_string()))?
}
self.channel = None;

// Recv time locked
match take(&mut self.handle) {
Some(handle) => {
(self.time_locked_ms, self.time_unlocked_ms) = handle
.join()
.map(|(locked, unlocked)| (Some(locked), Some(unlocked)))
.unwrap();
if let Some(send) = take(&mut self.channel) {
send.send(false)
.map_err(|e| PyBrokenPipeError::new_err(e.to_string()))?;

let start = Instant::now();
while !handle.is_finished() {
thread::sleep(Duration::from_millis(100));
if start.elapsed() > self.timeout {
return Err(PyTimeoutError::new_err("Failed to join knocker thread."));
}
}
}
self.contention_metric = handle.join().ok();
Ok(())
}
None => Err(PyValueError::new_err(
Expand All @@ -108,26 +120,3 @@ impl KnockKnock {
}
}
}

/// Lock and release the GIL in one function; for sanity check of GIL state monitoring.
#[pyfunction]
pub fn lock_and_release_gil(py: Python<'_>, lock_for_ms: u64, release_for_ms: u64) {
thread::sleep(Duration::from_millis(lock_for_ms));
py.allow_threads(move || {
// let handle = thread::spawn(move || {
// thread::sleep(Duration::from_millis(release_for_ms));
// });
// handle.join().unwrap();
let _ = ack(4, 1);
});
}

fn ack(n: u64, m: u64) -> u64 {
if n == 0 {
m + 1
} else if m == 0 {
ack(n - 1, 1)
} else {
ack(n - 1, ack(n, m - 1))
}
}
Binary file not shown.
77 changes: 64 additions & 13 deletions tests/test_knockknock.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,70 @@
from concurrent.futures import ThreadPoolExecutor, wait
from gilknocker import KnockKnock, lock_and_release_gil
import numpy as np
import threading
import time
from gilknocker import KnockKnock


def test_knockknock():
knocker = KnockKnock(5)
assert knocker.time_locked_ms() is None
N_THREADS = 4
N_PTS = 4096

n = 10

def flaky(n_tries=5):
def wrapper(func):
def _wrapper(*args, **kwargs):
for _ in range(n_tries - 1):
try:
return func(*args, **kwargs)
except:
pass
return func(*args, **kwargs)

return _wrapper

return wrapper


def a_lotta_gil():
"""Keep the GIL busy"""
for i in range(100_000_000):
pass


def a_little_gil():
"""Work which releases the GIL"""
for i in range(2):
x = np.random.randn(N_PTS, N_PTS)
x[:] = np.fft.fft2(x).real


def _run(target):
knocker = KnockKnock(interval_micros=1000, timeout_secs=1)
knocker.start()
jobs = []
with ThreadPoolExecutor(10) as executor:
for _ in range(n):
jobs.append(executor.submit(lock_and_release_gil, 0, 5000))
wait(jobs)
threads = []
for i in range(N_THREADS):
thread = threading.Thread(target=target, daemon=True)
threads.append(thread)
thread.start()

for thread in threads:
thread.join()
knocker.stop()
assert knocker.time_unlocked_ms() > 0
breakpoint()
print(f"Metric: {knocker.contention_metric}")
return knocker


@flaky()
def test_knockknock_busy():
knocker = _run(a_lotta_gil)
assert knocker.contention_metric > 0.9


@flaky()
def test_knockknock_available_gil():
knocker = _run(a_little_gil)
assert knocker.contention_metric < 0.01


# Manual verification with py-spy
# busy should give high GIL %
if __name__ == "__main__":
test_knockknock_busy()

0 comments on commit 1f49ebb

Please sign in to comment.