name: kotlin-coroutines description: Advanced Kotlin coroutines patterns for AmethystMultiplatform. Use when working with: (1) Structured concurrency (supervisorScope, coroutineScope), (2) Advanced Flow operators (flatMapLatest, combine, merge, shareIn, stateIn), (3) Channels and callbackFlow, (4) Dispatcher management and context switching, (5) Exception handling (CoroutineExceptionHandler, SupervisorJob), (6) Testing async code (runTest, Turbine), (7) Nostr relay connection pools and subscriptions, (8) Backpressure handling in event streams. Delegates to kotlin-expert for basic StateFlow/SharedFlow patterns. Complements nostr-expert for relay communication.
Kotlin Coroutines - Advanced Async Patterns
Expert guidance for complex async operations in Amethyst: relay pools, event streams, structured concurrency, and testing.
Mental Model
Async Architecture in Amethyst:
Relay Pool (supervisorScope)
├── Relay 1 (launch) → callbackFlow → Events
├── Relay 2 (launch) → callbackFlow → Events
└── Relay 3 (launch) → callbackFlow → Events
↓
merge() → distinctBy(id) → shareIn
↓
Multiple Collectors (ViewModels, Services)
Key principles:
- supervisorScope - Children fail independently
- callbackFlow - Bridge callbacks to Flow
- shareIn/stateIn - Hot flows from cold
- Backpressure - buffer(), conflate(), DROP_OLDEST
When to Use This Skill
Use for advanced async patterns:
- Multi-relay subscriptions with supervisorScope
- Complex Flow operators (flatMapLatest, combine, merge)
- callbackFlow for Android callbacks (connectivity, location)
- Backpressure handling in high-frequency streams
- Exception handling with CoroutineExceptionHandler
- Testing coroutines with runTest and Turbine
Delegate to kotlin-expert for:
- Basic StateFlow/SharedFlow patterns
- Simple viewModelScope.launch
- MutableStateFlow → asStateFlow()
Core Patterns
Pattern: callbackFlow for Relay Subscriptions
// Real pattern from NostrClientStaticReqAsStateFlow.kt
fun INostrClient.reqAsFlow(
relay: NormalizedRelayUrl,
filters: List<Filter>,
): Flow<List<Event>> = callbackFlow {
val subId = RandomInstance.randomChars(10)
var hasBeenLive = false
val eventIds = mutableSetOf<HexKey>()
var currentEvents = listOf<Event>()
val listener = object : IRequestListener {
override fun onEvent(event: Event, ...) {
if (event.id !in eventIds) {
currentEvents = if (hasBeenLive) {
// After EOSE: prepend
listOf(event) + currentEvents
} else {
// Before EOSE: append
currentEvents + event
}
eventIds.add(event.id)
trySend(currentEvents)
}
}
override fun onEose(...) {
hasBeenLive = true
}
}
openReqSubscription(subId, mapOf(relay to filters), listener)
awaitClose { close(subId) }
}
Key techniques:
- Deduplication with Set
- EOSE handling (append → prepend strategy)
- trySend (non-blocking from callback)
- awaitClose for cleanup
Pattern: Structured Concurrency for Relays
suspend fun connectToRelays(relays: List<Relay>) = supervisorScope {
relays.forEach { relay ->
launch {
try {
relay.connect()
relay.subscribe(filters).collect { event ->
eventChannel.send(event)
}
} catch (e: IOException) {
Log.e("Relay", "Connection failed: ${relay.url}", e)
// Other relays continue
}
}
}
}
Why supervisorScope:
- One relay failure doesn't cancel others
- All cancelled together when scope cancelled
- Proper cleanup guaranteed
Pattern: Exception Handling for Services
// Real pattern from PushNotificationReceiverService.kt
class MyService : Service() {
val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
Log.e("Service", "Caught: ${throwable.message}", throwable)
}
private val scope = CoroutineScope(
Dispatchers.IO + SupervisorJob() + exceptionHandler
)
override fun onDestroy() {
scope.cancel()
super.onDestroy()
}
}
Pattern benefits:
- SupervisorJob: children fail independently
- ExceptionHandler: log instead of crash
- Scoped lifecycle: cancel all on destroy
Pattern: Network Connectivity as Flow
// Real pattern from ConnectivityFlow.kt
val status = callbackFlow {
val networkCallback = object : NetworkCallback() {
override fun onAvailable(network: Network) {
trySend(ConnectivityStatus.Active(...))
}
override fun onLost(network: Network) {
trySend(ConnectivityStatus.Off)
}
}
connectivityManager.registerCallback(networkCallback)
// Initial state
activeNetwork?.let { trySend(ConnectivityStatus.Active(...)) }
awaitClose {
connectivityManager.unregisterCallback(networkCallback)
}
}
.distinctUntilChanged()
.debounce(200) // Stabilize flapping
.flowOn(Dispatchers.IO)
Key patterns:
- Emit initial state immediately
- Register callback in flow body
- Cleanup in awaitClose
- Stabilize with debounce + distinctUntilChanged
Pattern: Merge Events from Multiple Relays
fun observeFromRelays(
relays: List<NormalizedRelayUrl>,
filters: List<Filter>
): Flow<Event> =
relays.map { relay ->
client.reqAsFlow(relay, filters)
.flatMapConcat { it.asFlow() }
}.merge()
.distinctBy { it.id }
Flow:
- Each relay:
Flow<List<Event>> - flatMapConcat: flatten to
Flow<Event> - merge(): combine all relays
- distinctBy: deduplicate across relays
Advanced Operators
For comprehensive coverage of Flow operators:
- flatMapLatest, combine, zip, merge → See advanced-flow-operators.md
- shareIn, stateIn → Conversion to hot flows
- buffer, conflate → Backpressure strategies
- debounce, sample → Rate limiting
Quick Reference
| Operator | Use Case | Example |
|---|---|---|
| flatMapLatest | Cancel previous, switch to new | Search (cancel old query) |
| combine | Latest from ALL flows | combine(account, settings, connectivity) |
| merge | Single stream from multiple | merge(relay1, relay2, relay3) |
| shareIn | Multiple collectors, single upstream | Share expensive computation |
| stateIn | StateFlow from Flow | ViewModel state |
| buffer(DROP_OLDEST) | High-frequency streams | Real-time event feed |
| conflate | Latest only | UI updates |
| debounce | Wait for quiet period | Search input |
Nostr Relay Patterns
For complete relay-specific patterns: → See relay-patterns.md
Covers:
- Multi-relay subscription management
- Connection lifecycle and reconnection
- Event deduplication strategies
- Backpressure for high-frequency events
- EOSE handling patterns
Testing
For comprehensive testing patterns: → See testing-coroutines.md
Quick testing pattern:
@Test
fun `relay subscription receives events`() = runTest {
val client = FakeNostrClient()
client.reqAsFlow(relay, filters).test {
assertEquals(emptyList(), awaitItem())
client.sendEvent(event1)
assertEquals(listOf(event1), awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
Testing tools:
runTest- Virtual time, auto cleanup- Turbine
.test {}- Flow assertions advanceTimeBy()- Control time- Fake implementations over mocks
Common Scenarios
Scenario: Implement New Relay Feature
Steps:
- callbackFlow for subscription
- Deduplication (Set of event IDs)
- awaitClose for cleanup
- Test with FakeNostrClient
Example: Add subscription for specific event kind
fun observeKind(kind: Int): Flow<Event> = callbackFlow {
val listener = object : IRequestListener {
override fun onEvent(event: Event, ...) {
if (event.kind == kind) {
trySend(event)
}
}
}
client.subscribe(listener)
awaitClose { client.unsubscribe(listener) }
}
Scenario: Handle Network Connectivity Changes
Steps:
- callbackFlow for connectivity
- flatMapLatest to reconnect
- debounce to stabilize
- Exception handling for failures
Example: Reconnect relays on connectivity
connectivityFlow
.flatMapLatest { status ->
when (status) {
Active -> relayPool.observeEvents()
else -> emptyFlow()
}
}
.catch { e -> Log.e("Error", e) }
.collect { event -> handleEvent(event) }
Scenario: Optimize Multi-Collector Performance
Steps:
- Use shareIn for expensive upstream
- Configure SharingStarted strategy
- Set replay buffer size
- Test with multiple collectors
Example: Share relay subscription
val events: SharedFlow<Event> = client
.reqAsFlow(relay, filters)
.flatMapConcat { it.asFlow() }
.shareIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
replay = 0
)
Anti-Patterns
❌ Using GlobalScope
GlobalScope.launch { /* Leaks, no structured concurrency */ }
✅ Use scoped coroutines
viewModelScope.launch { /* Cancelled with ViewModel */ }
❌ Forgetting awaitClose
callbackFlow {
registerCallback()
// Missing cleanup!
}
✅ Always cleanup
callbackFlow {
registerCallback()
awaitClose { unregisterCallback() }
}
❌ Blocking in Flow
flow.map { Thread.sleep(1000); process(it) }
✅ Suspend, don't block
flow.map { delay(1000); process(it) }.flowOn(Dispatchers.Default)
❌ Ignoring backpressure
fastProducer.collect { slowConsumer(it) } // Blocks producer!
✅ Handle backpressure
fastProducer
.buffer(64, BufferOverflow.DROP_OLDEST)
.collect { slowConsumer(it) }
Delegation
Use kotlin-expert for:
- Basic StateFlow/SharedFlow patterns
- viewModelScope.launch usage
- Simple MutableStateFlow → asStateFlow()
Use nostr-expert for:
- Nostr protocol details (NIPs, event structure)
- Event creation and signing
- Cryptographic operations
This skill provides:
- Advanced async patterns
- Structured concurrency
- Complex Flow operators
- Testing strategies
- Relay-specific async patterns
Resources
- references/advanced-flow-operators.md - All Flow operators with examples
- references/relay-patterns.md - Nostr relay async patterns from codebase
- references/testing-coroutines.md - Complete testing guide
Quick Decision Tree
Need async operation?
├─ Simple ViewModel state update → kotlin-expert (StateFlow)
├─ Android callback → This skill (callbackFlow)
├─ Multiple concurrent operations → This skill (supervisorScope)
├─ Complex Flow transformation → This skill (references/advanced-flow-operators.md)
├─ Relay subscription → This skill (references/relay-patterns.md)
└─ Testing async code → This skill (references/testing-coroutines.md)