Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## Version 0.9.0
- Added `continuous` jobs: a light-weight alternative to loop jobs. Continuous jobs automatically submit their continuation but do not track their cycle nor do they perform archival operations. See [the manual](https://vachalab.github.io/qq-manual/continuous_job.html) for more information.

***

## Version 0.8.0
- Added the `--transfer-mode` and `--archive-mode` options, which allow automatically transferring (and archiving, respectively) files from the working directory for other jobs than those successfully finished. See [the manual](https://vachalab.github.io/qq-manual/transfer_modes.html) for more information.
- As a consequence of the above change, the behavior of `qq go`, `qq sync`, and `qq wipe` has been slightly adjusted.
Expand Down
2 changes: 1 addition & 1 deletion scripts/qq_scripts/gmx-eta
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Requires `uv`: https://docs.astral.sh/uv
# ]
#
# [tool.uv.sources]
# qq = { git = "https://github.com/Ladme/qq.git", tag = "v0.8.0" }
# qq = { git = "https://github.com/Ladme/qq.git", tag = "v0.9.0" }
# ///

import argparse
Expand Down
2 changes: 1 addition & 1 deletion scripts/qq_scripts/multi-check
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Requires `uv`: https://docs.astral.sh/uv
# ]
#
# [tool.uv.sources]
# qq = { git = "https://github.com/Ladme/qq.git", tag = "v0.8.0" }
# qq = { git = "https://github.com/Ladme/qq.git", tag = "v0.9.0" }
# ///

import argparse
Expand Down
2 changes: 1 addition & 1 deletion scripts/qq_scripts/multi-kill
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Requires `uv`: https://docs.astral.sh/uv
# ]
#
# [tool.uv.sources]
# qq = { git = "https://github.com/Ladme/qq.git", tag = "v0.8.0" }
# qq = { git = "https://github.com/Ladme/qq.git", tag = "v0.9.0" }
# ///

import argparse
Expand Down
2 changes: 1 addition & 1 deletion scripts/qq_scripts/multi-submit
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Requires `uv`: https://docs.astral.sh/uv
# ]
#
# [tool.uv.sources]
# qq = { git = "https://github.com/Ladme/qq.git", tag = "v0.8.0" }
# qq = { git = "https://github.com/Ladme/qq.git", tag = "v0.9.0" }
# ///

import argparse
Expand Down
12 changes: 12 additions & 0 deletions src/qq_lib/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,3 +725,15 @@ def available_work_dirs() -> str:
return ", ".join([f"'{work_dir_type}'" for work_dir_type in work_dirs])
except QQError:
return "??? (no batch system detected)"


def available_job_types() -> str:
"""
Return the supported job types.

Returns:
str: A comma-separated list of supported job types, each wrapped in quotes.
"""
from qq_lib.properties.job_type import JobType

return ", ".join([f"'{str(job_type)}'" for job_type in JobType])
1 change: 1 addition & 0 deletions src/qq_lib/properties/job_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class JobType(Enum):

STANDARD = 1
LOOP = 2
CONTINUOUS = 3

def __str__(self):
return self.name.lower()
Expand Down
36 changes: 23 additions & 13 deletions src/qq_lib/run/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ def __init__(self, info_file: Path, host: str):
else:
self._archiver = None

if self._informer.info.job_type == JobType.CONTINUOUS:
self._should_resubmit = True

def prepare(self) -> None:
"""
Prepare the script for execution, setting up the archive
Expand Down Expand Up @@ -211,7 +214,7 @@ def execute(self) -> int:
# if the script returns an exit code corresponding to CFG.exit_codes.qq_run_no_resubmit,
# do not submit the next cycle of the job but return 0
if (
self._informer.info.loop_info is not None
self._informer.info.job_type in [JobType.LOOP, JobType.CONTINUOUS]
and self._process.returncode == CFG.exit_codes.qq_run_no_resubmit
):
logger.debug(
Expand Down Expand Up @@ -243,7 +246,7 @@ def finalize(self) -> None:
- If not using scratch: No file operations are performed.
3. Updates the qq info file to "finished" (exit code 0) or "failed" (non-zero
exit code).
4. Resubmits the job if it is a loop job and completed successfully.
4. Resubmits the job if it is a loop or continuous job and was completed successfully.

Raises:
QQError: If copying, deletion, or archiving of files fails or if the resubmission fails.
Expand Down Expand Up @@ -291,8 +294,8 @@ def finalize(self) -> None:
# update the qqinfo file
self._updateInfoFinished()

# if this is a loop job
if self._informer.info.job_type == JobType.LOOP:
# if this is a loop/continuous job
if self._informer.info.job_type in [JobType.LOOP, JobType.CONTINUOUS]:
self._resubmit()
else:
# update the qqinfo file
Expand Down Expand Up @@ -646,25 +649,32 @@ def _reloadInfoAndEnsureValid(self, retry: bool = False) -> None:

def _resubmit(self) -> None:
"""
Resubmit the current loop job to the batch system if additional cycles remain.
Resubmit the current job if either of the following is true:
a) it is a loop job and additional cycles remain,
b) it is a continuous job that should be resubmitted.

