Skip to content

Commit 8df066a

Browse files
authored
Merge pull request #1194 from ably/AIT-276/apply-on-ack
[AIT-276] feat: introduce ACK-based local application of LiveObjects ops
2 parents 16f03d1 + 33855f8 commit 8df066a

18 files changed

Lines changed: 1192 additions & 108 deletions

File tree

lib/src/main/java/io/ably/lib/transport/ConnectionManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1318,6 +1318,7 @@ private synchronized void onConnected(ProtocolMessage message) {
13181318
maxIdleInterval = connectionDetails.maxIdleInterval;
13191319
connectionStateTtl = connectionDetails.connectionStateTtl;
13201320
maxMessageSize = connectionDetails.maxMessageSize;
1321+
siteCode = connectionDetails.siteCode; // CD2j
13211322

13221323
/* set the clientId resolved from token, if any */
13231324
String clientId = connectionDetails.clientId;
@@ -2033,6 +2034,7 @@ private boolean isFatalError(ErrorInfo err) {
20332034
private CMConnectivityListener connectivityListener;
20342035
private long connectionStateTtl = Defaults.connectionStateTtl;
20352036
public int maxMessageSize = Defaults.maxMessageSize;
2037+
public String siteCode; // CD2j
20362038
long maxIdleInterval = Defaults.maxIdleInterval;
20372039
private int disconnectedRetryAttempt = 0;
20382040

lib/src/main/java/io/ably/lib/types/ConnectionDetails.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ public class ConnectionDetails {
7373
* Spec: CD2f, RTN14e, DF1a
7474
*/
7575
public Long connectionStateTtl;
76+
/**
77+
* An opaque string identifying the server instance that the client is connected to.
78+
* Used as a key in siteTimeserials maps for LiveObjects operations.
79+
* <p>
80+
* Spec: CD2j
81+
*/
82+
public String siteCode;
7683

7784
ConnectionDetails() {
7885
maxIdleInterval = Defaults.maxIdleInterval;
@@ -114,6 +121,9 @@ ConnectionDetails readMsgpack(MessageUnpacker unpacker) throws IOException {
114121
case "connectionStateTtl":
115122
connectionStateTtl = unpacker.unpackLong();
116123
break;
124+
case "siteCode":
125+
siteCode = unpacker.unpackString();
126+
break;
117127
default:
118128
Log.v(TAG, "Unexpected field: " + fieldName);
119129
unpacker.skipValue();

liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt

Lines changed: 78 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import io.ably.lib.objects.type.map.LiveMapValue
1212
import io.ably.lib.realtime.ChannelState
1313
import io.ably.lib.types.AblyException
1414
import io.ably.lib.types.ProtocolMessage
15+
import io.ably.lib.types.PublishResult
1516
import io.ably.lib.util.Log
1617
import kotlinx.coroutines.*
1718
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
@@ -31,6 +32,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
3132

3233
internal var state = ObjectsState.Initialized
3334

35+
/**
36+
* Set of serials for operations applied locally upon ACK, awaiting deduplication of the server echo.
37+
* @spec RTO7b, RTO7b1
38+
*/
39+
internal val appliedOnAckSerials = mutableSetOf<String>()
40+
3441
/**
3542
* @spec RTO4 - Used for handling object messages and object sync messages
3643
*/
@@ -125,13 +132,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
125132
)
126133
)
127134

128-
// RTO11g - Publish the message
129-
publish(arrayOf(msg))
135+
// RTO11i - publish and apply locally on ACK
136+
publishAndApply(arrayOf(msg))
130137

131-
// RTO11h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope
132-
return objectsPool.get(objectId) as? LiveMap ?: withContext(sequentialScope.coroutineContext) {
133-
objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveMap
134-
}
138+
// RTO11h2 - Return existing object if found after apply
139+
return objectsPool.get(objectId) as? LiveMap
140+
?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d
135141
}
136142

137143
private suspend fun createCounterAsync(initialValue: Number): LiveCounter {
@@ -161,13 +167,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
161167
)
162168
)
163169

164-
// RTO12g - Publish the message
165-
publish(arrayOf(msg))
170+
// RTO12i - publish and apply locally on ACK
171+
publishAndApply(arrayOf(msg))
166172

167-
// RTO12h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope
168-
return objectsPool.get(objectId) as? LiveCounter ?: withContext(sequentialScope.coroutineContext) {
169-
objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveCounter
170-
}
173+
// RTO12h2 - Return existing object if found after apply
174+
return objectsPool.get(objectId) as? LiveCounter
175+
?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d
171176
}
172177

