Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions examples/port-forward-deployment.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import * as k8s from '@kubernetes/client-node';
import net from 'node:net';

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const forward = new k8s.PortForward(kc);

const namespace = process.argv[2] ?? 'default';
const deploymentName = process.argv[3] ?? 'demo-deployment';
const localPort = parseInt(process.argv[4] ?? '8080', 10);
const remotePort = parseInt(process.argv[5] ?? '8080', 10);

// This creates a local server that forwards traffic to a deployment in Kubernetes
// by resolving the deployment to its first ready pod and port-forwarding to that pod.
// Usage: node port-forward-deployment.js [namespace] [deploymentName] [localPort] [remotePort]
// Example: node port-forward-deployment.js default my-app 8080 3000
// This is equivalent to: kubectl port-forward deployment/my-app 8080:3000 -n default

const server = net.createServer(async (socket) => {
try {
await forward.portForwardDeployment(namespace, deploymentName, [remotePort], socket, null, socket);
} catch (error) {
console.error(`Error port-forwarding to deployment ${namespace}/${deploymentName}:`, error.message);
socket.destroy();
}
});

server.listen(localPort, '127.0.0.1', () => {
console.log(`Port forward server listening on http://127.0.0.1:${localPort}`);
console.log(`Forwarding to deployment: ${namespace}/${deploymentName}:${remotePort}`);
console.log(`Press Ctrl+C to stop`);
});

server.on('error', (error) => {
console.error('Server error:', error);
});

process.on('SIGINT', () => {
console.log('\nShutting down port-forward server...');
server.close();
process.exit(0);
});
43 changes: 43 additions & 0 deletions examples/port-forward-service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import * as k8s from '@kubernetes/client-node';
import net from 'node:net';

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const forward = new k8s.PortForward(kc);

const namespace = process.argv[2] ?? 'default';
const serviceName = process.argv[3] ?? 'demo-service';
const localPort = parseInt(process.argv[4] ?? '8080', 10);
const remotePort = parseInt(process.argv[5] ?? '8080', 10);

// This creates a local server that forwards traffic to a service in Kubernetes
// by resolving the service to its first ready pod and port-forwarding to that pod.
// Usage: node port-forward-service.js [namespace] [serviceName] [localPort] [remotePort]
// Example: node port-forward-service.js default my-service 8080 80
// This is equivalent to: kubectl port-forward svc/my-service 8080:80 -n default

const server = net.createServer(async (socket) => {
try {
await forward.portForwardService(namespace, serviceName, [remotePort], socket, null, socket);
} catch (error) {
console.error(`Error port-forwarding to service ${namespace}/${serviceName}:`, error.message);
socket.destroy();
}
});

server.listen(localPort, '127.0.0.1', () => {
console.log(`Port forward server listening on http://127.0.0.1:${localPort}`);
console.log(`Forwarding to service: ${namespace}/${serviceName}:${remotePort}`);
console.log(`Press Ctrl+C to stop`);
});

server.on('error', (error) => {
console.error('Server error:', error);
});

process.on('SIGINT', () => {
console.log('\nShutting down port-forward server...');
server.close();
process.exit(0);
});
46 changes: 46 additions & 0 deletions examples/typescript/port-forward/port-forward-deployment.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as k8s from '@kubernetes/client-node';
import net from 'node:net';

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const forward = new k8s.PortForward(kc);

const namespace = process.argv[2] ?? 'default';
const deploymentName = process.argv[3] ?? 'demo-deployment';
const localPort = parseInt(process.argv[4] ?? '8080', 10);
const remotePort = parseInt(process.argv[5] ?? '8080', 10);

// This creates a local server that forwards traffic to a deployment in Kubernetes
// by resolving the deployment to its first ready pod and port-forwarding to that pod.
// Usage: node port-forward-deployment.ts [namespace] [deploymentName] [localPort] [remotePort]
// Example: node port-forward-deployment.ts default my-app 8080 3000
// This is equivalent to: kubectl port-forward deployment/my-app 8080:3000 -n default

const server = net.createServer(async (socket) => {
try {
await forward.portForwardDeployment(namespace, deploymentName, [remotePort], socket, null, socket);
} catch (error) {
console.error(
`Error port-forwarding to deployment ${namespace}/${deploymentName}:`,
(error as Error).message,
);
socket.destroy();
}
});

server.listen(localPort, '127.0.0.1', () => {
console.log(`Port forward server listening on http://127.0.0.1:${localPort}`);
console.log(`Forwarding to deployment: ${namespace}/${deploymentName}:${remotePort}`);
console.log(`Press Ctrl+C to stop`);
});