Raises:
QQError: If the job cannot be resubmitted.
"""
if not (loop_info := self._informer.info.loop_info):
logger.debug("Loop info is undefined while resubmiting. This is a bug!")
return

if loop_info.current >= loop_info.end:
logger.info("This was the final cycle of the loop job. Not resubmitting.")
return

if not self._should_resubmit:
logger.info(
f"The script finished with an exit code of '{CFG.exit_codes.qq_run_no_resubmit}' indicating that the next cycle of the job should not be submitted. Not resubmitting."
)
return

if self._informer.info.job_type == JobType.LOOP:
if not (loop_info := self._informer.info.loop_info):
logger.warning(
"Loop info is undefined while resubmiting a loop job. This is a bug!"
)
return

if loop_info.current >= loop_info.end:
logger.info(
"This was the final cycle of the loop job. Not resubmitting."
)
return

logger.info("Resubmitting the job.")
logger.debug(
f"Resubmitting using the batch system '{str(self._batch_system)}'."
Expand Down
8 changes: 6 additions & 2 deletions src/qq_lib/submit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from click_option_group import optgroup

from qq_lib.core.click_format import GNUHelpColorsCommand
from qq_lib.core.common import available_work_dirs, get_runtime_files
from qq_lib.core.common import (
available_job_types,
available_work_dirs,
get_runtime_files,
)
from qq_lib.core.config import CFG
from qq_lib.core.error import QQError
from qq_lib.core.logger import get_logger
Expand Down Expand Up @@ -86,7 +90,7 @@ def complete_script(
"--job-type",
type=str,
default=None,
help="Type of the qq job. Defaults to 'standard'.",
help=f"Type of the qq job. Defaults to 'standard'. Available types: {available_job_types()}.",
)
@optgroup.option(
"--exclude",
Expand Down
67 changes: 49 additions & 18 deletions src/qq_lib/submit/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,41 +183,69 @@ def submit(self) -> str:

def continuesLoop(self) -> bool:
"""
Determine whether the submitted job is a continuation of a loop job.

Checks if an info file exists in the input directory that corresponds
to the previous cycle of the same loop job. A job is considered a valid
continuation if:
- An info file is found.
- Both the info file and the current job are loop jobs.
- The previous job finished successfully.
- The previous loop cycle number is exactly one less than the current one.
Determine whether the submitted job is a continuation of a loop/continuous job.

