Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 99 additions & 76 deletions app/src/main/java/to/bitkit/repositories/LightningRepo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import kotlinx.coroutines.flow.update
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.tasks.await
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
Expand Down Expand Up @@ -108,6 +109,7 @@ class LightningRepo @Inject constructor(
private val syncMutex = Mutex()
private val syncPending = AtomicBoolean(false)
private val syncRetryJob = AtomicReference<Job?>(null)
private val lifecycleMutex = Mutex()

init {
observeConnectivityForSyncRetry()
Expand Down Expand Up @@ -269,88 +271,98 @@ class LightningRepo @Inject constructor(

eventHandler?.let { _eventHandlers.add(it) }

val initialLifecycleState = _lightningState.value.nodeLifecycleState
if (initialLifecycleState.isRunningOrStarting()) {
Logger.info("LDK node start skipped, lifecycle state: $initialLifecycleState", context = TAG)
return@withContext Result.success(Unit)
}
// Track retry state outside mutex to avoid deadlock (Mutex is non-reentrant)
var shouldRetryStart = false
var initialLifecycleState: NodeLifecycleState = NodeLifecycleState.Stopped

runCatching {
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Starting) }

// Setup if needed
if (lightningService.node == null) {
val setupResult = setup(walletIndex, customServerUrl, customRgsServerUrl, channelMigration)
if (setupResult.isFailure) {
_lightningState.update {
it.copy(
nodeLifecycleState = NodeLifecycleState.ErrorStarting(
setupResult.exceptionOrNull() ?: NodeSetupError()
val result = lifecycleMutex.withLock {
initialLifecycleState = _lightningState.value.nodeLifecycleState
if (initialLifecycleState.isRunningOrStarting()) {
Logger.info("LDK node start skipped, lifecycle state: $initialLifecycleState", context = TAG)
return@withLock Result.success(Unit)
}

runCatching {
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Starting) }

// Setup if needed
if (lightningService.node == null) {
val setupResult = setup(walletIndex, customServerUrl, customRgsServerUrl, channelMigration)
if (setupResult.isFailure) {
_lightningState.update {
it.copy(
nodeLifecycleState = NodeLifecycleState.ErrorStarting(
setupResult.exceptionOrNull() ?: NodeSetupError()
)
)
)
}
return@withLock setupResult
}
return@withContext setupResult
}
}

if (getStatus()?.isRunning == true) {
Logger.info("LDK node already running", context = TAG)
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
lightningService.startEventListener(::onEvent).onFailure {
Logger.warn("Failed to start event listener", it, context = TAG)
return@withContext Result.failure(it)
if (getStatus()?.isRunning == true) {
Logger.info("LDK node already running", context = TAG)
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
lightningService.startEventListener(::onEvent).onFailure {
Logger.warn("Failed to start event listener", it, context = TAG)
return@withLock Result.failure(it)
}
return@withLock Result.success(Unit)
}
return@withContext Result.success(Unit)
}

// Start node
lightningService.start(timeout, ::onEvent)
// Start node
lightningService.start(timeout, ::onEvent)

_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }

// Initial state sync
syncState()
updateGeoBlockState()
refreshChannelCache()
// Initial state sync
syncState()
updateGeoBlockState()
refreshChannelCache()

// Post-startup tasks (non-blocking)
connectToTrustedPeers().onFailure {
Logger.error("Failed to connect to trusted peers", it, context = TAG)
}
// Post-startup tasks (non-blocking)
connectToTrustedPeers().onFailure {
Logger.error("Failed to connect to trusted peers", it, context = TAG)
}

sync().onFailure { e ->
Logger.warn("Initial sync failed, event-driven sync will retry", e, context = TAG)
}
scope.launch { registerForNotifications() }
Unit
}.onFailure { e ->
val currentLifecycleState = _lightningState.value.nodeLifecycleState
if (currentLifecycleState.isRunning()) {
Logger.warn("Start error occurred but node is $currentLifecycleState, skipping retry", e, context = TAG)
return@withContext Result.success(Unit)
}
sync().onFailure { e ->
Logger.warn("Initial sync failed, event-driven sync will retry", e, context = TAG)
}
scope.launch { registerForNotifications() }
Result.success(Unit)
}.getOrElse { e ->
val currentState = _lightningState.value.nodeLifecycleState
if (currentState.isRunning()) {
Logger.warn("Start error but node is $currentState, skipping retry", e, context = TAG)
return@withLock Result.success(Unit)
}

if (shouldRetry) {
val retryDelay = 2.seconds
Logger.warn("Start error, retrying after $retryDelay...", e, context = TAG)
_lightningState.update { it.copy(nodeLifecycleState = initialLifecycleState) }

delay(retryDelay)
return@withContext start(
walletIndex = walletIndex,
timeout = timeout,
shouldRetry = false,
customServerUrl = customServerUrl,
customRgsServerUrl = customRgsServerUrl,
channelMigration = channelMigration,
)
} else {
_lightningState.update {
it.copy(nodeLifecycleState = NodeLifecycleState.ErrorStarting(e))
if (shouldRetry) {
Logger.warn("Start error, will retry...", e, context = TAG)
_lightningState.update { it.copy(nodeLifecycleState = initialLifecycleState) }
shouldRetryStart = true
Result.failure(e)
} else {
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.ErrorStarting(e)) }
Result.failure(e)
}
return@withContext Result.failure(e)
}
}

// Retry OUTSIDE the mutex to avoid deadlock (Kotlin Mutex is non-reentrant)
if (shouldRetryStart) {
delay(2.seconds)
return@withContext start(
walletIndex = walletIndex,
timeout = timeout,
shouldRetry = false,
customServerUrl = customServerUrl,
customRgsServerUrl = customRgsServerUrl,
channelMigration = channelMigration,
)
}

result
}

private suspend fun onEvent(event: Event) {
Expand All @@ -374,16 +386,27 @@ class LightningRepo @Inject constructor(
}

suspend fun stop(): Result<Unit> = withContext(bgDispatcher) {
if (_lightningState.value.nodeLifecycleState.isStoppedOrStopping()) {
return@withContext Result.success(Unit)
}
lifecycleMutex.withLock {
if (_lightningState.value.nodeLifecycleState.isStoppedOrStopping()) {
return@withLock Result.success(Unit)
}

runCatching {
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Stopping) }
lightningService.stop()
_lightningState.update { LightningState(nodeLifecycleState = NodeLifecycleState.Stopped) }
}.onFailure {
Logger.error("Node stop error", it, context = TAG)
runCatching {
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Stopping) }
lightningService.stop()
_lightningState.update { LightningState(nodeLifecycleState = NodeLifecycleState.Stopped) }
}.onFailure {
Logger.error("Node stop error", it, context = TAG)
// On failure, check actual node state and update accordingly
// If node is still running, revert to Running state to allow retry
if (lightningService.node != null && lightningService.status?.isRunning == true) {
Logger.warn("Stop failed but node is still running, reverting to Running state", context = TAG)
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
} else {
// Node appears stopped, update state
_lightningState.update { LightningState(nodeLifecycleState = NodeLifecycleState.Stopped) }
}
}
}
}

Expand Down
Loading