diff --git a/examples/port-forward-deployment.js b/examples/port-forward-deployment.js new file mode 100644 index 0000000000..58c701a388 --- /dev/null +++ b/examples/port-forward-deployment.js @@ -0,0 +1,43 @@ +import * as k8s from '@kubernetes/client-node'; +import net from 'node:net'; + +const kc = new k8s.KubeConfig(); +kc.loadFromDefault(); + +const forward = new k8s.PortForward(kc); + +const namespace = process.argv[2] ?? 'default'; +const deploymentName = process.argv[3] ?? 'demo-deployment'; +const localPort = parseInt(process.argv[4] ?? '8080', 10); +const remotePort = parseInt(process.argv[5] ?? '8080', 10); + +// This creates a local server that forwards traffic to a deployment in Kubernetes +// by resolving the deployment to its first ready pod and port-forwarding to that pod. +// Usage: node port-forward-deployment.js [namespace] [deploymentName] [localPort] [remotePort] +// Example: node port-forward-deployment.js default my-app 8080 3000 +// This is equivalent to: kubectl port-forward deployment/my-app 8080:3000 -n default + +const server = net.createServer(async (socket) => { + try { + await forward.portForwardDeployment(namespace, deploymentName, [remotePort], socket, null, socket); + } catch (error) { + console.error(`Error port-forwarding to deployment ${namespace}/${deploymentName}:`, error.message); + socket.destroy(); + } +}); + +server.listen(localPort, '127.0.0.1', () => { + console.log(`Port forward server listening on http://127.0.0.1:${localPort}`); + console.log(`Forwarding to deployment: ${namespace}/${deploymentName}:${remotePort}`); + console.log(`Press Ctrl+C to stop`); +}); + +server.on('error', (error) => { + console.error('Server error:', error); +}); + +process.on('SIGINT', () => { + console.log('\nShutting down port-forward server...'); + server.close(); + process.exit(0); +}); diff --git a/examples/port-forward-service.js b/examples/port-forward-service.js new file mode 100644 index 0000000000..69b8057951 --- /dev/null +++ b/examples/port-forward-service.js @@ -0,0 +1,43 @@ +import * as k8s from '@kubernetes/client-node'; +import net from 'node:net'; + +const kc = new k8s.KubeConfig(); +kc.loadFromDefault(); + +const forward = new k8s.PortForward(kc); + +const namespace = process.argv[2] ?? 'default'; +const serviceName = process.argv[3] ?? 'demo-service'; +const localPort = parseInt(process.argv[4] ?? '8080', 10); +const remotePort = parseInt(process.argv[5] ?? '8080', 10); + +// This creates a local server that forwards traffic to a service in Kubernetes +// by resolving the service to its first ready pod and port-forwarding to that pod. +// Usage: node port-forward-service.js [namespace] [serviceName] [localPort] [remotePort] +// Example: node port-forward-service.js default my-service 8080 80 +// This is equivalent to: kubectl port-forward svc/my-service 8080:80 -n default + +const server = net.createServer(async (socket) => { + try { + await forward.portForwardService(namespace, serviceName, [remotePort], socket, null, socket); + } catch (error) { + console.error(`Error port-forwarding to service ${namespace}/${serviceName}:`, error.message); + socket.destroy(); + } +}); + +server.listen(localPort, '127.0.0.1', () => { + console.log(`Port forward server listening on http://127.0.0.1:${localPort}`); + console.log(`Forwarding to service: ${namespace}/${serviceName}:${remotePort}`); + console.log(`Press Ctrl+C to stop`); +}); + +server.on('error', (error) => { + console.error('Server error:', error); +}); + +process.on('SIGINT', () => { + console.log('\nShutting down port-forward server...'); + server.close(); + process.exit(0); +}); diff --git a/examples/typescript/port-forward/port-forward-deployment.ts b/examples/typescript/port-forward/port-forward-deployment.ts new file mode 100644 index 0000000000..6f04a9841a --- /dev/null +++ b/examples/typescript/port-forward/port-forward-deployment.ts @@ -0,0 +1,46 @@ +import * as k8s from '@kubernetes/client-node'; +import net from 'node:net'; + +const kc = new k8s.KubeConfig(); +kc.loadFromDefault(); + +const forward = new k8s.PortForward(kc); + +const namespace = process.argv[2] ?? 'default'; +const deploymentName = process.argv[3] ?? 'demo-deployment'; +const localPort = parseInt(process.argv[4] ?? '8080', 10); +const remotePort = parseInt(process.argv[5] ?? '8080', 10); + +// This creates a local server that forwards traffic to a deployment in Kubernetes +// by resolving the deployment to its first ready pod and port-forwarding to that pod. +// Usage: node port-forward-deployment.ts [namespace] [deploymentName] [localPort] [remotePort] +// Example: node port-forward-deployment.ts default my-app 8080 3000 +// This is equivalent to: kubectl port-forward deployment/my-app 8080:3000 -n default + +const server = net.createServer(async (socket) => { + try { + await forward.portForwardDeployment(namespace, deploymentName, [remotePort], socket, null, socket); + } catch (error) { + console.error( + `Error port-forwarding to deployment ${namespace}/${deploymentName}:`, + (error as Error).message, + ); + socket.destroy(); + } +}); + +server.listen(localPort, '127.0.0.1', () => { + console.log(`Port forward server listening on http://127.0.0.1:${localPort}`); + console.log(`Forwarding to deployment: ${namespace}/${deploymentName}:${remotePort}`); + console.log(`Press Ctrl+C to stop`); +}); + +server.on('error', (error: NodeJS.ErrnoException) => { + console.error('Server error:', error); +}); + +process.on('SIGINT', () => { + console.log('\nShutting down port-forward server...'); + server.close(); + process.exit(0); +}); diff --git a/examples/typescript/port-forward/port-forward-service.ts b/examples/typescript/port-forward/port-forward-service.ts new file mode 100644 index 0000000000..6753fbfa5a --- /dev/null +++ b/examples/typescript/port-forward/port-forward-service.ts @@ -0,0 +1,43 @@ +import * as k8s from '@kubernetes/client-node'; +import net from 'node:net'; + +const kc = new k8s.KubeConfig(); +kc.loadFromDefault(); + +const forward = new k8s.PortForward(kc); + +const namespace = process.argv[2] ?? 'default'; +const serviceName = process.argv[3] ?? 'demo-service'; +const localPort = parseInt(process.argv[4] ?? '8080', 10); +const remotePort = parseInt(process.argv[5] ?? '8080', 10); + +// This creates a local server that forwards traffic to a service in Kubernetes +// by resolving the service to its first ready pod and port-forwarding to that pod. +// Usage: node port-forward-service.ts [namespace] [serviceName] [localPort] [remotePort] +// Example: node port-forward-service.ts default my-service 8080 80 +// This is equivalent to: kubectl port-forward svc/my-service 8080:80 -n default + +const server = net.createServer(async (socket) => { + try { + await forward.portForwardService(namespace, serviceName, [remotePort], socket, null, socket); + } catch (error) { + console.error(`Error port-forwarding to service ${namespace}/${serviceName}:`, error.message); + socket.destroy(); + } +}); + +server.listen(localPort, '127.0.0.1', () => { + console.log(`Port forward server listening on http://127.0.0.1:${localPort}`); + console.log(`Forwarding to service: ${namespace}/${serviceName}:${remotePort}`); + console.log(`Press Ctrl+C to stop`); +}); + +server.on('error', (error: NodeJS.ErrnoException) => { + console.error('Server error:', error); +}); + +process.on('SIGINT', () => { + console.log('\nShutting down port-forward server...'); + server.close(); + process.exit(0); +}); diff --git a/examples/typescript/watch/watch-example.ts b/examples/typescript/watch/watch-example.ts index b33d71f653..3bda64a0d3 100644 --- a/examples/typescript/watch/watch-example.ts +++ b/examples/typescript/watch/watch-example.ts @@ -4,37 +4,36 @@ const kc = new k8s.KubeConfig(); kc.loadFromDefault(); const watch = new k8s.Watch(kc); -const req = await watch - .watch( - '/api/v1/namespaces', - // optional query parameters can go here. - { - allowWatchBookmarks: true, - }, - // callback is called for each received object. - (type, apiObj, watchObj) => { - if (type === 'ADDED') { - console.log('new object:'); - } else if (type === 'MODIFIED') { - console.log('changed object:'); - } else if (type === 'DELETED') { - console.log('deleted object:'); - } else if (type === 'BOOKMARK') { - console.log(`bookmark: ${watchObj.metadata.resourceVersion}`); - } else { - console.log('unknown type: ' + type); - } - console.log(apiObj); - }, - // done callback is called if the watch terminates either normally or with an error - (err) => { - if (err) { - console.error(err); - } else { - console.log('watch finished normally') - } - }, - ) +const req = await watch.watch( + '/api/v1/namespaces', + // optional query parameters can go here. + { + allowWatchBookmarks: true, + }, + // callback is called for each received object. + (type, apiObj, watchObj) => { + if (type === 'ADDED') { + console.log('new object:'); + } else if (type === 'MODIFIED') { + console.log('changed object:'); + } else if (type === 'DELETED') { + console.log('deleted object:'); + } else if (type === 'BOOKMARK') { + console.log(`bookmark: ${watchObj.metadata.resourceVersion}`); + } else { + console.log('unknown type: ' + type); + } + console.log(apiObj); + }, + // done callback is called if the watch terminates either normally or with an error + (err) => { + if (err) { + console.error(err); + } else { + console.log('watch finished normally'); + } + }, +); // watch returns a request object which you can use to abort the watch. setTimeout(() => { req.abort(); diff --git a/src/portforward.ts b/src/portforward.ts index 89f546b70e..07ddd872fc 100644 --- a/src/portforward.ts +++ b/src/portforward.ts @@ -2,15 +2,18 @@ import WebSocket from 'isomorphic-ws'; import querystring from 'node:querystring'; import stream from 'node:stream'; +import { AppsV1Api, CoreV1Api, V1Pod } from './gen/index.js'; import { KubeConfig } from './config.js'; import { WebSocketHandler, WebSocketInterface } from './web-socket-handler.js'; export class PortForward { + private readonly config: KubeConfig; private readonly handler: WebSocketInterface; private readonly disconnectOnErr: boolean; // handler is a parameter really only for injecting for testing. constructor(config: KubeConfig, disconnectOnErr?: boolean, handler?: WebSocketInterface) { + this.config = config; this.handler = handler || new WebSocketHandler(config); this.disconnectOnErr = disconnectOnErr === undefined ? true : disconnectOnErr; } @@ -70,4 +73,128 @@ export class PortForward { return WebSocketHandler.restartableHandleStandardInput(createWebSocket, input, 0, retryCount); } + + /** + * Port forward to a service by resolving to the first ready pod selected by the service's selector. + * + * @param namespace - The namespace of the service + * @param serviceName - The name of the service + * @param targetPorts - The target ports to forward to + * @param output - The writable stream for output + * @param err - The writable stream for error output (can be null) + * @param input - The readable stream for input + * @param retryCount - The number of times to retry the connection + * @throws Will throw an error if the service is not found or has no ready pods + */ + public async portForwardService( + namespace: string, + serviceName: string, + targetPorts: number[], + output: stream.Writable, + err: stream.Writable | null, + input: stream.Readable, + retryCount: number = 0, + ): Promise WebSocket.WebSocket | null)> { + const coreApi = this.config.makeApiClient(CoreV1Api); + const service = await coreApi.readNamespacedService({ name: serviceName, namespace }); + + if (!service.spec?.selector || Object.keys(service.spec.selector).length === 0) { + throw new Error(`Service ${namespace}/${serviceName} has no selector defined`); + } + + const labelSelector = this.buildLabelSelector(service.spec.selector); + const pod = await this.getFirstReadyPod(namespace, labelSelector); + + return this.portForward(namespace, pod.metadata!.name!, targetPorts, output, err, input, retryCount); + } + + /** + * Port forward to a deployment by resolving to the first ready pod selected by the deployment's selector. + * + * @param namespace - The namespace of the deployment + * @param deploymentName - The name of the deployment + * @param targetPorts - The target ports to forward to + * @param output - The writable stream for output + * @param err - The writable stream for error output (can be null) + * @param input - The readable stream for input + * @param retryCount - The number of times to retry the connection + * @throws Will throw an error if the deployment is not found or has no ready pods + */ + public async portForwardDeployment( + namespace: string, + deploymentName: string, + targetPorts: number[], + output: stream.Writable, + err: stream.Writable | null, + input: stream.Readable, + retryCount: number = 0, + ): Promise WebSocket.WebSocket | null)> { + const appsApi = this.config.makeApiClient(AppsV1Api); + const deployment = await appsApi.readNamespacedDeployment({ name: deploymentName, namespace }); + + if ( + !deployment.spec?.selector?.matchLabels || + Object.keys(deployment.spec.selector.matchLabels).length === 0 + ) { + throw new Error(`Deployment ${namespace}/${deploymentName} has no selector defined`); + } + + const labelSelector = this.buildLabelSelector(deployment.spec.selector.matchLabels); + const pod = await this.getFirstReadyPod(namespace, labelSelector); + + return this.portForward(namespace, pod.metadata!.name!, targetPorts, output, err, input, retryCount); + } + + /** + * Get the first ready pod matching the label selector. + * + * @param namespace - The namespace to query + * @param labelSelector - The label selector to filter pods + * @returns The first ready pod + * @throws Will throw an error if no ready pods are found + */ + private async getFirstReadyPod(namespace: string, labelSelector: string): Promise { + const coreApi = this.config.makeApiClient(CoreV1Api); + const podList = await coreApi.listNamespacedPod({ namespace, labelSelector }); + + if (!podList.items || podList.items.length === 0) { + throw new Error(`No pods found with selector "${labelSelector}" in namespace ${namespace}`); + } + + // Find the first pod with Ready status + for (const pod of podList.items) { + if (this.isPodReady(pod)) { + return pod; + } + } + + throw new Error(`No ready pods found with selector "${labelSelector}" in namespace ${namespace}`); + } + + /** + * Check if a pod is ready by looking at its status conditions. + * + * @param pod - The pod to check + * @returns True if the pod has a Ready condition with status True + */ + private isPodReady(pod: V1Pod): boolean { + if (!pod.status?.conditions) { + return false; + } + return pod.status.conditions.some( + (condition) => condition.type === 'Ready' && condition.status === 'True', + ); + } + + /** + * Build a Kubernetes label selector string from a label object. + * + * @param labels - An object of label key-value pairs + * @returns A Kubernetes label selector string + */ + private buildLabelSelector(labels: { [key: string]: string }): string { + return Object.entries(labels) + .map(([key, value]) => `${key}=${value}`) + .join(','); + } } diff --git a/src/portforward_test.ts b/src/portforward_test.ts index b71603fa23..eca634062c 100644 --- a/src/portforward_test.ts +++ b/src/portforward_test.ts @@ -1,11 +1,12 @@ import { describe, it } from 'node:test'; import { strictEqual, rejects } from 'node:assert'; import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers'; -import { anyFunction, capture, instance, mock, verify } from 'ts-mockito'; +import { anyFunction, anything, capture, instance, mock, verify, when } from 'ts-mockito'; import { KubeConfig } from './config.js'; import { PortForward } from './portforward.js'; import { WebSocketHandler, WebSocketInterface } from './web-socket-handler.js'; +import { CoreV1Api, AppsV1Api, V1Service, V1Deployment, V1Pod, V1PodStatus } from './gen/index.js'; describe('PortForward', () => { it('should correctly port-forward to a url', async () => { @@ -130,4 +131,384 @@ describe('PortForward', () => { message: 'Only one port is currently supported for port-forward', }); }); + + it('should port-forward to a service by resolving to the first ready pod', async () => { + const kc: KubeConfig = mock(KubeConfig); + const mockCoreApi: CoreV1Api = mock(CoreV1Api); + const mockWebSocket: WebSocketInterface = mock(WebSocketHandler); + + when(kc.makeApiClient(CoreV1Api)).thenReturn(instance(mockCoreApi)); + + const serviceMock: V1Service = { + apiVersion: 'v1', + kind: 'Service', + metadata: { name: 'test-service', namespace: 'default' }, + spec: { + selector: { + app: 'test-app', + version: 'v1', + }, + }, + }; + + const podMock: V1Pod = { + apiVersion: 'v1', + kind: 'Pod', + metadata: { name: 'test-app-pod-1', namespace: 'default' }, + status: { + conditions: [ + { + type: 'Ready', + status: 'True', + } as { type: string; status: string }, + ], + } as V1PodStatus, + }; + + when(mockCoreApi.readNamespacedService(anything())).thenResolve(serviceMock); + when(mockCoreApi.listNamespacedPod(anything())).thenResolve({ + items: [podMock], + }); + + const portForward = new PortForward(instance(kc), true, instance(mockWebSocket)); + const osStream = new WritableStreamBuffer(); + const errStream = new WritableStreamBuffer(); + const isStream = new ReadableStreamBuffer(); + + await portForward.portForwardService( + 'default', + 'test-service', + [8080], + osStream, + errStream, + isStream, + ); + + const path = `/api/v1/namespaces/default/pods/test-app-pod-1/portforward?ports=8080`; + verify(mockWebSocket.connect(path, null, anyFunction())).called(); + }); + + it('should throw error when service has no selector', async () => { + const kc: KubeConfig = mock(KubeConfig); + const mockCoreApi: CoreV1Api = mock(CoreV1Api); + + when(kc.makeApiClient(CoreV1Api)).thenReturn(instance(mockCoreApi)); + + const serviceMock: V1Service = { + apiVersion: 'v1', + kind: 'Service', + metadata: { name: 'test-service', namespace: 'default' }, + spec: { + selector: undefined, + }, + }; + + when(mockCoreApi.readNamespacedService(anything())).thenResolve(serviceMock); + + const portForward = new PortForward(instance(kc)); + const osStream = new WritableStreamBuffer(); + const isStream = new ReadableStreamBuffer(); + + await rejects( + portForward.portForwardService('default', 'test-service', [8080], osStream, null, isStream), + { + name: 'Error', + message: 'Service default/test-service has no selector defined', + }, + ); + }); + + it('should throw error when no pods match the service selector', async () => { + const kc: KubeConfig = mock(KubeConfig); + const mockCoreApi: CoreV1Api = mock(CoreV1Api); + + when(kc.makeApiClient(CoreV1Api)).thenReturn(instance(mockCoreApi)); + + const serviceMock: V1Service = { + apiVersion: 'v1', + kind: 'Service', + metadata: { name: 'test-service', namespace: 'default' }, + spec: { + selector: { + app: 'test-app', + }, + }, + }; + + when(mockCoreApi.readNamespacedService(anything())).thenResolve(serviceMock); + when(mockCoreApi.listNamespacedPod(anything())).thenResolve({ + items: [], + }); + + const portForward = new PortForward(instance(kc)); + const osStream = new WritableStreamBuffer(); + const isStream = new ReadableStreamBuffer(); + + await rejects( + portForward.portForwardService('default', 'test-service', [8080], osStream, null, isStream), + { + name: 'Error', + message: 'No pods found with selector "app=test-app" in namespace default', + }, + ); + }); + + it('should throw error when no pods are ready for service selector', async () => { + const kc: KubeConfig = mock(KubeConfig); + const mockCoreApi: CoreV1Api = mock(CoreV1Api); + + when(kc.makeApiClient(CoreV1Api)).thenReturn(instance(mockCoreApi)); + + const serviceMock: V1Service = { + apiVersion: 'v1', + kind: 'Service', + metadata: { name: 'test-service', namespace: 'default' }, + spec: { + selector: { + app: 'test-app', + }, + }, + }; + + const notReadyPod: V1Pod = { + apiVersion: 'v1', + kind: 'Pod', + metadata: { name: 'test-app-pod-1', namespace: 'default' }, + status: { + conditions: [ + { + type: 'Ready', + status: 'False', + } as { type: string; status: string }, + ], + } as V1PodStatus, + }; + + when(mockCoreApi.readNamespacedService(anything())).thenResolve(serviceMock); + when(mockCoreApi.listNamespacedPod(anything())).thenResolve({ + items: [notReadyPod], + }); + + const portForward = new PortForward(instance(kc)); + const osStream = new WritableStreamBuffer(); + const isStream = new ReadableStreamBuffer(); + + await rejects( + portForward.portForwardService('default', 'test-service', [8080], osStream, null, isStream), + { + name: 'Error', + message: 'No ready pods found with selector "app=test-app" in namespace default', + }, + ); + }); + + it('should port-forward to a deployment by resolving to the first ready pod', async () => { + const kc: KubeConfig = mock(KubeConfig); + const mockAppsApi: AppsV1Api = mock(AppsV1Api); + const mockCoreApi: CoreV1Api = mock(CoreV1Api); + const mockWebSocket: WebSocketInterface = mock(WebSocketHandler); + + when(kc.makeApiClient(AppsV1Api)).thenReturn(instance(mockAppsApi)); + when(kc.makeApiClient(CoreV1Api)).thenReturn(instance(mockCoreApi)); + + const deploymentMock: V1Deployment = { + apiVersion: 'apps/v1', + kind: 'Deployment', + metadata: { name: 'test-deployment', namespace: 'default' }, + spec: { + selector: { + matchLabels: { + app: 'test-app', + env: 'prod', + }, + }, + template: { + metadata: { labels: { app: 'test-app', env: 'prod' } }, + spec: { containers: [] }, + }, + }, + }; + + const podMock: V1Pod = { + apiVersion: 'v1', + kind: 'Pod', + metadata: { name: 'test-app-deploy-1', namespace: 'default' }, + status: { + conditions: [ + { + type: 'Ready', + status: 'True', + } as { type: string; status: string }, + ], + } as V1PodStatus, + }; + + when(mockAppsApi.readNamespacedDeployment(anything())).thenResolve(deploymentMock); + when(mockCoreApi.listNamespacedPod(anything())).thenResolve({ + items: [podMock], + }); + + const portForward = new PortForward(instance(kc), true, instance(mockWebSocket)); + const osStream = new WritableStreamBuffer(); + const errStream = new WritableStreamBuffer(); + const isStream = new ReadableStreamBuffer(); + + await portForward.portForwardDeployment( + 'default', + 'test-deployment', + [8080], + osStream, + errStream, + isStream, + ); + + const path = `/api/v1/namespaces/default/pods/test-app-deploy-1/portforward?ports=8080`; + verify(mockWebSocket.connect(path, null, anyFunction())).called(); + }); + + it('should throw error when deployment has no selector', async () => { + const kc: KubeConfig = mock(KubeConfig); + const mockAppsApi: AppsV1Api = mock(AppsV1Api); + + when(kc.makeApiClient(AppsV1Api)).thenReturn(instance(mockAppsApi)); + + const deploymentMock: V1Deployment = { + apiVersion: 'apps/v1', + kind: 'Deployment', + metadata: { name: 'test-deployment', namespace: 'default' }, + spec: { + selector: { + matchLabels: undefined, + }, + template: { + metadata: { labels: {} }, + spec: { containers: [] }, + }, + }, + }; + + when(mockAppsApi.readNamespacedDeployment(anything())).thenResolve(deploymentMock); + + const portForward = new PortForward(instance(kc)); + const osStream = new WritableStreamBuffer(); + const isStream = new ReadableStreamBuffer(); + + await rejects( + portForward.portForwardDeployment('default', 'test-deployment', [8080], osStream, null, isStream), + { + name: 'Error', + message: 'Deployment default/test-deployment has no selector defined', + }, + ); + }); + + it('should support full kubernetes label selector syntax for services', async () => { + const kc: KubeConfig = mock(KubeConfig); + const mockCoreApi: CoreV1Api = mock(CoreV1Api); + const mockWebSocket: WebSocketInterface = mock(WebSocketHandler); + + when(kc.makeApiClient(CoreV1Api)).thenReturn(instance(mockCoreApi)); + + const serviceMock: V1Service = { + apiVersion: 'v1', + kind: 'Service', + metadata: { name: 'test-service', namespace: 'default' }, + spec: { + selector: { + 'app.kubernetes.io/name': 'myapp', + 'app.kubernetes.io/version': '1.0', + }, + }, + }; + + const podMock: V1Pod = { + apiVersion: 'v1', + kind: 'Pod', + metadata: { name: 'myapp-pod-1', namespace: 'default' }, + status: { + conditions: [ + { + type: 'Ready', + status: 'True', + } as { type: string; status: string }, + ], + } as V1PodStatus, + }; + + when(mockCoreApi.readNamespacedService(anything())).thenResolve(serviceMock); + when(mockCoreApi.listNamespacedPod(anything())).thenResolve({ + items: [podMock], + }); + + const portForward = new PortForward(instance(kc), true, instance(mockWebSocket)); + const osStream = new WritableStreamBuffer(); + const isStream = new ReadableStreamBuffer(); + + await portForward.portForwardService('default', 'test-service', [8080], osStream, null, isStream); + + const path = `/api/v1/namespaces/default/pods/myapp-pod-1/portforward?ports=8080`; + verify(mockWebSocket.connect(path, null, anyFunction())).called(); + }); + + it('should use the first ready pod when multiple pods match the selector', async () => { + const kc: KubeConfig = mock(KubeConfig); + const mockCoreApi: CoreV1Api = mock(CoreV1Api); + const mockWebSocket: WebSocketInterface = mock(WebSocketHandler); + + when(kc.makeApiClient(CoreV1Api)).thenReturn(instance(mockCoreApi)); + + const serviceMock: V1Service = { + apiVersion: 'v1', + kind: 'Service', + metadata: { name: 'test-service', namespace: 'default' }, + spec: { + selector: { + app: 'test-app', + }, + }, + }; + + const pod1: V1Pod = { + apiVersion: 'v1', + kind: 'Pod', + metadata: { name: 'test-app-pod-1', namespace: 'default' }, + status: { + conditions: [ + { + type: 'Ready', + status: 'True', + } as { type: string; status: string }, + ], + } as V1PodStatus, + }; + + const pod2: V1Pod = { + apiVersion: 'v1', + kind: 'Pod', + metadata: { name: 'test-app-pod-2', namespace: 'default' }, + status: { + conditions: [ + { + type: 'Ready', + status: 'True', + } as { type: string; status: string }, + ], + } as V1PodStatus, + }; + + when(mockCoreApi.readNamespacedService(anything())).thenResolve(serviceMock); + when(mockCoreApi.listNamespacedPod(anything())).thenResolve({ + items: [pod1, pod2], + }); + + const portForward = new PortForward(instance(kc), true, instance(mockWebSocket)); + const osStream = new WritableStreamBuffer(); + const isStream = new ReadableStreamBuffer(); + + await portForward.portForwardService('default', 'test-service', [8080], osStream, null, isStream); + + // Should use the first pod (pod1) + const path = `/api/v1/namespaces/default/pods/test-app-pod-1/portforward?ports=8080`; + verify(mockWebSocket.connect(path, null, anyFunction())).called(); + }); }); diff --git a/src/test/integration/index.ts b/src/test/integration/index.ts index ee26fa03df..176f01bc3b 100644 --- a/src/test/integration/index.ts +++ b/src/test/integration/index.ts @@ -1,7 +1,9 @@ import patchNamespace from './patchNamespace.js'; import cpFromPod from './cpFromPod.js'; +import portForwardIntegration from './portForward.js'; console.log('Integration testing'); await patchNamespace(); await cpFromPod(); +await portForwardIntegration(); diff --git a/src/test/integration/portForward.ts b/src/test/integration/portForward.ts new file mode 100644 index 0000000000..ebe1cb1e6f --- /dev/null +++ b/src/test/integration/portForward.ts @@ -0,0 +1,240 @@ +import assert from 'node:assert'; +import net from 'node:net'; +import { setTimeout } from 'node:timers/promises'; +import { CoreV1Api, AppsV1Api, KubeConfig, V1Service, V1Deployment } from '../../index.js'; +import { PortForward } from '../../portforward.js'; +import { generateName } from './name.js'; + +export default async function portForwardIntegration() { + const kc = new KubeConfig(); + kc.loadFromDefault(); + + const coreV1Client = kc.makeApiClient(CoreV1Api); + const appsV1Client = kc.makeApiClient(AppsV1Api); + const portForward = new PortForward(kc); + + const namespace = 'default'; + const appName = generateName('pf-test'); + const serviceName = `${appName}-service`; + const deploymentName = `${appName}-deployment`; + const containerPort = 8080; + const testPort = 18080; // Local port for testing + + console.log(`\n=== Port Forward Integration Test ===`); + console.log(`Creating test resources in namespace: ${namespace}`); + + // Create a deployment with a simple HTTP server + const deployment = new V1Deployment(); + deployment.metadata = { name: deploymentName, namespace }; + deployment.spec = { + replicas: 2, + selector: { matchLabels: { app: appName } }, + template: { + metadata: { labels: { app: appName } }, + spec: { + containers: [ + { + name: 'test-server', + image: 'busybox', + command: [ + 'sh', + '-c', + `while true; do echo -e "HTTP/1.1 200 OK\\r\\nContent-Length: 13\\r\\n\\r\\nHello World\\n" | nc -l -p ${containerPort} || true; done`, + ], + ports: [{ containerPort }], + }, + ], + }, + }, + }; + + console.log(`Creating deployment ${deploymentName}...`); + await appsV1Client.createNamespacedDeployment({ namespace, body: deployment }); + + // Create a service for the deployment + const service = new V1Service(); + service.metadata = { name: serviceName, namespace }; + service.spec = { + selector: { app: appName }, + ports: [{ port: containerPort, targetPort: containerPort, protocol: 'TCP' }], + type: 'ClusterIP', + }; + + console.log(`Creating service ${serviceName}...`); + await coreV1Client.createNamespacedService({ namespace, body: service }); + + // Wait for pods to be ready + console.log('Waiting for deployment pods to be ready...'); + let podsReady = false; + for (let i = 0; i < 60; i++) { + const deployment = await appsV1Client.readNamespacedDeployment({ name: deploymentName, namespace }); + if ( + deployment.status?.readyReplicas === deployment.spec?.replicas && + (deployment.status?.readyReplicas ?? 0) > 0 + ) { + podsReady = true; + console.log(`Deployment is ready with ${deployment.status?.readyReplicas} replicas`); + break; + } + await setTimeout(1000); + } + + assert.strictEqual(podsReady, true, 'Deployment pods did not become ready in time'); + + try { + // Test 1: Port forward to deployment + console.log(`\nTest 1: Port forwarding to deployment ${deploymentName}...`); + let deploymentTestPassed = false; + + const deploymentServer = net.createServer(async (socket) => { + try { + await portForward.portForwardDeployment( + namespace, + deploymentName, + [containerPort], + socket, + null, + socket, + ); + } catch (error) { + console.error('Deployment port forward error:', error); + socket.destroy(); + } + }); + + await new Promise((resolve) => { + deploymentServer.listen(testPort, '127.0.0.1', () => { + console.log(`Local server listening on port ${testPort} for deployment test`); + resolve(); + }); + }); + + deploymentTestPassed = await testPortForwardConnection(testPort, 'deployment'); + deploymentServer.close(); + assert.strictEqual(deploymentTestPassed, true, 'Failed to connect to deployment via port-forward'); + + // Test 2: Port forward to service + console.log(`\nTest 2: Port forwarding to service ${serviceName}...`); + let serviceTestPassed = false; + + const serviceServer = net.createServer(async (socket) => { + try { + await portForward.portForwardService( + namespace, + serviceName, + [containerPort], + socket, + null, + socket, + ); + } catch (error) { + console.error('Service port forward error:', error); + socket.destroy(); + } + }); + + await new Promise((resolve) => { + serviceServer.listen(testPort, '127.0.0.1', () => { + console.log(`Local server listening on port ${testPort} for service test`); + resolve(); + }); + }); + + serviceTestPassed = await testPortForwardConnection(testPort, 'service'); + serviceServer.close(); + assert.strictEqual(serviceTestPassed, true, 'Failed to connect to service via port-forward'); + + // Test 3: Error handling - service with no selector + console.log(`\nTest 3: Error handling - service with no matching pods...`); + const badServiceName = `${appName}-bad-service`; + const badService = new V1Service(); + badService.metadata = { name: badServiceName, namespace }; + badService.spec = { + selector: { app: 'nonexistent-app' }, // No pods match this selector + ports: [{ port: containerPort, targetPort: containerPort, protocol: 'TCP' }], + type: 'ClusterIP', + }; + + await coreV1Client.createNamespacedService({ namespace, body: badService }); + + try { + // Try to port forward to service with no ready pods + let errorThrown = false; + try { + const dummySocket = new net.Socket(); + await portForward.portForwardService( + namespace, + badServiceName, + [containerPort], + dummySocket, + null, + dummySocket, + ); + } catch (error) { + errorThrown = true; + console.log(`āœ“ Correctly threw error: ${(error as Error).message}`); + } + assert.strictEqual(errorThrown, true, 'Should have thrown error for service with no ready pods'); + } finally { + await coreV1Client.deleteNamespacedService({ name: badServiceName, namespace }); + } + + console.log('\nāœ“ Port forward integration tests passed!'); + } finally { + console.log('\nCleaning up test resources...'); + try { + await appsV1Client.deleteNamespacedDeployment({ name: deploymentName, namespace }); + await coreV1Client.deleteNamespacedService({ name: serviceName, namespace }); + } catch (error) { + console.warn('Cleanup warning:', (error as Error).message); + } + } +} + +async function testPortForwardConnection(testPort: number, label: string): Promise { + for (let i = 0; i < 5; i++) { + try { + const response = await new Promise((resolve, reject) => { + const socket = net.createConnection({ port: testPort, host: '127.0.0.1' }); + let data = ''; + + const timeout = globalThis.setTimeout(() => { + socket.destroy(); + reject(new Error('Connection timeout')); + }, 5000); + + socket.on('data', (chunk) => { + data += chunk.toString(); + if (data.includes('Hello World')) { + clearTimeout(timeout); + socket.end(); + } + }); + + socket.on('end', () => { + clearTimeout(timeout); + resolve(data); + }); + + socket.on('error', (error) => { + clearTimeout(timeout); + reject(error); + }); + + socket.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n'); + }); + + if (response.includes('Hello World')) { + console.log(`āœ“ Successfully connected to ${label} via port-forward`); + return true; + } + } catch (error) { + console.log('error:', error); + if (i < 4) { + console.log(`Attempt ${i + 1} failed, retrying...`); + await setTimeout(1000); + } + } + } + return false; +}