From b56b9fcaaa922869848650c15fc0384556a0b84d Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 27 Aug 2025 18:53:40 +0530 Subject: [PATCH 1/5] [ECO-5056] use server-provided GC grace period for garbage collection - Add support for server-configured objectsGCGracePeriod from connection details, with fallback to default 24-hour period. Updates ObjectsPool and all LiveObjects - Implementations to use the server-provided value for tombstone cleanup timing. --- .../java/io/ably/lib/objects/Adapter.java | 6 ++-- .../io/ably/lib/objects/ObjectsAdapter.java | 8 ++--- .../ably/lib/transport/ConnectionManager.java | 2 ++ .../io/ably/lib/types/ConnectionDetails.java | 8 +++++ .../kotlin/io/ably/lib/objects/Helpers.kt | 20 ++++++++++--- .../kotlin/io/ably/lib/objects/ObjectsPool.kt | 18 ++++++++++-- .../lib/objects/type/BaseRealtimeObject.kt | 29 +++++++++++++++---- .../type/livecounter/DefaultLiveCounter.kt | 2 +- .../objects/type/livemap/DefaultLiveMap.kt | 4 +-- .../lib/objects/type/livemap/LiveMapEntry.kt | 5 ++-- .../io/ably/lib/objects/unit/HelpersTest.kt | 20 +++++++++---- .../lib/objects/unit/ObjectMessageSizeTest.kt | 18 +++++++++--- 12 files changed, 106 insertions(+), 34 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/objects/Adapter.java b/lib/src/main/java/io/ably/lib/objects/Adapter.java index e9a084ae7..76c35cc37 100644 --- a/lib/src/main/java/io/ably/lib/objects/Adapter.java +++ b/lib/src/main/java/io/ably/lib/objects/Adapter.java @@ -2,7 +2,7 @@ import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.ChannelBase; -import io.ably.lib.transport.ConnectionManager; +import io.ably.lib.realtime.Connection; import io.ably.lib.types.AblyException; import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ErrorInfo; @@ -23,8 +23,8 @@ public Adapter(@NotNull AblyRealtime ably) { } @Override - public @NotNull ConnectionManager getConnectionManager() { - return ably.connection.connectionManager; + public @NotNull Connection getConnection() { + return ably.connection; } @Override diff --git a/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java b/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java index 21262942a..b6054e71a 100644 --- a/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java +++ b/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java @@ -1,7 +1,7 @@ package io.ably.lib.objects; import io.ably.lib.realtime.ChannelBase; -import io.ably.lib.transport.ConnectionManager; +import io.ably.lib.realtime.Connection; import io.ably.lib.types.AblyException; import io.ably.lib.types.ClientOptions; import org.jetbrains.annotations.Blocking; @@ -18,13 +18,13 @@ public interface ObjectsAdapter { @NotNull ClientOptions getClientOptions(); /** - * Retrieves the connection manager for handling connection state and operations. + * Retrieves the connection instance for handling connection state and operations. * Used to check connection status, obtain error information, and manage * message transmission across the Ably connection. * - * @return the connection manager instance + * @return the connection instance */ - @NotNull ConnectionManager getConnectionManager(); + @NotNull Connection getConnection(); /** * Retrieves the current time in milliseconds from the Ably server. diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 3fab010cb..94ca2b822 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -104,6 +104,7 @@ public class ConnectionManager implements ConnectListener { * This field is initialized only if the LiveObjects plugin is present in the classpath. */ private final LiveObjectsPlugin liveObjectsPlugin; + public Long objectsGCGracePeriod = null; /** * Methods on the channels map owned by the {@link AblyRealtime} instance @@ -1319,6 +1320,7 @@ private synchronized void onConnected(ProtocolMessage message) { connectionStateTtl = connectionDetails.connectionStateTtl; maxMessageSize = connectionDetails.maxMessageSize; siteCode = connectionDetails.siteCode; // CD2j + objectsGCGracePeriod = connectionDetails.objectsGCGracePeriod; /* set the clientId resolved from token, if any */ String clientId = connectionDetails.clientId; diff --git a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java index 587b9241f..6a557a12a 100644 --- a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java +++ b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java @@ -81,6 +81,11 @@ public class ConnectionDetails { */ public String siteCode; + /** + * The duration in milliseconds used to retain tombstoned objects at client side. + */ + public Long objectsGCGracePeriod; + ConnectionDetails() { maxIdleInterval = Defaults.maxIdleInterval; connectionStateTtl = Defaults.connectionStateTtl; @@ -124,6 +129,9 @@ ConnectionDetails readMsgpack(MessageUnpacker unpacker) throws IOException { case "siteCode": siteCode = unpacker.unpackString(); break; + case "objectsGCGracePeriod": + objectsGCGracePeriod = unpacker.unpackLong(); + break; default: Log.v(TAG, "Unexpected field: " + fieldName); unpacker.skipValue(); diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 6a855868c..a3a1138e1 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -3,6 +3,7 @@ package io.ably.lib.objects import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.CompletionListener import io.ably.lib.types.Callback +import io.ably.lib.realtime.ConnectionEvent import io.ably.lib.types.ChannelMode import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage @@ -17,7 +18,7 @@ import kotlin.coroutines.resumeWithException */ internal suspend fun ObjectsAdapter.sendAsync(message: ProtocolMessage): PublishResult = suspendCancellableCoroutine { continuation -> try { - connectionManager.send(message, clientOptions.queueMessages, object : Callback { + connection.connectionManager.send(message, clientOptions.queueMessages, object : Callback { override fun onSuccess(result: PublishResult) { continuation.resume(result) } @@ -47,6 +48,17 @@ internal suspend fun ObjectsAdapter.attachAsync(channelName: String) = suspendCa } } +internal fun ObjectsAdapter.retrieveObjectsGCGracePeriod(block : (Long?) -> Unit) { + val connectionManager = connection.connectionManager + if (connectionManager.objectsGCGracePeriod != null) { + block(connectionManager.objectsGCGracePeriod) + return + } + connection.once(ConnectionEvent.connected) { + block(connectionManager.objectsGCGracePeriod) + } +} + /** * Retrieves the channel modes for a specific channel. * This method returns the modes that are set for the specified channel. @@ -78,7 +90,7 @@ internal fun ObjectsAdapter.getChannelModes(channelName: String): Array) { - val maximumAllowedSize = connectionManager.maxMessageSize + val maximumAllowedSize = connection.connectionManager.maxMessageSize val objectsTotalMessageSize = objectMessages.sumOf { it.size() } if (objectsTotalMessageSize > maximumAllowedSize) { throw ablyException("ObjectMessages size $objectsTotalMessageSize exceeds maximum allowed size of $maximumAllowedSize bytes", @@ -133,8 +145,8 @@ internal fun ObjectsAdapter.throwIfInvalidWriteApiConfiguration(channelName: Str } internal fun ObjectsAdapter.throwIfUnpublishableState(channelName: String) { - if (!connectionManager.isActive) { - throw ablyException(connectionManager.stateErrorInfo) + if (!connection.connectionManager.isActive) { + throw ablyException(connection.connectionManager.stateErrorInfo) } throwIfInChannelState(channelName, arrayOf(ChannelState.failed, ChannelState.suspended)) } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt index 28ee839e0..43cab31c2 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt @@ -14,6 +14,9 @@ import java.util.concurrent.ConcurrentHashMap internal object ObjectsPoolDefaults { const val GC_INTERVAL_MS = 1000L * 60 * 5 // 5 minutes /** + * The SDK will attempt to use the `objectsGCGracePeriod` value provided by the server in the `connectionDetails` + * object of the `CONNECTED` event. + * If the server does not provide this value, the SDK will fall back to this default value. * Must be > 2 minutes to ensure we keep tombstones long enough to avoid the possibility of receiving an operation * with an earlier serial that would not have been applied if the tombstone still existed. * @@ -49,10 +52,19 @@ internal class ObjectsPool( private val gcScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) private var gcJob: Job // Job for the garbage collection coroutine + @Volatile + private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS + init { // RTO3b - Initialize pool with root object pool[ROOT_OBJECT_ID] = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, realtimeObjects) - // Start garbage collection coroutine + // Start garbage collection coroutine with server-provided grace period if available + realtimeObjects.adapter.retrieveObjectsGCGracePeriod { period -> + period?.let { + gcGracePeriod = it + Log.i(tag, "Using objectsGCGracePeriod from server: $gcGracePeriod ms") + } ?: Log.i(tag, "Server did not provide objectsGCGracePeriod, using default: $gcGracePeriod ms") + } gcJob = startGCJob() } @@ -123,9 +135,9 @@ internal class ObjectsPool( */ private fun onGCInterval() { pool.entries.removeIf { (_, obj) -> - if (obj.isEligibleForGc()) { true } // Remove from pool + if (obj.isEligibleForGc(gcGracePeriod)) { true } // Remove from pool else { - obj.onGCInterval() + obj.onGCInterval(gcGracePeriod) false // Keep in pool } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt index c6f602b5c..2eca29b55 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt @@ -4,7 +4,6 @@ import io.ably.lib.objects.ObjectMessage import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState import io.ably.lib.objects.ObjectsOperationSource -import io.ably.lib.objects.ObjectsPoolDefaults import io.ably.lib.objects.objectError import io.ably.lib.objects.type.livecounter.noOpCounterUpdate import io.ably.lib.objects.type.livemap.noOpMapUpdate @@ -138,10 +137,20 @@ internal abstract class BaseRealtimeObject( /** * Checks if the object is eligible for garbage collection. + * + * An object is eligible for garbage collection if it has been tombstoned and + * the time since tombstoning exceeds the specified grace period. + * + * @param gcGracePeriod The grace period in milliseconds that tombstoned objects + * should be kept before being eligible for collection. + * This value is retrieved from the server's connection details + * or defaults to 24 hours if not provided by the server. + * @return true if the object is tombstoned and the grace period has elapsed, + * false otherwise */ - internal fun isEligibleForGc(): Boolean { + internal fun isEligibleForGc(gcGracePeriod: Long): Boolean { val currentTime = System.currentTimeMillis() - return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true + return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true } /** @@ -198,12 +207,22 @@ internal abstract class BaseRealtimeObject( /** * Called during garbage collection intervals to clean up expired entries. * + * This method is invoked periodically (every 5 minutes) by the ObjectsPool + * to perform cleanup of tombstoned data that has exceeded the grace period. + * * This method should identify and remove entries that: * - Have been marked as tombstoned - * - Have a tombstone timestamp older than the configured grace period + * - Have a tombstone timestamp older than the specified grace period + * + * @param gcGracePeriod The grace period in milliseconds that tombstoned entries + * should be kept before being eligible for removal. + * This value is retrieved from the server's connection details + * or defaults to 24 hours if not provided by the server. + * Must be greater than 2 minutes to ensure proper operation + * ordering and avoid issues with delayed operations. * * Implementations typically use single-pass removal techniques to * efficiently clean up expired data without creating temporary collections. */ - abstract fun onGCInterval() + abstract fun onGCInterval(gcGracePeriod: Long) } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt index d242a9bd3..4f1ef28e5 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt @@ -111,7 +111,7 @@ internal class DefaultLiveCounter private constructor( liveCounterManager.notify(update as LiveCounterUpdate) } - override fun onGCInterval() { + override fun onGCInterval(gcGracePeriod: Long) { // Nothing to GC for a counter object return } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt index 493a74f07..8e9746d6e 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt @@ -190,8 +190,8 @@ internal class DefaultLiveMap private constructor( liveMapManager.notify(update as LiveMapUpdate) } - override fun onGCInterval() { - data.entries.removeIf { (_, entry) -> entry.isEligibleForGc() } + override fun onGCInterval(gcGracePeriod: Long) { + data.entries.removeIf { (_, entry) -> entry.isEligibleForGc(gcGracePeriod) } } companion object { diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt index df2259583..f12e88d88 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt @@ -3,7 +3,6 @@ package io.ably.lib.objects.type.livemap import io.ably.lib.objects.* import io.ably.lib.objects.ObjectData import io.ably.lib.objects.ObjectsPool -import io.ably.lib.objects.ObjectsPoolDefaults import io.ably.lib.objects.type.BaseRealtimeObject import io.ably.lib.objects.type.ObjectType import io.ably.lib.objects.type.counter.LiveCounter @@ -73,9 +72,9 @@ internal fun LiveMapEntry.getResolvedValue(objectsPool: ObjectsPool): LiveMapVal /** * Extension function to check if a LiveMapEntry is expired and ready for garbage collection */ -internal fun LiveMapEntry.isEligibleForGc(): Boolean { +internal fun LiveMapEntry.isEligibleForGc(gcGracePeriod: Long): Boolean { val currentTime = System.currentTimeMillis() - return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true + return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true } private fun fromRealtimeObject(realtimeObject: BaseRealtimeObject): LiveMapValue { diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index 5750046b0..222d1cb25 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -25,7 +25,9 @@ class HelpersTest { val connManager = mockk(relaxed = true) val clientOptions = ClientOptions().apply { queueMessages = false } - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -48,7 +50,9 @@ class HelpersTest { val connManager = mockk(relaxed = true) val clientOptions = ClientOptions() - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -69,7 +73,9 @@ class HelpersTest { val connManager = mockk(relaxed = true) val clientOptions = ClientOptions() - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } throws RuntimeException("send failed hard") @@ -346,7 +352,9 @@ class HelpersTest { fun testThrowIfUnpublishableStateInactiveConnection() { val adapter = mockk(relaxed = true) val connManager = mockk(relaxed = true) - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { connManager.isActive } returns false every { connManager.stateErrorInfo } returns serverError("not active").errorInfo @@ -359,7 +367,9 @@ class HelpersTest { fun testThrowIfUnpublishableStateChannelFailed() { val adapter = mockk(relaxed = true) val connManager = mockk(relaxed = true) - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { connManager.isActive } returns true val channel = mockk(relaxed = true) every { adapter.getChannel("ch") } returns channel diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt index 57e8f45ea..be6f6e797 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt @@ -14,8 +14,10 @@ import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectOperationAction import io.ably.lib.objects.ensureMessageSizeWithinLimit import io.ably.lib.objects.size +import io.ably.lib.transport.ConnectionManager import io.ably.lib.transport.Defaults import io.ably.lib.types.AblyException +import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.test.runTest import org.junit.Test @@ -27,8 +29,12 @@ class ObjectMessageSizeTest { @Test fun testObjectMessageSizeWithinLimit() = runTest { val mockAdapter = mockk(relaxed = true) - mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb - assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) + val connManager = mockk(relaxed = true) + every { mockAdapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } + connManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, connManager.maxMessageSize) // ObjectMessage with all size-contributing fields val objectMessage = ObjectMessage( @@ -158,8 +164,12 @@ class ObjectMessageSizeTest { @Test fun testObjectMessageSizeAboveLimit() = runTest { val mockAdapter = mockk(relaxed = true) - mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb - assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) + val connManager = mockk(relaxed = true) + every { mockAdapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } + connManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, connManager.maxMessageSize) // Create ObjectMessage with dummy data that results in size 60kb val objectMessage1 = ObjectMessage( From 8f1c5e2a0a5506f93193046bdb6b007489979cbc Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 28 Aug 2025 14:44:49 +0530 Subject: [PATCH 2/5] [ECO-5056] Updated test helper getMockObjectsAdapter with static mocck 1. Encouraged use of global mocck for Adapter 2. Updated failing tests accordingly 3. Added unit tests covering all cases for retrieveObjectsGCGracePeriod --- .../kotlin/io/ably/lib/objects/Helpers.kt | 19 ++- .../io/ably/lib/objects/unit/HelpersTest.kt | 128 +++++++++++------- .../lib/objects/unit/ObjectMessageSizeTest.kt | 24 +--- .../io/ably/lib/objects/unit/TestHelpers.kt | 10 +- .../objects/unit/objects/ObjectsPoolTest.kt | 8 +- 5 files changed, 109 insertions(+), 80 deletions(-) diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index a3a1138e1..520dfe557 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -4,6 +4,7 @@ import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.CompletionListener import io.ably.lib.types.Callback import io.ably.lib.realtime.ConnectionEvent +import io.ably.lib.realtime.ConnectionState import io.ably.lib.types.ChannelMode import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage @@ -13,12 +14,14 @@ import kotlinx.coroutines.suspendCancellableCoroutine import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException +internal val ObjectsAdapter.connectionManager get() = connection.connectionManager + /** * Spec: RTO15g */ internal suspend fun ObjectsAdapter.sendAsync(message: ProtocolMessage): PublishResult = suspendCancellableCoroutine { continuation -> try { - connection.connectionManager.send(message, clientOptions.queueMessages, object : Callback { + connectionManager.send(message, clientOptions.queueMessages, object : Callback { override fun onSuccess(result: PublishResult) { continuation.resume(result) } @@ -49,8 +52,12 @@ internal suspend fun ObjectsAdapter.attachAsync(channelName: String) = suspendCa } internal fun ObjectsAdapter.retrieveObjectsGCGracePeriod(block : (Long?) -> Unit) { - val connectionManager = connection.connectionManager - if (connectionManager.objectsGCGracePeriod != null) { + connectionManager.objectsGCGracePeriod?.let { + block(it) + return + } + // If already connected, no further `connected` event is guaranteed; return immediately. + if (connection.state == ConnectionState.connected) { block(connectionManager.objectsGCGracePeriod) return } @@ -90,7 +97,7 @@ internal fun ObjectsAdapter.getChannelModes(channelName: String): Array) { - val maximumAllowedSize = connection.connectionManager.maxMessageSize + val maximumAllowedSize = connectionManager.maxMessageSize val objectsTotalMessageSize = objectMessages.sumOf { it.size() } if (objectsTotalMessageSize > maximumAllowedSize) { throw ablyException("ObjectMessages size $objectsTotalMessageSize exceeds maximum allowed size of $maximumAllowedSize bytes", @@ -145,8 +152,8 @@ internal fun ObjectsAdapter.throwIfInvalidWriteApiConfiguration(channelName: Str } internal fun ObjectsAdapter.throwIfUnpublishableState(channelName: String) { - if (!connection.connectionManager.isActive) { - throw ablyException(connection.connectionManager.stateErrorInfo) + if (!connectionManager.isActive) { + throw ablyException(connectionManager.stateErrorInfo) } throwIfInChannelState(channelName, arrayOf(ChannelState.failed, ChannelState.suspended)) } diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index 222d1cb25..b47db1a53 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -5,12 +5,10 @@ import io.ably.lib.realtime.Channel import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.ChannelStateListener import io.ably.lib.realtime.CompletionListener -import io.ably.lib.transport.ConnectionManager +import io.ably.lib.realtime.ConnectionEvent +import io.ably.lib.realtime.ConnectionStateListener import io.ably.lib.types.* -import io.mockk.every -import io.mockk.mockk -import io.mockk.slot -import io.mockk.verify +import io.mockk.* import kotlinx.coroutines.test.runTest import org.junit.Assert.* import org.junit.Test @@ -21,13 +19,10 @@ class HelpersTest { // sendAsync @Test fun testSendAsyncShouldQueueAccordingToClientOptions() = runTest { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager val clientOptions = ClientOptions().apply { queueMessages = false } - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -46,13 +41,10 @@ class HelpersTest { @Test fun testSendAsyncErrorPropagatesAblyException() = runTest { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager val clientOptions = ClientOptions() - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -67,15 +59,59 @@ class HelpersTest { assertEquals(40000, ex.errorInfo.code) } + @Test + fun testRetrieveObjectsGCGracePeriodImmediateInvokesBlock() { + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager + connManager.setPrivateField("objectsGCGracePeriod", 123L) + + var value: Long? = null + adapter.retrieveObjectsGCGracePeriod { v -> value = v } + + assertEquals(123L, value) + verify(exactly = 0) { adapter.connection.once(ConnectionEvent.connected, any()) } + } + + @Test + fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithValue() { + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager + + var value: Long? = null + every { adapter.connection.once(ConnectionEvent.connected, any()) } answers { + val listener = secondArg() + connManager.setPrivateField("objectsGCGracePeriod", 456L) + listener.onConnectionStateChanged(mockk(relaxed = true)) + } + + adapter.retrieveObjectsGCGracePeriod { v -> value = v } + + assertEquals(456L, value) + verify(exactly = 1) { adapter.connection.once(ConnectionEvent.connected, any()) } + } + + @Test + fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithNull() { + val adapter = getMockObjectsAdapter() + + var value: Long? = null + every { adapter.connection.once(ConnectionEvent.connected, any()) } answers { + val listener = secondArg() + listener.onConnectionStateChanged(mockk(relaxed = true)) + } + + adapter.retrieveObjectsGCGracePeriod { v -> value = v } + + assertNull(value) + verify(exactly = 1) { adapter.connection.once(ConnectionEvent.connected, any()) } + } + @Test fun testSendAsyncThrowsWhenConnectionManagerThrows() = runTest { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager val clientOptions = ClientOptions() - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } throws RuntimeException("send failed hard") @@ -251,25 +287,25 @@ class HelpersTest { verify(exactly = 1) { channel.once(any()) } } - @Test - fun testEnsureAttachedAttachingButReceivesNonAttachedEmitsError() = runTest { - val adapter = mockk(relaxed = true) - val channel = mockk(relaxed = true) - every { adapter.getChannel("ch") } returns channel - channel.state = ChannelState.attaching - every { channel.once(any()) } answers { - val listener = firstArg() - val stateChange = mockk(relaxed = true) { - setPrivateField("current", ChannelState.suspended) - setPrivateField("reason", clientError("Not attached").errorInfo) - } - listener.onChannelStateChanged(stateChange) - } - val ex = assertFailsWith { adapter.ensureAttached("ch") } - assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) - assertTrue(ex.errorInfo.message.contains("Not attached")) - verify(exactly = 1) { channel.once(any()) } - } + @Test + fun testEnsureAttachedAttachingButReceivesNonAttachedEmitsError() = runTest { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.attaching + every { channel.once(any()) } answers { + val listener = firstArg() + val stateChange = mockk(relaxed = true) { + setPrivateField("current", ChannelState.suspended) + setPrivateField("reason", clientError("Not attached").errorInfo) + } + listener.onChannelStateChanged(stateChange) + } + val ex = assertFailsWith { adapter.ensureAttached("ch") } + assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) + assertTrue(ex.errorInfo.message.contains("Not attached")) + verify(exactly = 1) { channel.once(any()) } + } @Test fun testEnsureAttachedThrowsForInvalidState() = runTest { @@ -350,11 +386,8 @@ class HelpersTest { // throwIfUnpublishableState @Test fun testThrowIfUnpublishableStateInactiveConnection() { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager every { connManager.isActive } returns false every { connManager.stateErrorInfo } returns serverError("not active").errorInfo @@ -365,11 +398,8 @@ class HelpersTest { @Test fun testThrowIfUnpublishableStateChannelFailed() { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager every { connManager.isActive } returns true val channel = mockk(relaxed = true) every { adapter.getChannel("ch") } returns channel diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt index be6f6e797..4c413649e 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt @@ -14,27 +14,19 @@ import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectOperationAction import io.ably.lib.objects.ensureMessageSizeWithinLimit import io.ably.lib.objects.size -import io.ably.lib.transport.ConnectionManager import io.ably.lib.transport.Defaults import io.ably.lib.types.AblyException -import io.mockk.every -import io.mockk.mockk import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith class ObjectMessageSizeTest { - @Test fun testObjectMessageSizeWithinLimit() = runTest { - val mockAdapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { mockAdapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } - connManager.maxMessageSize = Defaults.maxMessageSize // 64 kb - assertEquals(65536, connManager.maxMessageSize) + val mockAdapter = getMockObjectsAdapter() + mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) // ObjectMessage with all size-contributing fields val objectMessage = ObjectMessage( @@ -163,13 +155,9 @@ class ObjectMessageSizeTest { @Test fun testObjectMessageSizeAboveLimit() = runTest { - val mockAdapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { mockAdapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } - connManager.maxMessageSize = Defaults.maxMessageSize // 64 kb - assertEquals(65536, connManager.maxMessageSize) + val mockAdapter = getMockObjectsAdapter() + mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) // Create ObjectMessage with dummy data that results in size 60kb val objectMessage1 = ObjectMessage( diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt index 56e4b2d20..17be76951 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt @@ -11,11 +11,13 @@ import io.ably.lib.objects.type.livemap.LiveMapManager import io.ably.lib.realtime.AblyRealtime import io.ably.lib.realtime.Channel import io.ably.lib.realtime.ChannelState +import io.ably.lib.transport.ConnectionManager import io.ably.lib.types.ChannelMode import io.ably.lib.types.ChannelOptions import io.ably.lib.types.ClientOptions import io.mockk.every import io.mockk.mockk +import io.mockk.mockkStatic import io.mockk.spyk import kotlinx.coroutines.CompletableDeferred @@ -46,9 +48,11 @@ internal fun getMockRealtimeChannel( } internal fun getMockObjectsAdapter(): ObjectsAdapter { - val mockkAdapter = mockk(relaxed = true) - every { mockkAdapter.getChannel(any()) } returns getMockRealtimeChannel("testChannelName") - return mockkAdapter + mockkStatic("io.ably.lib.objects.HelpersKt") + return mockk(relaxed = true) { + every { getChannel(any()) } returns getMockRealtimeChannel("testChannelName") + every { connectionManager } returns mockk(relaxed = true) + } } internal fun getMockObjectsPool(): ObjectsPool { diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt index 656b1e7c1..aff4f9d1a 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt @@ -19,7 +19,7 @@ class ObjectsPoolTest { @Test fun `(RTO3, RTO3a, RTO3b) An internal ObjectsPool should be used to maintain the list of objects present on a channel`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = defaultRealtimeObjects.objectsPool assertNotNull(objectsPool) @@ -44,7 +44,7 @@ class ObjectsPoolTest { @Test fun `(RTO6) ObjectsPool should create zero-value objects if not exists`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = spyk(defaultRealtimeObjects.objectsPool) assertEquals(1, objectsPool.size(), "RTO3 - Should only contain the root object initially") @@ -78,7 +78,7 @@ class ObjectsPoolTest { @Test fun `(RTO4b1, RTO4b2) ObjectsPool should reset to initial pool retaining original root map`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = defaultRealtimeObjects.objectsPool assertEquals(1, objectsPool.size()) val rootMap = objectsPool.get(ROOT_OBJECT_ID) as DefaultLiveMap @@ -107,7 +107,7 @@ class ObjectsPoolTest { @Test fun `(RTO5c2, RTO5c2a) ObjectsPool should delete extra object IDs`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = defaultRealtimeObjects.objectsPool // Add some objects From 5d6d3bcb6ab1aae5fa29ce1867a868b0e7043b67 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 28 Aug 2025 16:25:01 +0530 Subject: [PATCH 3/5] [ECO-5056] Added test for getObjects when liveobjects plugin is not installed --- .../test/realtime/RealtimeChannelTest.java | 23 +++++++++++++++++++ .../io/ably/lib/objects/unit/HelpersTest.kt | 10 ++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index a64f96daa..5350f0515 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -2617,6 +2617,29 @@ public void connect_should_not_rewrite_immediate_attach() throws AblyException { } } + @Test + public void channel_get_objects_throws_exception() throws AblyException { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + try (AblyRealtime ably = new AblyRealtime(opts)) { + + /* wait until connected */ + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); + assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected); + + /* create a channel and attach */ + final Channel channel = ably.channels.get("channel"); + channel.attach(); + new ChannelWaiter(channel).waitFor(ChannelState.attached); + assertEquals("Verify attached state reached", channel.state, ChannelState.attached); + + AblyException exception = assertThrows(AblyException.class, channel::getObjects); + assertNotNull(exception); + assertEquals(40019, exception.errorInfo.code); + assertEquals(400, exception.errorInfo.statusCode); + assertTrue(exception.errorInfo.message.contains("LiveObjects plugin hasn't been installed")); + } + } + static class DetachingProtocolListener implements DebugOptions.RawProtocolListener { public Channel theChannel; diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index b47db1a53..95795ca0f 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -76,9 +76,10 @@ class HelpersTest { fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithValue() { val adapter = getMockObjectsAdapter() val connManager = adapter.connectionManager + val connection = adapter.connection var value: Long? = null - every { adapter.connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.once(ConnectionEvent.connected, any()) } answers { val listener = secondArg() connManager.setPrivateField("objectsGCGracePeriod", 456L) listener.onConnectionStateChanged(mockk(relaxed = true)) @@ -87,15 +88,16 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertEquals(456L, value) - verify(exactly = 1) { adapter.connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } } @Test fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithNull() { val adapter = getMockObjectsAdapter() + val connection = adapter.connection var value: Long? = null - every { adapter.connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.once(ConnectionEvent.connected, any()) } answers { val listener = secondArg() listener.onConnectionStateChanged(mockk(relaxed = true)) } @@ -103,7 +105,7 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertNull(value) - verify(exactly = 1) { adapter.connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } } @Test From ed77613bd880f364da67514c328974c3cabf40f2 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Aug 2025 16:32:43 +0530 Subject: [PATCH 4/5] [ECO-5056] Refactored HelpersTest.kt assertions with concrete types --- .../io/ably/lib/objects/unit/HelpersTest.kt | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index 95795ca0f..bd05e0579 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -69,7 +69,7 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertEquals(123L, value) - verify(exactly = 0) { adapter.connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 0) { adapter.connection.once(ConnectionEvent.connected, any()) } } @Test @@ -79,7 +79,7 @@ class HelpersTest { val connection = adapter.connection var value: Long? = null - every { connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.once(ConnectionEvent.connected, any()) } answers { val listener = secondArg() connManager.setPrivateField("objectsGCGracePeriod", 456L) listener.onConnectionStateChanged(mockk(relaxed = true)) @@ -88,7 +88,7 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertEquals(456L, value) - verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } } @Test @@ -97,7 +97,7 @@ class HelpersTest { val connection = adapter.connection var value: Long? = null - every { connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.once(ConnectionEvent.connected, any()) } answers { val listener = secondArg() listener.onConnectionStateChanged(mockk(relaxed = true)) } @@ -105,7 +105,7 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertNull(value) - verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } } @Test @@ -277,7 +277,7 @@ class HelpersTest { every { adapter.getChannel("ch") } returns channel channel.state = ChannelState.attaching - every { channel.once(any()) } answers { + every { channel.once(any()) } answers { val listener = firstArg() val stateChange = mockk(relaxed = true) { setPrivateField("current", ChannelState.attached) @@ -286,7 +286,7 @@ class HelpersTest { } adapter.ensureAttached("ch") - verify(exactly = 1) { channel.once(any()) } + verify(exactly = 1) { channel.once(any()) } } @Test @@ -295,7 +295,7 @@ class HelpersTest { val channel = mockk(relaxed = true) every { adapter.getChannel("ch") } returns channel channel.state = ChannelState.attaching - every { channel.once(any()) } answers { + every { channel.once(any()) } answers { val listener = firstArg() val stateChange = mockk(relaxed = true) { setPrivateField("current", ChannelState.suspended) @@ -306,7 +306,7 @@ class HelpersTest { val ex = assertFailsWith { adapter.ensureAttached("ch") } assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) assertTrue(ex.errorInfo.message.contains("Not attached")) - verify(exactly = 1) { channel.once(any()) } + verify(exactly = 1) { channel.once(any()) } } @Test From 06a2157bf3bf1f4e869bed9f34df54c99b29e908 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 25 Sep 2025 17:57:13 +0530 Subject: [PATCH 5/5] [ECO-5056] Updated retrieveObjectsGCGracePeriod to return gcperiod every time CONNECTED message is received, updated tests for the same --- .../kotlin/io/ably/lib/objects/Helpers.kt | 19 ++++++---------- .../kotlin/io/ably/lib/objects/ObjectsPool.kt | 7 +++--- .../io/ably/lib/objects/unit/HelpersTest.kt | 22 +++++++++---------- 3 files changed, 22 insertions(+), 26 deletions(-) diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 520dfe557..683971510 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -4,7 +4,7 @@ import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.CompletionListener import io.ably.lib.types.Callback import io.ably.lib.realtime.ConnectionEvent -import io.ably.lib.realtime.ConnectionState +import io.ably.lib.realtime.ConnectionStateListener import io.ably.lib.types.ChannelMode import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage @@ -51,19 +51,14 @@ internal suspend fun ObjectsAdapter.attachAsync(channelName: String) = suspendCa } } -internal fun ObjectsAdapter.retrieveObjectsGCGracePeriod(block : (Long?) -> Unit) { - connectionManager.objectsGCGracePeriod?.let { - block(it) - return - } - // If already connected, no further `connected` event is guaranteed; return immediately. - if (connection.state == ConnectionState.connected) { - block(connectionManager.objectsGCGracePeriod) - return - } - connection.once(ConnectionEvent.connected) { +internal fun ObjectsAdapter.onGCGracePeriodUpdated(block : (Long?) -> Unit) : ObjectsSubscription { + connectionManager.objectsGCGracePeriod?.let { block(it) } + // Return new objectsGCGracePeriod whenever connection state changes to connected + val listener: (_: ConnectionStateListener.ConnectionStateChange) -> Unit = { block(connectionManager.objectsGCGracePeriod) } + connection.on(ConnectionEvent.connected, listener) + return ObjectsSubscription { connection.off(listener) } } /** diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt index 43cab31c2..224cd606f 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt @@ -52,14 +52,14 @@ internal class ObjectsPool( private val gcScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) private var gcJob: Job // Job for the garbage collection coroutine - @Volatile - private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS + @Volatile private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS + private var gcPeriodSubscription: ObjectsSubscription init { // RTO3b - Initialize pool with root object pool[ROOT_OBJECT_ID] = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, realtimeObjects) // Start garbage collection coroutine with server-provided grace period if available - realtimeObjects.adapter.retrieveObjectsGCGracePeriod { period -> + gcPeriodSubscription = realtimeObjects.adapter.onGCGracePeriodUpdated { period -> period?.let { gcGracePeriod = it Log.i(tag, "Using objectsGCGracePeriod from server: $gcGracePeriod ms") @@ -164,6 +164,7 @@ internal class ObjectsPool( * Should be called when the pool is no longer needed. */ fun dispose() { + gcPeriodSubscription.unsubscribe() gcJob.cancel() gcScope.cancel() pool.clear() diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index bd05e0579..21f5c6792 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -60,52 +60,52 @@ class HelpersTest { } @Test - fun testRetrieveObjectsGCGracePeriodImmediateInvokesBlock() { + fun testOnGCGracePeriodImmediateInvokesBlock() { val adapter = getMockObjectsAdapter() val connManager = adapter.connectionManager connManager.setPrivateField("objectsGCGracePeriod", 123L) var value: Long? = null - adapter.retrieveObjectsGCGracePeriod { v -> value = v } + adapter.onGCGracePeriodUpdated { v -> value = v } assertEquals(123L, value) - verify(exactly = 0) { adapter.connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { adapter.connection.on(ConnectionEvent.connected, any()) } } @Test - fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithValue() { + fun testOnGCGracePeriodDeferredInvokesOnConnectedWithValue() { val adapter = getMockObjectsAdapter() val connManager = adapter.connectionManager val connection = adapter.connection var value: Long? = null - every { connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.on(ConnectionEvent.connected, any()) } answers { val listener = secondArg() connManager.setPrivateField("objectsGCGracePeriod", 456L) listener.onConnectionStateChanged(mockk(relaxed = true)) } - adapter.retrieveObjectsGCGracePeriod { v -> value = v } + adapter.onGCGracePeriodUpdated { v -> value = v } assertEquals(456L, value) - verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.on(ConnectionEvent.connected, any()) } } @Test - fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithNull() { + fun testOnGCGracePeriodDeferredInvokesOnConnectedWithNull() { val adapter = getMockObjectsAdapter() val connection = adapter.connection var value: Long? = null - every { connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.on(ConnectionEvent.connected, any()) } answers { val listener = secondArg() listener.onConnectionStateChanged(mockk(relaxed = true)) } - adapter.retrieveObjectsGCGracePeriod { v -> value = v } + adapter.onGCGracePeriodUpdated { v -> value = v } assertNull(value) - verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.on(ConnectionEvent.connected, any()) } } @Test