Quilter

class Quilter<S : Quilted<S>>(val replica: ReplicaId, seam: Seam, initial: S, messageSerializer: KSerializer<QuiltMessage<S>>, scope: CoroutineScope, config: QuilterConfig = QuilterConfig(), clock: MonotonicMillis = SystemMonotonicMillis, binaryFormat: BinaryFormat = Cbor, deltaTargets: (Set<PeerId>) -> Set<PeerId> = { it }, random: Random = kotlin.random.Random.Default) : ScopedCloseable

Runs any Quilted CRDT live over a Seam, providing eventually-consistent multi-peer replication via a simple delta-propagation protocol.

Precondition — one instance per (replica, CRDT type) pair. Running two Quilter<S> instances with the same replica concurrently in the same process breaks the delta GC protocol: both will mint deltas starting at seq = 1, colliding on sequence numbers. The recipient cannot distinguish them and will silently drop or misorder deltas, leaving replicas permanently diverged. This is the same class of collision that the BoundedCounter single-dimension fix addressed in a prior release. Create exactly one Quilter<S> per (replica, CRDT type) per process.

Protocol

  • apply applies a local mutation, updates state, and broadcasts a QuiltMessage.Delta to all current peers. Each delta is tagged with a monotonic seq.

  • On receiving a QuiltMessage.Delta, the state is joined and an QuiltMessage.Ack is sent back to the original sender.

  • On receiving an QuiltMessage.Ack, the acker's progress is recorded; once every known peer has acked through a seq, all deltas at or below that seq are GC'd.

  • On first contact with a new peer, a QuiltMessage.FullState is sent so the late joiner converges immediately without waiting for a delta replay.

Gap detection (Rung 12b)

Per-sender receive-sequence tracking detects dropped or reordered deltas:

  • Out-of-order deltas are buffered and applied in order once the gap is filled.

  • Missing ranges trigger a QuiltMessage.Resend to the original sender.

  • Duplicate or stale deltas are re-acked and silently dropped.

  • QuiltMessage.Resend causes this replica to re-broadcast buffered pending deltas for the requested range (if they haven't been GC'd yet).

Peer eviction

Peers absent from Seam.peers beyond QuilterConfig.evictionAfter are evicted from the known-peer set, releasing their buffer pin. They receive a fresh QuiltMessage.FullState if they rejoin.

Parameters

replica

this peer's ReplicaId.

seam

the Seam to ride. Collect Seam.incoming exactly once — this class takes sole ownership of the incoming stream.

initial

the starting state (typically the CRDT's zero/empty value).

messageSerializer

a KSerializer for QuiltMessage<S>, obtained via QuiltMessage.serializer(stateSerializer).

scope

the CoroutineScope whose Job becomes the parent of the replicator's owned child job. In tests, pass backgroundScope from kotlinx.coroutines.test.TestScope so infinite-running collectors are cancelled cleanly at test end without raising kotlinx.coroutines.test.UncompletedCoroutinesError.

config

replication behaviour tuning (eviction TTL, anti-entropy interval).

clock

monotonic time source; override in tests to inject a fake clock.

Test-dispatcher guard. If the scope contains a kotlinx.coroutines.test.TestDispatcher, a diagnostic is emitted because runAntiEntropy uses real-clock kotlinx.coroutines.delay — under virtual time those delays never advance automatically, causing tests to deadlock silently. Either use UnconfinedTestDispatcher (delays execute eagerly) or advance virtual time via testScheduler.advanceTimeBy(…) if you must use StandardTestDispatcher. Set QuilterConfig.strictTestGuard to true to throw rather than warn.

Constructors

Link copied to clipboard
constructor(replica: ReplicaId, seam: Seam, initial: S, messageSerializer: KSerializer<QuiltMessage<S>>, scope: CoroutineScope, config: QuilterConfig = QuilterConfig(), clock: MonotonicMillis = SystemMonotonicMillis, binaryFormat: BinaryFormat = Cbor, deltaTargets: (Set<PeerId>) -> Set<PeerId> = { it }, random: Random = kotlin.random.Random.Default)

Properties

Link copied to clipboard
val cutFrontier: StateFlow<CutFrontier>

The causal-stability cut + frontier, recomputed on every matrix change (local apply, inbound delta, inbound QuiltMessage.Delivered, join, eviction) and published atomically as a single CutFrontier (W1 of ADR §4.6). A us.tractat.kuilt.crdt.Rga GC coordinator (#270) consumes this together with deliveredLocal and feeds them to Rga.compact(stableCut, frontierMax, delivered). For CRDTs that expose no dots (the delta-state zoo) it stays at CutFrontier.EMPTY.

Link copied to clipboard

This replica's delivered version vector: author → highest contiguous (gap-free) seq this replica has applied, derived from the current merged state's Quilted.causalDots. Recomputed on every state change (local apply and inbound delta), so it never carries an incremental-contiguity bug — a gap in an author's dots truncates that author's high-water at the gap.

Link copied to clipboard
Link copied to clipboard
val state: StateFlow<S>
Link copied to clipboard
val universalAckFlow: StateFlow<Long>

The causal-stability watermark: the highest sequence number that every currently known peer has acknowledged. Advances monotonically — it never decreases.

Functions

Link copied to clipboard
fun apply(patch: Patch<S>)

Apply a local mutation. Updates state synchronously; broadcasts a QuiltMessage.Delta to all current peers asynchronously (fire-and-forget within scope).

Link copied to clipboard
override fun close()
Link copied to clipboard
fun mutate(transform: (S) -> Patch<S>)

Apply a local mutation expressed as a transform on the current state. Equivalent to apply(state.value.let(transform)) but avoids reading state.value at every call site.