Stock tickers
🏷️ state data
🏷️ event payload
🏷️ machine start side-effect
🏷️ state entry side-effect
🏷️ transition side-effect
🏷️ any state + event transition
🏷️ delayed transition
🏷️ send event to self
🏷️ composing machines
🏷️ web sockets
About
This example has two machines:
- a per stock-symbol price machine, and
- a ticker machine composing zero or more price machines.
Try it
Price machine
Models the realtime price of a single stock. The machine’s state name tells us if the price is pending
, live
or stale
.
When the machine receives a TICK
event it updates its state and starts a timer with a state onEntry()
side-effect. If a new price is not received within the current price’s timeValid
ms, the machine sends itself a STALE
event, and it transitions to stale
(trading terminology: “stale” means the price is too old and cannot be used to place a trade).
import { type MachineInstance, defineMachine } from "yay-machine";
/* * Models a stock price moving up/down and whether it is fresh (live) or old (stale) */
export interface PricePendingState { readonly name: "pending";}
export interface PriceLiveState { readonly name: "live"; readonly price: number; readonly priceTime: number; readonly timeValid: number; readonly change: "up" | "down" | "none";}
export interface PriceStaleState { readonly name: "stale"; readonly price: number; readonly priceTime: number;}
export type PriceState = PricePendingState | PriceLiveState | PriceStaleState;
export interface TickEvent { readonly type: "TICK"; readonly price: number; readonly timeValid: number; // aka TTL (Time To Live) - how long the price is valid for}
export interface StaleEvent { readonly type: "STALE";}
export type PriceEvent = TickEvent | StaleEvent;
export type PriceMachine = MachineInstance<PriceState, PriceEvent>;
const getChange = ( previous: number, current: number,): PriceLiveState["change"] => { if (current > previous) { return "up"; } if (current < previous) { return "down"; } return "none";};
const updateState = ( state: PriceState, event: TickEvent,): Omit<PriceLiveState, "name"> => ({ price: event.price, priceTime: Date.now(), timeValid: event.timeValid, change: getChange( state.name !== "pending" ? state.price : event.price, event.price, ),});
export const priceMachine = defineMachine<PriceState, PriceEvent>({ initialState: { name: "pending" }, states: { pending: { on: { TICK: { to: "live", data: ({ state, event }) => updateState(state, event), }, }, }, live: { onEnter: ({ state, send }) => { const timer = setTimeout( () => send({ type: "STALE" }), state.timeValid, ); return () => clearTimeout(timer); }, on: { TICK: { to: "live", data: ({ state, event }) => updateState(state, event), }, STALE: { to: "stale", data: ({ state }) => ({ price: state.price, priceTime: state.priceTime, }), }, }, }, stale: { on: { TICK: { to: "live", data: ({ state, event }) => updateState(state, event), }, }, }, },});
Usage
import assert from "assert";import { type PriceLiveState, priceMachine } from "./priceMachine";
const price = priceMachine.newInstance().start();price.send({ type: "TICK", price: 41, timeValid: 5_000 });price.send({ type: "TICK", price: 42, timeValid: 5_000 });const { priceTime: _, ...state } = price.state as PriceLiveState;assert.deepStrictEqual(state, { name: "live", price: 42, change: "up", timeValid: 5_000,});
Ticker machine
Models a stock price ticker, with ticking prices for zero or more symbols.
The state name (connecting
, connected
and connectionError
) tells us about the status of the connection to the fictional WebSocket price service.
A machine onStart()
side-effect initiates the WebSocket client connection and setups up event listeners for lifecycle callbacks (onopen
, onerror
and onmessage
). In fact all three of these WebSocket events trigger the machine to send itself an equivalent event. The side-effect returns a cleanup function to close the connection when the machine is stopped.
Client code can add/remove price-tickers by sending ADD_TICKER
/ REMOVE_TICKER
events respectively. These events are handled in any state, and if the machine is currently connected
they send “subscribe” or “unsubscribe” messages for the symbol to the remote service.
As symbols are added/removed, the tickers machine creates/destroys price machines and adds/removes them to/from its own state data.
Multiple clients can request symbols to be added/removed, and the machine uses reference-counting to share the price machines and subscribe-to/unsubscribe-from the underlying WebSocket service just once per symbol.
Establishing a connection to a WebSocket service is an async operation so if the client added tickers before the connection is ready, the machine subscribes for all current symbols when it enters the connected
state, but only if the triggering event is CONNECTED
. (If we didn’t check the event, when a new symbol was added, we would re-subscribe for all previous symbols again.)
When the machine receives data from the WebSocket service, it parses the string and extracts the prices for each symbol, and sends a TICK
event to the relevant price machines.
import { defineMachine } from "yay-machine";import { type PriceMachine, priceMachine } from "../price/priceMachine";
interface CommonState { readonly url: string; readonly symbols: Record</* symbol */ string, PriceMachine>; readonly subscriptions: Record< /* subscriptionId */ string, /* symbol */ string >;}
interface ConnectingState extends CommonState { readonly name: "connecting";}
interface ConnectedState extends CommonState { readonly name: "connected"; readonly socket: WebSocket;}
interface ConnectionErrorState extends CommonState { readonly name: "connectionError"; readonly errorMessage: string;}
export type TickersState = | ConnectingState | ConnectedState | ConnectionErrorState;
interface ConnectedEvent { readonly type: "CONNECTED"; readonly socket: WebSocket;}
interface ConnectionErrorEvent { readonly type: "CONNECTION_ERROR"; readonly errorMessage: string;}
interface ReceivedDataEvent { readonly type: "RECEIVED_DATA"; readonly data: string;}
interface AddTickerEvent { readonly type: "ADD_TICKER"; readonly symbol: string; readonly subscriptionId: string;}
interface RemoveTickerEvent { readonly type: "REMOVE_TICKER"; readonly subscriptionId: string;}
export type TickersEvent = | ConnectedEvent | ConnectionErrorEvent | ReceivedDataEvent | AddTickerEvent | RemoveTickerEvent;
/** * A multi-symbol, multi-client stock tickers machine modelling active subscriptions and API integration. */export const tickerMachine = defineMachine<TickersState, TickersEvent>({ initialState: { name: "connecting", url: undefined!, symbols: {}, subscriptions: {}, }, onStart: ({ state, send }) => { // connect to remote service and setup event handlers const socket = new WebSocket(state.url); socket.onopen = () => send({ type: "CONNECTED", socket }); socket.onerror = (e) => send({ type: "CONNECTION_ERROR", errorMessage: String(e) }); socket.onmessage = (e) => send({ type: "RECEIVED_DATA", data: e.data });
return () => socket.close(); }, states: { connecting: { on: { CONNECTED: { to: "connected", data: ({ state, event }) => ({ ...state, socket: event.socket }), }, }, }, connected: { onEnter: ({ state, event }) => { // subscribe for all symbols added so far if (event?.type === "CONNECTED") { const symbols = Object.keys(state.symbols); if (symbols.length) { state.socket.send(`subscribe:${symbols.join(",")}`); } } }, }, }, on: { ADD_TICKER: [ { when: ({ state, event }) => !(event.symbol in state.symbols), data: ({ state, event }) => ({ ...state, symbols: { ...state.symbols, [event.symbol]: priceMachine.newInstance().start(), }, subscriptions: { ...state.subscriptions, [event.subscriptionId]: event.symbol, }, }), onTransition: ({ state, event }) => { if (state.name === "connected") { // subscribe for the new symbol state.socket.send(`subscribe:${event.symbol}`); } }, }, { data: ({ state, event }) => ({ ...state, subscriptions: { ...state.subscriptions, [event.subscriptionId]: event.symbol, }, }), }, ], REMOVE_TICKER: [ { when: ({ state, event }) => { const symbol = state.subscriptions[event.subscriptionId]; const symbols = Object.values(state.subscriptions); return ( event.subscriptionId in state.subscriptions && symbols.filter((it) => it === symbol).length === 1 ); }, data: ({ state, event }) => { const { [event.subscriptionId]: symbol, ...subscriptions } = state.subscriptions; const { [symbol]: ticker, ...symbols } = state.symbols; ticker.stop(); return { ...state, symbols, subscriptions, }; }, onTransition: ({ state, event }) => { if (state.name === "connected") { // unsubscribe for the symbol state.socket.send( `unsubscribe:${state.subscriptions[event.subscriptionId]}`, ); } }, }, { when: ({ state, event }) => event.subscriptionId in state.subscriptions, data: ({ state, event }) => { const { [event.subscriptionId]: symbol, ...subscriptions } = state.subscriptions; return { ...state, subscriptions, }; }, }, ], RECEIVED_DATA: { data: ({ state }) => state, onTransition: ({ state, event }) => { // event.data looks something like // this "BBBB:23.4,CCCC:234.1,BBBB:19.7,DDDD:256.1" // format "<symbol>:<price>[,<symbol>:<price>]*" const ticks = event.data.split(",").map((it) => it.split(":")); for (const [symbol, price] of ticks) { state.symbols[symbol]?.send({ type: "TICK", price: Number.parseFloat(price), timeValid: 5_000, }); } }, }, CONNECTION_ERROR: { to: "connectionError", data: ({ state, event }) => ({ ...state, errorMessage: event.errorMessage, }), }, },});
Usage
import { tickerMachine } from "./tickerMachine";
const ticker = tickerMachine .newInstance({ initialState: { name: "connecting", url: "wss://yay-machine.js.org/prices", symbols: {}, subscriptions: {}, }, }) .start();
ticker.send({ type: "ADD_TICKER", symbol: "YAAY", subscriptionId: "sub-1" });ticker.send({ type: "ADD_TICKER", symbol: "MCHN", subscriptionId: "sub-2" });
ticker.state.symbols["YAAY"].subscribe(({ state }) => { if (state.name === "live") { console.log("YAAY price went %s and is now %s", state.change, state.price); } else if (state.name === "stale") { console.log("YAAY price is stale %s", state.price); }});