Skip to content

Commit 391f451

Browse files
committed
[AIT-453] feat: add MAP_CLEAR operation support
- Introduced support for the `MAP_CLEAR` operation across LiveMap components. - Updated `MsgpackSerialization` to handle `MAP_CLEAR` serialization/deserialization. - Added `clearTimeserial` tracking in `DefaultLiveMap` to skip outdated operations. - Enhanced `LiveMapManager` to process `MAP_CLEAR` and synchronize state updates. - Comprehensive unit tests added for various `MAP_CLEAR` scenarios.
1 parent f430594 commit 391f451

5 files changed

Lines changed: 284 additions & 2 deletions

File tree

liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ internal enum class ObjectOperationAction(val code: Int) {
2020
CounterCreate(3),
2121
CounterInc(4),
2222
ObjectDelete(5),
23+
MapClear(6),
2324
Unknown(-1); // code for unknown value during deserialization
2425
}
2526

@@ -109,6 +110,13 @@ internal data class CounterInc(
109110
*/
110111
internal object ObjectDelete
111112

113+
/**
114+
* Payload for MAP_CLEAR operation.
115+
* Spec: MCL*
116+
* No fields - action is sufficient
117+
*/
118+
internal object MapClear
119+
112120
/**
113121
* Payload for MAP_CREATE_WITH_OBJECT_ID operation.
114122
* Spec: MCRO*
@@ -176,7 +184,13 @@ internal data class ObjectsMap(
176184
* The map entries, indexed by key.
177185
* Spec: OMP3b
178186
*/
179-
val entries: Map<String, ObjectsMapEntry>? = null
187+
val entries: Map<String, ObjectsMapEntry>? = null,
188+
189+
/**
190+
* The serial value of the last MAP_CLEAR operation applied to the map.
191+
* Spec: OMP3c
192+
*/
193+
val clearTimeserial: String? = null,
180194
)
181195

182196
/**
@@ -255,6 +269,12 @@ internal data class ObjectOperation(
255269
* Spec: OOP3q
256270
*/
257271
val counterCreateWithObjectId: CounterCreateWithObjectId? = null,
272+
273+
/**
274+
* Payload for MAP_CLEAR operation.
275+
* Spec: OOP3r
276+
*/
277+
val mapClear: MapClear? = null,
258278
)
259279

