kuilt Help

Quilter

Quilter<S> keeps a CRDT replica live over a Seam. It collects the Seam's incoming flow, merges inbound deltas, and broadcasts outbound deltas as you apply mutations. state is a StateFlow<S> — always the current converged value.

Basic setup

/** * Basic [Quilter] setup over a [us.tractat.kuilt.core.Seam]. * * Host a session, build a replicator, apply a mutation, and read the live * converged state via `state`. The mutation is broadcast to all current peers * automatically; every peer's `state` flow reflects the merge. */ @Suppress("unused") internal fun sampleQuilterSetup() = runTest( StandardTestDispatcher(), timeout = 5.seconds, ) { val loom = InMemoryLoom() val seam = loom.host(Pattern("my-session")) val cfg = QuilterConfig(expectVirtualTime = true) val replicator = Quilter( replica = ReplicaId(seam.selfId.value), seam = seam, initial = GCounter.ZERO, messageSerializer = QuiltMessage.serializer(GCounter.serializer()), scope = backgroundScope, config = cfg, ) // Apply a mutation — the delta is broadcast to all current peers automatically. replicator.apply(replicator.state.value.inc(replicator.replica, 1L)) // state is a StateFlow — always the current converged value. assertEquals(1L, replicator.state.value.value) }

Two-peer GCounter convergence

/** * Two peers independently increment their GCounter slots; after round-trip * delta exchange both replicas must agree on the total. */ @Test fun twoPeerGCounterConverges() = runTest(UnconfinedTestDispatcher()) { val loom = InMemoryLoom() val seamA = loom.host(Pattern("test")) val seamB = loom.join(InMemoryTag("b")) val repA = gcounterReplicator(seamA, backgroundScope) val repB = gcounterReplicator(seamB, backgroundScope) repA.apply(repA.state.value.inc(repA.replica, 3L)) repA.apply(repA.state.value.inc(repA.replica, 2L)) repB.apply(repB.state.value.inc(repB.replica, 4L)) testScheduler.advanceUntilIdle() assertEquals(9L, repA.state.value.value) assertEquals(9L, repB.state.value.value) }

Late-joiner full-state sync

When a peer joins after others have accumulated state, Quilter sends a FullState message rather than replaying the delta history. The late joiner converges in one round-trip:

/** * A and B accumulate state; C joins late and should converge via FullState * without replaying any delta history. */ @Test fun lateJoinerReceivesFullState() = runTest(UnconfinedTestDispatcher()) { val loom = InMemoryLoom() val seamA = loom.host(Pattern("test")) val seamB = loom.join(InMemoryTag("b")) val repA = gcounterReplicator(seamA, backgroundScope) val repB = gcounterReplicator(seamB, backgroundScope) repA.apply(repA.state.value.inc(repA.replica, 10L)) repB.apply(repB.state.value.inc(repB.replica, 5L)) testScheduler.advanceUntilIdle() // C joins after A and B have already accumulated state. val seamC = loom.join(InMemoryTag("c")) val repC = gcounterReplicator(seamC, backgroundScope) testScheduler.advanceUntilIdle() // C must have received FullState from A and B and converged to 15. assertEquals(15L, repA.state.value.value) assertEquals(15L, repC.state.value.value) }

Multiplexing multiple replicators over one Seam

Seam.incoming is single-collection per the kuilt contract. If two replicators tried to collect the same Seam independently, one would starve. MuxSeam (kuilt-core) solves this: it wraps the underlying seam, owns the single collection via shareIn, and prefixes frames with a 1-byte channel tag:

/** * Split one [Seam] into N independent logical channels via [MuxSeam]. * * [Seam.incoming] is single-collection per the kuilt contract. [MuxSeam] takes * sole ownership of that collection and fans the stream out to per-channel views, * each prefixed with a 1-byte tag. Use this whenever two independent consumers * (e.g. a [us.tractat.kuilt.quilter.Quilter] and a Raft transport) must share * one underlying seam. */ @Suppress("unused") internal fun sampleMuxSeamChannels() = runTest { val loom = InMemoryLoom() val seam = loom.host(Pattern("mux-demo")) val mux = MuxSeam(seam, this) // Each channel gets a typed Seam view that strips the tag on reads and // prepends it on writes — the rest of your code sees a plain Seam. val replicatorSeam: Seam = mux.channel(0x00.toByte()) val coordinatorSeam: Seam = mux.channel(0x01.toByte()) // channel() is idempotent — calling it again with the same tag returns the same Seam. check(mux.channel(0x00.toByte()) === replicatorSeam) check(replicatorSeam !== coordinatorSeam) }

Each consumer gets a typed Seam view that strips the tag on reads and prepends it on writes. This is how BoundedCounterTransferCoordinator and Quilter share one transport (see BoundedCounter). It is also how kuilt-session's Room.channel(id) provides scoped sub-channels.

Session metadata convergence

Quilter + LWWMap is the standard pattern for live-converging session metadata (display names, preferences):

/** * [Quilter] + [LWWMap] for live-converging session metadata (display names). * * `LWWMap<PeerId, String>` gives each key per-entry last-writer-wins semantics. * Backed by a `room.channel("member-metadata")` in production; shown here over * a plain seam for brevity. */ @Suppress("unused") internal fun sampleQuilterSessionMetadata() = runTest( StandardTestDispatcher(), timeout = 5.seconds, ) { val loom = InMemoryLoom() val seamAlice = loom.host(Pattern("session")) val seamBob = loom.join(InMemoryTag("bob")) val cfg = QuilterConfig(expectVirtualTime = true) val msgSer = QuiltMessage.serializer(LWWMap.serializer(PeerId.serializer(), serializer<String>())) val aliceRep = Quilter( replica = ReplicaId(seamAlice.selfId.value), seam = seamAlice, initial = LWWMap.empty<PeerId, String>(), messageSerializer = msgSer, scope = backgroundScope, config = cfg, ) val bobRep = Quilter( replica = ReplicaId(seamBob.selfId.value), seam = seamBob, initial = LWWMap.empty<PeerId, String>(), messageSerializer = msgSer, scope = backgroundScope, config = cfg, ) kotlinx.coroutines.delay(1) // Each peer writes its own display name. LWWMap gives per-key last-writer-wins. // LWWMap.set returns a new LWWMap (the merged state), so wrap it in Patch. aliceRep.apply(Patch(aliceRep.state.value.set(aliceRep.replica, timestamp = 1L, key = seamAlice.selfId, value = "Alice"))) bobRep.apply(Patch(bobRep.state.value.set(bobRep.replica, timestamp = 1L, key = seamBob.selfId, value = "Bob"))) kotlinx.coroutines.delay(10) // rep.state is the live-converging display-name map — both peers converge. assertEquals("Alice", aliceRep.state.value[seamAlice.selfId]) assertEquals("Alice", bobRep.state.value[seamAlice.selfId]) }

AutoCloseable lifecycle

Quilter implements AutoCloseable. Call close() to cancel the background collection and release resources. In a use {} block or a scope that is cancelled, the replicator shuts down cleanly.

Wire protocol

Quilter serialises messages with CBOR by default (via Cbor from kotlinx-serialization). Messages are QuiltMessage<S>:

  • Delta(seq, patch) — an incremental update.

  • FullState(state) — the complete current state, sent to new peers and as a retry on gap detection.

  • Ack(seq) — acknowledgement, used to clear the pending-delta buffer.

  • Resend(fromSeq) — request to re-send deltas from fromSeq when a gap is detected.

Last modified: 22 June 2026