173178
/**
@@ -182,15 +187,55 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
182187
/**
183188
* Spec: RTO15
184189
*/
185-
internal suspend fun publish(objectMessages: Array<ObjectMessage>) {
190+
internal suspend fun publish(objectMessages: Array<ObjectMessage>): PublishResult {
186191
// RTO15b, RTL6c - Ensure that the channel is in a valid state for publishing
187192
adapter.throwIfUnpublishableState(channelName)
188193
adapter.ensureMessageSizeWithinLimit(objectMessages)
189194
// RTO15e - Must construct the ProtocolMessage as per RTO15e1, RTO15e2, RTO15e3
190195
val protocolMessage = ProtocolMessage(ProtocolMessage.Action.`object`, channelName)
191196
protocolMessage.state = objectMessages
192197
// RTO15f, RTO15g - Send the ProtocolMessage using the adapter and capture success/failure
193-
adapter.sendAsync(protocolMessage)
198+
return adapter.sendAsync(protocolMessage) // RTO15h
199+
}
200+
201+
/**
202+
* Publishes the given object messages and, upon receiving the ACK, immediately applies them
203+
* locally as synthetic inbound messages using the assigned serial and connection's siteCode.
204+
*
205+
* Spec: RTO20
206+
*/
207+
internal suspend fun publishAndApply(objectMessages: Array<ObjectMessage>) {
208+
// RTO20b - publish, propagate failure
209+
val publishResult = publish(objectMessages)
210+
211+
// RTO20c - validate required info
212+
val siteCode = adapter.connectionManager.siteCode
213+
if (siteCode == null) {
214+
Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed")
215+
return
216+
}
217+
val serials = publishResult.serials
218+
if (serials == null || serials.size != objectMessages.size) {
219+
Log.e(tag, "RTO20c2: PublishResult.serials unavailable or wrong length; operations will be applied when echoed")
220+
return
221+
}
222+
223+
// RTO20d - create synthetic inbound ObjectMessages
224+
val syntheticMessages = mutableListOf<ObjectMessage>()
225+
objectMessages.forEachIndexed { i, msg ->
226+
val serial = serials[i]
227+
if (serial == null) {
228+
Log.d(tag, "RTO20d1: serial null at index $i (conflated), skipping")
229+
return@forEachIndexed
230+
}
231+
syntheticMessages.add(msg.copy(serial = serial, siteCode = siteCode)) // RTO20d2a, RTO20d2b, RTO20d3
232+
}
233+
if (syntheticMessages.isEmpty()) return
234+
235+
// RTO20e, RTO20f - dispatch to sequential scope for ordering
236+
withContext(sequentialScope.coroutineContext) {
237+
objectsManager.applyAckResult(syntheticMessages) // suspends if SYNCING (RTO20e), applies on SYNCED (RTO20f)
238+
}
194239
}
195240

196241
/**
@@ -268,16 +313,30 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
268313
objectsManager.clearBufferedObjectOperations() // RTO4b5
269314
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
270315
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
271-
objectsManager.endSync(fromInitializedState) // RTO4b4
316+
objectsManager.endSync() // RTO4b4
272317
}
273318
}
274319
ChannelState.detached,
320+
ChannelState.suspended,
275321
ChannelState.failed -> {
276-
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
277-
objectsPool.clearObjectsData(false)
278-
objectsManager.clearSyncObjectsDataPool()
322+
val errorReason = try {
323+
adapter.getChannel(channelName).reason
324+
} catch (e: Exception) {
325+
null
326+
}
327+
val error = ablyException(
328+
"publishAndApply could not be applied locally: channel entered $state whilst waiting for objects sync",
329+
ErrorCode.PublishAndApplyFailedDueToChannelState,
330+
HttpStatusCode.BadRequest,
331+
cause = errorReason?.let { AblyException.fromErrorInfo(it) }
332+
)
333+
objectsManager.failBufferedAcks(error) // RTO20e1
334+
if (state != ChannelState.suspended) {
335+
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
336+
objectsPool.clearObjectsData(false)
337+
objectsManager.clearSyncObjectsDataPool()
338+
}
279339
}
280-
281340
else -> {
282341
// No action needed for other states
283342
}

liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ internal enum class ErrorCode(public val code: Int) {
1111
// Channel mode and state validation error codes
1212
ChannelModeRequired(40_024),
1313
ChannelStateError(90_001),
14+
PublishAndApplyFailedDueToChannelState(92_008),
1415
}
1516

1617
internal enum class HttpStatusCode(public val code: Int) {

liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 62 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import io.ably.lib.objects.type.BaseRealtimeObject
44
import io.ably.lib.objects.type.ObjectUpdate
55
import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
66
import io.ably.lib.objects.type.livemap.DefaultLiveMap
7+
import io.ably.lib.types.AblyException
78
import io.ably.lib.util.Log
9+
import kotlinx.coroutines.CompletableDeferred
810

911
/**
1012
* @spec RTO5 - Processes OBJECT and OBJECT_SYNC messages during sync sequences
@@ -21,6 +23,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
2123
* @spec RTO7 - Buffered object operations during sync
2224
*/
2325
private val bufferedObjectOperations = mutableListOf<ObjectMessage>() // RTO7a
26+
private var syncCompletionWaiter: CompletableDeferred<Unit>? = null
2427

2528
/**
2629
* Handles object messages (non-sync messages).
@@ -39,7 +42,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
3942
}
4043

4144
// Apply messages immediately if synced
42-
applyObjectMessages(objectMessages) // RTO8b
45+
applyObjectMessages(objectMessages, ObjectsOperationSource.CHANNEL) // RTO8b
4346
}
4447

4548
/**
@@ -62,7 +65,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
6265
if (syncTracker.hasSyncEnded()) {
6366
// defer the state change event until the next tick if this was a new sync sequence
6467
// to allow any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
65-
endSync(isNewSync)
68+
endSync()
6669
}
6770
}
6871

@@ -78,25 +81,48 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
7881
bufferedObjectOperations.clear() // RTO5a2b
7982
syncObjectsDataPool.clear() // RTO5a2a
8083
currentSyncId = syncId
81-
stateChange(ObjectsState.Syncing, false)
84+
syncCompletionWaiter = CompletableDeferred()
85+
stateChange(ObjectsState.Syncing)
8286
}
8387

8488
/**
8589
* Ends the current sync sequence.
8690
*
8791
* @spec RTO5c - Applies sync data and buffered operations
8892
*/
89-
internal fun endSync(deferStateEvent: Boolean) {
93+
internal fun endSync() {
9094
Log.v(tag, "Ending sync sequence")
91-
applySync()
92-
// should apply buffered object operations after we applied the sync.
93-
// can use regular non-sync object.operation logic
94-
applyObjectMessages(bufferedObjectOperations) // RTO5c6
95-
96-
bufferedObjectOperations.clear() // RTO5c5
97-
syncObjectsDataPool.clear() // RTO5c4
98-
currentSyncId = null // RTO5c3
99-
stateChange(ObjectsState.Synced, deferStateEvent)
95+
applySync() // RTO5c1/2/7
96+
applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6
97+
bufferedObjectOperations.clear() // RTO5c5
98+
syncObjectsDataPool.clear() // RTO5c4
99+
currentSyncId = null // RTO5c3
100+
realtimeObjects.appliedOnAckSerials.clear() // RTO5c9
101+
stateChange(ObjectsState.Synced) // RTO5c8
102+
syncCompletionWaiter?.complete(Unit)
103+
syncCompletionWaiter = null
104+
}
105+
106+
/**
107+
* Called from publishAndApply (via withContext sequentialScope).
108+
* If SYNCED: apply immediately with LOCAL source.
109+
* If not SYNCED: suspend until endSync transitions to SYNCED (RTO20e), then apply.
110+
*/
111+
internal suspend fun applyAckResult(messages: List<ObjectMessage>) {
112+
if (realtimeObjects.state != ObjectsState.Synced) {
113+
if (syncCompletionWaiter == null) syncCompletionWaiter = CompletableDeferred()
114+
syncCompletionWaiter?.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1)
115+
}
116+
applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f
117+
}
118+
119+
/**
120+
* Fails all pending apply waiters.
121+
* Called when the channel enters DETACHED/SUSPENDED/FAILED (RTO20e1).
122+
*/
123+
internal fun failBufferedAcks(error: AblyException) {
124+
syncCompletionWaiter?.completeExceptionally(error)
125+
syncCompletionWaiter = null
100126
}
101127

102128
/**
@@ -162,7 +188,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
162188
*
163189
* @spec RTO9 - Creates zero-value objects if they don't exist
164190
*/
165-
private fun applyObjectMessages(objectMessages: List<ObjectMessage>) {
191+
private fun applyObjectMessages(
192+
objectMessages: List<ObjectMessage>,
193+
source: ObjectsOperationSource = ObjectsOperationSource.CHANNEL,
194+
) {
166195
// RTO9a
167196
for (objectMessage in objectMessages) {
168197
if (objectMessage.operation == null) {
@@ -177,14 +206,30 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
177206
Log.w(tag, "Object operation action is unknown, skipping message: ${objectMessage.id}")
178207
continue
179208
}
209+
210+
// RTO9a3 - skip operations already applied on ACK (discard without taking any further action).
211+
// This check comes before zero-value object creation (RTO9a2a1) so that no zero-value object is
212+
// created for an objectId not yet in the pool when the echo is being discarded.
213+
// Note: siteTimeserials is NOT updated here intentionally — updating it to the echo's serial would
214+
// incorrectly reject older-but-unprocessed operations from the same site that arrive after the echo.
215+
if (objectMessage.serial != null &&
216+
realtimeObjects.appliedOnAckSerials.contains(objectMessage.serial)) {
217+
Log.d(tag, "RTO9a3: serial ${objectMessage.serial} already applied on ACK; discarding echo")
218+
realtimeObjects.appliedOnAckSerials.remove(objectMessage.serial)
219+
continue // discard without taking any further action
220+
}
221+
180222
// RTO9a2a - we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
181223
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
182224
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,
183225
// since they need to be able to eventually initialize themselves from that *_CREATE op.
184226
// so to simplify operations handling, we always try to create a zero-value object in the pool first,
185227
// and then we can always apply the operation on the existing object in the pool.
186228
val obj = realtimeObjects.objectsPool.createZeroValueObjectIfNotExists(objectOperation.objectId) // RTO9a2a1
187-
obj.applyObject(objectMessage) // RTO9a2a2, RTO9a2a3
229+
val applied = obj.applyObject(objectMessage, source) // RTO9a2a2, RTO9a2a3
230+
if (source == ObjectsOperationSource.LOCAL && applied && objectMessage.serial != null) {
231+
realtimeObjects.appliedOnAckSerials.add(objectMessage.serial) // RTO9a2a4
232+
}
188233
}
189234
}
190235

@@ -228,7 +273,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
228273
*
229274
* @spec RTO2 - Emits state change events for syncing and synced states
230275
*/
231-
private fun stateChange(newState: ObjectsState, deferEvent: Boolean) {
276+
private fun stateChange(newState: ObjectsState) {
232277
if (realtimeObjects.state == newState) {
233278
return
234279
}
@@ -240,6 +285,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
240285
}
241286

242287
internal fun dispose() {
288+
syncCompletionWaiter?.cancel()
243289
syncObjectsDataPool.clear()
244290
bufferedObjectOperations.clear()
245291
disposeObjectsStateListeners()
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.ably.lib.objects
2+
3+
/** @spec RTO22 */
4+
internal enum class ObjectsOperationSource {
5+
LOCAL, // RTO22a - applied upon receipt of ACK
6+
CHANNEL // RTO22b - received over a Realtime channel
7+
}

0 commit comments

Comments
 (0)