Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions src/web-socket-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import stream = require('stream');
import { V1Status } from './api';
import { KubeConfig } from './config';

const protocols = ['v4.channel.k8s.io', 'v3.channel.k8s.io', 'v2.channel.k8s.io', 'channel.k8s.io'];
const protocols = [
'v5.channel.k8s.io',
'v4.channel.k8s.io',
'v3.channel.k8s.io',
'v2.channel.k8s.io',
'channel.k8s.io',
];

export type TextHandler = (text: string) => boolean;
export type BinaryHandler = (stream: number, buff: Buffer) => boolean;
Expand All @@ -17,12 +23,39 @@ export interface WebSocketInterface {
): Promise<WebSocket.WebSocket>;
}

export interface StreamInterface {
stdin: stream.Readable;
stdout: stream.Writable;
stderr: stream.Writable;
}

export class WebSocketHandler implements WebSocketInterface {
public static readonly StdinStream: number = 0;
public static readonly StdoutStream: number = 1;
public static readonly StderrStream: number = 2;
public static readonly StatusStream: number = 3;
public static readonly ResizeStream: number = 4;
public static readonly CloseStream: number = 255;

public static supportsClose(protocol: string): boolean {
return protocol === 'v5.channel.k8s.io';
}

public static closeStream(streamNum: number, streams: StreamInterface): void {
console.log('Closing stream: ' + streamNum);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are three console.log()s left over.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugh, so embarrassing to leave test printfs in the PR :)

fixed.

switch (streamNum) {
case WebSocketHandler.StdinStream:
streams.stdin.pause();
break;
case WebSocketHandler.StdoutStream:
console.log('closing stdout');
streams.stdout.end();
break;
case WebSocketHandler.StderrStream:
streams.stderr.end();
break;
}
}

