Skip to content

Script worker#55

Open
erthalion wants to merge 1 commit intomainfrom
feature/new-script-worker
Open

Script worker#55
erthalion wants to merge 1 commit intomainfrom
feature/new-script-worker

Conversation

@erthalion
Copy link
Collaborator

Add a new worker called "script", which compiles previously introduced parser using LLVM into bytecode for execution. It also provides a set of functionality available at runtime for such scripts. It doesn't implement the machine state part yet.

Note that it's a breaking change in terms of the interface. If before the workload configuration was passed as a first positional argument, now it has to be passed via -c workload.toml, to distinguish from passing a script via -f script.ber.

Add a new worker called "script", which compiles previously introduced
parser using LLVM into bytecode for execution. It also provides a set of
functionality available at runtime for such scripts.

Note that it's a breaking change in terms of the interface. If before
the workload configuration was passed as a first positional argument,
now it has to be passed via `-c workload.toml`, to distinguish from
passing a script via `-f script.ber`.
Copy link
Contributor

@Molter73 Molter73 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to go through script.rs, but it is late on a Friday, so I'll come back to this later. Overall looks good!

I'm getting an error when trying to compile:

error: No suitable version of LLVM was found system-wide or pointed
              to by LLVM_SYS_201_PREFIX.

              Consider using `llvmenv` to compile an appropriate copy of LLVM, and
              refer to the llvm-sys documentation for more information.

              llvm-sys: https://crates.io/crates/llvm-sys
              llvmenv: https://crates.io/crates/llvmenv
   --> /home/mmoltras/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/llvm-sys-201.0.1/src/lib.rs:533:1
    |
533 | / std::compile_error!(concat!(
534 | |     "No suitable version of LLVM was found system-wide or pointed
535 | |        to by LLVM_SYS_",
536 | |     env!("CARGO_PKG_VERSION_MAJOR"),
...   |
543 | |        llvmenv: https://crates.io/crates/llvmenv"
544 | | ));
    | |__^

Do we have any guidance on what version of LLVM we should have available?

flag_f: Option<String>,
}

fn run_script(script_path: String) -> Vec<(i32, u64)> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a good idea to move this function and run_workload to a separate file, we should aim to have a simple main function in here and nothing else.

Comment on lines +75 to +128
let works = apply_rules(works);

works.into_iter().for_each(|node| {
debug!("AST NODE: {:?}", node);

let Node::Work {
name: _,
args,
instructions: _,
dist: _,
} = node
else {
unreachable!()
};

let workers: u32 = args
.get("workers")
.cloned()
.unwrap_or(String::from("0"))
.parse()
.unwrap();

info!("Config: {:?}", config);
let duration: u64 = args
.get("duration")
.cloned()
.unwrap_or(String::from("0"))
.parse()
.unwrap();

let h: Vec<_> = (0..workers)
.map(|_| {
let worker = new_script_worker(node.clone());

match fork() {
Ok(Fork::Parent(child)) => {
info!("Child {}", child);
Some((child, duration))
}
Ok(Fork::Child) => {
worker.run_payload().unwrap();
None
}
Err(e) => {
warn!("Failed: {e:?}");
None
}
}
})
.collect();

handles.extend(h);
});

handles.iter().filter_map(|i| *i).collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is equivalent and (maybe) slightly easier to read:

