Quilter
Quilter<S> keeps a CRDT replica live over a Seam. It collects the Seam's incoming flow, merges inbound deltas, and broadcasts outbound deltas as you apply mutations. state is a StateFlow<S> — always the current converged value.
Basic setup
/**
* Basic [Quilter] setup over a [us.tractat.kuilt.core.Seam].
*
* Host a session, build a replicator, apply a mutation, and read the live
* converged state via `state`. The mutation is broadcast to all current peers
* automatically; every peer's `state` flow reflects the merge.
*/
@Suppress("unused")
internal fun sampleQuilterSetup() = runTest(
StandardTestDispatcher(),
timeout = 5.seconds,
) {
val loom = InMemoryLoom()
val seam = loom.host(Pattern("my-session"))
val cfg = QuilterConfig(expectVirtualTime = true)
val replicator = Quilter(
replica = ReplicaId(seam.selfId.value),
seam = seam,
initial = GCounter.ZERO,
messageSerializer = QuiltMessage.serializer(GCounter.serializer()),
scope = backgroundScope,
config = cfg,
)
// Apply a mutation — the delta is broadcast to all current peers automatically.
replicator.apply(replicator.state.value.inc(replicator.replica, 1L))
// state is a StateFlow — always the current converged value.
assertEquals(1L, replicator.state.value.value)
}
Two-peer GCounter convergence
/**
* Two peers independently increment their GCounter slots; after round-trip
* delta exchange both replicas must agree on the total.
*/
@Test
fun twoPeerGCounterConverges() = runTest(UnconfinedTestDispatcher()) {
val loom = InMemoryLoom()
val seamA = loom.host(Pattern("test"))
val seamB = loom.join(InMemoryTag("b"))
val repA = gcounterReplicator(seamA, backgroundScope)
val repB = gcounterReplicator(seamB, backgroundScope)
repA.apply(repA.state.value.inc(repA.replica, 3L))
repA.apply(repA.state.value.inc(repA.replica, 2L))
repB.apply(repB.state.value.inc(repB.replica, 4L))
testScheduler.advanceUntilIdle()
assertEquals(9L, repA.state.value.value)
assertEquals(9L, repB.state.value.value)
}
Late-joiner full-state sync
When a peer joins after others have accumulated state, Quilter sends a FullState message rather than replaying the delta history. The late joiner converges in one round-trip:
/**
* A and B accumulate state; C joins late and should converge via FullState
* without replaying any delta history.
*/
@Test
fun lateJoinerReceivesFullState() = runTest(UnconfinedTestDispatcher()) {
val loom = InMemoryLoom()
val seamA = loom.host(Pattern("test"))
val seamB = loom.join(InMemoryTag("b"))
val repA = gcounterReplicator(seamA, backgroundScope)
val repB = gcounterReplicator(seamB, backgroundScope)
repA.apply(repA.state.value.inc(repA.replica, 10L))
repB.apply(repB.state.value.inc(repB.replica, 5L))
testScheduler.advanceUntilIdle()
// C joins after A and B have already accumulated state.
val seamC = loom.join(InMemoryTag("c"))
val repC = gcounterReplicator(seamC, backgroundScope)
testScheduler.advanceUntilIdle()
// C must have received FullState from A and B and converged to 15.
assertEquals(15L, repA.state.value.value)
assertEquals(15L, repC.state.value.value)
}
Multiplexing multiple replicators over one Seam
Seam.incoming is single-collection per the kuilt contract. If two replicators tried to collect the same Seam independently, one would starve. MuxSeam (kuilt-core) solves this: it wraps the underlying seam, owns the single collection via shareIn, and prefixes frames with a 1-byte channel tag:
/**
* Split one [Seam] into N independent logical channels via [MuxSeam].
*
* [Seam.incoming] is single-collection per the kuilt contract. [MuxSeam] takes
* sole ownership of that collection and fans the stream out to per-channel views,
* each prefixed with a 1-byte tag. Use this whenever two independent consumers
* (e.g. a [us.tractat.kuilt.quilter.Quilter] and a Raft transport) must share
* one underlying seam.
*/
@Suppress("unused")
internal fun sampleMuxSeamChannels() = runTest {
val loom = InMemoryLoom()
val seam = loom.host(Pattern("mux-demo"))
val mux = MuxSeam(seam, this)
// Each channel gets a typed Seam view that strips the tag on reads and
// prepends it on writes — the rest of your code sees a plain Seam.
val replicatorSeam: Seam = mux.channel(0x00.toByte())
val coordinatorSeam: Seam = mux.channel(0x01.toByte())
// channel() is idempotent — calling it again with the same tag returns the same Seam.
check(mux.channel(0x00.toByte()) === replicatorSeam)
check(replicatorSeam !== coordinatorSeam)
}
Each consumer gets a typed Seam view that strips the tag on reads and prepends it on writes. This is how BoundedCounterTransferCoordinator and Quilter share one transport (see BoundedCounter). It is also how kuilt-session's Room.channel(id) provides scoped sub-channels.
Quilter + LWWMap is the standard pattern for live-converging session metadata (display names, preferences):
/**
* [Quilter] + [LWWMap] for live-converging session metadata (display names).
*
* `LWWMap<PeerId, String>` gives each key per-entry last-writer-wins semantics.
* Backed by a `room.channel("member-metadata")` in production; shown here over
* a plain seam for brevity.
*/
@Suppress("unused")
internal fun sampleQuilterSessionMetadata() = runTest(
StandardTestDispatcher(),
timeout = 5.seconds,
) {
val loom = InMemoryLoom()
val seamAlice = loom.host(Pattern("session"))
val seamBob = loom.join(InMemoryTag("bob"))
val cfg = QuilterConfig(expectVirtualTime = true)
val msgSer = QuiltMessage.serializer(LWWMap.serializer(PeerId.serializer(), serializer<String>()))
val aliceRep = Quilter(
replica = ReplicaId(seamAlice.selfId.value),
seam = seamAlice,
initial = LWWMap.empty<PeerId, String>(),
messageSerializer = msgSer,
scope = backgroundScope,
config = cfg,
)
val bobRep = Quilter(
replica = ReplicaId(seamBob.selfId.value),
seam = seamBob,
initial = LWWMap.empty<PeerId, String>(),
messageSerializer = msgSer,
scope = backgroundScope,
config = cfg,
)
kotlinx.coroutines.delay(1)
// Each peer writes its own display name. LWWMap gives per-key last-writer-wins.
// LWWMap.set returns a new LWWMap (the merged state), so wrap it in Patch.
aliceRep.apply(Patch(aliceRep.state.value.set(aliceRep.replica, timestamp = 1L, key = seamAlice.selfId, value = "Alice")))
bobRep.apply(Patch(bobRep.state.value.set(bobRep.replica, timestamp = 1L, key = seamBob.selfId, value = "Bob")))
kotlinx.coroutines.delay(10)
// rep.state is the live-converging display-name map — both peers converge.
assertEquals("Alice", aliceRep.state.value[seamAlice.selfId])
assertEquals("Alice", bobRep.state.value[seamAlice.selfId])
}
AutoCloseable lifecycle
Quilter implements AutoCloseable. Call close() to cancel the background collection and release resources. In a use {} block or a scope that is cancelled, the replicator shuts down cleanly.
Wire protocol
Quilter serialises messages with CBOR by default (via Cbor from kotlinx-serialization). Messages are QuiltMessage<S>:
Delta(seq, patch) — an incremental update.
FullState(state) — the complete current state, sent to new peers and as a retry on gap detection.
Ack(seq) — acknowledgement, used to clear the pending-delta buffer.
Resend(fromSeq) — request to re-send deltas from fromSeq when a gap is detected.
Last modified: 22 June 2026