-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSynchronousQueueMS.kt
More file actions
92 lines (77 loc) · 2.97 KB
/
SynchronousQueueMS.kt
File metadata and controls
92 lines (77 loc) · 2.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
abstract class Node(val next: AtomicReference<Node>)
class Receiver<E>(
val action: Continuation<E>,
next: AtomicReference<Node> = AtomicReference<Node>()
) : Node(next)
class Sender<E>(
val element: E,
val action: Continuation<Unit>,
next: AtomicReference<Node> = AtomicReference<Node>()
) : Node(next)
class Dummy() : Node(AtomicReference<Node>())
class SynchronousQueueMS<E> : SynchronousQueue<E> {
val dummy = Dummy()
val head = AtomicReference<Node>(dummy)
val tail = AtomicReference<Node>(dummy)
override suspend fun send(element: E) {
while (true) {
val curTail = tail.get()
if (head.get() != curTail && curTail !is Sender<*>) {
val curHead = head.get()
var nextTwoHead = curHead.next.get()
if (nextTwoHead !is Receiver<*> || head.get() == tail.get()) {
continue
}
nextTwoHead = nextTwoHead as Receiver<E>
if (head.compareAndSet(curHead, nextTwoHead) && curHead != tail.get()) {
nextTwoHead.action.resume(element)
return
}
} else {
val result = suspendCoroutine<Any> l@ { continuation ->
val newTail = Sender(element, continuation)
if (curTail.next.compareAndSet(null, newTail)) {
tail.compareAndSet(curTail, newTail)
} else {
continuation.resume("again")
return@l
}
}
if (result == "again") continue
return
}
}
}
override suspend fun receive(): E {
while (true){
val curTail = tail.get()
if (head.get() != curTail && curTail !is Receiver<*>) {
val curHead = head.get()
var nextTwoHead = curHead.next.get()
if (nextTwoHead !is Sender<*> || head.get() == tail.get()) {
continue
}
nextTwoHead = nextTwoHead as Sender<E>
if (head.compareAndSet(curHead, nextTwoHead) && curHead != tail.get()) {
nextTwoHead.action.resume(Unit)
return nextTwoHead.element
}
} else {
val result = suspendCoroutine<E?> l@ { continuation ->
val newTail = Receiver(continuation)
if (curTail.next.compareAndSet(null, newTail)) {
tail.compareAndSet(curTail, newTail)
} else {
continuation.resume(null)
return@l
}
} ?: continue
return result
}
}
}
}