STOMP parser
🏷️ state data
🏷️ conditional transitions
🏷️ immediate (always) transitions
🏷️ events
This machine parses STOMP messages, as defined by the spec. This parser roughly mirrors the Augmented BNF.
The machine starts in the idle state, and when it receives a PARSE event, it consumes input until landing in either the error state or one of the command:client or command:server states. If the raw text data contains more messages, it loops around until all input is consumed.
As it finds valid tokens it keeps advancing the state’s currentIndex, which is the current parse position in the raw message string.
The machine parses the input entirely with conditional immediate (always) transitions, and therefore consumes all given input synchronously.
It’s safe to send the machine multiple PARSE events in a row, subscribing to state, and extracting each valid frame as the state changes.
Definition
Section titled “Definition”import { defineMachine } from "yay-machine";import { BODY, CLIENT_COMMANDS, COMMAND, type Command, EOL, HEADER, NULL, SERVER_COMMANDS,} from "./stompGrammar";
/* * Parser state-machine */
interface IdleState { readonly name: "idle";}
interface ParseState { readonly raw: string; // the raw unparsed data readonly currentIndex: number;}
interface ParseStartState extends ParseState { readonly name: "parse:start"; readonly raw: string; // the raw unparsed data readonly currentIndex: number;}
interface ParseCommandState extends ParseState { readonly name: "parse:command"; readonly raw: string; // the raw unparsed data readonly currentIndex: number;}
interface ParseHeadersState extends ParseState { readonly name: "parse:headers"; readonly command: Command; readonly headers: Record<string, string>;}
interface ParseBodyState extends ParseState { readonly name: "parse:body"; readonly command: Command; readonly headers: Record<string, string>;}
interface FrameState extends Omit<ParseBodyState, "name"> { readonly name: "frame?" | "command:client" | "command:server"; readonly body: string;}
interface HeartbeatState extends ParseState { readonly name: "heartbeat";}
interface ErrorState extends ParseState { readonly name: "error"; readonly errorMessage: string;}
type ParserState = | IdleState | ParseStartState | ParseCommandState | ParseHeadersState | ParseBodyState | FrameState | HeartbeatState | ErrorState;
interface ParserEvent { readonly type: "PARSE"; readonly raw: string;}
/** * A parser for STOMP 1.2 frames. * * An example message from their docs looks like * * "MESSAGE * subscription:0 * message-id:007 * destination:/queue/a * content-type:text/plain * * hello queue a^@" * * @see https://stomp.github.io/stomp-specification-1.2.html * @see https://stomp.github.io/stomp-specification-1.2.html#Augmented_BNF */export const stompParserMachine = defineMachine<ParserState, ParserEvent>({ initialState: { name: "idle" }, states: { "parse:start": { always: [ { to: "heartbeat", when: ({ state: { raw, currentIndex } }) => EOL.at(raw, currentIndex), data: ({ state }) => state, }, { to: "parse:command", data: ({ state }) => state, }, ], }, "parse:command": { always: [ { to: "parse:headers", when: ({ state: { currentIndex, raw } }) => COMMAND.at(raw, currentIndex) && EOL.at(raw, COMMAND.newIndex()), data: ({ state }) => ({ ...state, currentIndex: EOL.newIndex(), command: COMMAND.match(), headers: {}, }), }, { to: "heartbeat", when: ({ state: { raw, currentIndex } }) => EOL.at(raw, currentIndex), data: ({ state }) => state, }, { to: "error", data: ({ state }) => ({ ...state, errorMessage: `Command expected, found: "${state.raw.slice(0, 15)}..."`, }), }, ], }, "parse:headers": { always: [ { to: "parse:headers", when: ({ state: { currentIndex, raw } }) => HEADER.at(raw, currentIndex) && EOL.at(raw, HEADER.newIndex()), data: ({ state }) => ({ ...state, currentIndex: EOL.newIndex(), headers: { ...state.headers, ...HEADER.match() }, }), }, { to: "parse:body", when: ({ state: { raw, currentIndex } }) => EOL.at(raw, currentIndex), data: ({ state }) => ({ ...state, currentIndex: EOL.newIndex() }), }, { to: "error", data: ({ state: { raw, currentIndex } }) => ({ raw, currentIndex, errorMessage: `Invalid headers, at: "${raw.slice(currentIndex, currentIndex + 15)}..."`, }), }, ], }, "parse:body": { always: [ { to: "frame?", when: ({ state: { raw, currentIndex } }) => BODY.at(raw, currentIndex) && NULL.at(raw, BODY.newIndex()) && EOL.anyAt(raw, NULL.newIndex()), data: ({ state }) => ({ ...state, body: BODY.match(), currentIndex: EOL.newIndex(), }), }, { to: "error", data: ({ state: { raw, currentIndex } }) => ({ raw, currentIndex, errorMessage: `Invalid body/missing null, at: "${raw.slice(currentIndex, 15)}..."`, }), }, ], }, "frame?": { always: [ { to: "command:client", when: ({ state: { command } }) => CLIENT_COMMANDS.includes( command as (typeof CLIENT_COMMANDS)[number], ), data: ({ state }) => state, }, { to: "command:server", when: ({ state: { command } }) => SERVER_COMMANDS.includes( command as (typeof SERVER_COMMANDS)[number], ), data: ({ state }) => state, }, ], }, "command:client": { always: { to: "parse:start", when: ({ state: { raw, currentIndex } }) => COMMAND.at(raw, currentIndex) && EOL.at(raw, COMMAND.newIndex()), data: ({ state }) => state, }, }, "command:server": { always: { to: "parse:start", when: ({ state: { raw, currentIndex } }) => COMMAND.at(raw, currentIndex) && EOL.at(raw, COMMAND.newIndex()), data: ({ state }) => state, }, }, }, on: { PARSE: { to: "parse:start", data: ({ event: { raw } }) => ({ raw, currentIndex: 0 }), }, },});import { Matcher } from "./parseUtils";
/* * STOMP message grammar parsers */
export const NULL = new Matcher<null>((raw, currentIndex) => raw.charCodeAt(currentIndex) === 0 ? [null, currentIndex + 1] : false,);
export const EOL = new Matcher((raw, currentIndex) => { if (raw[currentIndex] === "\n") { return ["\n", currentIndex + 1]; } if (raw[currentIndex] === "\r" && raw[currentIndex + 1] === "\n") { return ["\r\n", currentIndex + 1]; } return false;});
export const CLIENT_COMMANDS = [ "SEND", "SUBSCRIBE", "UNSUBSCRIBE", "BEGIN", "COMMIT", "ABORT", "ACK", "NACK", "DISCONNECT", "CONNECT", "STOMP",] as const;
export const SERVER_COMMANDS = [ "CONNECTED", "MESSAGE", "RECEIPT", "ERROR",] as const;
export const ALL_COMMANDS = [...CLIENT_COMMANDS, ...SERVER_COMMANDS] as const;
export const COMMAND = new Matcher((raw, currentIndex) => { const command = ALL_COMMANDS.find((it) => raw.startsWith(it)); if (!command) { return false; } return [command, currentIndex + command.length];});
export type Command = (typeof ALL_COMMANDS)[number];
export const HEADER = new Matcher<Record<string, string>>( (raw, currentIndex) => { let headerName: string | undefined; let headerValue: string | undefined; let colon = -1; let end = -1; for (let i = currentIndex; i < raw.length; i++) { if (EOL.at(raw, i)) { end = i; break; } if (raw[i] === ":") { if (colon !== -1) { return false; } colon = i; } } if (colon > currentIndex && end > colon) { headerName = raw.slice(currentIndex, colon); headerValue = raw.slice(colon + 1, end); return [{ [headerName]: headerValue }, end]; } return false; },);
export const BODY = new Matcher<string>((raw, currentIndex) => { for (let i = currentIndex; i < raw.length; i++) { if (NULL.at(raw, i)) { return [raw.slice(currentIndex, i), i]; } } return false;});/* * Parser utils to define token parsers and consume input */
export type Match<Value> = ( raw: string, currentIndex: number,) => false | [value: Value, currentIndex: number];
export class Matcher<Value> { constructor(doMatch: Match<Value>) { this.doMatch = doMatch; this.lastMatch = false; }
private readonly doMatch: Match<Value>; private lastMatch: ReturnType<Match<Value>>;
/** * @returns true if the matcher matches once at the current position */ at(raw: string, currentIndex: number): boolean { this.lastMatch = this.doMatch(raw, currentIndex); return !!this.lastMatch; }
/** * @returns true if the matcher matches zero or more times at the current position */ anyAt(raw: string, currentIndex: number): boolean { let index = currentIndex; let lastMatch: ReturnType<Match<Value>>; do { lastMatch = this.doMatch(raw, index); if (lastMatch) { index = lastMatch[1]; this.lastMatch = lastMatch; } else { this.lastMatch = [undefined!, index]; break; } } while (lastMatch);
return true; }
/** * @returns the last matched value */ match(): Value { if (!Array.isArray(this.lastMatch)) { throw new Error("Last match was not success"); } return this.lastMatch[0]; }
/** * @returns the new index after the last matched value */ newIndex(): number { if (!Array.isArray(this.lastMatch)) { throw new Error("Last match was not success"); } return this.lastMatch[1]; }}import assert from "assert";import { stompParserMachine } from "./stompParserMachine";
const raw = `MESSAGEsubscription:0message-id:007destination:/queue/acontent-type:text/plain
hello queue a\u0000`;
const machine = stompParserMachine.newInstance().start();machine.send({ type: "PARSE", raw });assert.deepStrictEqual(machine.state, { name: "command:server", raw, currentIndex: 99, command: "MESSAGE", headers: { subscription: "0", "message-id": "007", destination: "/queue/a", "content-type": "text/plain", }, body: "hello queue a",});