diff --git a/cache_utils/src/bin/two_thread_cal.rs b/cache_utils/src/bin/two_thread_cal.rs new file mode 100644 index 0000000..707f534 --- /dev/null +++ b/cache_utils/src/bin/two_thread_cal.rs @@ -0,0 +1,165 @@ +use core::sync::atomic::{AtomicBool,Ordering}; +use core::sync::atomic::spin_loop_hint; +use std::sync::Arc; +use std::thread; +use cache_utils::mmap::MMappedMemory; +use nix::sched::{CpuSet, sched_getaffinity}; +use cache_utils::calibration::{calibrate_fixed_freq_2_thread, CalibrateOperation2T, load_and_flush, HistParams, CFLUSH_BUCKET_NUMBER, CFLUSH_BUCKET_SIZE, CFLUSH_NUM_ITER, Verbosity, only_flush}; +use cache_utils::{maccess, noop, flush}; +use nix::unistd::Pid; + +/* +fn wait(turn_lock: &AtomicBool, turn: bool) { + while turn_lock.load(Ordering::Acquire) != turn { + spin_loop_hint(); + } + assert_eq!(turn_lock.load(Ordering::Relaxed), turn); +} + +fn next(turn_lock: &AtomicBool) { + turn_lock.fetch_xor(true, Ordering::Release); +} + +fn ping(turn_lock: &AtomicBool) { + wait(turn_lock, false); + println!("ping"); + next(turn_lock); +} + +fn pong_thread(turn_lock: Arc, stop: Arc) { + while pong(&turn_lock, &stop) { + + } +} + +fn pong(turn_lock: &AtomicBool, stop: &AtomicBool) -> bool { + wait(turn_lock, true); + if stop.load(Ordering::Relaxed) { + return false; + } + println!("pong"); + next(turn_lock); + true +} + + + +fn joke() { + let turn_counter = Arc::new(AtomicBool::new(false)); + let stop = Arc::new(AtomicBool::new(false)); + let tcc = turn_counter.clone(); + let sc = stop.clone(); + + let thread = thread::spawn(|| { + pong_thread(tcc, sc) + }); + + for _ in 0..10 { + ping(&turn_counter); + } + wait(&turn_counter, false); + stop.store(true, Ordering::Relaxed); + next(&turn_counter); + thread.join().unwrap(); + println!("Okay"); +} +*/ + +use core::arch::x86_64 as arch_x86; + +unsafe fn multiple_access(p: *const u8) { + maccess::(p); + maccess::(p); + arch_x86::_mm_mfence(); + maccess::(p); + arch_x86::_mm_mfence(); + maccess::(p); + arch_x86::_mm_mfence(); + maccess::(p); + maccess::(p); +} + +const SIZE: usize = 2 << 20; + +fn main() { + // Grab a slice of memory + let m = MMappedMemory::new(SIZE); + let array = m.slice(); + + let cache_line_size = 64; + + // Generate core iterator + let mut core_pairs: Vec<(usize, usize)> = Vec::new(); + let mut i = 1; + let old = sched_getaffinity(Pid::from_raw(0)).unwrap(); + while i < CpuSet::count() { + if old.is_set(i).unwrap() { + core_pairs.push((0, i)); + println!("{},{}", 0, i); + } + i = i << 1; + + } + + // operations + // Call calibrate 2T \o/ + + let verbose_level = Verbosity::RawResult; + + unsafe { + let pointer = (&array[0]) as *const u8; + + if pointer as usize & (cache_line_size - 1) != 0 { + panic!("not aligned nicely"); + } + calibrate_fixed_freq_2_thread(pointer, + 64, + array.len() as isize, + &mut core_pairs.into_iter(), + &[ + CalibrateOperation2T { + prepare: multiple_access, + op: only_flush, + name: "clflush_remote_hit", + display_name: "clflush remote hit", + }, + CalibrateOperation2T { + prepare: multiple_access, + op: load_and_flush, + name: "clflush_shared_hit", + display_name: "clflush shared hit", + }, + CalibrateOperation2T { + prepare: flush, + op: only_flush, + name: "clflush_miss_f", + display_name: "clflush miss - f", + }, + CalibrateOperation2T { + prepare: flush, + op: load_and_flush, + name: "clflush_local_hit_f", + display_name: "clflush local hit - f", + }, + CalibrateOperation2T { + prepare: noop::, + op: only_flush, + name: "clflush_miss_n", + display_name: "clflush miss - n", + }, + CalibrateOperation2T { + prepare: noop::, + op: load_and_flush, + name: "clflush_local_hit_n", + display_name: "clflush local hit - n", + }, + ], + HistParams { + bucket_number: CFLUSH_BUCKET_NUMBER, + bucket_size: CFLUSH_BUCKET_SIZE, + iterations: CFLUSH_NUM_ITER, + }, + verbose_level, + ); + } +} diff --git a/cache_utils/src/calibration.rs b/cache_utils/src/calibration.rs index 46bcf30..e66729f 100644 --- a/cache_utils/src/calibration.rs +++ b/cache_utils/src/calibration.rs @@ -9,12 +9,30 @@ use core::arch::x86_64 as arch_x86; #[cfg(feature = "no_std")] use polling_serial::{serial_print as print, serial_println as println}; +//#[cfg(feature = "use_std")] +//use nix::errno::Errno; +#[cfg(feature = "use_std")] +use nix::sched::{sched_getaffinity, sched_setaffinity, CpuSet}; +#[cfg(feature = "use_std")] +use nix::unistd::Pid; +//#[cfg(feature = "use_std")] +//use nix::Error::Sys; +#[cfg(feature = "use_std")] +use std::sync::Arc; +#[cfg(feature = "use_std")] +use std::thread; + extern crate alloc; use crate::calibration::Verbosity::*; use alloc::vec; use alloc::vec::Vec; use core::cmp::min; use itertools::Itertools; +use core::sync::atomic::{AtomicPtr, AtomicBool, Ordering, spin_loop_hint}; +use core::ptr::{/*null,*/ null_mut}; +use nix::Error; +use atomic::Atomic; + #[derive(Ord, PartialOrd, Eq, PartialEq)] pub enum Verbosity { @@ -412,6 +430,350 @@ fn calibrate_impl_fixed_freq( ret } +#[cfg(feature = "use_std")] +pub struct CalibrateOperation2T<'a> { + pub prepare: unsafe fn(*const u8) -> (), + pub op: unsafe fn(*const u8) -> u64, + pub name: &'a str, + pub display_name: &'a str, +} + +#[cfg(feature = "use_std")] +pub struct CalibrateResult2T { + pub main_core: usize, + pub helper_core: usize, + pub res: Result, nix::Error>, // TODO + + // TODO +} + +fn wait(turn_lock: &AtomicBool, turn: bool) { + while turn_lock.load(Ordering::Acquire) != turn { + spin_loop_hint(); + } + assert_eq!(turn_lock.load(Ordering::Relaxed), turn); +} + +fn next(turn_lock: &AtomicBool) { + turn_lock.fetch_xor(true, Ordering::Release); +} + +#[cfg(feature = "use_std")] +pub unsafe fn calibrate_fixed_freq_2_thread>( + p: *const u8, + increment: usize, + len: isize, + cores: &mut I, + operations: &[CalibrateOperation2T], + hist_params: HistParams, + verbosity_level: Verbosity, +) -> Vec { + calibrate_fixed_freq_2_thread_impl(p, increment, len, cores, operations, hist_params, verbosity_level) +} +fn calibrate_fixed_freq_2_thread_impl>( + p: *const u8, + increment: usize, + len: isize, + cores: &mut I, + operations: &[CalibrateOperation2T], + hist_params: HistParams, + verbosity_level: Verbosity, +) -> Vec { + if verbosity_level >= Thresholds { + println!( + "Calibrating {}...", + operations + .iter() + .map(|operation| { operation.display_name }) + .format(", ") + ); + } + + let to_bucket = |time: u64| -> usize { time as usize / hist_params.bucket_size }; + let from_bucket = |bucket: usize| -> u64 { (bucket * hist_params.bucket_size) as u64 }; + + let slicing = if let Some(uarch) = MicroArchitecture::get_micro_architecture() { + Some(cache_slicing(uarch, 8)) + } else { + None + }; + + let h = if let Some(s) = slicing { + if s.can_hash() { + Some(|addr: usize| -> usize { slicing.unwrap().hash(addr).unwrap() }) + } else { + None + } + } else { + None + }; + + let mut ret = Vec::new(); + + let helper_thread_params = Arc::new(HelperThreadParams{ + turn: AtomicBool::new(false), + stop: AtomicBool::new(true), + op: Atomic::new(operations[0].prepare), + address: AtomicPtr::new(null_mut()), + }); + + + + if verbosity_level >= Thresholds { + print!("CSV: main_core, helper_core, address, "); + if h.is_some() { + print!("hash, "); + } + println!( + "{} min, {} median, {} max", + operations + .iter() + .map(|operation| operation.name) + .format(" min, "), + operations + .iter() + .map(|operation| operation.name) + .format(" median, "), + operations + .iter() + .map(|operation| operation.name) + .format(" max, ") + ); + } + + if verbosity_level >= RawResult { + print!("RESULT:main_core,helper_core,address,"); + if h.is_some() { + print!("hash,"); + } + println!( + "time,{}", + operations + .iter() + .map(|operation| operation.name) + .format(",") + ); + } + + let old = sched_getaffinity(Pid::from_raw(0)).unwrap(); + + for (main_core, helper_core) in cores { + // set main thread affinity + + + if verbosity_level >= Thresholds { + println!("Calibration for main_core {}, helper {}.", main_core, helper_core); + } + + let mut core = CpuSet::new(); + match core.set(main_core) { + Ok(_) => {}, + Err(e) => { + ret.push(CalibrateResult2T{main_core, helper_core, res:Err(e)}); + continue; + } + } + + match sched_setaffinity(Pid::from_raw(0), &core) { + Ok(_) => {}, + Err(e) => { + ret.push(CalibrateResult2T{main_core, helper_core, res:Err(e)}); + continue; + } + } + + + helper_thread_params.stop.store(false, Ordering::Relaxed); + // set up the helper thread + + let htp = helper_thread_params.clone(); + let hc = helper_core; + let helper_thread = thread::spawn(move || { + calibrate_fixed_freq_2_thread_helper(htp, hc) + }); + + // do the calibration + let mut calibrate_result_vec = Vec::new(); + + for i in (0..len).step_by(increment) { + + let pointer = unsafe { p.offset(i) }; + helper_thread_params.address.store(p as *mut u8, Ordering::Relaxed); + + + + let hash = h.map(|h| h(pointer as usize)); + + if verbosity_level >= Thresholds { + print!("Calibration for {:p}", pointer); + if let Some(h) = hash { + print!(" (hash: {:x})", h) + } + println!(); + } + + // TODO add some useful impl to CalibrateResults + let mut calibrate_result = CalibrateResult { + offset: i, + histogram: Vec::new(), + median: vec![0; operations.len()], + min: vec![0; operations.len()], + max: vec![0; operations.len()], + }; + calibrate_result.histogram.reserve(operations.len()); + + + + for op in operations { + helper_thread_params.op.store(op.prepare, Ordering::Relaxed); + let mut hist = vec![0; hist_params.bucket_number]; + for _ in 0..hist_params.iterations { + next(&helper_thread_params.turn); + wait(&helper_thread_params.turn, false); + let time = unsafe { (op.op)(pointer) }; + let bucket = min(hist_params.bucket_number - 1, to_bucket(time)); + hist[bucket] += 1; + } + calibrate_result.histogram.push(hist); + } + + let mut sums = vec![0; operations.len()]; + + let median_thresholds: Vec = calibrate_result + .histogram + .iter() + .map(|h| (hist_params.iterations - h[hist_params.bucket_number - 1]) / 2) + .collect(); + + for j in 0..hist_params.bucket_number - 1 { + if verbosity_level >= RawResult { + print!("RESULT:{},{},{:p},", main_core, helper_core, pointer); + if let Some(h) = hash { + print!("{:x},", h); + } + print!("{}", from_bucket(j)); + } + // ignore the last bucket : spurious context switches etc. + for op in 0..operations.len() { + let hist = &calibrate_result.histogram[op][j]; + let min = &mut calibrate_result.min[op]; + let max = &mut calibrate_result.max[op]; + let med = &mut calibrate_result.median[op]; + let sum = &mut sums[op]; + if verbosity_level >= RawResult { + print!(",{}", hist); + } + + if *min == 0 { + // looking for min + if *hist > SPURIOUS_THRESHOLD { + *min = from_bucket(j); + } + } else if *hist > SPURIOUS_THRESHOLD { + *max = from_bucket(j); + } + + if *med == 0 { + *sum += *hist; + if *sum >= median_thresholds[op] { + *med = from_bucket(j); + } + } + } + if verbosity_level >= RawResult { + println!(); + } + } + if verbosity_level >= Thresholds { + for (j, op) in operations.iter().enumerate() { + println!( + "{}: min {}, median {}, max {}", + op.display_name, + calibrate_result.min[j], + calibrate_result.median[j], + calibrate_result.max[j] + ); + } + print!("CSV: {},{},{:p}, ", main_core, helper_core, pointer); + if let Some(h) = hash { + print!("{:x}, ", h) + } + println!( + "{}, {}, {}", + calibrate_result.min.iter().format(", "), + calibrate_result.median.iter().format(", "), + calibrate_result.max.iter().format(", ") + ); + } + calibrate_result_vec.push(calibrate_result); + } + + ret.push(CalibrateResult2T{ + main_core, + helper_core, + res: Ok(calibrate_result_vec) + }); + // terminate the thread + helper_thread_params.stop.store(true, Ordering::Relaxed); + next(&helper_thread_params.turn); + wait(&helper_thread_params.turn, false); + // join thread. + helper_thread.join(); + + } + + sched_setaffinity(Pid::from_raw(0), &old).unwrap(); + + ret + // return the result + // TODO +} +#[cfg(feature = "use_std")] +struct HelperThreadParams { + turn: AtomicBool, + stop: AtomicBool, + op: Atomic, + address: AtomicPtr, +} + +#[cfg(feature = "use_std")] +fn calibrate_fixed_freq_2_thread_helper( + params: Arc, + helper_core: usize, +) -> Result<(), Error> { + // set thread affinity + let mut core = CpuSet::new(); + match core.set(helper_core) { + Ok(_) => {}, + Err(_e) => { + unimplemented!(); + } + } + + match sched_setaffinity(Pid::from_raw(0), &core) { + Ok(_) => {}, + Err(_e) => { + unimplemented!(); + + } + } + + loop { + // grab lock + wait(¶ms.turn, true); + if params.stop.load(Ordering::Relaxed) { + next(¶ms.turn); + return Ok(()); + } + // get the relevant parameters + let addr: *const u8 = params.address.load(Ordering::Relaxed); + let op = params.op.load(Ordering::Relaxed); + unsafe {op(addr)}; + // release lock + next(¶ms.turn); + } +} + #[allow(non_snake_case)] pub fn calibrate_L3_miss_hit( array: &[u8],