Returns:
bool: True if the job is a valid continuation of a previous loop job,
bool: True if the job is a valid continuation of a previous loop/continuous job,
False otherwise.
"""
try:
# only one qq info file can be present
# there should only be one info file for both loop jobs (runtime files are archived)
# and continuous jobs (runtime files overwrite each other)
info_file = get_info_file(self._input_dir)
informer = Informer.fromFile(info_file)

if (
informer.info.loop_info
and self._loop_info
and informer.info.job_state == NaiveState.FINISHED
and informer.info.loop_info.current == self._loop_info.current - 1
if self._loopJobContinuesLoop(informer) or self._continuousJobContinuesLoop(
informer
):
logger.debug("Valid loop job with a correct cycle.")
logger.debug("Valid loop job with a correct cycle or a continuous job.")
return True
logger.debug(
"Detected info file is either not a loop job or does not correspond to the previous cycle."
"Detected info file does not correspond to a resubmittable job."
)
return False
except QQError as e:
logger.debug(f"Could not read an info file: {e}.")
return False

def _loopJobContinuesLoop(self, previous: Informer) -> bool:
"""
Determine whether the submitted job is a continuation of a loop job.

Args:
previous (Informer): Informer associated with the previous job.

Returns:
bool: True if the job is a valid continuation of a previous loop job, False otherwise.
"""
return (
# both the previous job and the current job must be loop jobs
previous.info.loop_info is not None
and self._loop_info is not None
# previous job must be successfully finished
and previous.info.job_state == NaiveState.FINISHED
# the cycle of the current job is one more than the cycle of the previous job
and previous.info.loop_info.current == self._loop_info.current - 1
)

def _continuousJobContinuesLoop(self, previous: Informer) -> bool:
"""
Determine whether the submitted job is a continuation of a continuous job.

Args:
previous (Informer): Informer associated with the previous job.

Returns:
bool: True if the job is a valid continuation of a previous continuous job, False otherwise.
"""
return (
# both the previous and the current job must be continuous jobs
previous.info.job_type == JobType.CONTINUOUS
and self._job_type == JobType.CONTINUOUS
# previous job must be successfully finished
and previous.info.job_state == NaiveState.FINISHED
)

def getInputDir(self) -> Path:
"""
Get path to the job's input directory.
Expand Down Expand Up @@ -327,6 +355,9 @@ def _createEnvVarsDict(self) -> dict[str, str]:
env_vars[CFG.env_vars.loop_start] = str(self._loop_info.start)
env_vars[CFG.env_vars.loop_end] = str(self._loop_info.end)
env_vars[CFG.env_vars.archive_format] = self._loop_info.archive_format

# loop job- or continuous job-specific environment variables
if self._job_type in [JobType.LOOP, JobType.CONTINUOUS]:
env_vars[CFG.env_vars.no_resubmit] = str(CFG.exit_codes.qq_run_no_resubmit)

return env_vars
Expand Down
30 changes: 30 additions & 0 deletions tests/test_properties_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,36 @@ def test_get_command_line_for_resubmit_basic(sample_info):
]


def test_get_command_line_for_continuous(sample_info):
sample_info.job_type = JobType.CONTINUOUS
sample_info.excluded_files = [Path("exclude.txt"), Path("inner/exclude2.txt")]
sample_info.included_files = [Path("include.txt"), Path("inner/include2.txt")]

assert sample_info.getCommandLineForResubmit() == [
"script.sh",
"--queue",
"default",
"--job-type",
"continuous",
"--batch-system",
"PBS",
"--depend",
"afterok=12345.fake.server.com",
"--ncpus",
"8",
"--work-dir",
"scratch_local",
"--account",
"fake-account",
"--exclude",
"exclude.txt,inner/exclude2.txt",
"--include",
"include.txt,inner/include2.txt",
"--transfer-mode",
"success",
]


def test_get_command_line_full(sample_info):
sample_info.job_type = JobType.LOOP
sample_info.excluded_files = [Path("exclude.txt"), Path("inner/exclude2.txt")]
Expand Down
5 changes: 5 additions & 0 deletions tests/test_properties_job_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
def test_str_method():
assert str(JobType.STANDARD) == "standard"
assert str(JobType.LOOP) == "loop"
assert str(JobType.CONTINUOUS) == "continuous"


