Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions mypy/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,11 @@ def connect(self) -> None:
pid, connection_name = data["pid"], data["connection_name"]
assert isinstance(pid, int), f"Bad PID: {pid}"
assert isinstance(connection_name, str), f"Bad connection name: {connection_name}"
# Double-check this status file is created by us.
assert pid == self.proc.pid, f"PID mismatch: {pid} vs {self.proc.pid}"
if sys.platform != "win32":
# TODO(emmatyping): for some reason this does not work on Windows. Probably
# because we don't fork? We should check this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we are not the first affected by this. Apparently virtual environments and other launchers on Windows create "wrapper" processes, so we will get a different PIDs, even if using sys.executable. Found this one https://stackoverflow.com/questions/64884701/subprocess-has-wrong-pid-on-windows (and a bunch of similar questions).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find! Somehow my searching didn't find that. I'll update the comment to reflect it.

# Double-check this status file is created by us.
assert pid == self.proc.pid, f"PID mismatch: {pid} vs {self.proc.pid}"
self.conn = IPCClient(connection_name, WORKER_CONNECTION_TIMEOUT)
return
except Exception as exc:
Expand Down
59 changes: 54 additions & 5 deletions mypy/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,60 @@ def ready_to_read(conns: list[IPCClient], timeout: float | None = None) -> list[

Return index of each readable connection in the original list.
"""
# TODO: add Windows support for this.
assert sys.platform != "win32"
connections = [conn.connection for conn in conns]
ready, _, _ = select(connections, [], [], timeout)
return [connections.index(r) for r in ready]
if sys.platform == "win32":
# Windows doesn't support select() on named pipes. Instead, start an overlapped
# ReadFile on each pipe (which internally creates an event via CreateEventW),
# then WaitForMultipleObjects on those events for efficient OS-level waiting.
# Any data consumed by the probe reads is stored into each connection's buffer
# so the subsequent read_bytes() call will find it via frame_from_buffer().
WAIT_FAILED = 0xFFFFFFFF
pending: list[tuple[int, _winapi.Overlapped]] = []
events: list[int] = []
ready: list[int] = []

for i, conn in enumerate(conns):
try:
ov, err = _winapi.ReadFile(conn.connection, 1, overlapped=True)
except OSError:
# Broken/closed pipe
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this except is needed. On Linux a closed socket will be returned as ready, so that the caller will fail with No data received when getting empty data from it. We want a similar behavior on Windows so that if one of the workers crashed, the coordinator should crash as well (showing the INTERNAL ERROR: ... thing).

So I guess we can either raise IPCError here, or append this one to ready.

if err == _winapi.ERROR_IO_PENDING:
events.append(ov.event)
pending.append((i, ov))
else:
# Data was immediately available (err == 0 or ERROR_MORE_DATA)
_, err = ov.GetOverlappedResult(True)
data = ov.getbuffer()
if data:
conn.buffer.extend(data)
ready.append(i)

# Wait only if nothing is immediately ready and there are pending operations
if not ready and events:
timeout_ms = int(timeout * 1000) if timeout is not None else _winapi.INFINITE
res = _winapi.WaitForMultipleObjects(events, False, timeout_ms)
if res == WAIT_FAILED:
for _, ov in pending:
ov.cancel()
raise IPCException(f"Failed to wait for connections: {_winapi.GetLastError()}")

# Check which pending operations completed, cancel the rest
for i, ov in pending:
if _winapi.WaitForSingleObject(ov.event, 0) == _winapi.WAIT_OBJECT_0:
_, err = ov.GetOverlappedResult(True)
data = ov.getbuffer()
if data:
conns[i].buffer.extend(data)
ready.append(i)
else:
ov.cancel()

return ready

else:
connections = [conn.connection for conn in conns]
ready, _, _ = select(connections, [], [], timeout)
return [connections.index(r) for r in ready]


def send(connection: IPCBase, data: IPCMessage) -> None:
Expand Down