260280
/**

liveobjects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import io.ably.lib.objects.MapCreate
1212
import io.ably.lib.objects.MapCreateWithObjectId
1313
import io.ably.lib.objects.MapRemove
1414
import io.ably.lib.objects.MapSet
15+
import io.ably.lib.objects.MapClear
1516
import io.ably.lib.objects.ObjectDelete
1617
import io.ably.lib.objects.ObjectsMapSemantics
1718
import io.ably.lib.objects.ObjectsCounter
@@ -174,6 +175,7 @@ private fun ObjectOperation.writeMsgpack(packer: MessagePacker) {
174175
if (objectDelete != null) fieldCount++
175176
if (mapCreateWithObjectId != null) fieldCount++
176177
if (counterCreateWithObjectId != null) fieldCount++
178+
if (mapClear != null) fieldCount++
177179

178180
packer.packMapHeader(fieldCount)
179181

@@ -224,6 +226,11 @@ private fun ObjectOperation.writeMsgpack(packer: MessagePacker) {
224226
counterCreateWithObjectId.writeMsgpack(packer)
225227
}
226228

229+
if (mapClear != null) {
230+
packer.packString("mapClear")
231+
packer.packMapHeader(0) // empty map, no fields
232+
}
233+
227234
}
228235

229236
/**
@@ -242,6 +249,7 @@ private fun readObjectOperation(unpacker: MessageUnpacker): ObjectOperation {
242249
var objectDelete: ObjectDelete? = null
243250
var mapCreateWithObjectId: MapCreateWithObjectId? = null
244251
var counterCreateWithObjectId: CounterCreateWithObjectId? = null
252+
var mapClear: MapClear? = null
245253

246254
for (i in 0 until fieldCount) {
247255
val fieldName = unpacker.unpackString().intern()
@@ -271,6 +279,10 @@ private fun readObjectOperation(unpacker: MessageUnpacker): ObjectOperation {
271279
}
272280
"mapCreateWithObjectId" -> mapCreateWithObjectId = readMapCreateWithObjectId(unpacker)
273281
"counterCreateWithObjectId" -> counterCreateWithObjectId = readCounterCreateWithObjectId(unpacker)
282+
"mapClear" -> {
283+
unpacker.skipValue() // empty map, consume it
284+
mapClear = MapClear
285+
}
274286
else -> unpacker.skipValue()
275287
}
276288
}
@@ -290,6 +302,7 @@ private fun readObjectOperation(unpacker: MessageUnpacker): ObjectOperation {
290302
objectDelete = objectDelete,
291303
mapCreateWithObjectId = mapCreateWithObjectId,
292304
counterCreateWithObjectId = counterCreateWithObjectId,
305+
mapClear = mapClear,
293306
)
294307
}
295308

@@ -631,6 +644,7 @@ private fun ObjectsMap.writeMsgpack(packer: MessagePacker) {
631644

632645
if (semantics != null) fieldCount++
633646
if (entries != null) fieldCount++
647+
if (clearTimeserial != null) fieldCount++
634648

635649
packer.packMapHeader(fieldCount)
636650

@@ -647,6 +661,11 @@ private fun ObjectsMap.writeMsgpack(packer: MessagePacker) {
647661
value.writeMsgpack(packer)
648662
}
649663
}
664+
665+
if (clearTimeserial != null) {
666+
packer.packString("clearTimeserial")
667+
packer.packString(clearTimeserial)
668+
}
650669
}
651670

652671
/**
@@ -657,6 +676,7 @@ private fun readObjectMap(unpacker: MessageUnpacker): ObjectsMap {
657676

658677
var semantics: ObjectsMapSemantics? = null
659678
var entries: Map<String, ObjectsMapEntry>? = null
679+
var clearTimeserial: String? = null
660680

661681
for (i in 0 until fieldCount) {
662682
val fieldName = unpacker.unpackString().intern()
@@ -684,11 +704,12 @@ private fun readObjectMap(unpacker: MessageUnpacker): ObjectsMap {
684704
}
685705
entries = tempMap
686706
}
707+
"clearTimeserial" -> clearTimeserial = unpacker.unpackString()
687708
else -> unpacker.skipValue()
688709
}
689710
}
690711

691-
return ObjectsMap(semantics = semantics, entries = entries)
712+
return ObjectsMap(semantics = semantics, entries = entries, clearTimeserial = clearTimeserial)
692713
}
693714

694715
/**

liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ internal class DefaultLiveMap private constructor(
3838
*/
3939
internal val data = ConcurrentHashMap<String, LiveMapEntry>()
4040

41+
/** @spec RTLM25 */
42+
internal var clearTimeserial: String? = null
43+
4144
/**
4245
* LiveMapManager instance for managing LiveMap operations
4346
*/
@@ -174,6 +177,7 @@ internal class DefaultLiveMap private constructor(
174177
}
175178

176179
override fun clearData(): LiveMapUpdate {
180+
clearTimeserial = null // RTLM4
177181
return liveMapManager.calculateUpdateFromDataDiff(data.toMap(), emptyMap())
178182
.apply { data.clear() }
179183
}

liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import io.ably.lib.objects.objectError
1212
import io.ably.lib.objects.type.map.LiveMapUpdate
1313
import io.ably.lib.objects.type.noOp
1414
import io.ably.lib.util.Log
15+
import io.ably.lib.objects.MapClear
1516

1617
internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChangeCoordinator() {
1718

@@ -41,6 +42,8 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
4142
)
4243
} // RTLM6c
4344

