summaryrefslogtreecommitdiff
path: root/src/connection
diff options
context:
space:
mode:
authorKai Stevenson <kai@kaistevenson.com>2025-06-29 15:56:06 -0700
committerKai Stevenson <kai@kaistevenson.com>2025-06-29 16:36:20 -0700
commit3df0abc4b0a374c57845e96339bf5a46d8189364 (patch)
treee11dcea578948fbd134de93149f5576ab7ff2c8c /src/connection
parentffcdb8b126c5267040ddc8f0304b153fa88755e5 (diff)
networking worksHEADmaster
Diffstat (limited to 'src/connection')
-rw-r--r--src/connection/index.ts197
1 files changed, 197 insertions, 0 deletions
diff --git a/src/connection/index.ts b/src/connection/index.ts
new file mode 100644
index 0000000..828fcb5
--- /dev/null
+++ b/src/connection/index.ts
@@ -0,0 +1,197 @@
+import net, { Socket } from "node:net";
+import { SessionState } from "../state";
+import { TextEncoder } from "node:util";
+
+const encoder = new TextEncoder();
+const decoder = new TextDecoder();
+
+const Headers = {
+ BEGIN_FRAME: Buffer.from(encoder.encode("BEGIN_FRAME")),
+ END_FRAME: Buffer.from(encoder.encode("END_FRAME")),
+ STATE_MSG: Buffer.from(encoder.encode("BEGIN_STATE_MSG")),
+};
+
+export class Connection {
+ private readBuffer: Buffer = Buffer.alloc(0);
+ private frameBuffer: Uint8Array[] = [];
+
+ private handleData(data: Buffer) {
+ // append to buffer
+ this.readBuffer = Buffer.concat([this.readBuffer, data]);
+
+ while (true) {
+ if (this.readBuffer.length < Headers.BEGIN_FRAME.length + 4) {
+ return;
+ }
+
+ if (
+ !this.readBuffer
+ .subarray(0, Headers.BEGIN_FRAME.length)
+ .equals(Headers.BEGIN_FRAME)
+ ) {
+ throw new Error("Missing BEGIN_FRAME!");
+ }
+
+ const lengthOffset = Headers.BEGIN_FRAME.length;
+ const messageLength = this.readBuffer.readUInt32LE(lengthOffset);
+
+ const totalFrameLength =
+ Headers.BEGIN_FRAME.length +
+ 4 +
+ messageLength +
+ Headers.END_FRAME.length;
+
+ if (this.readBuffer.length < totalFrameLength) {
+ return;
+ }
+
+ const messageStart = lengthOffset + 4;
+ const messageEnd = messageStart + messageLength;
+ const message = this.readBuffer.subarray(messageStart, messageEnd);
+
+ const endFrameStart = messageEnd;
+ if (
+ !this.readBuffer
+ .subarray(endFrameStart, endFrameStart + Headers.END_FRAME.length)
+ .equals(Headers.END_FRAME)
+ ) {
+ console.log(decoder.decode(this.readBuffer));
+ console.log(
+ this.readBuffer.subarray(
+ endFrameStart,
+ endFrameStart + Headers.END_FRAME.length
+ )
+ );
+ throw new Error("Invalid END_FRAME!");
+ }
+
+ this.frameBuffer.push(message);
+
+ this.readBuffer = this.readBuffer.subarray(totalFrameLength);
+ }
+ }
+
+ constructor(private socket: Socket) {
+ socket.on("data", this.handleData.bind(this));
+ }
+
+ private serializeState(state: SessionState): Uint8Array {
+ const json = JSON.stringify(state);
+ return encoder.encode(json);
+ }
+
+ private deserializeState(state: Uint8Array): SessionState {
+ const json = decoder.decode(state);
+ return JSON.parse(json);
+ }
+
+ public sendState(state: SessionState) {
+ const serializedState = this.serializeState(state);
+ const message = new Uint8Array([...Headers.STATE_MSG, ...serializedState]);
+
+ const len = Buffer.alloc(4);
+ len.writeUInt32LE(message.length);
+
+ const msg = new Uint8Array([
+ ...Headers.BEGIN_FRAME,
+ ...len,
+ ...message,
+ ...Headers.END_FRAME,
+ ]);
+
+ this.socket.write(msg);
+ }
+
+ public tryReadState(): SessionState | undefined {
+ const frame = this.frameBuffer.find((frame) => {
+ return Headers.STATE_MSG.every((v, i) => frame[i] === v);
+ });
+
+ if (!frame) {
+ return;
+ }
+
+ this.frameBuffer.splice(this.frameBuffer.indexOf(frame), 1);
+
+ const msg = frame.slice(Headers.STATE_MSG.length);
+ const state = this.deserializeState(msg);
+
+ return state;
+ }
+}
+
+export const advanceStateRemote = async (
+ connection: Connection,
+ sessionState: SessionState
+): Promise<SessionState> => {
+ const remoteState = await new Promise<SessionState>((res, rej) => {
+ const timeout = setTimeout(rej, 2500);
+ const interval = setInterval(async () => {
+ const state = connection.tryReadState();
+ if (state) {
+ clearInterval(timeout);
+ clearInterval(interval);
+ res(state);
+ }
+ });
+ });
+
+ if (remoteState.seqno !== sessionState.seqno) {
+ throw new Error(`Misaligned seqno!`);
+ }
+ return {
+ ...sessionState,
+ inboundEventQueue: remoteState.outboundEventQueue,
+ remotePlayerGameState: remoteState.localPlayerGameState,
+ };
+};
+
+export const getConnection = async ({
+ localPort,
+ remotePort,
+ hostname,
+}: {
+ localPort?: number;
+ remotePort?: number;
+ hostname?: string;
+}) => {
+ return new Promise<Connection>(async (res, rej) => {
+ let serverSocket: Socket | undefined;
+ let clientSocket: Socket | undefined;
+
+ const handleConnection = () => {
+ if (serverSocket) {
+ res(new Connection(serverSocket));
+ return;
+ } else if (clientSocket) {
+ res(new Connection(clientSocket));
+ return;
+ }
+
+ throw new Error("No valid connection!");
+ };
+
+ if (remotePort) {
+ console.log(`Trying to connect on ${remotePort}`);
+ if (!hostname) {
+ throw new Error(
+ `A hostname is required when connecting to a remote machine`
+ );
+ }
+ clientSocket = net.createConnection(
+ remotePort,
+ hostname,
+ handleConnection
+ );
+ } else if (localPort) {
+ console.log(`Listening on ${localPort}`);
+ const server = net.createServer((socket) => {
+ serverSocket = socket;
+ handleConnection();
+ });
+ server.listen(localPort);
+ } else {
+ throw new Error(`Must provide localPort OR remotePort`);
+ }
+ });
+};