import net, { Socket } from "node:net"; import { SessionState } from "../state"; import { TextEncoder } from "node:util"; const encoder = new TextEncoder(); const decoder = new TextDecoder(); const Headers = { BEGIN_FRAME: Buffer.from(encoder.encode("BEGIN_FRAME")), END_FRAME: Buffer.from(encoder.encode("END_FRAME")), STATE_MSG: Buffer.from(encoder.encode("BEGIN_STATE_MSG")), }; export class Connection { private readBuffer: Buffer = Buffer.alloc(0); private frameBuffer: Uint8Array[] = []; private handleData(data: Buffer) { // append to buffer this.readBuffer = Buffer.concat([this.readBuffer, data]); while (true) { if (this.readBuffer.length < Headers.BEGIN_FRAME.length + 4) { return; } if ( !this.readBuffer .subarray(0, Headers.BEGIN_FRAME.length) .equals(Headers.BEGIN_FRAME) ) { throw new Error("Missing BEGIN_FRAME!"); } const lengthOffset = Headers.BEGIN_FRAME.length; const messageLength = this.readBuffer.readUInt32LE(lengthOffset); const totalFrameLength = Headers.BEGIN_FRAME.length + 4 + messageLength + Headers.END_FRAME.length; if (this.readBuffer.length < totalFrameLength) { return; } const messageStart = lengthOffset + 4; const messageEnd = messageStart + messageLength; const message = this.readBuffer.subarray(messageStart, messageEnd); const endFrameStart = messageEnd; if ( !this.readBuffer .subarray(endFrameStart, endFrameStart + Headers.END_FRAME.length) .equals(Headers.END_FRAME) ) { console.log(decoder.decode(this.readBuffer)); console.log( this.readBuffer.subarray( endFrameStart, endFrameStart + Headers.END_FRAME.length ) ); throw new Error("Invalid END_FRAME!"); } this.frameBuffer.push(message); this.readBuffer = this.readBuffer.subarray(totalFrameLength); } } constructor(private socket: Socket) { socket.on("data", this.handleData.bind(this)); } private serializeState(state: SessionState): Uint8Array { const json = JSON.stringify(state); return encoder.encode(json); } private deserializeState(state: Uint8Array): SessionState { const json = decoder.decode(state); return JSON.parse(json); } public sendState(state: SessionState) { const serializedState = this.serializeState(state); const message = new Uint8Array([...Headers.STATE_MSG, ...serializedState]); const len = Buffer.alloc(4); len.writeUInt32LE(message.length); const msg = new Uint8Array([ ...Headers.BEGIN_FRAME, ...len, ...message, ...Headers.END_FRAME, ]); this.socket.write(msg); } public tryReadState(): SessionState | undefined { const frame = this.frameBuffer.find((frame) => { return Headers.STATE_MSG.every((v, i) => frame[i] === v); }); if (!frame) { return; } this.frameBuffer.splice(this.frameBuffer.indexOf(frame), 1); const msg = frame.slice(Headers.STATE_MSG.length); const state = this.deserializeState(msg); return state; } } export const advanceStateRemote = async ( connection: Connection, sessionState: SessionState ): Promise => { const remoteState = await new Promise((res, rej) => { const timeout = setTimeout(rej, 2500); const interval = setInterval(async () => { const state = connection.tryReadState(); if (state) { clearInterval(timeout); clearInterval(interval); res(state); } }); }); if (remoteState.seqno !== sessionState.seqno) { throw new Error(`Misaligned seqno!`); } return { ...sessionState, inboundEventQueue: remoteState.outboundEventQueue, remotePlayerGameState: remoteState.localPlayerGameState, }; }; export const getConnection = async ({ localPort, remotePort, hostname, }: { localPort?: number; remotePort?: number; hostname?: string; }) => { return new Promise(async (res, rej) => { let serverSocket: Socket | undefined; let clientSocket: Socket | undefined; const handleConnection = () => { if (serverSocket) { res(new Connection(serverSocket)); return; } else if (clientSocket) { res(new Connection(clientSocket)); return; } throw new Error("No valid connection!"); }; if (remotePort) { console.log(`Trying to connect on ${remotePort}`); if (!hostname) { throw new Error( `A hostname is required when connecting to a remote machine` ); } clientSocket = net.createConnection( remotePort, hostname, handleConnection ); } else if (localPort) { console.log(`Listening on ${localPort}`); const server = net.createServer((socket) => { serverSocket = socket; handleConnection(); }); server.listen(localPort); } else { throw new Error(`Must provide localPort OR remotePort`); } }); };