public static handleStandardStreams(
streamNum: number,
Expand All @@ -39,6 +72,7 @@ export class WebSocketHandler implements WebSocketInterface {
stderr.write(buff);
} else if (streamNum === WebSocketHandler.StatusStream) {
// stream closing.
// Hacky, change tests to use the stream interface
if (stdout && stdout !== process.stdout) {
stdout.end();
}
Expand Down Expand Up @@ -69,6 +103,12 @@ export class WebSocketHandler implements WebSocketInterface {
});

stdin.on('end', () => {
if (WebSocketHandler.supportsClose(ws.protocol)) {
const buff = Buffer.alloc(2);
buff.writeUint8(this.CloseStream, 0);
buff.writeUint8(this.StdinStream, 1);
ws.send(buff);
}
ws.close();
});
// Keep the stream open
Expand Down Expand Up @@ -141,7 +181,16 @@ export class WebSocketHandler implements WebSocketInterface {
// factory is really just for test injection
public constructor(
readonly config: KubeConfig,
readonly socketFactory?: (uri: string, opts: WebSocket.ClientOptions) => WebSocket.WebSocket,
readonly socketFactory?: (
uri: string,
protocols: string[],
opts: WebSocket.ClientOptions,
) => WebSocket.WebSocket,
readonly streams: StreamInterface = {
stdin: process.stdin,
stdout: process.stdout,
stderr: process.stderr,
},
) {}

/**
Expand Down Expand Up @@ -173,7 +222,7 @@ export class WebSocketHandler implements WebSocketInterface {

return await new Promise<WebSocket.WebSocket>((resolve, reject) => {
const client = this.socketFactory
? this.socketFactory(uri, opts)
? this.socketFactory(uri, protocols, opts)
: new WebSocket(uri, protocols, opts);
let resolved = false;

Expand All @@ -191,11 +240,18 @@ export class WebSocketHandler implements WebSocketInterface {
client.onmessage = ({ data }: { data: WebSocket.Data }) => {
// TODO: support ArrayBuffer and Buffer[] data types?
if (typeof data === 'string') {
if (data.charCodeAt(0) === WebSocketHandler.CloseStream) {
WebSocketHandler.closeStream(data.charCodeAt(1), this.streams);
}
if (textHandler && !textHandler(data)) {
client.close();
}
} else if (data instanceof Buffer) {
const streamNum = data.readInt8(0);
const streamNum = data.readUint8(0);
if (streamNum === WebSocketHandler.CloseStream) {
console.log('Closing stream!');
WebSocketHandler.closeStream(data.readInt8(1), this.streams);
}
if (binaryHandler && !binaryHandler(streamNum, data.subarray(1))) {
client.close();
}
Expand Down
108 changes: 105 additions & 3 deletions src/web-socket-handler_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { promisify } from 'util';
import { expect } from 'chai';
import WebSocket = require('isomorphic-ws');
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';
import stream = require('stream');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import stream = require('stream');
import stream from 'node:stream';

Copy link
Contributor Author

@brendandburns brendandburns Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


import { V1Status } from './api';
import { KubeConfig } from './config';
Expand Down Expand Up @@ -119,7 +120,7 @@ describe('WebSocket', () => {

const handler = new WebSocketHandler(
kc,
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
uriOut = uri;
return mockWs as WebSocket.WebSocket;
},
Expand Down Expand Up @@ -170,7 +171,7 @@ describe('WebSocket', () => {

const handler = new WebSocketHandler(
kc,
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
uriOut = uri;
return mockWs as WebSocket.WebSocket;
},
Expand Down Expand Up @@ -239,7 +240,7 @@ describe('WebSocket', () => {

const handler = new WebSocketHandler(
kc,
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
uriOut = uri;
return mockWs as WebSocket.WebSocket;
},
Expand Down Expand Up @@ -303,6 +304,107 @@ describe('WebSocket', () => {
});
});

describe('V5 protocol support', () => {
it('should handle close', async () => {
const kc = new KubeConfig();
const host = 'foo.company.com';
const server = `https://${host}`;
kc.clusters = [
{
name: 'cluster',
server,
} as Cluster,
] as Cluster[];
kc.contexts = [
{
cluster: 'cluster',
user: 'user',
} as Context,
] as Context[];
kc.users = [
{
name: 'user',
} as User,
];

const mockWs = {
protocol: 'v5.channel.k8s.io',
} as WebSocket.WebSocket;
let uriOut = '';
let endCalled = false;
const handler = new WebSocketHandler(
kc,
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
uriOut = uri;
return mockWs as WebSocket.WebSocket;
},
{
stdin: process.stdin,
stderr: process.stderr,
stdout: {
end: () => {
endCalled = true;
},
} as stream.Writable,
},
);
const path = '/some/path';

const promise = handler.connect(path, null, null);
await setImmediatePromise();

expect(uriOut).to.equal(`wss://${host}${path}`);

const event = {
target: mockWs,
type: 'open',
};
mockWs.onopen!(event);
const errEvt = {
error: {},
message: 'some message',
type: 'some type',
target: mockWs,
};
const closeBuff = Buffer.alloc(2);
closeBuff.writeUint8(255, 0);
closeBuff.writeUint8(WebSocketHandler.StdoutStream, 1);

mockWs.onmessage!({
data: closeBuff,
type: 'type',
target: mockWs,
});
await promise;
expect(endCalled).to.be.true;
});
it('should handle closing stdin < v4 protocol', () => {
const ws = {
// send is not defined, so this will throw if we try to send the close message.
close: () => {},
} as WebSocket;
const stdinStream = new ReadableStreamBuffer();
WebSocketHandler.handleStandardInput(ws, stdinStream);
stdinStream.emit('end');
});
it('should handle closing stdin v5 protocol', () => {
let sent: Buffer | null = null;
const ws = {
protocol: 'v5.channel.k8s.io',
send: (data) => {
sent = data;
},
close: () => {},
} as WebSocket;
const stdinStream = new ReadableStreamBuffer();
WebSocketHandler.handleStandardInput(ws, stdinStream);
stdinStream.emit('end');
expect(sent).to.not.be.null;
expect(sent!.readUint8(0)).to.equal(255); // CLOSE signal
expect(sent!.readUInt8(1)).to.equal(0); // Stdin stream is #0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason one of these uses readUint8() and the other uses readUInt8() (note difference in casing of the i).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted all to readUint8 The Internet seems to be ambiguous on which is the right casing. Both appear to work...

});
});

describe('Restartable Handle Standard Input', () => {
it('should throw on negative retry', () => {
const p = new Promise<WebSocket.WebSocket>(() => {});
Expand Down