diff --git a/.github/workflows/test_python_scripts.yml b/.github/workflows/test_python_scripts.yml new file mode 100644 index 00000000..38911bdf --- /dev/null +++ b/.github/workflows/test_python_scripts.yml @@ -0,0 +1,29 @@ +name: Test Python scripts + +on: + workflow_dispatch: + pull_request: + push: + branches: + - master + +jobs: + test_python_scripts: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-python@v6 + with: + python-version: "3.12" + - name: Install dependencies + run: pip install pytest pytest-cov + - name: Run tests + run: python -m pytest scripts/*.py --cov=scripts --cov-report=xml + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v5 + with: + fail_ci_if_error: true + files: coverage.xml + flags: python_scripts + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/scripts/tool_communication.py b/scripts/tool_communication.py index d77e7733..4a4f10dc 100755 --- a/scripts/tool_communication.py +++ b/scripts/tool_communication.py @@ -37,6 +37,10 @@ import argparse import socket import os +import pytest + +RED = "\033[31m" +RESET = "\033[0m" # Custom formatter to show both default values and description formatting in the help message @@ -83,8 +87,6 @@ def check_tcp(ip, port, timeout=5.0): def main(args): - RED = "\033[31m" - RESET = "\033[0m" logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s") @@ -97,27 +99,33 @@ def main(args): # Check IP and port reachability if not check_tcp(robot_ip, tcp_port): - logging.error( - f"{RED}Cannot reach {robot_ip}:{tcp_port}.\n" + raise ConnectionError( + f"Cannot reach {robot_ip}:{tcp_port}.\n" "Check that the IP address and port are correct.\n" "If so, ensure that the robot is powered on, reachable on the network, " - f"and that the ToolCommForwarder URCap is running.{RESET}" + f"and that the ToolCommForwarder URCap is running." + ) + + # Check if parent directory of device_name exists + parent_dir = os.path.dirname(local_device) + if parent_dir and not os.path.exists(parent_dir): + raise FileNotFoundError( + f"Parent directory '{parent_dir}' does not exist.\n" + "Socat needs an existing directory to create the PTY symlink.\n" + "Fix:\n" + f" - Create the parent directory, e.g. 'mkdir -p {parent_dir}'.\n" + f" - Use a different device name with an existing parent directory." ) - logging.info("Exiting tool communication script.") - return # Check if the device_name is a directory if os.path.isdir(local_device): - - logging.error( - f"{RED}'{local_device}' exists and is a directory.\n" + raise FileExistsError( + f"'{local_device}' exists and is a directory.\n" "Socat needs a file path to create a PTY symlink, but it cannot replace a directory.\n" "Fix:\n" " - Remove the directory.\n" - f" - Use a different device name, e.g. '--device-name /tmp/ttyUR0'. {RESET}" - ) - logging.info("Exiting tool communication script.") - return + f" - Use a different device name, e.g. '--device-name /tmp/ttyUR0'." + ) # Configure socat command socat_config = [ @@ -144,19 +152,64 @@ def main(args): # Error case when socat is not installed except FileNotFoundError: - logging.error(f"{RED}Socat not found in PATH. Install it (e.g. apt-get install socat). {RESET}") - logging.info("Exiting tool communication script.") - return + raise FileNotFoundError("Socat not found in PATH. Install it (e.g. apt-get install socat).") - # Other errors - except Exception as e: - logging.error(f"{RED}Unexpected error launching socat: {e} {RESET}") - logging.info("Exiting tool communication script.") - return - return +def test_check_tcp(): + assert not check_tcp("127.0.0.1", 0.1) + + +def test_check_tcp_open_port(): + bind_ip = "127.0.0.1" + bind_port = 54321 + args = argparse.Namespace( + robot_ip=bind_ip, + tcp_port=bind_port, + device_name="/tmp/nonexistent_dir/ttyUR" + ) + with pytest.raises(ConnectionError): + main(args) + + +def test_parent_dir_doesnt_exist(): + bind_ip = "127.0.0.1" + bind_port = 54321 + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.bind((bind_ip, bind_port)) + server.listen(1) + args = argparse.Namespace( + robot_ip=bind_ip, + tcp_port=bind_port, + device_name="/tmp/nonexistent_dir/ttyUR" + ) + with pytest.raises(FileNotFoundError) as exc_info: + main(args) + assert os.path.dirname(args.device_name) in str(exc_info.value) + server.close() + + +def test_device_name_is_directory(): + bind_ip = "127.0.0.1" + bind_port = 54321 + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.bind((bind_ip, bind_port)) + server.listen(1) + args = argparse.Namespace( + robot_ip=bind_ip, + tcp_port=bind_port, + device_name="/tmp/ttyUR_dir" + ) + os.makedirs(args.device_name, exist_ok=True) + with pytest.raises(FileExistsError) as exc_info: + main(args) + assert args.device_name in str(exc_info.value) + server.close() if __name__ == "__main__": args = get_args() - main(args) + try: + main(args) + except Exception as e: + logging.error(f"{RED}Unexpected error: {e} {RESET}") + logging.info("Exiting tool communication script.")