Add two thread calibration

This commit is contained in:
GuillaumeDIDIER 2020-07-02 15:40:30 +02:00
parent bd4028f521
commit 77a40a24be
2 changed files with 527 additions and 0 deletions

View File

@ -0,0 +1,165 @@
use core::sync::atomic::{AtomicBool,Ordering};
use core::sync::atomic::spin_loop_hint;
use std::sync::Arc;
use std::thread;
use cache_utils::mmap::MMappedMemory;
use nix::sched::{CpuSet, sched_getaffinity};
use cache_utils::calibration::{calibrate_fixed_freq_2_thread, CalibrateOperation2T, load_and_flush, HistParams, CFLUSH_BUCKET_NUMBER, CFLUSH_BUCKET_SIZE, CFLUSH_NUM_ITER, Verbosity, only_flush};
use cache_utils::{maccess, noop, flush};
use nix::unistd::Pid;
/*
fn wait(turn_lock: &AtomicBool, turn: bool) {
while turn_lock.load(Ordering::Acquire) != turn {
spin_loop_hint();
}
assert_eq!(turn_lock.load(Ordering::Relaxed), turn);
}
fn next(turn_lock: &AtomicBool) {
turn_lock.fetch_xor(true, Ordering::Release);
}
fn ping(turn_lock: &AtomicBool) {
wait(turn_lock, false);
println!("ping");
next(turn_lock);
}
fn pong_thread(turn_lock: Arc<AtomicBool>, stop: Arc<AtomicBool>) {
while pong(&turn_lock, &stop) {
}
}
fn pong(turn_lock: &AtomicBool, stop: &AtomicBool) -> bool {
wait(turn_lock, true);
if stop.load(Ordering::Relaxed) {
return false;
}
println!("pong");
next(turn_lock);
true
}
fn joke() {
let turn_counter = Arc::new(AtomicBool::new(false));
let stop = Arc::new(AtomicBool::new(false));
let tcc = turn_counter.clone();
let sc = stop.clone();
let thread = thread::spawn(|| {
pong_thread(tcc, sc)
});
for _ in 0..10 {
ping(&turn_counter);
}
wait(&turn_counter, false);
stop.store(true, Ordering::Relaxed);
next(&turn_counter);
thread.join().unwrap();
println!("Okay");
}
*/
use core::arch::x86_64 as arch_x86;
unsafe fn multiple_access(p: *const u8) {
maccess::<u8>(p);
maccess::<u8>(p);
arch_x86::_mm_mfence();
maccess::<u8>(p);
arch_x86::_mm_mfence();
maccess::<u8>(p);
arch_x86::_mm_mfence();
maccess::<u8>(p);
maccess::<u8>(p);
}
const SIZE: usize = 2 << 20;
fn main() {
// Grab a slice of memory
let m = MMappedMemory::new(SIZE);
let array = m.slice();
let cache_line_size = 64;
// Generate core iterator
let mut core_pairs: Vec<(usize, usize)> = Vec::new();
let mut i = 1;
let old = sched_getaffinity(Pid::from_raw(0)).unwrap();
while i < CpuSet::count() {
if old.is_set(i).unwrap() {
core_pairs.push((0, i));
println!("{},{}", 0, i);
}
i = i << 1;
}
// operations
// Call calibrate 2T \o/
let verbose_level = Verbosity::RawResult;
unsafe {
let pointer = (&array[0]) as *const u8;
if pointer as usize & (cache_line_size - 1) != 0 {
panic!("not aligned nicely");
}
calibrate_fixed_freq_2_thread(pointer,
64,
array.len() as isize,
&mut core_pairs.into_iter(),
&[
CalibrateOperation2T {
prepare: multiple_access,
op: only_flush,
name: "clflush_remote_hit",
display_name: "clflush remote hit",
},
CalibrateOperation2T {
prepare: multiple_access,
op: load_and_flush,
name: "clflush_shared_hit",
display_name: "clflush shared hit",
},
CalibrateOperation2T {
prepare: flush,
op: only_flush,
name: "clflush_miss_f",
display_name: "clflush miss - f",
},
CalibrateOperation2T {
prepare: flush,
op: load_and_flush,
name: "clflush_local_hit_f",
display_name: "clflush local hit - f",
},
CalibrateOperation2T {
prepare: noop::<u8>,
op: only_flush,
name: "clflush_miss_n",
display_name: "clflush miss - n",
},
CalibrateOperation2T {
prepare: noop::<u8>,
op: load_and_flush,
name: "clflush_local_hit_n",
display_name: "clflush local hit - n",
},
],
HistParams {
bucket_number: CFLUSH_BUCKET_NUMBER,
bucket_size: CFLUSH_BUCKET_SIZE,
iterations: CFLUSH_NUM_ITER,
},
verbose_level,
);
}
}

View File

