SpeculativeSequencer

class SpeculativeSequencer<S, A>(sequencer: TurnSequencer<A>, game: SpeculativeGame<S, A>, initialState: S, scope: CoroutineScope)

A wrapper over TurnSequencer that applies local actions optimistically before their committed index arrives, then rolls back and replays if the committed order differs from what was predicted.

Mechanism

On propose:

  1. The action is applied immediately to speculativeState (optimistic apply).

  2. The action is appended to a pending-input buffer.

  3. The underlying TurnSequencer.propose is called and suspends until a quorum commits.

  4. If propose throws (e.g. us.tractat.kuilt.raft.LeadershipLostException), the speculative apply is rolled back — the action is removed from the pending buffer and speculativeState is restored to the authoritative snapshot + remaining pending.

On each TurnEvent.Committed emission received by the background collector:

  • Duplicate key (same DedupKey as a previously applied entry): the entry is skipped entirely — the ClientSessionTable gate prevents double-apply to the authoritative snapshot. The confirmed count still advances.

  • Match (committed action equals oldest pending, first-apply): the pending entry is confirmed and discarded. The authoritative snapshot advances to include the confirmed action. No rollback is needed.

  • Mismatch (foreign peer's action, or reorder, first-apply): the authoritative snapshot advances to include the committed action. All remaining pending inputs are replayed on top to produce the new speculativeState.

Constraints

Usage

val speculative = SpeculativeSequencer(
sequencer = TurnSequencer(node, Move.serializer()),
game = myGame, // SpeculativeGame<GameState, Move>
initialState = state0,
scope = viewModelScope, // or a scope tied to session lifetime
)

// Observe for UI — always up to date with speculative state:
speculative.speculativeState.collect { render(it) }

// Propose a local player's move (suspends until quorum confirms):
try {
speculative.propose(myMove)
} catch (e: LeadershipLostException) { /* retry */}

// Durable propose with a caller-owned request ID for cross-crash exactly-once:
try {
speculative.propose(myMove, requestId = nextSerial)
} catch (e: LeadershipLostException) { /* retry with same requestId */}

Parameters

sequencer

The backing TurnSequencer. Lifetime is owned by the caller.

game

The consumer-owned state machine. Must be pure and deterministic.

initialState

The authoritative starting state (before any actions).

scope

The CoroutineScope that owns the background committed-event collector. Cancel this scope to stop the collector.

Samples

runTest(timeout = 5.seconds) {
    // A trivially pure game: state is a list of committed integers.
    val counterGame = object : SpeculativeGame<List<Int>, Int> {
        override fun apply(state: List<Int>, action: Int): List<Int> = state + action
        override fun snapshot(state: List<Int>): List<Int> = state.toList()
        override fun restore(snapshot: List<Int>): List<Int> = snapshot.toList()
    }

    val node = FakeRaftNode()
    node.setRole(RaftRole.Leader)
    val sequencer = TurnSequencer(node, Int.serializer())

    val speculative = SpeculativeSequencer(
        sequencer = sequencer,
        game = counterGame,
        initialState = emptyList(),
        scope = backgroundScope,
    )

    // Optimistic apply: speculativeState reflects 42 immediately, before quorum.
    val proposed = async { speculative.propose(42) }
    assertEquals(listOf(42), speculative.speculativeState.value)

    // Once quorum confirms, pending count drops to 0.
    val indexed = proposed.await()
    assertEquals(42, indexed.action)
    speculative.awaitConfirmedCount(1)
    assertEquals(0, speculative.pendingCount)
    assertEquals(listOf(42), speculative.speculativeState.value)
}

Constructors

Link copied to clipboard
constructor(sequencer: TurnSequencer<A>, game: SpeculativeGame<S, A>, initialState: S, scope: CoroutineScope)

Properties

Link copied to clipboard
val confirmedCount: StateFlow<Int>

The number of committed events processed by the background collector.

Link copied to clipboard

The number of locally proposed actions waiting for confirmation.

Link copied to clipboard
val speculativeState: StateFlow<S>

The current speculative game state, including all locally proposed but not-yet-confirmed actions on top of the last authoritative snapshot.

Functions

Link copied to clipboard
suspend fun awaitConfirmedCount(count: Int)

Suspends until at least count committed events have been processed by the background collector. Use in tests to synchronize assertions after pushing commits via FakeRaftNode.pushCommitted.

Link copied to clipboard
suspend fun propose(action: A): IndexedAction<A>

Proposes action for Raft replication and returns after a quorum commits it.

suspend fun propose(action: A, requestId: Long): IndexedAction<A>

Proposes action with a caller-pinned requestId for cross-crash exactly-once, then returns after a quorum commits it (same semantics as propose).