-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathconcore.py
More file actions
256 lines (221 loc) · 8.9 KB
/
concore.py
File metadata and controls
256 lines (221 loc) · 8.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import time
import logging
import os
import atexit
from ast import literal_eval
import sys
import re
import zmq
import numpy as np
import signal
import concore_base
logger = logging.getLogger('concore')
logger.addHandler(logging.NullHandler())
#these lines mute the noisy library
logging.getLogger('matplotlib').setLevel(logging.WARNING)
logging.getLogger('PIL').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
# if windows, register this process PID for safe termination
# Previous approach: single "concorekill.bat" overwritten by each node (race condition).
# New approach: append PID to shared registry; generate validated kill script.
# See: https://github.com/ControlCore-Project/concore/issues/391
_LOCK_LEN = 0x7FFFFFFF # lock range large enough to cover entire file
_BASE_DIR = os.path.abspath(".") # capture CWD before atexit can shift it
_PID_REGISTRY_FILE = os.path.join(_BASE_DIR, "concorekill_pids.txt")
_KILL_SCRIPT_FILE = os.path.join(_BASE_DIR, "concorekill.bat")
def _register_pid():
"""Append current PID to the shared registry file. Uses file locking on Windows."""
try:
with open(_PID_REGISTRY_FILE, "a") as f:
if hasattr(sys, 'getwindowsversion'):
import msvcrt
try:
msvcrt.locking(f.fileno(), msvcrt.LK_LOCK, _LOCK_LEN)
f.write(str(os.getpid()) + "\n")
finally:
try:
f.seek(0)
msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, _LOCK_LEN)
except OSError:
pass
else:
f.write(str(os.getpid()) + "\n")
except OSError:
pass
def _cleanup_pid():
"""Remove current PID from registry on exit. Uses file locking on Windows."""
pid = str(os.getpid())
try:
if not os.path.exists(_PID_REGISTRY_FILE):
return
with open(_PID_REGISTRY_FILE, "r+") as f:
if hasattr(sys, 'getwindowsversion'):
import msvcrt
try:
msvcrt.locking(f.fileno(), msvcrt.LK_LOCK, _LOCK_LEN)
pids = [line.strip() for line in f if line.strip()]
remaining = [p for p in pids if p != pid]
if remaining:
f.seek(0)
f.truncate()
for p in remaining:
f.write(p + "\n")
else:
f.close()
try:
os.remove(_PID_REGISTRY_FILE)
except OSError:
pass
try:
os.remove(_KILL_SCRIPT_FILE)
except OSError:
pass
return
finally:
try:
f.seek(0)
msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, _LOCK_LEN)
except (OSError, ValueError):
pass
else:
pids = [line.strip() for line in f if line.strip()]
remaining = [p for p in pids if p != pid]
if remaining:
f.seek(0)
f.truncate()
for p in remaining:
f.write(p + "\n")
else:
f.close()
try:
os.remove(_PID_REGISTRY_FILE)
except OSError:
pass
try:
os.remove(_KILL_SCRIPT_FILE)
except OSError:
pass
except OSError:
pass
def _write_kill_script():
"""Generate concorekill.bat that validates each PID before killing."""
try:
reg_name = os.path.basename(_PID_REGISTRY_FILE)
bat_name = os.path.basename(_KILL_SCRIPT_FILE)
script = "@echo off\r\n"
script += 'if not exist "%~dp0' + reg_name + '" (\r\n'
script += " echo No PID registry found. Nothing to kill.\r\n"
script += " exit /b 0\r\n"
script += ")\r\n"
script += 'for /f "usebackq tokens=*" %%p in ("%~dp0' + reg_name + '") do (\r\n'
script += ' wmic process where "ProcessId=%%p" get CommandLine /value 2>nul | find /i "concore" >nul\r\n'
script += " if not errorlevel 1 (\r\n"
script += " echo Killing concore process %%p\r\n"
script += " taskkill /F /PID %%p >nul 2>&1\r\n"
script += " ) else (\r\n"
script += " echo Skipping PID %%p - not a concore process or not running\r\n"
script += " )\r\n"
script += ")\r\n"
script += 'del /q "%~dp0' + reg_name + '" 2>nul\r\n'
script += 'del /q "%~dp0' + bat_name + '" 2>nul\r\n'
with open(_KILL_SCRIPT_FILE, "w", newline="") as f:
f.write(script)
except OSError:
pass
if hasattr(sys, 'getwindowsversion'):
_register_pid()
_write_kill_script()
atexit.register(_cleanup_pid)
ZeroMQPort = concore_base.ZeroMQPort
convert_numpy_to_python = concore_base.convert_numpy_to_python
safe_literal_eval = concore_base.safe_literal_eval
parse_params = concore_base.parse_params
# Global variables
zmq_ports = {}
_cleanup_in_progress = False
last_read_status = "SUCCESS"
s = ''
olds = ''
delay = 1
retrycount = 0
inpath = "./in" #must be rel path for local
outpath = "./out"
simtime = 0
def _port_path(base, port_num):
return base + str(port_num)
concore_params_file = os.path.join(_port_path(inpath, 1), "concore.params")
concore_maxtime_file = os.path.join(_port_path(inpath, 1), "concore.maxtime")
# Load input/output ports if present
iport = safe_literal_eval("concore.iport", {})
oport = safe_literal_eval("concore.oport", {})
_mod = sys.modules[__name__]
# ===================================================================
# ZeroMQ Communication Wrapper
# ===================================================================
def init_zmq_port(port_name, port_type, address, socket_type_str):
concore_base.init_zmq_port(_mod, port_name, port_type, address, socket_type_str)
def terminate_zmq():
"""Clean up all ZMQ sockets and contexts before exit."""
concore_base.terminate_zmq(_mod)
def signal_handler(sig, frame):
"""Handle interrupt signals gracefully."""
print(f"\nReceived signal {sig}, shutting down gracefully...")
try:
atexit.unregister(terminate_zmq)
except Exception:
pass
concore_base.terminate_zmq(_mod)
sys.exit(0)
# Register cleanup handlers
atexit.register(terminate_zmq)
signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C
if not hasattr(sys, 'getwindowsversion'):
signal.signal(signal.SIGTERM, signal_handler) # Handle termination (Unix only)
params = concore_base.load_params(concore_params_file)
#9/30/22
def tryparam(n, i):
"""Return parameter `n` from params dict, else default `i`."""
return params.get(n, i)
#9/12/21
# ===================================================================
# Simulation Time Handling
# ===================================================================
def default_maxtime(default):
"""Read maximum simulation time from file or use default."""
global maxtime
maxtime = safe_literal_eval(concore_maxtime_file, default)
default_maxtime(100)
def unchanged():
"""Check if global string `s` is unchanged since last call."""
return concore_base.unchanged(_mod)
# ===================================================================
# I/O Handling (File + ZMQ)
# ===================================================================
def read(port_identifier, name, initstr_val):
"""Read data from a ZMQ port or file-based port.
Returns:
tuple: (data, success_flag) where success_flag is True if real
data was received, False if a fallback/default was used.
Also sets ``concore.last_read_status`` to one of:
SUCCESS, FILE_NOT_FOUND, TIMEOUT, PARSE_ERROR,
EMPTY_DATA, RETRIES_EXCEEDED.
Backward compatibility:
Legacy callers that do ``value = concore.read(...)`` will
receive a tuple. They can adapt with::
result = concore.read(...)
if isinstance(result, tuple):
value, ok = result
else:
value, ok = result, True
Alternatively, check ``concore.last_read_status`` after the
call.
"""
global last_read_status
result = concore_base.read(_mod, port_identifier, name, initstr_val)
last_read_status = concore_base.last_read_status
return result
def write(port_identifier, name, val, delta=0):
concore_base.write(_mod, port_identifier, name, val, delta)
def initval(simtime_val_str):
return concore_base.initval(_mod, simtime_val_str)