Suggested change
let works = apply_rules(works);
works.into_iter().for_each(|node| {
debug!("AST NODE: {:?}", node);
let Node::Work {
name: _,
args,
instructions: _,
dist: _,
} = node
else {
unreachable!()
};
let workers: u32 = args
.get("workers")
.cloned()
.unwrap_or(String::from("0"))
.parse()
.unwrap();
info!("Config: {:?}", config);
let duration: u64 = args
.get("duration")
.cloned()
.unwrap_or(String::from("0"))
.parse()
.unwrap();
let h: Vec<_> = (0..workers)
.map(|_| {
let worker = new_script_worker(node.clone());
match fork() {
Ok(Fork::Parent(child)) => {
info!("Child {}", child);
Some((child, duration))
}
Ok(Fork::Child) => {
worker.run_payload().unwrap();
None
}
Err(e) => {
warn!("Failed: {e:?}");
None
}
}
})
.collect();
handles.extend(h);
});
handles.iter().filter_map(|i| *i).collect()
apply_rules(works)
.into_iter()
.map(|node| {
let Node::Work { args, .. } = node else {
unreachable!()
};
let workers: u32 = args
.get("workers")
.cloned()
.unwrap_or(String::from("0"))
.parse()
.unwrap();
let duration: u64 = args
.get("duration")
.cloned()
.unwrap_or(String::from("0"))
.parse()
.unwrap();
(0..workers)
.filter_map(|_| {
let worker = new_script_worker(node.clone());
match fork() {
Ok(Fork::Parent(child)) => {
info!("Child {}", child);
Some((child, duration))
}
Ok(Fork::Child) => {
worker.run_payload().unwrap();
None
}
Err(e) => {
warn!("Failed: {e:?}");
None
}
}
})
.collect::<Vec<_>>()
})
.flatten()
.collect()

Comment on lines +90 to +102
let workers: u32 = args
.get("workers")
.cloned()
.unwrap_or(String::from("0"))
.parse()
.unwrap();

info!("Config: {:?}", config);
let duration: u64 = args
.get("duration")
.cloned()
.unwrap_or(String::from("0"))
.parse()
.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could give args are more concise type that handled all this logic under the hood 🤔

.collect();

info!("In total: {}", upper);
handles.iter().filter_map(|i| *i).collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably change the map operation in line 72 to filter_map and replace this line with just handles.

This will also save an iteration and allocation, since this line will iterate over all the elements of handles, remove the None elements and allocate a new vector for the remaining items, all of which can be done during the original iteration.

Comment on lines +239 to +263
// Ignore processes without specified duration -- we don't want
// neither terminate them, nor count against processes to compare.
let watched_processes = processes
.iter()
.filter(|(_, duration)| *duration > 0)
.collect::<Vec<_>>();

// Find all processes with expired duration. If we've received
// SIGTERM, get all processes.
let expired = watched_processes
.iter()
.filter(|(_, duration)| {
*duration < elapsed
|| terminating.load(Ordering::Relaxed)
})
.collect::<Vec<_>>();

for (handle, _) in &expired {
info!("Terminating: {}", *handle);
let _ = kill(Pid::from_raw(*handle), Signal::SIGKILL);
}
});
}

if expired.len() == watched_processes.len() {
break;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is equivalent (barring some comments):

Suggested change
// Ignore processes without specified duration -- we don't want
// neither terminate them, nor count against processes to compare.
let watched_processes = processes
.iter()
.filter(|(_, duration)| *duration > 0)
.collect::<Vec<_>>();
// Find all processes with expired duration. If we've received
// SIGTERM, get all processes.
let expired = watched_processes
.iter()
.filter(|(_, duration)| {
*duration < elapsed
|| terminating.load(Ordering::Relaxed)
})
.collect::<Vec<_>>();
for (handle, _) in &expired {
info!("Terminating: {}", *handle);
let _ = kill(Pid::from_raw(*handle), Signal::SIGKILL);
}
});
}
if expired.len() == watched_processes.len() {
break;
}
for (handle, _) in processes.extract_if(.., |(_, duration)| {
(*duration > 0 && *duration < elapsed)
|| terminating.load(Ordering::Relaxed)
}) {
let _ = kill(Pid::from_raw(*handle), Signal::SIGKILL);
}
if processes.is_empty()
|| processes.iter().all(|(_, duration)| *duration == 0)
{
break;
}

@erthalion
Copy link
Collaborator Author

Do we have any guidance on what version of LLVM we should have available?

Good point, I had to call this out. llvm-sys crate requires a specified version of LLVM for every version of the crate with the following pattern: llvm-sys version = LLVM version * 10. It seems to possible to build everything with the LLVM version higher than required, but we can first stick with the default recommendation.

