diff options
author | Kai Stevenson <kai@kaistevenson.com> | 2025-06-29 15:56:06 -0700 |
---|---|---|
committer | Kai Stevenson <kai@kaistevenson.com> | 2025-06-29 16:36:20 -0700 |
commit | 3df0abc4b0a374c57845e96339bf5a46d8189364 (patch) | |
tree | e11dcea578948fbd134de93149f5576ab7ff2c8c /src/connection | |
parent | ffcdb8b126c5267040ddc8f0304b153fa88755e5 (diff) |
Diffstat (limited to 'src/connection')
-rw-r--r-- | src/connection/index.ts | 197 |
1 files changed, 197 insertions, 0 deletions
diff --git a/src/connection/index.ts b/src/connection/index.ts new file mode 100644 index 0000000..828fcb5 --- /dev/null +++ b/src/connection/index.ts @@ -0,0 +1,197 @@ +import net, { Socket } from "node:net"; +import { SessionState } from "../state"; +import { TextEncoder } from "node:util"; + +const encoder = new TextEncoder(); +const decoder = new TextDecoder(); + +const Headers = { + BEGIN_FRAME: Buffer.from(encoder.encode("BEGIN_FRAME")), + END_FRAME: Buffer.from(encoder.encode("END_FRAME")), + STATE_MSG: Buffer.from(encoder.encode("BEGIN_STATE_MSG")), +}; + +export class Connection { + private readBuffer: Buffer = Buffer.alloc(0); + private frameBuffer: Uint8Array[] = []; + + private handleData(data: Buffer) { + // append to buffer + this.readBuffer = Buffer.concat([this.readBuffer, data]); + + while (true) { + if (this.readBuffer.length < Headers.BEGIN_FRAME.length + 4) { + return; + } + + if ( + !this.readBuffer + .subarray(0, Headers.BEGIN_FRAME.length) + .equals(Headers.BEGIN_FRAME) + ) { + throw new Error("Missing BEGIN_FRAME!"); + } + + const lengthOffset = Headers.BEGIN_FRAME.length; + const messageLength = this.readBuffer.readUInt32LE(lengthOffset); + + const totalFrameLength = + Headers.BEGIN_FRAME.length + + 4 + + messageLength + + Headers.END_FRAME.length; + + if (this.readBuffer.length < totalFrameLength) { + return; + } + + const messageStart = lengthOffset + 4; + const messageEnd = messageStart + messageLength; + const message = this.readBuffer.subarray(messageStart, messageEnd); + + const endFrameStart = messageEnd; + if ( + !this.readBuffer + .subarray(endFrameStart, endFrameStart + Headers.END_FRAME.length) + .equals(Headers.END_FRAME) + ) { + console.log(decoder.decode(this.readBuffer)); + console.log( + this.readBuffer.subarray( + endFrameStart, + endFrameStart + Headers.END_FRAME.length + ) + ); + throw new Error("Invalid END_FRAME!"); + } + + this.frameBuffer.push(message); + + this.readBuffer = this.readBuffer.subarray(totalFrameLength); + } + } + + constructor(private socket: Socket) { + socket.on("data", this.handleData.bind(this)); + } + + private serializeState(state: SessionState): Uint8Array { + const json = JSON.stringify(state); + return encoder.encode(json); + } + + private deserializeState(state: Uint8Array): SessionState { + const json = decoder.decode(state); + return JSON.parse(json); + } + + public sendState(state: SessionState) { + const serializedState = this.serializeState(state); + const message = new Uint8Array([...Headers.STATE_MSG, ...serializedState]); + + const len = Buffer.alloc(4); + len.writeUInt32LE(message.length); + + const msg = new Uint8Array([ + ...Headers.BEGIN_FRAME, + ...len, + ...message, + ...Headers.END_FRAME, + ]); + + this.socket.write(msg); + } + + public tryReadState(): SessionState | undefined { + const frame = this.frameBuffer.find((frame) => { + return Headers.STATE_MSG.every((v, i) => frame[i] === v); + }); + + if (!frame) { + return; + } + + this.frameBuffer.splice(this.frameBuffer.indexOf(frame), 1); + + const msg = frame.slice(Headers.STATE_MSG.length); + const state = this.deserializeState(msg); + + return state; + } +} + +export const advanceStateRemote = async ( + connection: Connection, + sessionState: SessionState +): Promise<SessionState> => { + const remoteState = await new Promise<SessionState>((res, rej) => { + const timeout = setTimeout(rej, 2500); + const interval = setInterval(async () => { + const state = connection.tryReadState(); + if (state) { + clearInterval(timeout); + clearInterval(interval); + res(state); + } + }); + }); + + if (remoteState.seqno !== sessionState.seqno) { + throw new Error(`Misaligned seqno!`); + } + return { + ...sessionState, + inboundEventQueue: remoteState.outboundEventQueue, + remotePlayerGameState: remoteState.localPlayerGameState, + }; +}; + +export const getConnection = async ({ + localPort, + remotePort, + hostname, +}: { + localPort?: number; + remotePort?: number; + hostname?: string; +}) => { + return new Promise<Connection>(async (res, rej) => { + let serverSocket: Socket | undefined; + let clientSocket: Socket | undefined; + + const handleConnection = () => { + if (serverSocket) { + res(new Connection(serverSocket)); + return; + } else if (clientSocket) { + res(new Connection(clientSocket)); + return; + } + + throw new Error("No valid connection!"); + }; + + if (remotePort) { + console.log(`Trying to connect on ${remotePort}`); + if (!hostname) { + throw new Error( + `A hostname is required when connecting to a remote machine` + ); + } + clientSocket = net.createConnection( + remotePort, + hostname, + handleConnection + ); + } else if (localPort) { + console.log(`Listening on ${localPort}`); + const server = net.createServer((socket) => { + serverSocket = socket; + handleConnection(); + }); + server.listen(localPort); + } else { + throw new Error(`Must provide localPort OR remotePort`); + } + }); +}; |