kuilt Help

Warp

Imagine a roomful of devices — phones, laptops, whatever is connected — with a pile of work to get through. You want them to share the pile: each device grabs a job, does it, and puts the answer back. When they're done, all the answers are in one place, no job was done twice, and nobody was in charge — there was no central server handing out assignments.

That's warp in one sentence. And the surprising part is that kuilt already has every piece needed to do it.

The pieces already ship

Nothing here had to be invented — warp just names a combination of things kuilt already provides:

  • A shared to-do list every device can add to and read, which merges cleanly when two devices reconnect after a split — that's an ORSet from Replicated Data.

  • An answers board that keeps one answer per job even if a job ran twice — an ORMap, where a duplicate just overwrites itself.

  • A way to decide who does which job without asking anyone — consistent hashing: arrange the connected devices in a ring, and each job has one natural owner (the nearest device clockwise). No vote, no lock, no round-trip.

  • The list of who's connected, kept fresh by Consensus or the room roster.

  • Cover for a device that drops out — kuilt's liveness detection notices, and the next device on the ring picks up the dropped jobs.

How you'd use it

You give each device an executor — the function that actually does a job — and then add jobs to the shared list. Each job runs on exactly its owner; the answers converge everywhere:

val nodeA = WarpNode( selfId = seamA.selfId, seam = seamA, rosterFlow = seamA.rosterSnapshot(), // who's connected scope = backgroundScope, quilterConfig = quilterConfig, clock = clock, executor = { taskId -> "result-${taskId.value}" }, // the work each job does ) // ...a second node on another peer, wired the same way... val tasks = (1..8).map { TaskId("conv-task-$it") } tasks.forEach { nodeA.enqueue(it) } // add work to the shared list advanceUntilIdle() // Each task ran on exactly one peer — its owner on the ring. // None lost, none run twice; the results board converges on every node. assertEquals(tasks.toSet(), nodeA.results.taskIds) assertEquals(tasks.toSet(), nodeB.results.taskIds)

The whole scheduler is just those parts tied together: a ring over the connected peers, a work queue, and a results board as the safety net.

The honest seam

The "no central boss" trick holds as long as the set of connected devices is stable. When a device joins or leaves, the jobs whose owner changed get reassigned — that costs one membership-change event per affected job, not a negotiation per job. At low churn this is much cheaper than coordinating every job; at very high churn the cost rises, and the answers board is what keeps the worst case to "a job ran twice" rather than "a job got the wrong answer." The spike measurements checked this boundary directly: roughly zero duplicates while membership is steady, and never a wrong result.

By default warp also trims those churn-window double-runs before they happen. In the brief moment right after a device joins or leaves, two devices can briefly disagree about who owns a job and both start it. So before running a job a device first quietly calls dibs — a small note that rides along with traffic already flowing — and, during that unsettled moment, waits a beat and runs the job only if it's the agreed owner. Two devices that would have raced now have one step aside. It costs nothing in the steady state — there's no note to wait on and no pause — and the answers board is still the final safety net for anything that slips through. (Prefer the simplest behaviour? Selecting ClaimStrategy.Ring turns the dibs step off and leans on the answers board alone.)

Choosing how to run a job

Every job is more than data — it's also a plan for how that data should flow before the answer is committed. Warp makes that plan something you can hold in your hand, inspect, and rewrite before anything moves.

A Draft is the plan

Warp.shuttle(opId) returns a Draft — an immutable description of a pipeline. You chain .map(), .filter(), and .embroider() on it and get a record of what should happen, with nothing executed yet:

val draft: Draft<ByteArray> = Warp.shuttle(OpId("docs")) .map(OpId("score")) .filter(OpId("above-threshold")) .embroider(OpId("rank")) check(draft.stages.size == 4) check(draft.isMonotone.not()) // has an Embroider stage check(draft.embroidery?.opId == OpId("rank"))

isMonotone tells you at a glance whether any consensus step is even needed. embroidery is the single coordination point — you can locate it, inspect it, or defer it without touching any data.

Making the plan better

Three pure rewrites can improve a draft without changing what it computes:

  • Defer the consensus step. Push the embroider as far right as possible so the agreement covers the smallest set of already-filtered elements.

  • Push filters early. Move filter stages ahead of map stages so less data flows into the heavier transforms. This assumes each filter operates on the source element, not on a map's derived value — the current model carries only symbolic names, not type annotations, so filter-before-map is a modelled assumption rather than a proved dependency check.

  • Fuse adjacent stages. Collapse a run of consecutive maps (or filters) into one, so the runtime applies them in a single pass.

optimize() applies all three to a fixpoint and returns a structurally equivalent draft:

val optimized = unoptimized.optimize() check(optimized.stages.last() is DraftStage.Embroider) // embroider deferred last check(unoptimized.isEquivalentTo(optimized)) // same convergent result

What it costs

coordinationCost(stats) scores a draft with three numbers:

  • rounds — how many consensus round-trips the plan needs.

  • coupling — the maximum batch size across all coordinated nodes (the blast-radius term — see below).

  • coordinatedVolume — how many elements will cross the consensus boundary.

For a single embroider, plan reduces coordinatedVolume by deferring the embroider past selective filters:

// Unplanned: embroider before filter → consensus sees ~1000 docs val unplannedCost = unplanned.coordinationCost(stats) check(unplannedCost.coordinatedVolume >= 900L) // Planned: embroider deferred → consensus sees only ~50 docs val planned = unplanned.plan(stats) val plannedCost = planned.coordinationCost(stats) check(plannedCost.coordinatedVolume < 100L) check(plannedCost < unplannedCost)

The cardinality estimates come from WarpStats — a CRDT map of per-source HyperLogLog sketches. Peers gossip these sketches on the same anti-entropy as everything else; each peer plans locally from the converged value, no round-trip needed.

CoordinationCost implements Comparable: fewer rounds first, then smaller coupling, then lower volume.

When a query has many agreements

A draft is a dependency graph, not just a pipeline. Call combine to merge two independent drafts into one plan:

val docs = Warp.shuttle(OpId("source.docs")) .map(OpId("map.score")) .embroider(OpId("embroider.rank")) val scores = Warp.shuttle(OpId("source.scores")) .filter(OpId("filter.nonzero")) .embroider(OpId("embroider.vote")) val combined: Draft<Unit> = docs.combine(scores) // Both branches' embroideries are present — independent, no edges connect them. check(combined.embroideries.size == 2) check(!combined.isMonotone)

The two branches share no ancestor path — their embroideries are independent. The planner can commit both in a single Raft round-trip instead of two. Calling consolidateEmbroideries() (included in plan) fuses them into one BatchedEmbroider:

val consolidated = combined.consolidateEmbroideries() // Two independent embroideries become one BatchedEmbroider — one consensus round. check(consolidated.nodes.any { it.stage is DraftStage.BatchedEmbroider }) check(consolidated.nodes.count { it.stage.coordinationKind == CoordinationKind.Coordinated } == 1) // Semantic equivalence: same sources, same embroider multiset, same free-op multiset. check(combined.isEquivalentTo(consolidated))

The round-count cut is analytically provable and holds at real Raft execution. The representative query has three independent agreements (level 0) plus one that depends on them (level 1) — four rounds unplanned, two after plan:

// Branch C chains two embroideries: embroider(C) must commit before embroider(D). val branchA = Warp.shuttle(OpId("source.a")).embroider(OpId("embroider.a")) val branchB = Warp.shuttle(OpId("source.b")).embroider(OpId("embroider.b")) val branchC = Warp.shuttle(OpId("source.c")) .embroider(OpId("embroider.c")) .map(OpId("map.m")) .embroider(OpId("embroider.d")) val unplanned: Draft<Unit> = branchA.combine(branchB).combine(branchC) val planned: Draft<Unit> = unplanned.plan(WarpStats.empty()) val stats = WarpStats.empty() // Unplanned: 4 separate Embroider nodes → 4 rounds (one per node). check(unplanned.coordinationCost(stats).rounds == 4) // Planned: BatchedEmbroider(A,B,C) at level 0 + Embroider(D) at level 1 → 2 rounds. check(planned.coordinationCost(stats).rounds == 2) // rounds is a real lever — the planner measurably cuts round count. check(planned.coordinationCost(stats) < unplanned.coordinationCost(stats)) // Coupling = 3: the level-0 batch bundles three agreements (blast-radius = 3). check(planned.coordinationCost(stats).coupling == 3)

The minimum number of rounds is the depth of the dependency graph, not the count of individual agreements. Independent agreements at the same depth collapse into one round; a sequential dependency (one embroidery's output feeds another's input) forces one round per level — this is a Brent-style critical-path argument.

The honest tradeoff. Batching K independent agreements into one round couples their failure domains: if the Raft proposal is rejected, all K must retry together. The coupling field records the maximum batch size; the planner minimises rounds first, then coupling. A caller who finds the retry unit too large can inspect coupling and split the draft before planning.

Results that converge

A monotone pipeline never "finishes" — it converges, refining as contributions arrive from distributed peers. IncrementalResult holds the running join: contribute a lattice delta and the state grows; it can never shrink. awaitThreshold suspends until a monotone predicate is first satisfied, then returns a snapshot that stays valid permanently:

val result = IncrementalResult(GCounter.ZERO) result.contribute(GCounter.of(alice to 3L)) result.contribute(GCounter.of(bob to 2L)) // result.state.value.value == 5 // In a coroutine: suspends until the predicate crosses, returns once, stays valid. val crossed = result.awaitThreshold { it.value >= 5L }

ConvergentExecution wires a Draft to an IncrementalResult and processes submitted deltas asynchronously. The scope is required — production wires a service scope; tests wire backgroundScope to share the virtual clock.

The dream

The scheduler above is the real, measured first step. The design docs below explore where it could go — all speculative, none scheduled:

  • The vision — the full walk: why a roomful of machines could feel like one computer.

  • Deeper waters — an index into the machinery below the vision.

  • Execution engine — how work travels as descriptors, and the caching that makes it cheap.

  • Query planning — choosing how to run a job to minimise coordination.

  • AI & modelling — training a shared model across many devices without gathering their data.

  • Observability — reading traces straight off the causality the data already carries (the shipped, non-speculative slice of this became Observability).

  • Epic slices — how the work would decompose, and which single slice is real today.

  • Foundation note — why :kuilt-warp stays experimental and outside the stable surface.

The fantasy, last

The furthest-out idea: jobs that are not just data but code — a function that travels to whichever device is free, runs there, and sends its answer back. That's code mobility, and it's a long way off. The honest first step toward it is exactly what ships today: a correct, measured way to spread work across a mesh, built from parts kuilt already had.

Last modified: 02 July 2026