Copy link
Contributor

@Molter73 Molter73 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, now I'm done going through the changes :)

context: LLVMContextRef,
}

/// # Safety
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a few # Safety comments with no concrete guidelines on what callers should look out for, we might want to update them with something like this:

Suggested change
/// # Safety
/// # Safety
///
/// The caller must ensure the pointer is valid and points to a null terminated C-string.


/// # Safety
///
/// Open a file with create and write permissions and write to it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment probably needs updating.


stream.write_all(b"Hello\n").unwrap();
let mut buf = vec![];
stream.read_exact(&mut buf).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_exact will read the number of bytes needed to fill buf, since buf is defined as an empty vector, does that mean this call always return immediately? Do we know what we should expect as an answer? Maybe we can try reading that length by declaring an array instead as proposed in the read_exact docs?

let mut buf = [0; 4];
stream.read_exact(&mut buf).unwrap();

https://doc.rust-lang.org/std/io/trait.Read.html#method.read_exact

///
/// Spawn a process with a random argument.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn task(name: *const i8, random: bool) -> u64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like random is only being logged, are we keeping this here as a placeholder for future work?

Comment on lines +183 to +187
builder: LLVMBuilderRef,
ee: LLVMExecutionEngineRef,
module: LLVMModuleRef,
module_state: &HashMap<String, LLVMValueRef>,
module_runtime: &HashMap<String, (LLVMValueRef, LLVMTypeRef)>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, it looks like the jit_instruction and get_arg_value methods are always called with references to the same values, would it make sense to move these into ScriptWorker itself and simply pass &self to jit_instruction? Or is this done because these values are only needed when creating the ScriptWorker in new and can be dropped afterwards? If the latter is the case, would it make sense to have some sort of auxiliar ScriptWorkerBuilder struct that can own these values and drop them along itself once the final ScriptWorker is built?

I know I'm being a bit finicky here, I just want to see if we can slightly simplify these methods.

};

// Populate the global mapping with observed runtime functions
for instr in instructions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm being a bit dense, why are we iterating the instruction twice? Could we not do this mapping before the LLVM... calls in between the loops?

Comment on lines +468 to +479
let Node::Work {
name: _,
args: _,
instructions,
dist: _,
} = node.clone()
else {
unreachable!()
};

// Populate the global mapping with observed runtime functions
for instr in instructions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not need to clone the entire node here, the only thing we do with each instruction is to match it in the loop, so this should work unless I'm missing something:

Suggested change
let Node::Work {
name: _,
args: _,
instructions,
dist: _,
} = node.clone()
else {
unreachable!()
};
// Populate the global mapping with observed runtime functions
for instr in instructions {
// Populate the global mapping with observed runtime functions
for instr in node.instructions {

Comment on lines +394 to +402
let Node::Work {
name: _,
args: _,
instructions,
dist: _,
} = node.clone()
else {
unreachable!()
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might need a bit of refactoring and I know it is not a big deal because we are only doing this during start up, but we are cloning the entire node in order to access its underlying instructions... Twice!

Consider instead adding a get_instructions method to Node that will let us access the instructions through a reference, something like this:

impl Node {
    pub fn get_instructions(&self) -> Option<&[Instruction]> {
        match self {
            Node::Machine { .. } => None,
            Node::Work { instructions, .. } => Some(&instructions),
        }
    }
}

Comment on lines +520 to +528
let Node::Work {
name: _,
args: _,
instructions: _,
dist,
} = self.node.clone()
else {
unreachable!()
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here about adding a way to reach the dist field without needing to clone the entire node.

Comment on lines +548 to +550
thread::sleep(time::Duration::from_millis(
(interval * 1000.0).round() as u64,
));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be equivalent?

Suggested change
thread::sleep(time::Duration::from_millis(
(interval * 1000.0).round() as u64,
));
thread::sleep(time::Duration::from_secs_f64(
interval.round(),
));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants