Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/test_python_scripts.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Test Python scripts

on:
workflow_dispatch:
pull_request:
push:
branches:
- master

jobs:
test_python_scripts:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions/setup-python@v6
with:
python-version: "3.12"
- name: Install dependencies
run: pip install pytest pytest-cov
- name: Run tests
run: python -m pytest scripts/*.py --cov=scripts --cov-report=xml
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v5
with:
fail_ci_if_error: true
files: coverage.xml
flags: python_scripts
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
101 changes: 77 additions & 24 deletions scripts/tool_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
import argparse
import socket
import os
import pytest

RED = "\033[31m"
RESET = "\033[0m"


# Custom formatter to show both default values and description formatting in the help message
Expand Down Expand Up @@ -83,8 +87,6 @@ def check_tcp(ip, port, timeout=5.0):


def main(args):
RED = "\033[31m"
RESET = "\033[0m"

logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")

Expand All @@ -97,27 +99,33 @@ def main(args):

# Check IP and port reachability
if not check_tcp(robot_ip, tcp_port):
logging.error(
f"{RED}Cannot reach {robot_ip}:{tcp_port}.\n"
raise ConnectionError(
f"Cannot reach {robot_ip}:{tcp_port}.\n"
"Check that the IP address and port are correct.\n"
"If so, ensure that the robot is powered on, reachable on the network, "
f"and that the ToolCommForwarder URCap is running.{RESET}"
f"and that the ToolCommForwarder URCap is running."
)

# Check if parent directory of device_name exists
parent_dir = os.path.dirname(local_device)
if parent_dir and not os.path.exists(parent_dir):
raise FileNotFoundError(
f"Parent directory '{parent_dir}' does not exist.\n"
"Socat needs an existing directory to create the PTY symlink.\n"
"Fix:\n"
f" - Create the parent directory, e.g. 'mkdir -p {parent_dir}'.\n"
f" - Use a different device name with an existing parent directory."
)
logging.info("Exiting tool communication script.")
return

# Check if the device_name is a directory
if os.path.isdir(local_device):

logging.error(
f"{RED}'{local_device}' exists and is a directory.\n"
raise FileExistsError(
f"'{local_device}' exists and is a directory.\n"
"Socat needs a file path to create a PTY symlink, but it cannot replace a directory.\n"
"Fix:\n"
" - Remove the directory.\n"
f" - Use a different device name, e.g. '--device-name /tmp/ttyUR0'. {RESET}"
)
logging.info("Exiting tool communication script.")
return
f" - Use a different device name, e.g. '--device-name /tmp/ttyUR0'."
)

# Configure socat command
socat_config = [
Expand All @@ -144,19 +152,64 @@ def main(args):

# Error case when socat is not installed
except FileNotFoundError:
logging.error(f"{RED}Socat not found in PATH. Install it (e.g. apt-get install socat). {RESET}")
logging.info("Exiting tool communication script.")
return
raise FileNotFoundError("Socat not found in PATH. Install it (e.g. apt-get install socat).")

# Other errors
except Exception as e:
logging.error(f"{RED}Unexpected error launching socat: {e} {RESET}")
logging.info("Exiting tool communication script.")
return

return
def test_check_tcp():
assert not check_tcp("127.0.0.1", 0.1)


def test_check_tcp_open_port():
bind_ip = "127.0.0.1"
bind_port = 54321
args = argparse.Namespace(
robot_ip=bind_ip,
tcp_port=bind_port,
device_name="/tmp/nonexistent_dir/ttyUR"
)
with pytest.raises(ConnectionError):
main(args)


def test_parent_dir_doesnt_exist():
bind_ip = "127.0.0.1"
bind_port = 54321
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((bind_ip, bind_port))
server.listen(1)
args = argparse.Namespace(
robot_ip=bind_ip,
tcp_port=bind_port,
device_name="/tmp/nonexistent_dir/ttyUR"
)
with pytest.raises(FileNotFoundError) as exc_info:
main(args)
assert os.path.dirname(args.device_name) in str(exc_info.value)
server.close()


def test_device_name_is_directory():
bind_ip = "127.0.0.1"
bind_port = 54321
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((bind_ip, bind_port))
server.listen(1)
args = argparse.Namespace(
robot_ip=bind_ip,
tcp_port=bind_port,
device_name="/tmp/ttyUR_dir"
)
os.makedirs(args.device_name, exist_ok=True)
with pytest.raises(FileExistsError) as exc_info:
main(args)
assert args.device_name in str(exc_info.value)
server.close()


if __name__ == "__main__":
args = get_args()
main(args)
try:
main(args)
except Exception as e:
logging.error(f"{RED}Unexpected error: {e} {RESET}")
logging.info("Exiting tool communication script.")
Loading