@pytest.mark.parametrize(
Expand All @@ -22,6 +23,9 @@ def test_str_method():
("loop", JobType.LOOP),
("LOOP", JobType.LOOP),
("LoOp", JobType.LOOP),
("continuous", JobType.CONTINUOUS),
("CONTINUOUS", JobType.CONTINUOUS),
("ConTiNUOus", JobType.CONTINUOUS),
],
)
def test_from_str_valid(input_str, expected):
Expand All @@ -37,6 +41,7 @@ def test_from_str_valid(input_str, expected):
"123",
"standrd", # intentional typo
"looping",
"continous", # intentional typo
],
)
def test_from_str_invalid_raises(invalid_str):
Expand Down
20 changes: 13 additions & 7 deletions tests/test_run_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ def terminate_and_stop():

def test_runner_resubmit_final_cycle():
informer_mock = MagicMock()
informer_mock.info.job_type = JobType.LOOP
informer_mock.info.loop_info.current = 5
informer_mock.info.loop_info.end = 5

Expand Down Expand Up @@ -1117,7 +1118,11 @@ def test_runner_execute_updates_info_and_runs_script(tmp_path):
assert retcode == 0


def test_runner_execute_handles_no_resubmit_exit_code(tmp_path):
@pytest.mark.parametrize(
"job_type",
[JobType.LOOP, JobType.CONTINUOUS],
)
def test_runner_execute_handles_no_resubmit_exit_code(tmp_path, job_type):
script_file = tmp_path / "script.sh"
script_file.write_text("#!/bin/bash\necho Hello\n")

Expand All @@ -1131,6 +1136,7 @@ def test_runner_execute_handles_no_resubmit_exit_code(tmp_path):
runner._informer.info.stdout_file = stdout_file
runner._informer.info.stderr_file = stderr_file
runner._informer.info.loop_info = MagicMock()
runner._informer.info.job_type = job_type
runner._should_resubmit = True

mock_process = MagicMock()
Expand Down Expand Up @@ -1319,8 +1325,8 @@ def test_runner_reload_info_without_retry(mock_informer_cls, mock_retryer_cls):
def test_runner_ensure_matches_job_with_matching_numeric_id():
informer = MagicMock()
informer.info.job_id = "12345.cluster.domain"
informer.matchesJob = (
lambda job_id: informer.info.job_id.split(".", 1)[0] == job_id.split(".", 1)[0]
informer.matchesJob = lambda job_id: (
informer.info.job_id.split(".", 1)[0] == job_id.split(".", 1)[0]
)

runner = Runner.__new__(Runner)
Expand All @@ -1333,8 +1339,8 @@ def test_runner_ensure_matches_job_with_matching_numeric_id():
def test_runner_ensure_matches_job_with_different_numeric_id_raises():
informer = MagicMock()
informer.info.job_id = "99999.cluster.domain"
informer.matchesJob = (
lambda job_id: informer.info.job_id.split(".", 1)[0] == job_id.split(".", 1)[0]
informer.matchesJob = lambda job_id: (
informer.info.job_id.split(".", 1)[0] == job_id.split(".", 1)[0]
)

runner = Runner.__new__(Runner)
Expand All @@ -1348,8 +1354,8 @@ def test_runner_ensure_matches_job_with_different_numeric_id_raises():
def test_runner_ensure_matches_job_with_partial_suffix_matching():
informer = MagicMock()
informer.info.job_id = "5678.random.server.org"
informer.matchesJob = (
lambda job_id: informer.info.job_id.split(".", 1)[0] == job_id.split(".", 1)[0]
informer.matchesJob = lambda job_id: (
informer.info.job_id.split(".", 1)[0] == job_id.split(".", 1)[0]
)

runner = Runner.__new__(Runner)
Expand Down
Loading
Loading