@ -9,12 +9,30 @@ use core::arch::x86_64 as arch_x86;
#[cfg(feature = "no_std")]
use polling_serial::{serial_print as print, serial_println as println};
//#[cfg(feature = "use_std")]
//use nix::errno::Errno;
#[cfg(feature = "use_std")]
use nix::sched::{sched_getaffinity, sched_setaffinity, CpuSet};
#[cfg(feature = "use_std")]
use nix::unistd::Pid;
//#[cfg(feature = "use_std")]
//use nix::Error::Sys;
#[cfg(feature = "use_std")]
use std::sync::Arc;
#[cfg(feature = "use_std")]
use std::thread;
extern crate alloc;
use crate::calibration::Verbosity::*;
use alloc::vec;
use alloc::vec::Vec;
use core::cmp::min;
use itertools::Itertools;
use core::sync::atomic::{AtomicPtr, AtomicBool, Ordering, spin_loop_hint};
use core::ptr::{/*null,*/ null_mut};
use nix::Error;
use atomic::Atomic;
#[derive(Ord, PartialOrd, Eq, PartialEq)]
pub enum Verbosity {
@ -412,6 +430,350 @@ fn calibrate_impl_fixed_freq(
ret
}
#[cfg(feature = "use_std")]
pub struct CalibrateOperation2T<'a> {
pub prepare: unsafe fn(*const u8) -> (),
pub op: unsafe fn(*const u8) -> u64,
pub name: &'a str,
pub display_name: &'a str,
}
#[cfg(feature = "use_std")]
pub struct CalibrateResult2T {
pub main_core: usize,
pub helper_core: usize,
pub res: Result<Vec<CalibrateResult>, nix::Error>, // TODO
// TODO
}
fn wait(turn_lock: &AtomicBool, turn: bool) {
while turn_lock.load(Ordering::Acquire) != turn {
spin_loop_hint();
}
assert_eq!(turn_lock.load(Ordering::Relaxed), turn);
}
fn next(turn_lock: &AtomicBool) {
turn_lock.fetch_xor(true, Ordering::Release);
}
#[cfg(feature = "use_std")]
pub unsafe fn calibrate_fixed_freq_2_thread<I: Iterator<Item = (usize, usize)>>(
p: *const u8,
increment: usize,
len: isize,
cores: &mut I,
operations: &[CalibrateOperation2T],
hist_params: HistParams,
verbosity_level: Verbosity,
) -> Vec<CalibrateResult2T> {
calibrate_fixed_freq_2_thread_impl(p, increment, len, cores, operations, hist_params, verbosity_level)
}
fn calibrate_fixed_freq_2_thread_impl<I: Iterator<Item = (usize, usize)>>(
p: *const u8,
increment: usize,
len: isize,
cores: &mut I,
operations: &[CalibrateOperation2T],
hist_params: HistParams,
verbosity_level: Verbosity,
) -> Vec<CalibrateResult2T> {
if verbosity_level >= Thresholds {
println!(
"Calibrating {}...",
operations
.iter()
.map(|operation| { operation.display_name })
.format(", ")
);
}
let to_bucket = |time: u64| -> usize { time as usize / hist_params.bucket_size };
let from_bucket = |bucket: usize| -> u64 { (bucket * hist_params.bucket_size) as u64 };
let slicing = if let Some(uarch) = MicroArchitecture::get_micro_architecture() {
Some(cache_slicing(uarch, 8))
} else {
None
};
let h = if let Some(s) = slicing {
if s.can_hash() {
Some(|addr: usize| -> usize { slicing.unwrap().hash(addr).unwrap() })
} else {
None
}
} else {
None
};
let mut ret = Vec::new();
let helper_thread_params = Arc::new(HelperThreadParams{
turn: AtomicBool::new(false),
stop: AtomicBool::new(true),
op: Atomic::new(operations[0].prepare),
address: AtomicPtr::new(null_mut()),
});
if verbosity_level >= Thresholds {
print!("CSV: main_core, helper_core, address, ");
if h.is_some() {
print!("hash, ");
}
println!(
"{} min, {} median, {} max",
operations
.iter()
.map(|operation| operation.name)
.format(" min, "),
operations
.iter()
.map(|operation| operation.name)
.format(" median, "),
operations
.iter()
.map(|operation| operation.name)
.format(" max, ")
);
}
if verbosity_level >= RawResult {
print!("RESULT:main_core,helper_core,address,");
if h.is_some() {
print!("hash,");
}
println!(
"time,{}",
operations
.iter()
.map(|operation| operation.name)
.format(",")
);
}
let old = sched_getaffinity(Pid::from_raw(0)).unwrap();
for (main_core, helper_core) in cores {
// set main thread affinity
if verbosity_level >= Thresholds {
println!("Calibration for main_core {}, helper {}.", main_core, helper_core);
}
let mut core = CpuSet::new();
match core.set(main_core) {
Ok(_) => {},
Err(e) => {
ret.push(CalibrateResult2T{main_core, helper_core, res:Err(e)});
continue;
}
}
match sched_setaffinity(Pid::from_raw(0), &core) {
Ok(_) => {},
Err(e) => {
ret.push(CalibrateResult2T{main_core, helper_core, res:Err(e)});
continue;
}
}
helper_thread_params.stop.store(false, Ordering::Relaxed);
// set up the helper thread
let htp = helper_thread_params.clone();
let hc = helper_core;
let helper_thread = thread::spawn(move || {
calibrate_fixed_freq_2_thread_helper(htp, hc)
});
// do the calibration
let mut calibrate_result_vec = Vec::new();
for i in (0..len).step_by(increment) {
let pointer = unsafe { p.offset(i) };
helper_thread_params.address.store(p as *mut u8, Ordering::Relaxed);
let hash = h.map(|h| h(pointer as usize));
if verbosity_level >= Thresholds {
print!("Calibration for {:p}", pointer);
if let Some(h) = hash {
print!(" (hash: {:x})", h)
}
println!();
}
// TODO add some useful impl to CalibrateResults
let mut calibrate_result = CalibrateResult {
offset: i,
histogram: Vec::new(),
median: vec![0; operations.len()],
min: vec![0; operations.len()],
max: vec![0; operations.len()],
};
calibrate_result.histogram.reserve(operations.len());
for op in operations {
helper_thread_params.op.store(op.prepare, Ordering::Relaxed);
let mut hist = vec![0; hist_params.bucket_number];
for _ in 0..hist_params.iterations {
next(&helper_thread_params.turn);
wait(&helper_thread_params.turn, false);
let time = unsafe { (op.op)(pointer) };
let bucket = min(hist_params.bucket_number - 1, to_bucket(time));
hist[bucket] += 1;
}
calibrate_result.histogram.push(hist);
}
let mut sums = vec![0; operations.len()];
let median_thresholds: Vec<u32> = calibrate_result
.histogram
.iter()
.map(|h| (hist_params.iterations - h[hist_params.bucket_number - 1]) / 2)
.collect();
for j in 0..hist_params.bucket_number - 1 {
if verbosity_level >= RawResult {
print!("RESULT:{},{},{:p},", main_core, helper_core, pointer);
if let Some(h) = hash {
print!("{:x},", h);
}
print!("{}", from_bucket(j));
}
// ignore the last bucket : spurious context switches etc.
for op in 0..operations.len() {
let hist = &calibrate_result.histogram[op][j];
let min = &mut calibrate_result.min[op];
let max = &mut calibrate_result.max[op];
let med = &mut calibrate_result.median[op];
let sum = &mut sums[op];
if verbosity_level >= RawResult {
print!(",{}", hist);
}
if *min == 0 {
// looking for min
if *hist > SPURIOUS_THRESHOLD {
*min = from_bucket(j);
}
} else if *hist > SPURIOUS_THRESHOLD {
*max = from_bucket(j);
}
if *med == 0 {
*sum += *hist;
if *sum >= median_thresholds[op] {
*med = from_bucket(j);
}
}
}
if verbosity_level >= RawResult {
println!();
}
}
if verbosity_level >= Thresholds {
for (j, op) in operations.iter().enumerate() {
println!(
"{}: min {}, median {}, max {}",
op.display_name,
calibrate_result.min[j],
calibrate_result.median[j],
calibrate_result.max[j]
);
}
print!("CSV: {},{},{:p}, ", main_core, helper_core, pointer);
if let Some(h) = hash {
print!("{:x}, ", h)
}
println!(
"{}, {}, {}",
calibrate_result.min.iter().format(", "),
calibrate_result.median.iter().format(", "),
calibrate_result.max.iter().format(", ")
);
}
calibrate_result_vec.push(calibrate_result);
}
ret.push(CalibrateResult2T{
main_core,
helper_core,
res: Ok(calibrate_result_vec)
});
// terminate the thread
helper_thread_params.stop.store(true, Ordering::Relaxed);
next(&helper_thread_params.turn);
wait(&helper_thread_params.turn, false);
// join thread.
helper_thread.join();
}
sched_setaffinity(Pid::from_raw(0), &old).unwrap();
ret
// return the result
// TODO
}
#[cfg(feature = "use_std")]
struct HelperThreadParams {
turn: AtomicBool,
stop: AtomicBool,
op: Atomic<unsafe fn(*const u8)>,
address: AtomicPtr<u8>,
}
#[cfg(feature = "use_std")]
fn calibrate_fixed_freq_2_thread_helper(
params: Arc<HelperThreadParams>,
helper_core: usize,
) -> Result<(), Error> {
// set thread affinity
let mut core = CpuSet::new();
match core.set(helper_core) {
Ok(_) => {},
Err(_e) => {
unimplemented!();
}
}
match sched_setaffinity(Pid::from_raw(0), &core) {
Ok(_) => {},
Err(_e) => {
unimplemented!();
}
}
loop {
// grab lock
wait(&params.turn, true);
if params.stop.load(Ordering::Relaxed) {
next(&params.turn);
return Ok(());
}
// get the relevant parameters
let addr: *const u8 = params.address.load(Ordering::Relaxed);
let op = params.op.load(Ordering::Relaxed);
unsafe {op(addr)};
// release lock
next(&params.turn);
}
}
#[allow(non_snake_case)]
pub fn calibrate_L3_miss_hit(
array: &[u8],