Conversation
Add a new worker called "script", which compiles previously introduced parser using LLVM into bytecode for execution. It also provides a set of functionality available at runtime for such scripts. Note that it's a breaking change in terms of the interface. If before the workload configuration was passed as a first positional argument, now it has to be passed via `-c workload.toml`, to distinguish from passing a script via `-f script.ber`.
Molter73
left a comment
There was a problem hiding this comment.
Still need to go through script.rs, but it is late on a Friday, so I'll come back to this later. Overall looks good!
I'm getting an error when trying to compile:
error: No suitable version of LLVM was found system-wide or pointed
to by LLVM_SYS_201_PREFIX.
Consider using `llvmenv` to compile an appropriate copy of LLVM, and
refer to the llvm-sys documentation for more information.
llvm-sys: https://crates.io/crates/llvm-sys
llvmenv: https://crates.io/crates/llvmenv
--> /home/mmoltras/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/llvm-sys-201.0.1/src/lib.rs:533:1
|
533 | / std::compile_error!(concat!(
534 | | "No suitable version of LLVM was found system-wide or pointed
535 | | to by LLVM_SYS_",
536 | | env!("CARGO_PKG_VERSION_MAJOR"),
... |
543 | | llvmenv: https://crates.io/crates/llvmenv"
544 | | ));
| |__^
Do we have any guidance on what version of LLVM we should have available?
| flag_f: Option<String>, | ||
| } | ||
|
|
||
| fn run_script(script_path: String) -> Vec<(i32, u64)> { |
There was a problem hiding this comment.
Probably a good idea to move this function and run_workload to a separate file, we should aim to have a simple main function in here and nothing else.
| let works = apply_rules(works); | ||
|
|
||
| works.into_iter().for_each(|node| { | ||
| debug!("AST NODE: {:?}", node); | ||
|
|
||
| let Node::Work { | ||
| name: _, | ||
| args, | ||
| instructions: _, | ||
| dist: _, | ||
| } = node | ||
| else { | ||
| unreachable!() | ||
| }; | ||
|
|
||
| let workers: u32 = args | ||
| .get("workers") | ||
| .cloned() | ||
| .unwrap_or(String::from("0")) | ||
| .parse() | ||
| .unwrap(); | ||
|
|
||
| info!("Config: {:?}", config); | ||
| let duration: u64 = args | ||
| .get("duration") | ||
| .cloned() | ||
| .unwrap_or(String::from("0")) | ||
| .parse() | ||
| .unwrap(); | ||
|
|
||
| let h: Vec<_> = (0..workers) | ||
| .map(|_| { | ||
| let worker = new_script_worker(node.clone()); | ||
|
|
||
| match fork() { | ||
| Ok(Fork::Parent(child)) => { | ||
| info!("Child {}", child); | ||
| Some((child, duration)) | ||
| } | ||
| Ok(Fork::Child) => { | ||
| worker.run_payload().unwrap(); | ||
| None | ||
| } | ||
| Err(e) => { | ||
| warn!("Failed: {e:?}"); | ||
| None | ||
| } | ||
| } | ||
| }) | ||
| .collect(); | ||
|
|
||
| handles.extend(h); | ||
| }); | ||
|
|
||
| handles.iter().filter_map(|i| *i).collect() |
There was a problem hiding this comment.
I think this is equivalent and (maybe) slightly easier to read:
| let works = apply_rules(works); | |
| works.into_iter().for_each(|node| { | |
| debug!("AST NODE: {:?}", node); | |
| let Node::Work { | |
| name: _, | |
| args, | |
| instructions: _, | |
| dist: _, | |
| } = node | |
| else { | |
| unreachable!() | |
| }; | |
| let workers: u32 = args | |
| .get("workers") | |
| .cloned() | |
| .unwrap_or(String::from("0")) | |
| .parse() | |
| .unwrap(); | |
| info!("Config: {:?}", config); | |
| let duration: u64 = args | |
| .get("duration") | |
| .cloned() | |
| .unwrap_or(String::from("0")) | |
| .parse() | |
| .unwrap(); | |
| let h: Vec<_> = (0..workers) | |
| .map(|_| { | |
| let worker = new_script_worker(node.clone()); | |
| match fork() { | |
| Ok(Fork::Parent(child)) => { | |
| info!("Child {}", child); | |
| Some((child, duration)) | |
| } | |
| Ok(Fork::Child) => { | |
| worker.run_payload().unwrap(); | |
| None | |
| } | |
| Err(e) => { | |
| warn!("Failed: {e:?}"); | |
| None | |
| } | |
| } | |
| }) | |
| .collect(); | |
| handles.extend(h); | |
| }); | |
| handles.iter().filter_map(|i| *i).collect() | |
| apply_rules(works) | |
| .into_iter() | |
| .map(|node| { | |
| let Node::Work { args, .. } = node else { | |
| unreachable!() | |
| }; | |
| let workers: u32 = args | |
| .get("workers") | |
| .cloned() | |
| .unwrap_or(String::from("0")) | |
| .parse() | |
| .unwrap(); | |
| let duration: u64 = args | |
| .get("duration") | |
| .cloned() | |
| .unwrap_or(String::from("0")) | |
| .parse() | |
| .unwrap(); | |
| (0..workers) | |
| .filter_map(|_| { | |
| let worker = new_script_worker(node.clone()); | |
| match fork() { | |
| Ok(Fork::Parent(child)) => { | |
| info!("Child {}", child); | |
| Some((child, duration)) | |
| } | |
| Ok(Fork::Child) => { | |
| worker.run_payload().unwrap(); | |
| None | |
| } | |
| Err(e) => { | |
| warn!("Failed: {e:?}"); | |
| None | |
| } | |
| } | |
| }) | |
| .collect::<Vec<_>>() | |
| }) | |
| .flatten() | |
| .collect() |
| let workers: u32 = args | ||
| .get("workers") | ||
| .cloned() | ||
| .unwrap_or(String::from("0")) | ||
| .parse() | ||
| .unwrap(); | ||
|
|
||
| info!("Config: {:?}", config); | ||
| let duration: u64 = args | ||
| .get("duration") | ||
| .cloned() | ||
| .unwrap_or(String::from("0")) | ||
| .parse() | ||
| .unwrap(); |
There was a problem hiding this comment.
I wonder if we could give args are more concise type that handled all this logic under the hood 🤔
| .collect(); | ||
|
|
||
| info!("In total: {}", upper); | ||
| handles.iter().filter_map(|i| *i).collect() |
There was a problem hiding this comment.
We can probably change the map operation in line 72 to filter_map and replace this line with just handles.
This will also save an iteration and allocation, since this line will iterate over all the elements of handles, remove the None elements and allocate a new vector for the remaining items, all of which can be done during the original iteration.
| // Ignore processes without specified duration -- we don't want | ||
| // neither terminate them, nor count against processes to compare. | ||
| let watched_processes = processes | ||
| .iter() | ||
| .filter(|(_, duration)| *duration > 0) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| // Find all processes with expired duration. If we've received | ||
| // SIGTERM, get all processes. | ||
| let expired = watched_processes | ||
| .iter() | ||
| .filter(|(_, duration)| { | ||
| *duration < elapsed | ||
| || terminating.load(Ordering::Relaxed) | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| for (handle, _) in &expired { | ||
| info!("Terminating: {}", *handle); | ||
| let _ = kill(Pid::from_raw(*handle), Signal::SIGKILL); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| if expired.len() == watched_processes.len() { | ||
| break; | ||
| } |
There was a problem hiding this comment.
I think this is equivalent (barring some comments):
| // Ignore processes without specified duration -- we don't want | |
| // neither terminate them, nor count against processes to compare. | |
| let watched_processes = processes | |
| .iter() | |
| .filter(|(_, duration)| *duration > 0) | |
| .collect::<Vec<_>>(); | |
| // Find all processes with expired duration. If we've received | |
| // SIGTERM, get all processes. | |
| let expired = watched_processes | |
| .iter() | |
| .filter(|(_, duration)| { | |
| *duration < elapsed | |
| || terminating.load(Ordering::Relaxed) | |
| }) | |
| .collect::<Vec<_>>(); | |
| for (handle, _) in &expired { | |
| info!("Terminating: {}", *handle); | |
| let _ = kill(Pid::from_raw(*handle), Signal::SIGKILL); | |
| } | |
| }); | |
| } | |
| if expired.len() == watched_processes.len() { | |
| break; | |
| } | |
| for (handle, _) in processes.extract_if(.., |(_, duration)| { | |
| (*duration > 0 && *duration < elapsed) | |
| || terminating.load(Ordering::Relaxed) | |
| }) { | |
| let _ = kill(Pid::from_raw(*handle), Signal::SIGKILL); | |
| } | |
| if processes.is_empty() | |
| || processes.iter().all(|(_, duration)| *duration == 0) | |
| { | |
| break; | |
| } | |
Good point, I had to call this out. |
Molter73
left a comment
There was a problem hiding this comment.
Ok, now I'm done going through the changes :)
| context: LLVMContextRef, | ||
| } | ||
|
|
||
| /// # Safety |
There was a problem hiding this comment.
I see a few # Safety comments with no concrete guidelines on what callers should look out for, we might want to update them with something like this:
| /// # Safety | |
| /// # Safety | |
| /// | |
| /// The caller must ensure the pointer is valid and points to a null terminated C-string. |
|
|
||
| /// # Safety | ||
| /// | ||
| /// Open a file with create and write permissions and write to it. |
There was a problem hiding this comment.
This comment probably needs updating.
|
|
||
| stream.write_all(b"Hello\n").unwrap(); | ||
| let mut buf = vec![]; | ||
| stream.read_exact(&mut buf).unwrap(); |
There was a problem hiding this comment.
read_exact will read the number of bytes needed to fill buf, since buf is defined as an empty vector, does that mean this call always return immediately? Do we know what we should expect as an answer? Maybe we can try reading that length by declaring an array instead as proposed in the read_exact docs?
let mut buf = [0; 4];
stream.read_exact(&mut buf).unwrap();https://doc.rust-lang.org/std/io/trait.Read.html#method.read_exact
| /// | ||
| /// Spawn a process with a random argument. | ||
| #[unsafe(no_mangle)] | ||
| pub unsafe extern "C" fn task(name: *const i8, random: bool) -> u64 { |
There was a problem hiding this comment.
Looks like random is only being logged, are we keeping this here as a placeholder for future work?
| builder: LLVMBuilderRef, | ||
| ee: LLVMExecutionEngineRef, | ||
| module: LLVMModuleRef, | ||
| module_state: &HashMap<String, LLVMValueRef>, | ||
| module_runtime: &HashMap<String, (LLVMValueRef, LLVMTypeRef)>, |
There was a problem hiding this comment.
AFAICT, it looks like the jit_instruction and get_arg_value methods are always called with references to the same values, would it make sense to move these into ScriptWorker itself and simply pass &self to jit_instruction? Or is this done because these values are only needed when creating the ScriptWorker in new and can be dropped afterwards? If the latter is the case, would it make sense to have some sort of auxiliar ScriptWorkerBuilder struct that can own these values and drop them along itself once the final ScriptWorker is built?
I know I'm being a bit finicky here, I just want to see if we can slightly simplify these methods.
| }; | ||
|
|
||
| // Populate the global mapping with observed runtime functions | ||
| for instr in instructions { |
There was a problem hiding this comment.
I'm being a bit dense, why are we iterating the instruction twice? Could we not do this mapping before the LLVM... calls in between the loops?
| let Node::Work { | ||
| name: _, | ||
| args: _, | ||
| instructions, | ||
| dist: _, | ||
| } = node.clone() | ||
| else { | ||
| unreachable!() | ||
| }; | ||
|
|
||
| // Populate the global mapping with observed runtime functions | ||
| for instr in instructions { |
There was a problem hiding this comment.
We should not need to clone the entire node here, the only thing we do with each instruction is to match it in the loop, so this should work unless I'm missing something:
| let Node::Work { | |
| name: _, | |
| args: _, | |
| instructions, | |
| dist: _, | |
| } = node.clone() | |
| else { | |
| unreachable!() | |
| }; | |
| // Populate the global mapping with observed runtime functions | |
| for instr in instructions { | |
| // Populate the global mapping with observed runtime functions | |
| for instr in node.instructions { |
| let Node::Work { | ||
| name: _, | ||
| args: _, | ||
| instructions, | ||
| dist: _, | ||
| } = node.clone() | ||
| else { | ||
| unreachable!() | ||
| }; |
There was a problem hiding this comment.
This might need a bit of refactoring and I know it is not a big deal because we are only doing this during start up, but we are cloning the entire node in order to access its underlying instructions... Twice!
Consider instead adding a get_instructions method to Node that will let us access the instructions through a reference, something like this:
impl Node {
pub fn get_instructions(&self) -> Option<&[Instruction]> {
match self {
Node::Machine { .. } => None,
Node::Work { instructions, .. } => Some(&instructions),
}
}
}| let Node::Work { | ||
| name: _, | ||
| args: _, | ||
| instructions: _, | ||
| dist, | ||
| } = self.node.clone() | ||
| else { | ||
| unreachable!() | ||
| }; |
There was a problem hiding this comment.
Similar comment here about adding a way to reach the dist field without needing to clone the entire node.
| thread::sleep(time::Duration::from_millis( | ||
| (interval * 1000.0).round() as u64, | ||
| )); |
There was a problem hiding this comment.
Would this be equivalent?
| thread::sleep(time::Duration::from_millis( | |
| (interval * 1000.0).round() as u64, | |
| )); | |
| thread::sleep(time::Duration::from_secs_f64( | |
| interval.round(), | |
| )); |
Add a new worker called "script", which compiles previously introduced parser using LLVM into bytecode for execution. It also provides a set of functionality available at runtime for such scripts. It doesn't implement the machine state part yet.
Note that it's a breaking change in terms of the interface. If before the workload configuration was passed as a first positional argument, now it has to be passed via
-c workload.toml, to distinguish from passing a script via-f script.ber.