diff --git a/core/src/monitor.rs b/core/src/monitor.rs index 10b446b6..a324434b 100644 --- a/core/src/monitor.rs +++ b/core/src/monitor.rs @@ -74,16 +74,10 @@ impl Monitor { #[cfg(unix)] extern "C" fn sigurg_handler(_: libc::c_int) { if let Ok(mut set) = SigSet::thread_get_mask() { - //删除对SIGURG信号的屏蔽,使信号处理函数即使在处理中,也可以再次进入信号处理函数 - set.remove(Signal::SIGURG); - set.thread_set_mask() - .expect("Failed to remove SIGURG signal mask!"); - //不抢占处于Syscall状态的协程。 - //MonitorListener的设计理念是不对Syscall状态的协程发送信号。 + //MonitorListener的设计理念是只对Running状态的协程发送信号。 //但由于NOTIFY_NODE移除和monitor线程遍历之间存在竞态条件, //SIGURG可能在协程刚进入Syscall状态时到达。 - //如果此时抢占,协程会被放入syscall_map但无人唤醒(因为没有io_uring/epoll注册), - //导致死锁。 + //如果此时抢占,协程会被放入syscall_map但无人唤醒(因为没有io_uring/epoll注册), 导致死锁。 // Skip preemption for coroutines in Syscall state. // MonitorListener's design is to NOT send signals to Syscall-state // coroutines. However, a race between NOTIFY_NODE removal and the @@ -92,10 +86,14 @@ impl Monitor { // coroutine lands in the syscall map with no io_uring/epoll/timer // registration to wake it, causing a deadlock. if let Some(co) = SchedulableCoroutine::current() { - if matches!(co.state(), CoroutineState::Syscall((), _, _)) { + if !matches!(co.state(), CoroutineState::Running) { return; } } + //删除对SIGURG信号的屏蔽,使信号处理函数即使在处理中,也可以再次进入信号处理函数 + set.remove(Signal::SIGURG); + set.thread_set_mask() + .expect("Failed to remove SIGURG signal mask!"); if let Some(suspender) = SchedulableSuspender::current() { suspender.suspend(); } @@ -177,14 +175,25 @@ impl Monitor { let monitor = Self::get_instance(); Self::init_current(monitor); let notify_queue = unsafe { &*monitor.notify_queue.get() }; + //先收集超时节点快照,再逐个检查是否仍在队列中 + //(在收集和检查之间,on_state_changed可能已将节点移除——协程进入了Syscall状态) + let mut expired = Vec::new(); while MonitorState::Running == monitor.state.get() || !notify_queue.is_empty() { //只遍历,不删除,如果抢占调度失败,会在1ms后不断重试,相当于主动检测 - for node in notify_queue { - if now() < node.timestamp { - continue; - } + expired.clear(); + let current = now(); + expired.extend( + notify_queue + .iter() + .filter(|n| current >= n.timestamp) + .copied(), + ); + for node in &expired { //实际上只对陷入重度计算的协程发送信号抢占 //对于陷入执行系统调用的协程不发送信号(如果发送信号,会打断系统调用,进而降低总体性能) + if !notify_queue.contains(node) { + continue; + } cfg_if::cfg_if! { if #[cfg(unix)] { if pthread_kill(node.pthread, Signal::SIGURG).is_err() { @@ -519,58 +528,27 @@ std::arch::global_asm!( "ret", ); -// Thread-local flag for two-level preemption on Windows. -// Level 1: SuspendThread fires, do_preempt sets this flag and returns -// without switching coroutines — the thread continues executing -// and exits any critical section (heap allocation, IO, etc.). -// If it reaches a hooked syscall, the Nio/Iocp layer will call -// Suspender::suspend_with cooperatively. -// Level 2: If the flag is still set on the next SuspendThread (~1ms later), -// the coroutine is truly CPU-bound with no syscalls — do_preempt -// forces an immediate context switch. -#[cfg(windows)] -thread_local! { - static PREEMPT_PENDING: Cell = const { Cell::new(false) }; -} - #[cfg(windows)] extern "C" fn do_preempt() { - PREEMPT_PENDING.with(|flag| { - if flag.get() { - // Flag was already set from a previous SuspendThread attempt but the - // coroutine never yielded (no hooked syscalls) — it is truly CPU-bound. - // Force immediate suspension. - flag.set(false); - //不抢占处于Syscall状态的协程。 - //MonitorListener的设计理念是不对Syscall状态的协程发送信号。 - //但由于NOTIFY_NODE移除和monitor线程遍历之间存在竞态条件, - //SIGURG可能在协程刚进入Syscall状态时到达。 - //如果此时抢占,协程会被放入syscall_map但无人唤醒(因为没有io_uring/epoll注册), - //导致死锁。 - // Skip preemption for coroutines in Syscall state. - // MonitorListener's design is to NOT send signals to Syscall-state - // coroutines. However, a race between NOTIFY_NODE removal and the - // monitor's queue iteration can cause SIGURG to arrive just after - // the coroutine entered Syscall state. If preempted here, the - // coroutine lands in the syscall map with no io_uring/epoll/timer - // registration to wake it, causing a deadlock. - if let Some(co) = SchedulableCoroutine::current() { - if matches!(co.state(), CoroutineState::Syscall((), _, _)) { - return; - } - } - if let Some(suspender) = SchedulableSuspender::current() { - suspender.suspend(); - } - } else { - // First attempt: set the flag and return without suspending. - // preempt_asm will restore all registers and return to the original - // code. This gives the thread time to exit any critical section. - // If the coroutine reaches a hooked syscall, the Nio/Iocp layer - // will yield cooperatively via Suspender::suspend_with. - flag.set(true); + //MonitorListener的设计理念是只对Running状态的协程发送信号。 + //但由于NOTIFY_NODE移除和monitor线程遍历之间存在竞态条件, + //SIGURG可能在协程刚进入Syscall状态时到达。 + //如果此时抢占,协程会被放入syscall_map但无人唤醒(因为没有io_uring/epoll注册),导致死锁。 + // Skip preemption for coroutines in Syscall state. + // MonitorListener's design is to NOT send signals to Syscall-state + // coroutines. However, a race between NOTIFY_NODE removal and the + // monitor's queue iteration can cause SIGURG to arrive just after + // the coroutine entered Syscall state. If preempted here, the + // coroutine lands in the syscall map with no io_uring/epoll/timer + // registration to wake it, causing a deadlock. + if let Some(co) = SchedulableCoroutine::current() { + if !matches!(co.state(), CoroutineState::Running) { + return; } - }); + } + if let Some(suspender) = SchedulableSuspender::current() { + suspender.suspend(); + } } #[repr(C)] @@ -690,14 +668,6 @@ mod tests { assert_ne!(thread_id, 0, "Thread should have reported its ID"); // Directly call preempt_thread to preempt the running coroutine. - // Two-level preemption: the first call sets a cooperative flag (the - // coroutine continues running), the second call forces suspension. - assert!( - super::Monitor::preempt_thread(thread_id), - "preempt_thread should succeed (set cooperative flag)" - ); - // Allow the first preempt_asm to complete before the second call - std::thread::sleep(Duration::from_millis(1)); assert!( super::Monitor::preempt_thread(thread_id), "preempt_thread should succeed (force suspend)"