STOMP parser
🏷️ state data
🏷️ conditional transitions
🏷️ immediate (always) transitions
🏷️ events
This machine parses STOMP messages, as defined by the spec. This parser roughly mirrors the Augmented BNF.
The machine starts in the idle
state, and when it receives a PARSE
event, it consumes input until landing in either the error
state or one of the command:client
or command:server
states. If the raw text data contains more messages, it loops around until all input is consumed.
As it finds valid tokens it keeps advancing the state’s currentIndex
, which is the current parse position in the raw
message string
The machine parses the input entirely with conditional immediate (always) transitions, and therefore consumes all given input synchronously.
It’s safe to send the machine multiple PARSE
events in a row, subscribing to state, and extracting each valid frame as the state changes.
import { defineMachine } from "yay-machine";import { BODY, CLIENT_COMMANDS, COMMAND, type Command, EOL, HEADER, NULL, SERVER_COMMANDS,} from "./stompGrammar";
/* * Parser state-machine */
interface IdleState { readonly name: "idle";}
interface ParseState { readonly raw: string; // the raw unparsed data readonly currentIndex: number;}
interface ParseStartState extends ParseState { readonly name: "parse:start"; readonly raw: string; // the raw unparsed data readonly currentIndex: number;}
interface ParseCommandState extends ParseState { readonly name: "parse:command"; readonly raw: string; // the raw unparsed data readonly currentIndex: number;}
interface ParseHeadersState extends ParseState { readonly name: "parse:headers"; readonly command: Command; readonly headers: Record<string, string>;}
interface ParseBodyState extends ParseState { readonly name: "parse:body"; readonly command: Command; readonly headers: Record<string, string>;}
interface FrameState extends Omit<ParseBodyState, "name"> { readonly name: "frame?" | "command:client" | "command:server"; readonly body: string;}
interface HeartbeatState extends ParseState { readonly name: "heartbeat";}
interface ErrorState extends ParseState { readonly name: "error"; readonly errorMessage: string;}
type ParserState = | IdleState | ParseStartState | ParseCommandState | ParseHeadersState | ParseBodyState | FrameState | HeartbeatState | ErrorState;
interface ParserEvent { readonly type: "PARSE"; readonly raw: string;}
/** * A parser for STOMP 1.2 frames. * * An example message from their docs looks like * * "MESSAGE * subscription:0 * message-id:007 * destination:/queue/a * content-type:text/plain * * hello queue a^@" * * @see * @see */export const stompParserMachine = defineMachine<ParserState, ParserEvent>({ initialState: { name: "idle" }, states: { "parse:start": { always: [ { to: "heartbeat", when: ({ state: { raw, currentIndex } }) =>, currentIndex), data: ({ state }) => state, }, { to: "parse:command", data: ({ state }) => state, }, ], }, "parse:command": { always: [ { to: "parse:headers", when: ({ state: { currentIndex, raw } }) =>, currentIndex) &&, COMMAND.newIndex()), data: ({ state }) => ({ ...state, currentIndex: EOL.newIndex(), command: COMMAND.match(), headers: {}, }), }, { to: "heartbeat", when: ({ state: { raw, currentIndex } }) =>, currentIndex), data: ({ state }) => state, }, { to: "error", data: ({ state }) => ({ ...state, errorMessage: `Command expected, found: "${state.raw.slice(0, 15)}..."`, }), }, ], }, "parse:headers": { always: [ { to: "parse:headers", when: ({ state: { currentIndex, raw } }) =>, currentIndex) &&, HEADER.newIndex()), data: ({ state }) => ({ ...state, currentIndex: EOL.newIndex(), headers: { ...state.headers, ...HEADER.match() }, }), }, { to: "parse:body", when: ({ state: { raw, currentIndex } }) =>, currentIndex), data: ({ state }) => ({ ...state, currentIndex: EOL.newIndex() }), }, { to: "error", data: ({ state: { raw, currentIndex } }) => ({ raw, currentIndex, errorMessage: `Invalid headers, at: "${raw.slice(currentIndex, currentIndex + 15)}..."`, }), }, ], }, "parse:body": { always: [ { to: "frame?", when: ({ state: { raw, currentIndex } }) =>, currentIndex) &&, BODY.newIndex()) && EOL.anyAt(raw, NULL.newIndex()), data: ({ state }) => ({ ...state, body: BODY.match(), currentIndex: EOL.newIndex(), }), }, { to: "error", data: ({ state: { raw, currentIndex } }) => ({ raw, currentIndex, errorMessage: `Invalid body/missing null, at: "${raw.slice(currentIndex, 15)}..."`, }), }, ], }, "frame?": { always: [ { to: "command:client", when: ({ state: { command } }) => CLIENT_COMMANDS.includes( command as (typeof CLIENT_COMMANDS)[number], ), data: ({ state }) => state, }, { to: "command:server", when: ({ state: { command } }) => SERVER_COMMANDS.includes( command as (typeof SERVER_COMMANDS)[number], ), data: ({ state }) => state, }, ], }, "command:client": { always: { to: "parse:start", when: ({ state: { raw, currentIndex } }) =>, currentIndex) &&, COMMAND.newIndex()), data: ({ state }) => state, }, }, "command:server": { always: { to: "parse:start", when: ({ state: { raw, currentIndex } }) =>, currentIndex) &&, COMMAND.newIndex()), data: ({ state }) => state, }, }, }, on: { PARSE: { to: "parse:start", data: ({ event: { raw } }) => ({ raw, currentIndex: 0 }), }, },});
import { Matcher } from "./parseUtils";
/* * STOMP message grammar parsers */
export const NULL = new Matcher<null>((raw, currentIndex) => raw.charCodeAt(currentIndex) === 0 ? [null, currentIndex + 1] : false,);
export const EOL = new Matcher((raw, currentIndex) => { if (raw[currentIndex] === "\n") { return ["\n", currentIndex + 1]; } if (raw[currentIndex] === "\r" && raw[currentIndex + 1] === "\n") { return ["\r\n", currentIndex + 1]; } return false;});
export const SERVER_COMMANDS = [ "CONNECTED", "MESSAGE", "RECEIPT", "ERROR",] as const;
export const ALL_COMMANDS = [...CLIENT_COMMANDS, ...SERVER_COMMANDS] as const;
export const COMMAND = new Matcher((raw, currentIndex) => { const command = ALL_COMMANDS.find((it) => raw.startsWith(it)); if (!command) { return false; } return [command, currentIndex + command.length];});
export type Command = (typeof ALL_COMMANDS)[number];
export const HEADER = new Matcher<Record<string, string>>( (raw, currentIndex) => { let headerName: string | undefined; let headerValue: string | undefined; let colon = -1; let end = -1; for (let i = currentIndex; i < raw.length; i++) { if (, i)) { end = i; break; } if (raw[i] === ":") { if (colon !== -1) { return false; } colon = i; } } if (colon > currentIndex && end > colon) { headerName = raw.slice(currentIndex, colon); headerValue = raw.slice(colon + 1, end); return [{ [headerName]: headerValue }, end]; } return false; },);
export const BODY = new Matcher<string>((raw, currentIndex) => { for (let i = currentIndex; i < raw.length; i++) { if (, i)) { return [raw.slice(currentIndex, i), i]; } } return false;});
/* * Parser utils to define token parsers and consume input */
export type Match<Value> = ( raw: string, currentIndex: number,) => false | [value: Value, currentIndex: number];
export class Matcher<Value> { constructor(readonly doMatch: Match<Value>) { this.lastMatch = false; }
private lastMatch: ReturnType<Match<Value>>;
/** * @returns true if the matcher matches once at the current position */ at(raw: string, currentIndex: number): boolean { this.lastMatch = this.doMatch(raw, currentIndex); return !!this.lastMatch; }
/** * @returns true if the matcher matches zero or more times at the current position */ anyAt(raw: string, currentIndex: number): boolean { let index = currentIndex; let lastMatch: ReturnType<Match<Value>>; do { lastMatch = this.doMatch(raw, index); if (lastMatch) { index = lastMatch[1]; this.lastMatch = lastMatch; } else { this.lastMatch = [undefined!, index]; break; } } while (lastMatch);
return true; }
/** * @returns the last matched value */ match(): Value { if (!Array.isArray(this.lastMatch)) { throw new Error("Last match was not success"); } return this.lastMatch[0]; }
/** * @returns the new index after the last matched value */ newIndex(): number { if (!Array.isArray(this.lastMatch)) { throw new Error("Last match was not success"); } return this.lastMatch[1]; }}
import assert from "assert";import { stompParserMachine } from "./stompParserMachine";
const raw = `MESSAGEsubscription:0message-id:007destination:/queue/acontent-type:text/plain
hello queue a\u0000`;
const machine = stompParserMachine.newInstance().start();machine.send({ type: "PARSE", raw });assert.deepStrictEqual(machine.state, { name: "command:server", raw, currentIndex: 99, command: "MESSAGE", headers: { subscription: "0", "message-id": "007", destination: "/queue/a", "content-type": "text/plain", }, body: "hello queue a",});