Log Streaming
Implementation Referenceβ
Source Files:
packages/core/src/adapters/kubernetes-adapter.ts
- Log streaming implementationpackages/core/src/utils/kubernetes-api.ts
- Pod-level log methods
Key Functions:
KubernetesAdapter.streamLogs()
- Core log streaming functionalityK8sPod.logs()
- Get static log contentK8sPod.streamLogs()
- Stream logs with full optionsK8sPod.follow()
- Convenient log following (streamLogs with follow: true)
Overviewβ
Xec provides comprehensive log streaming capabilities for Kubernetes pods through kubectl logs
. This enables real-time monitoring, debugging, and log aggregation from containerized applications with support for filtering, tailing, and multi-container pods.
Basic Log Operationsβ
Static Log Retrievalβ
Get log content without streaming:
import { $ } from '@xec-sh/core';
const k8s = $.k8s({ namespace: 'production' });
const pod = k8s.pod('web-server');
// Get recent logs
const logs = await pod.logs({ tail: 100 });
console.log('Recent logs:');
console.log(logs);
// Get logs with timestamps
const timestampedLogs = await pod.logs({
tail: 50,
timestamps: true
});
console.log('Timestamped logs:');
console.log(timestampedLogs);
Previous Container Logsβ
Access logs from previous container instances:
// Get logs from previous container (after restart)
const previousLogs = await pod.logs({
previous: true,
tail: 200
});
console.log('Logs from previous container:');
console.log(previousLogs);
Real-time Log Streamingβ
Basic Streamingβ
Stream logs in real-time:
const pod = k8s.pod('api-server');
// Stream logs with callback
const stream = await pod.streamLogs(
(line) => {
console.log(`[${new Date().toISOString()}] ${line.trim()}`);
},
{
follow: true,
tail: 10 // Start with last 10 lines
}
);
// Let it stream for 30 seconds
setTimeout(() => {
stream.stop();
console.log('Log streaming stopped');
}, 30000);
Follow Logs (Simplified)β
Use the convenient follow()
method:
// Simplified log following
const followStream = await pod.follow(
(line) => {
// Parse structured logs
try {
const logEntry = JSON.parse(line);
console.log(`[${logEntry.level}] ${logEntry.message}`);
} catch {
// Handle plain text logs
console.log(`RAW: ${line.trim()}`);
}
},
{
tail: 20,
timestamps: true
}
);
// Stop after some time
setTimeout(() => followStream.stop(), 60000);
Container-Specific Loggingβ
Multi-Container Podsβ
Stream logs from specific containers:
const multiPod = k8s.pod('multi-container-pod');
// Stream from nginx container
const nginxStream = await multiPod.streamLogs(
(line) => console.log(`[nginx] ${line.trim()}`),
{
container: 'nginx',
follow: true,
tail: 50
}
);
// Stream from app container
const appStream = await multiPod.streamLogs(
(line) => console.log(`[app] ${line.trim()}`),
{
container: 'app',
follow: true,
tail: 50
}
);
// Monitor both containers
console.log('Monitoring both containers...');
setTimeout(() => {
nginxStream.stop();
appStream.stop();
}, 30000);
Container Selectionβ
Target specific containers in complex pods:
// Get list of containers first (if needed)
const podInfo = await $.k8s({
pod: 'complex-pod',
namespace: 'production'
})`echo "Container: $HOSTNAME"`;
// Stream from sidecar container
const sidecarLogs = await pod.streamLogs(
(line) => {
if (line.includes('ERROR') || line.includes('WARN')) {
console.log(`π¨ SIDECAR: ${line.trim()}`);
}
},
{
container: 'istio-proxy',
follow: true
}
);
Advanced Log Filteringβ
Log Processingβ
Process and filter logs during streaming:
const pod = k8s.pod('application-server');
const logStream = await pod.follow(
(line) => {
const trimmedLine = line.trim();
// Filter and categorize logs
if (trimmedLine.includes('ERROR')) {
console.error(`β ERROR: ${trimmedLine}`);
} else if (trimmedLine.includes('WARN')) {
console.warn(`β οΈ WARN: ${trimmedLine}`);
} else if (trimmedLine.includes('INFO')) {
console.info(`βΉοΈ INFO: ${trimmedLine}`);
} else if (trimmedLine.includes('DEBUG')) {
// Skip debug logs in production
return;
} else {
console.log(`π LOG: ${trimmedLine}`);
}
},
{
tail: 0, // Only new logs
timestamps: true
}
);
Structured Log Parsingβ
Parse and format structured logs:
interface LogEntry {
timestamp: string;
level: string;
message: string;
service?: string;
traceId?: string;
}
const stream = await pod.follow(
(line) => {
try {
const entry: LogEntry = JSON.parse(line);
const timestamp = new Date(entry.timestamp).toLocaleTimeString();
const level = entry.level.padEnd(5);
const service = entry.service || 'unknown';
const traceId = entry.traceId ? ` [${entry.traceId.slice(0, 8)}]` : '';
console.log(`${timestamp} ${level} [${service}]${traceId} ${entry.message}`);
} catch {
// Fallback for non-JSON logs
console.log(`RAW: ${line.trim()}`);
}
},
{ follow: true }
);
Log Aggregationβ
Multiple Pod Monitoringβ
Aggregate logs from multiple pods:
async function aggregateLogs(podNames: string[], namespace: string) {
const k8s = $.k8s({ namespace });
const streams: Array<{ pod: string; stream: { stop: () => void } }> = [];
console.log(`Starting log aggregation for ${podNames.length} pods...\n`);
// Set up streaming for each pod
for (const podName of podNames) {
const pod = k8s.pod(podName);
const stream = await pod.follow(
(line) => {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] [${podName}] ${line.trim()}`);
},
{
tail: 5, // Start with recent logs
timestamps: false // We add our own timestamps
}
);
streams.push({ pod: podName, stream });
}
console.log(`Aggregating logs from ${streams.length} pods. Press Ctrl+C to stop...\n`);
// Return cleanup function
return () => {
console.log('\nStopping log aggregation...');
streams.forEach(({ pod, stream }) => {
stream.stop();
console.log(` Stopped ${pod}`);
});
};
}
// Usage
const stopAggregation = await aggregateLogs(
['web-1', 'web-2', 'web-3'],
'production'
);
// Stop after 2 minutes
setTimeout(stopAggregation, 120000);
Deployment Monitoringβ
Monitor all pods in a deployment:
async function monitorDeployment(deploymentName: string, namespace: string) {
// Get pods for deployment (this would require kubectl integration)
const labelSelector = `app=${deploymentName}`;
// For this example, assume we have pod names
const pods = ['web-deploy-abc', 'web-deploy-def', 'web-deploy-ghi'];
const k8s = $.k8s({ namespace });
const activeStreams = new Map();
console.log(`Monitoring deployment: ${deploymentName}`);
for (const podName of pods) {
const pod = k8s.pod(podName);
const stream = await pod.follow(
(line) => {
// Add pod identifier and format
const timestamp = new Date().toISOString().slice(11, 23);
console.log(`${timestamp} [${podName.slice(-3)}] ${line.trim()}`);
},
{
tail: 0, // Only new logs
timestamps: false
}
);
activeStreams.set(podName, stream);
}
return {
stop: () => {
activeStreams.forEach((stream, podName) => {
stream.stop();
console.log(`Stopped monitoring ${podName}`);
});
activeStreams.clear();
},
getActiveCount: () => activeStreams.size
};
}
Error Handling and Monitoringβ
Connection Resilienceβ
Handle streaming interruptions:
async function resilientLogStreaming(podName: string, maxRetries = 3) {
const pod = k8s.pod(podName);
let retryCount = 0;
let currentStream: { stop: () => void } | null = null;
const startStream = async (): Promise<void> => {
try {
currentStream = await pod.follow(
(line) => {
console.log(`[${podName}] ${line.trim()}`);
retryCount = 0; // Reset on successful data
},
{ follow: true, tail: 10 }
);
} catch (error) {
console.error(`Stream error for ${podName}:`, error.message);
if (retryCount < maxRetries) {
retryCount++;
console.log(`Retrying in 5 seconds... (${retryCount}/${maxRetries})`);
setTimeout(startStream, 5000);
} else {
console.error(`Max retries exceeded for ${podName}`);
}
}
};
await startStream();
return {
stop: () => {
if (currentStream) {
currentStream.stop();
currentStream = null;
}
}
};
}
Log Quality Monitoringβ
Monitor log quality and detect issues:
async function monitorLogQuality(podName: string) {
const pod = k8s.pod(podName);
let errorCount = 0;
let warningCount = 0;
let totalLines = 0;
let lastLogTime = Date.now();
const stream = await pod.follow(
(line) => {
totalLines++;
lastLogTime = Date.now();
const trimmedLine = line.trim();
if (trimmedLine.includes('ERROR')) {
errorCount++;
console.error(`π₯ ERROR [${errorCount}]: ${trimmedLine}`);
} else if (trimmedLine.includes('WARN')) {
warningCount++;
console.warn(`β οΈ WARN [${warningCount}]: ${trimmedLine}`);
}
// Alert on high error rate
if (errorCount > 10 && totalLines > 50) {
const errorRate = (errorCount / totalLines) * 100;
if (errorRate > 20) {
console.error(`π¨ HIGH ERROR RATE: ${errorRate.toFixed(1)}% in ${podName}`);
}
}
},
{ follow: true }
);
// Monitor for log silence
const silenceCheck = setInterval(() => {
const timeSinceLastLog = Date.now() - lastLogTime;
if (timeSinceLastLog > 60000) { // 1 minute
console.warn(`β° No logs from ${podName} for ${Math.round(timeSinceLastLog/1000)}s`);
}
}, 30000);
return {
stop: () => {
stream.stop();
clearInterval(silenceCheck);
},
getStats: () => ({
totalLines,
errorCount,
warningCount,
errorRate: totalLines > 0 ? (errorCount / totalLines) * 100 : 0
})
};
}
Debugging Workflowsβ
Application Debuggingβ
Set up comprehensive debugging log monitoring:
async function debugApplication(appPod: string) {
const pod = k8s.pod(appPod);
console.log(`π Starting debug session for ${appPod}\n`);
// Get recent error context
const recentLogs = await pod.logs({
tail: 100,
timestamps: true
});
const errorLines = recentLogs.split('\n').filter(line =>
line.includes('ERROR') || line.includes('FATAL')
);
if (errorLines.length > 0) {
console.log('Recent errors found:');
errorLines.forEach(line => console.log(` ${line}`));
console.log();
}
// Start real-time monitoring
const debugStream = await pod.follow(
(line) => {
const timestamp = new Date().toISOString();
if (line.includes('ERROR') || line.includes('FATAL')) {
console.error(`${timestamp} π₯ ${line.trim()}`);
} else if (line.includes('WARN')) {
console.warn(`${timestamp} β οΈ ${line.trim()}`);
} else if (line.includes('DEBUG')) {
console.debug(`${timestamp} π ${line.trim()}`);
} else {
console.log(`${timestamp} βΉοΈ ${line.trim()}`);
}
},
{
tail: 0,
timestamps: false
}
);
return {
stop: () => {
debugStream.stop();
console.log('\nπ Debug session ended');
}
};
}
Performance Monitoringβ
Monitor application performance through logs:
async function monitorPerformance(appPod: string) {
const pod = k8s.pod(appPod);
const metrics = {
requestCount: 0,
errorCount: 0,
slowRequests: 0,
responseTimes: [] as number[]
};
const stream = await pod.follow(
(line) => {
// Parse access logs (assume nginx-style)
const httpMatch = line.match(/(\d{3})\s+(\d+\.\d+)/);
if (httpMatch) {
const [, status, responseTime] = httpMatch;
const statusCode = parseInt(status);
const time = parseFloat(responseTime);
metrics.requestCount++;
metrics.responseTimes.push(time);
if (statusCode >= 400) {
metrics.errorCount++;
}
if (time > 1.0) { // Slow request threshold
metrics.slowRequests++;
console.warn(`π Slow request: ${time}s (${statusCode})`);
}
// Report metrics every 100 requests
if (metrics.requestCount % 100 === 0) {
const avgTime = metrics.responseTimes.reduce((a, b) => a + b, 0) / metrics.responseTimes.length;
const errorRate = (metrics.errorCount / metrics.requestCount) * 100;
console.log(`π Metrics (${metrics.requestCount} requests):`);
console.log(` Average response time: ${avgTime.toFixed(2)}s`);
console.log(` Error rate: ${errorRate.toFixed(1)}%`);
console.log(` Slow requests: ${metrics.slowRequests}`);
// Reset for next period
metrics.responseTimes = [];
}
}
},
{ follow: true }
);
return { stream, metrics };
}
Performance Considerationsβ
Stream Managementβ
- Memory Usage: Log streams buffer data in memory
- CPU Impact: Log parsing can be CPU intensive
- Network: Continuous data transfer from cluster
Best Practicesβ
// Good: Limit tail size for initial logs
const stream = await pod.follow(onData, { tail: 50 });
// Good: Use structured logging for better parsing
const parseStructuredLog = (line: string) => {
try {
return JSON.parse(line);
} catch {
return { message: line, level: 'unknown' };
}
};
// Good: Always clean up streams
const cleanup = () => {
activeStreams.forEach(stream => stream.stop());
activeStreams.clear();
};
process.on('SIGINT', cleanup);
process.on('SIGTERM', cleanup);
Resource Optimizationβ
// Batch log processing for efficiency
let logBuffer: string[] = [];
const batchedStream = await pod.follow(
(line) => {
logBuffer.push(line);
// Process in batches of 10
if (logBuffer.length >= 10) {
processBatch(logBuffer);
logBuffer = [];
}
},
{ follow: true }
);
function processBatch(lines: string[]) {
// Process multiple lines together
const errors = lines.filter(line => line.includes('ERROR'));
if (errors.length > 0) {
console.log(`Batch contains ${errors.length} errors`);
}
}