Skip to content

Commit 67c2b62

Browse files
committed
[ECO-5457] Refactored ObjectsStateSubscription to ObjectsSubscription
- Updated usage doc for the same
1 parent 33ff9e2 commit 67c2b62

5 files changed

Lines changed: 26 additions & 16 deletions

File tree

lib/src/main/java/io/ably/lib/objects/state/ObjectsStateSubscription.java renamed to lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1-
package io.ably.lib.objects.state;
1+
package io.ably.lib.objects;
22

33
/**
4-
* Represents a subscription that can be unsubscribed from.
5-
* This interface provides a way to clean up and remove subscriptions when they
6-
* are no longer needed.
4+
* Represents a objects subscription that can be unsubscribed from.
5+
* This interface provides a way to clean up and remove subscriptions when they are no longer needed.
76
* Example usage:
8-
* ```java
9-
* ObjectsStateSubscription s = objects.subscribe(ObjectsStateEvent.SYNCING, new ObjectsStateListener() {});
7+
* <pre>
8+
* {@code
9+
* ObjectsSubscription s = objects.subscribe(ObjectsStateEvent.SYNCING, new ObjectsStateListener() {});
1010
* // Later when done with the subscription
1111
* s.unsubscribe();
12+
* }
13+
* </pre>
1214
*/
13-
public interface ObjectsStateSubscription {
15+
public interface ObjectsSubscription {
1416
/**
1517
* This method should be called when the subscription is no longer needed,
1618
* it will make sure no further events will be sent to the subscriber and

lib/src/main/java/io/ably/lib/objects/state/ObjectsStateChange.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.ably.lib.objects.state;
22

3+
import io.ably.lib.objects.ObjectsSubscription;
34
import org.jetbrains.annotations.NonBlocking;
45
import org.jetbrains.annotations.NotNull;
56

@@ -16,7 +17,7 @@ public interface ObjectsStateChange {
1617
* @return a subscription object that can be used to unsubscribe from the event
1718
*/
1819
@NonBlocking
19-
ObjectsStateSubscription on(@NotNull ObjectsStateEvent event, @NotNull ObjectsStateChange.Listener listener);
20+
ObjectsSubscription on(@NotNull ObjectsStateEvent event, @NotNull ObjectsStateChange.Listener listener);
2021

2122
/**
2223
* Unsubscribes the specified listener from all synchronization state events.

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package io.ably.lib.objects
22

33
import io.ably.lib.objects.state.ObjectsStateChange
44
import io.ably.lib.objects.state.ObjectsStateEvent
5-
import io.ably.lib.objects.state.ObjectsStateSubscription
65
import io.ably.lib.realtime.ChannelState
76
import io.ably.lib.types.Callback
87
import io.ably.lib.types.ProtocolMessage
@@ -38,7 +37,8 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
3837
/**
3938
* Coroutine scope for handling callbacks asynchronously.
4039
*/
41-
private val callbackScope = CoroutineScope(Dispatchers.Default + CoroutineName("LiveObjectsCallback-$channelName"))
40+
private val callbackScope =
41+
CoroutineScope(Dispatchers.Default + CoroutineName("LiveObjectsCallback-$channelName") + SupervisorJob())
4242

4343
/**
4444
* Event bus for handling incoming object messages sequentially.
@@ -90,7 +90,7 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
9090
TODO("Not yet implemented")
9191
}
9292

93-
override fun on(event: ObjectsStateEvent, listener: ObjectsStateChange.Listener): ObjectsStateSubscription =
93+
override fun on(event: ObjectsStateEvent, listener: ObjectsStateChange.Listener): ObjectsSubscription =
9494
objectsManager.on(event, listener)
9595

9696
override fun off(listener: ObjectsStateChange.Listener) = objectsManager.off(listener)

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsState.kt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package io.ably.lib.objects
22

33
import io.ably.lib.objects.state.ObjectsStateChange
44
import io.ably.lib.objects.state.ObjectsStateEvent
5-
import io.ably.lib.objects.state.ObjectsStateSubscription
65
import io.ably.lib.util.EventEmitter
76
import io.ably.lib.util.Log
87
import kotlinx.coroutines.*
@@ -66,9 +65,9 @@ internal abstract class ObjectsStateCoordinator : ObjectsStateChange, HandlesObj
6665

6766
private val emitterScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + SupervisorJob())
6867

69-
override fun on(event: ObjectsStateEvent, listener: ObjectsStateChange.Listener): ObjectsStateSubscription {
68+
override fun on(event: ObjectsStateEvent, listener: ObjectsStateChange.Listener): ObjectsSubscription {
7069
externalObjectStateEmitter.on(event, listener)
71-
return ObjectsStateSubscription {
70+
return ObjectsSubscription {
7271
externalObjectStateEmitter.off(event, listener)
7372
}
7473
}
@@ -105,7 +104,12 @@ internal abstract class ObjectsStateCoordinator : ObjectsStateChange, HandlesObj
105104
}
106105

107106
private class ObjectsStateEmitter : EventEmitter<ObjectsStateEvent, ObjectsStateChange.Listener>() {
107+
private val tag = "ObjectsStateEmitter"
108108
override fun apply(listener: ObjectsStateChange.Listener?, event: ObjectsStateEvent?, vararg args: Any?) {
109-
listener?.onStateChanged(event!!)
109+
try {
110+
listener?.onStateChanged(event!!)
111+
} catch (t: Throwable) {
112+
Log.e(tag, "Error occurred while executing listener callback for event: $event", t)
113+
}
110114
}
111115
}

live-objects/src/main/kotlin/io/ably/lib/objects/Utils.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package io.ably.lib.objects
33
import io.ably.lib.types.AblyException
44
import io.ably.lib.types.Callback
55
import io.ably.lib.types.ErrorInfo
6+
import io.ably.lib.util.Log
67
import kotlinx.coroutines.*
78

89
internal fun ablyException(
@@ -62,7 +63,9 @@ internal fun <T> CoroutineScope.launchWithCallback(callback: Callback<T>, block:
6263
launch {
6364
try {
6465
val result = block()
65-
callback.onSuccess(result)
66+
try { callback.onSuccess(result) } catch (t: Throwable) {
67+
Log.e("asyncCallback", "Error occurred while executing callback's onSuccess handler", t)
68+
} // catch and don't rethrow error from callback
6669
} catch (throwable: Throwable) {
6770
val exception = throwable as? AblyException
6871
callback.onError(exception?.errorInfo)

0 commit comments

Comments
 (0)