SpeculativeSequencer
A wrapper over TurnSequencer that applies local actions optimistically before their committed index arrives, then rolls back and replays if the committed order differs from what was predicted.
Mechanism
On propose:
The action is applied immediately to speculativeState (optimistic apply).
The action is appended to a pending-input buffer.
The underlying TurnSequencer.propose is called and suspends until a quorum commits.
If propose throws (e.g. us.tractat.kuilt.raft.LeadershipLostException), the speculative apply is rolled back — the action is removed from the pending buffer and speculativeState is restored to the authoritative snapshot + remaining pending.
On each TurnEvent.Committed emission received by the background collector:
Duplicate key (same DedupKey as a previously applied entry): the entry is skipped entirely — the ClientSessionTable gate prevents double-apply to the authoritative snapshot. The confirmed count still advances.
Match (committed action equals oldest pending, first-apply): the pending entry is confirmed and discarded. The authoritative snapshot advances to include the confirmed action. No rollback is needed.
Mismatch (foreign peer's action, or reorder, first-apply): the authoritative snapshot advances to include the committed action. All remaining pending inputs are replayed on top to produce the new speculativeState.
Constraints
SpeculativeGame.apply must be deterministic and pure — replay correctness depends on it. See SpeculativeGame KDoc.
Log compaction rehydrates. A snapshot install from Raft surfaces as a TurnEvent.Reset on the backing TurnSequencer.events stream; the pending buffer is discarded and the authoritative state is rebuilt via SpeculativeGame.fromSnapshot (which must be implemented for compaction- enabled sessions). See SpeculativeGame for the boundary note.
Single collector. The TurnSequencer.events flow must not be collected elsewhere — the collector backing speculativeState is the single consumer of turn events.
Usage
val speculative = SpeculativeSequencer(
sequencer = TurnSequencer(node, Move.serializer()),
game = myGame, // SpeculativeGame<GameState, Move>
initialState = state0,
scope = viewModelScope, // or a scope tied to session lifetime
)
// Observe for UI — always up to date with speculative state:
speculative.speculativeState.collect { render(it) }
// Propose a local player's move (suspends until quorum confirms):
try {
speculative.propose(myMove)
} catch (e: LeadershipLostException) { /* retry */}
// Durable propose with a caller-owned request ID for cross-crash exactly-once:
try {
speculative.propose(myMove, requestId = nextSerial)
} catch (e: LeadershipLostException) { /* retry with same requestId */}Parameters
The backing TurnSequencer. Lifetime is owned by the caller.
The consumer-owned state machine. Must be pure and deterministic.
The authoritative starting state (before any actions).
The CoroutineScope that owns the background committed-event collector. Cancel this scope to stop the collector.
Samples
runTest(timeout = 5.seconds) {
// A trivially pure game: state is a list of committed integers.
val counterGame = object : SpeculativeGame<List<Int>, Int> {
override fun apply(state: List<Int>, action: Int): List<Int> = state + action
override fun snapshot(state: List<Int>): List<Int> = state.toList()
override fun restore(snapshot: List<Int>): List<Int> = snapshot.toList()
}
val node = FakeRaftNode()
node.setRole(RaftRole.Leader)
val sequencer = TurnSequencer(node, Int.serializer())
val speculative = SpeculativeSequencer(
sequencer = sequencer,
game = counterGame,
initialState = emptyList(),
scope = backgroundScope,
)
// Optimistic apply: speculativeState reflects 42 immediately, before quorum.
val proposed = async { speculative.propose(42) }
assertEquals(listOf(42), speculative.speculativeState.value)
// Once quorum confirms, pending count drops to 0.
val indexed = proposed.await()
assertEquals(42, indexed.action)
speculative.awaitConfirmedCount(1)
assertEquals(0, speculative.pendingCount)
assertEquals(listOf(42), speculative.speculativeState.value)
}Constructors
Properties
The number of committed events processed by the background collector.
The number of locally proposed actions waiting for confirmation.
The current speculative game state, including all locally proposed but not-yet-confirmed actions on top of the last authoritative snapshot.
Functions
Suspends until at least count committed events have been processed by the background collector. Use in tests to synchronize assertions after pushing commits via FakeRaftNode.pushCommitted.
Proposes action for Raft replication and returns after a quorum commits it.