45+
liveMap.clearTimeserial = objectState.map?.clearTimeserial // RTLM6i
46+
4447
// RTLM6d
4548
objectState.createOp?.let { createOp ->
4649
mergeInitialDataFromCreateOperation(createOp)
@@ -83,6 +86,11 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
8386
liveMap.notifyUpdated(update)
8487
true // RTLM15d5b
8588
}
89+
ObjectOperationAction.MapClear -> {
90+
val update = applyMapClear(serial) // RTLM15d8
91+
liveMap.notifyUpdated(update) // RTLM15d8a
92+
true // RTLM15d8b
93+
}
8694
else -> {
8795
Log.w(tag, "Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4
8896
false
@@ -118,6 +126,14 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
118126
mapSet: MapSet, // RTLM7d1
119127
timeSerial: String?, // RTLM7d2
120128
): LiveMapUpdate {
129+
// RTLM7h - skip if operation is older than the last MAP_CLEAR
130+
val clearSerial = liveMap.clearTimeserial
131+
if (clearSerial != null && (timeSerial == null || clearSerial >= timeSerial)) {
132+
Log.v(tag,
133+
"Skipping MAP_SET for key=\"${mapSet.key}\": op serial $timeSerial <= clear serial $clearSerial; objectId=$objectId")
134+
return noOpMapUpdate
135+
}
136+
121137
val existingEntry = liveMap.data[mapSet.key]
122138

123139
// RTLM7a
@@ -170,6 +186,14 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
170186
timeSerial: String?, // RTLM8c2
171187
timeStamp: Long?, // RTLM8c3
172188
): LiveMapUpdate {
189+
// skip if operation is older than the last MAP_CLEAR
190+
val clearSerial = liveMap.clearTimeserial
191+
if (clearSerial != null && (timeSerial == null || clearSerial >= timeSerial)) {
192+
Log.v(tag,
193+
"Skipping MAP_REMOVE for key=\"${mapRemove.key}\": op serial $timeSerial <= clear serial $clearSerial; objectId=$objectId")
194+
return noOpMapUpdate
195+
}
196+
173197
val existingEntry = liveMap.data[mapRemove.key]
174198

175199
// RTLM8a
@@ -212,6 +236,39 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
212236
return LiveMapUpdate(mapOf(mapRemove.key to LiveMapUpdate.Change.REMOVED))
213237
}
214238

239+
/**
240+
* @spec RTLM24 - Applies MAP_CLEAR operation to LiveMap
241+
*/
242+
private fun applyMapClear(timeSerial: String?): LiveMapUpdate {
243+
val clearSerial = liveMap.clearTimeserial
244+
245+
// RTLM24a - skip if existing clear serial is strictly newer than incoming op serial
246+
if (clearSerial != null && (timeSerial == null || clearSerial > timeSerial)) {
247+
Log.v(tag,
248+
"Skipping MAP_CLEAR: op serial $timeSerial <= current clear serial $clearSerial; objectId=$objectId")
249+
return noOpMapUpdate
250+
}
251+
252+
Log.v(tag,
253+
"Updating clearTimeserial; previous=$clearSerial, new=$timeSerial; objectId=$objectId")
254+
liveMap.clearTimeserial = timeSerial // RTLM25
255+
256+
val update = mutableMapOf<String, LiveMapUpdate.Change>()
257+
258+
// RTLM24b - remove all entries whose serial is older than (or equal to missing) the clear serial
259+
val iter = liveMap.data.entries.iterator()
260+
while (iter.hasNext()) {
261+
val (key, entry) = iter.next()
262+
val entrySerial = entry.timeserial
263+
if (entrySerial == null || (timeSerial != null && timeSerial > entrySerial)) {
264+
iter.remove()
265+
update[key] = LiveMapUpdate.Change.REMOVED
266+
}
267+
}
268+
269+
return LiveMapUpdate(update)
270+
}
271+
215272
/**
216273
* For Lww CRDT semantics (the only supported LiveMap semantic) an operation
217274
* Should only be applied if incoming serial is strictly greater than existing entry's serial.

0 commit comments

Comments
 (0)