server.on('error', (error: NodeJS.ErrnoException) => {
console.error('Server error:', error);
});

process.on('SIGINT', () => {
console.log('\nShutting down port-forward server...');
server.close();
process.exit(0);
});
43 changes: 43 additions & 0 deletions examples/typescript/port-forward/port-forward-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import * as k8s from '@kubernetes/client-node';
import net from 'node:net';

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const forward = new k8s.PortForward(kc);

const namespace = process.argv[2] ?? 'default';
const serviceName = process.argv[3] ?? 'demo-service';
const localPort = parseInt(process.argv[4] ?? '8080', 10);
const remotePort = parseInt(process.argv[5] ?? '8080', 10);

// This creates a local server that forwards traffic to a service in Kubernetes
// by resolving the service to its first ready pod and port-forwarding to that pod.
// Usage: node port-forward-service.ts [namespace] [serviceName] [localPort] [remotePort]
// Example: node port-forward-service.ts default my-service 8080 80
// This is equivalent to: kubectl port-forward svc/my-service 8080:80 -n default

const server = net.createServer(async (socket) => {
try {
await forward.portForwardService(namespace, serviceName, [remotePort], socket, null, socket);
} catch (error) {
console.error(`Error port-forwarding to service ${namespace}/${serviceName}:`, error.message);
socket.destroy();
}
});

server.listen(localPort, '127.0.0.1', () => {
console.log(`Port forward server listening on http://127.0.0.1:${localPort}`);
console.log(`Forwarding to service: ${namespace}/${serviceName}:${remotePort}`);
console.log(`Press Ctrl+C to stop`);
});

server.on('error', (error: NodeJS.ErrnoException) => {
console.error('Server error:', error);
});

