Reactive flows
Every read on a KeyValueStorage<T> returns a Flow. Writes that touch the
same entity_name trigger active observers to re-execute their queries and
emit fresh results — no manual refresh, no event bus.
- Every read is a Flow
- When does it re-emit?
- Compose integration
- Lifecycle and avoiding leaks
- Verbatim Turbine pattern
- See also
Every read is a Flow
val all: Flow<List<Merchant>> = merchants.selectAll()
val food: Flow<List<Merchant>> = merchants.select(where = Merchant::category eq "Food")
val one: Flow<Merchant?> = merchants.selectByKey("m_1")
val rows: Flow<List<ResultRow<Merchant>>> = merchants.selectResult()
val total: Flow<Int> = merchants.count()
val meta: Flow<Metadata> = merchants.metadata()
These flows are cold. They start no work until something collects them and they cancel cleanly when the collector cancels. The first collector triggers the first SQL execution; subsequent emissions are driven by SQLDelight’s table notifications.
When does it re-emit?
Any insert, insertAll, update, updateAll, upsert, upsertAll,
delete, deleteByKey, deleteByKeys, deleteAll, deleteExpired, or
deleteStale against the same entity_name invalidates every active select
flow for that store. Each invalidated flow re-runs its query and emits the
new result.
sequenceDiagram
participant Writer
participant SqkonStore as KeyValueStorage<T>
participant SQLDelight
participant Reader as Flow consumer
Writer->>SqkonStore: insert(...)
SqkonStore->>SQLDelight: INSERT
SQLDelight->>SQLDelight: notify(entity)
SQLDelight->>SqkonStore: re-execute
SqkonStore->>Reader: emit(updatedList)
Notifications are scoped to one entity_name within one Sqkon instance.
Two KeyValueStorage<Merchant>("merchants") references built from the same
Sqkon share notifications; two separate Sqkon instances do not.
A few useful guarantees:
- Single emission per transaction. A bulk
insertAllorupsertAllwraps every write in one transaction — observers see one emission for the batch, not N. distinctUntilChangedonselectResultandmetadata. Re-emissions with identical content are dropped.selectAll/selectdo not dedup. If you need that, append.distinctUntilChanged()yourself.
Compose integration
Collect with collectAsState for snapshot reads, or collectAsStateWithLifecycle
on Android for lifecycle-aware behavior:
@Composable
fun MerchantList(merchants: KeyValueStorage<Merchant>) {
val list by merchants.selectAll().collectAsState(initial = emptyList())
LazyColumn {
items(list, key = { it.id }) { MerchantRow(it) }
}
}
For anything heavier than a screen-scoped read — sorting, mapping, derived
state — promote the flow into a ViewModel:
class MerchantsViewModel(merchants: KeyValueStorage<Merchant>) : ViewModel() {
val foodMerchants: StateFlow<List<Merchant>> =
merchants.select(where = Merchant::category eq "Food")
.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5_000), emptyList())
}
stateIn shares a single upstream subscription across all UI collectors and
keeps the cache warm during config changes.
Lifecycle and avoiding leaks
Sqkon flows do not leak by themselves — they’re cold and cancel when the collector cancels — but you must give them a structured scope to attach to:
- ViewModels:
viewModelScope(Android) / your platform equivalent. - Composables:
LaunchedEffect(key) { flow.collect { … } }orcollectAsState. - Tests: the test’s
MainScope()— and always cancel it intearDown()or pager-based tests will flake. See Testing.
The pattern this codebase uses everywhere:
private val mainScope = MainScope()
// ...
@After fun tearDown() { mainScope.cancel() }
Verbatim Turbine pattern
Turbine is the recommended way to test flow emissions. From
library/src/commonTest/kotlin/com/mercury/sqkon/db/KeyValueStorageTest.kt:
@Test
fun selectCount_flowUpdatesOnChange() = runTest {
testObjectStorage.count().test {
// Wait for first result
val first = awaitItem()
assertEquals(expected = 0, first)
TestObject().also { testObjectStorage.insert(it.id, it) }
val second = awaitItem()
assertEquals(expected = 1, second)
testObjectStorage.deleteAll()
val third = awaitItem()
assertEquals(expected = 0, third)
expectNoEvents()
}
}
Three patterns to copy:
awaitItem()once for the initial emission, then once per write.expectNoEvents()at the end asserts no spurious re-emissions.- A bulk write (
insertAll,upsertAll) produces one emission, not N — seeselectCount_flowUpdatesOnUpsertAllOncein the same file.
See also
- Paging — paging sources hook into the same notification stream.
- Transactions — emissions fire after a transaction commits, not mid-transaction.
- Testing — Turbine, MainScope teardown, in-memory drivers.
- Source:
library/src/commonMain/kotlin/com/mercury/sqkon/db/KeyValueStorage.kt(select*methods).