Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
}
}

/// Returns the number of buffered object operations if in the SYNCING state, or nil otherwise.
internal var testsOnly_bufferedObjectOperationsCount: Int? {
mutableStateMutex.withSync { mutableState in
if case let .syncing(syncingData) = mutableState.state {
syncingData.bufferedObjectOperations.count
} else {
nil
}
}
}

// These drive the testsOnly_waitingForSyncEvents property that informs the test suite when `getRoot()` is waiting for the object sync sequence to complete per RTO1c.
private let waitingForSyncEvents: AsyncStream<Void>
private let waitingForSyncEventsContinuation: AsyncStream<Void>.Continuation
Expand Down Expand Up @@ -728,8 +739,12 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
onChannelAttachedHasObjects = hasObjects

// We will subsequently transition to .synced either by the completion of the RTO4a OBJECT_SYNC, or by the RTO4b no-HAS_OBJECTS case below
if state.toObjectsSyncState != .syncing {
// RTO4c
switch state {
case let .syncing(syncingData):
// RTO4d
syncingData.bufferedObjectOperations = []
case .initialized, .synced:
// RTO4c, RTO4d
transition(to: .syncing(.init(bufferedObjectOperations: [], syncSequence: nil)), userCallbackQueue: userCallbackQueue)
}

Expand All @@ -743,7 +758,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO

// I have, for now, not directly implemented the "perform the actions for object sync completion" of RTO4b4 since my implementation doesn't quite match the model given there; here you only have a SyncObjectsPool if you have an OBJECT_SYNC in progress, which you might not have upon receiving an ATTACHED. Instead I've just implemented what seem like the relevant side effects. Can revisit this if "the actions for object sync completion" get more complex.

// RTO4b3, RTO4b4, RTO4b5, RTO5c3, RTO5c4, RTO5c5, RTO5c9, RTO5c8
// RTO4b3, RTO4b4, RTO5c3, RTO5c4, RTO5c5, RTO5c9, RTO5c8
appliedOnAckSerials.removeAll()
transition(to: .synced, userCallbackQueue: userCallbackQueue)

Expand Down Expand Up @@ -784,9 +799,8 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
// Figure out whether to continue any existing sync sequence or start a new one
let isNewSyncSequence = syncCursor == nil || syncingData.syncSequence?.id != syncCursor?.sequenceID
if isNewSyncSequence {
// RTO5a2a, RTO5a2b: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3
// RTO5a2a: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3
syncingData.syncSequence = nil
syncingData.bufferedObjectOperations = []
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ struct InternalDefaultRealtimeObjectsTests {

// @spec RTO5a2
// @spec RTO5a2a
// @spec RTO5a2b
@Test
func newSequenceIdDiscardsInFlightSync() async throws {
let internalQueue = TestFactories.createInternalQueue()
Expand All @@ -145,13 +144,6 @@ struct InternalDefaultRealtimeObjectsTests {

#expect(realtimeObjects.testsOnly_hasSyncSequence)

// Inject an OBJECT; it will get buffered per RTO8a and subsequently discarded per RTO5a2b
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [
TestFactories.mapCreateOperationMessage(objectId: "map:3@789"),
])
}

// Start new sequence with different ID (RTO5a2)
let secondMessages = [TestFactories.simpleMapMessage(objectId: "map:2@456")]
internalQueue.ably_syncNoDeadlock {
Expand All @@ -175,7 +167,6 @@ struct InternalDefaultRealtimeObjectsTests {
// Verify only the second sequence's objects were applied (RTO5a2a - previous cleared)
let pool = realtimeObjects.testsOnly_objectsPool
#expect(pool.entries["map:1@123"] == nil) // From discarded first sequence
#expect(pool.entries["map:3@789"] == nil) // Check we discarded the OBJECT that was buffered during discarded first sequence (RTO5a2b)
#expect(pool.entries["map:2@456"] != nil) // From completed second sequence
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
}
Expand Down Expand Up @@ -366,9 +357,10 @@ struct InternalDefaultRealtimeObjectsTests {
struct OnChannelAttachedTests {
// MARK: - RTO4a Tests

// @spec RTO4a - Checks that when the `HAS_OBJECTS` flag is 1 (i.e. the server will shortly perform an `OBJECT_SYNC` sequence) we don't modify any internal state
// @spec RTO4a - Checks that when the `HAS_OBJECTS` flag is 1 (i.e. the server will shortly perform an `OBJECT_SYNC` sequence) we don't modify the objects pool or sync sequence
// @specOneOf(1/2) RTO4d
@Test
func doesNotModifyStateWhenHasObjectsIsTrue() {
func handlesHasObjectTrue() {
let internalQueue = TestFactories.createInternalQueue()
let realtimeObjects = InternalDefaultRealtimeObjectsTests.createDefaultRealtimeObjects(internalQueue: internalQueue)

Expand All @@ -389,12 +381,21 @@ struct InternalDefaultRealtimeObjectsTests {

#expect(realtimeObjects.testsOnly_hasSyncSequence)

// Inject a buffered OBJECT operation
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [
TestFactories.mapCreateOperationMessage(objectId: "map:buffered@789"),
])
}

#expect(realtimeObjects.testsOnly_bufferedObjectOperationsCount == 1)

// When: onChannelAttached is called with hasObjects = true
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: true)
}

// Then: Nothing should be modified
// Then:
#expect(realtimeObjects.testsOnly_onChannelAttachedHasObjects == true)

// Verify ObjectsPool is unchanged
Expand All @@ -405,6 +406,9 @@ struct InternalDefaultRealtimeObjectsTests {

// Verify sync sequence is still active
#expect(realtimeObjects.testsOnly_hasSyncSequence)

// RTO4d: Verify buffered object operations were cleared
#expect(realtimeObjects.testsOnly_bufferedObjectOperationsCount == 0)
}

// MARK: - RTO4b Tests
Expand All @@ -414,7 +418,7 @@ struct InternalDefaultRealtimeObjectsTests {
// @spec RTO4b2a
// @spec RTO4b3
// @spec RTO4b4
// @spec RTO4b5
// @specOneOf(2/2) RTO4d
@available(iOS 17.0.0, tvOS 17.0.0, *)
@Test
func handlesHasObjectsFalse() async throws {
Expand Down Expand Up @@ -490,7 +494,7 @@ struct InternalDefaultRealtimeObjectsTests {
#expect(newRoot as AnyObject === originalPool.root as AnyObject) // Should be same instance
#expect(newRoot.testsOnly_data.isEmpty) // Should be zero-valued (empty)

// RTO4b3, RTO4b4, RTO4b5: SyncObjectsPool must be cleared, sync sequence cleared, BufferedObjectOperations cleared, appliedOnAckSerials cleared
// RTO4b3, RTO4b4, RTO4d: SyncObjectsPool must be cleared, sync sequence cleared, BufferedObjectOperations cleared, appliedOnAckSerials cleared
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
#expect(realtimeObjects.testsOnly_appliedOnAckSerials.isEmpty)
}
Expand Down
Loading
Loading