process.on('SIGINT', () => {
console.log('\nShutting down port-forward server...');
server.close();
process.exit(0);
});
61 changes: 30 additions & 31 deletions examples/typescript/watch/watch-example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,36 @@ const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const watch = new k8s.Watch(kc);
const req = await watch
.watch(
'/api/v1/namespaces',
// optional query parameters can go here.
{
allowWatchBookmarks: true,
},
// callback is called for each received object.
(type, apiObj, watchObj) => {
if (type === 'ADDED') {
console.log('new object:');
} else if (type === 'MODIFIED') {
console.log('changed object:');
} else if (type === 'DELETED') {
console.log('deleted object:');
} else if (type === 'BOOKMARK') {
console.log(`bookmark: ${watchObj.metadata.resourceVersion}`);
} else {
console.log('unknown type: ' + type);
}
console.log(apiObj);
},
// done callback is called if the watch terminates either normally or with an error
(err) => {
if (err) {
console.error(err);
} else {
console.log('watch finished normally')
}
},
)
const req = await watch.watch(
'/api/v1/namespaces',
// optional query parameters can go here.
{
allowWatchBookmarks: true,
},
// callback is called for each received object.
(type, apiObj, watchObj) => {
if (type === 'ADDED') {
console.log('new object:');
} else if (type === 'MODIFIED') {
console.log('changed object:');
} else if (type === 'DELETED') {
console.log('deleted object:');
} else if (type === 'BOOKMARK') {
console.log(`bookmark: ${watchObj.metadata.resourceVersion}`);
} else {
console.log('unknown type: ' + type);
}
console.log(apiObj);
},
// done callback is called if the watch terminates either normally or with an error
(err) => {
if (err) {
console.error(err);
} else {
console.log('watch finished normally');
}
},
);
// watch returns a request object which you can use to abort the watch.
setTimeout(() => {
req.abort();
Expand Down
127 changes: 127 additions & 0 deletions src/portforward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ import WebSocket from 'isomorphic-ws';
import querystring from 'node:querystring';
import stream from 'node:stream';

import { AppsV1Api, CoreV1Api, V1Pod } from './gen/index.js';
import { KubeConfig } from './config.js';
import { WebSocketHandler, WebSocketInterface } from './web-socket-handler.js';

export class PortForward {
private readonly config: KubeConfig;
private readonly handler: WebSocketInterface;
private readonly disconnectOnErr: boolean;

// handler is a parameter really only for injecting for testing.
constructor(config: KubeConfig, disconnectOnErr?: boolean, handler?: WebSocketInterface) {
this.config = config;
this.handler = handler || new WebSocketHandler(config);
this.disconnectOnErr = disconnectOnErr === undefined ? true : disconnectOnErr;
}
Expand Down Expand Up @@ -70,4 +73,128 @@ export class PortForward {

return WebSocketHandler.restartableHandleStandardInput(createWebSocket, input, 0, retryCount);
}

/**
* Port forward to a service by resolving to the first ready pod selected by the service's selector.
*
* @param namespace - The namespace of the service
* @param serviceName - The name of the service
* @param targetPorts - The target ports to forward to
* @param output - The writable stream for output
* @param err - The writable stream for error output (can be null)
* @param input - The readable stream for input
* @param retryCount - The number of times to retry the connection
* @throws Will throw an error if the service is not found or has no ready pods
*/
public async portForwardService(
namespace: string,
serviceName: string,
targetPorts: number[],
output: stream.Writable,
err: stream.Writable | null,
input: stream.Readable,
retryCount: number = 0,
): Promise<WebSocket.WebSocket | (() => WebSocket.WebSocket | null)> {
const coreApi = this.config.makeApiClient(CoreV1Api);
const service = await coreApi.readNamespacedService({ name: serviceName, namespace });

if (!service.spec?.selector || Object.keys(service.spec.selector).length === 0) {
throw new Error(`Service ${namespace}/${serviceName} has no selector defined`);
}

const labelSelector = this.buildLabelSelector(service.spec.selector);
const pod = await this.getFirstReadyPod(namespace, labelSelector);

return this.portForward(namespace, pod.metadata!.name!, targetPorts, output, err, input, retryCount);
}

/**
* Port forward to a deployment by resolving to the first ready pod selected by the deployment's selector.
*
* @param namespace - The namespace of the deployment
* @param deploymentName - The name of the deployment
* @param targetPorts - The target ports to forward to
* @param output - The writable stream for output
* @param err - The writable stream for error output (can be null)
* @param input - The readable stream for input
* @param retryCount - The number of times to retry the connection
* @throws Will throw an error if the deployment is not found or has no ready pods
*/
public async portForwardDeployment(
namespace: string,
deploymentName: string,
targetPorts: number[],
output: stream.Writable,
err: stream.Writable | null,
input: stream.Readable,
retryCount: number = 0,
): Promise<WebSocket.WebSocket | (() => WebSocket.WebSocket | null)> {
const appsApi = this.config.makeApiClient(AppsV1Api);
const deployment = await appsApi.readNamespacedDeployment({ name: deploymentName, namespace });

if (
!deployment.spec?.selector?.matchLabels ||
Object.keys(deployment.spec.selector.matchLabels).length === 0
) {
throw new Error(`Deployment ${namespace}/${deploymentName} has no selector defined`);
}

const labelSelector = this.buildLabelSelector(deployment.spec.selector.matchLabels);
const pod = await this.getFirstReadyPod(namespace, labelSelector);

return this.portForward(namespace, pod.metadata!.name!, targetPorts, output, err, input, retryCount);
}

/**
* Get the first ready pod matching the label selector.
*
* @param namespace - The namespace to query
* @param labelSelector - The label selector to filter pods
* @returns The first ready pod
* @throws Will throw an error if no ready pods are found
*/
private async getFirstReadyPod(namespace: string, labelSelector: string): Promise<V1Pod> {
const coreApi = this.config.makeApiClient(CoreV1Api);
const podList = await coreApi.listNamespacedPod({ namespace, labelSelector });

if (!podList.items || podList.items.length === 0) {
throw new Error(`No pods found with selector "${labelSelector}" in namespace ${namespace}`);
}

// Find the first pod with Ready status
for (const pod of podList.items) {
if (this.isPodReady(pod)) {
return pod;
}
}

throw new Error(`No ready pods found with selector "${labelSelector}" in namespace ${namespace}`);
}

/**
* Check if a pod is ready by looking at its status conditions.
*
* @param pod - The pod to check
* @returns True if the pod has a Ready condition with status True
*/
private isPodReady(pod: V1Pod): boolean {
if (!pod.status?.conditions) {
return false;
}
return pod.status.conditions.some(
(condition) => condition.type === 'Ready' && condition.status === 'True',
);
}

/**
* Build a Kubernetes label selector string from a label object.
*
* @param labels - An object of label key-value pairs
* @returns A Kubernetes label selector string
*/
private buildLabelSelector(labels: { [key: string]: string }): string {
return Object.entries(labels)
.map(([key, value]) => `${key}=${value}`)
.join(',');
}
}
Loading