Streaming
The Xec execution engine provides powerful streaming capabilities for handling real-time output, large data transfers, and continuous log monitoring across all adapters.
Overview
Streaming support (packages/core/src/utils/stream.ts
) provides:
- Real-time output streaming from commands
- Pipe operations between commands
- Transform streams for data processing
- Backpressure handling for flow control
- Multi-stream management (stdout/stderr)
- Stream composition and splitting
Basic Streaming
Output Streaming
import { $ } from '@xec-sh/core';
// Stream output in real-time
await $`tail -f /var/log/app.log`
.stdout((line) => {
console.log('LOG:', line);
})
.stderr((line) => {
console.error('ERROR:', line);
});
// Stream with line buffering
await $`long-running-process`
.stdout((line) => process.stdout.write(`[OUT] ${line}\n`))
.stderr((line) => process.stderr.write(`[ERR] ${line}\n`));
Stream to File
import { createWriteStream } from 'fs';
// Stream output to file
const logFile = createWriteStream('output.log');
const errorFile = createWriteStream('error.log');
await $`npm run build`
.stdout(logFile)
.stderr(errorFile);
// Append mode
const appendStream = createWriteStream('app.log', { flags: 'a' });
await $`echo "New log entry"`
.stdout(appendStream);
Pipe Operations
Command Piping
// Pipe between commands
await $`cat large-file.txt`
.pipe($`grep "error"`)
.pipe($`sort`)
.pipe($`uniq -c`);
// Store intermediate results
const filtered = await $`cat data.json`
.pipe($`jq '.items[]'`);
const sorted = await filtered
.pipe($`sort -n`);
Cross-Environment Piping
// Pipe from local to remote
await $`cat local-file.txt`
.pipe($.ssh('server')`cat > remote-file.txt`);
// Pipe from container to local
await $.docker('container')`cat /app/data.json`
.pipe($`jq '.'`)
.stdout(process.stdout);
// Chain across multiple environments
await $.k8s('pod')`cat /data/export.csv`
.pipe($.docker('processor')`python process.py`)
.pipe($`gzip > processed.csv.gz`);
Transform Streams
Data Transformation
import { Transform } from 'stream';
// Create transform stream
const uppercase = new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
}
});
// Apply transformation
await $`echo "hello world"`
.stdout(uppercase)
.stdout(process.stdout); // Outputs: HELLO WORLD
// JSON transformation
const jsonParser = new Transform({
transform(chunk, encoding, callback) {
try {
const data = JSON.parse(chunk);
callback(null, JSON.stringify(data, null, 2));
} catch (err) {
callback(err);
}
}
});
await $`curl api.example.com/data`
.stdout(jsonParser)
.stdout(process.stdout);
Line Processing
// Process lines individually
const lineProcessor = new Transform({
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
const processed = lines
.filter(line => line.includes('ERROR'))
.map(line => `[${new Date().toISOString()}] ${line}`)
.join('\n');
callback(null, processed);
}
});
await $`tail -f app.log`
.stdout(lineProcessor)
.stdout(process.stdout);
Stream Control
Backpressure Handling
// Handle backpressure automatically
const slowConsumer = new Transform({
async transform(chunk, encoding, callback) {
// Simulate slow processing
await new Promise(resolve => setTimeout(resolve, 100));
callback(null, chunk);
}
});
// Execution automatically handles backpressure
await $`cat large-file.txt`
.stdout(slowConsumer)
.stdout(process.stdout);
Stream Pausing/Resuming
// Manual stream control
const command = $`tail -f /var/log/syslog`;
const stream = command.stream();
// Pause after 5 seconds
setTimeout(() => {
stream.pause();
console.log('Stream paused');
}, 5000);
// Resume after 10 seconds
setTimeout(() => {
stream.resume();
console.log('Stream resumed');
}, 10000);
await command;
Multi-Stream Management
Separate Stream Handling
// Handle stdout and stderr separately
await $`command 2>&1`
.stdout((line) => {
if (line.startsWith('ERROR:')) {
logger.error(line);
} else {
logger.info(line);
}
});
// Different handlers for each stream
await $`npm test`
.stdout((line) => console.log(`✓ ${line}`))
.stderr((line) => console.error(`✗ ${line}`));
Stream Merging
import { PassThrough } from 'stream';
// Merge multiple streams
const merged = new PassThrough();
// Merge outputs from multiple commands
await Promise.all([
$`tail -f app1.log`.stdout(merged),
$`tail -f app2.log`.stdout(merged),
$`tail -f app3.log`.stdout(merged)
]);
// Process merged stream
merged.pipe(process.stdout);
Log Streaming
Real-time Logs
// Stream Docker logs
await $.docker('container').logs({
follow: true,
tail: 100,
timestamps: true
}).stdout((line) => {
const [timestamp, ...message] = line.split(' ');
console.log({
timestamp,
message: message.join(' ')
});
});
// Stream Kubernetes logs
await $.k8s('pod', 'namespace').logs({
follow: true,
container: 'app',
since: '10m'
}).stdout((line) => {
console.log(`[K8S] ${line}`);
});
Multi-Source Log Aggregation
// Aggregate logs from multiple sources
async function aggregateLogs(sources: string[]) {
const logStream = new PassThrough();
// Start all log streams
await Promise.all(sources.map(source =>
$.ssh(source)`tail -f /var/log/app.log`
.stdout((line) => {
logStream.write(`[${source}] ${line}\n`);
})
));
// Process aggregated logs
logStream.pipe(process.stdout);
}
await aggregateLogs(['server1', 'server2', 'server3']);
Stream Composition
Pipeline Creation
import { pipeline } from 'stream/promises';
// Create processing pipeline
async function processPipeline(input: string, output: string) {
const gunzip = $`gunzip -c ${input}`.stream();
const process = $`python process.py`.stream();
const compress = $`gzip -c`.stream();
const outputFile = createWriteStream(output);
await pipeline(
gunzip,
process,
compress,
outputFile
);
}
Stream Splitting
// Split stream to multiple destinations
const splitter = new PassThrough();
const file1 = createWriteStream('output1.log');
const file2 = createWriteStream('output2.log');
splitter.pipe(file1);
splitter.pipe(file2);
splitter.pipe(process.stdout);
await $`generate-data`
.stdout(splitter);
Progress Tracking
Stream Progress
// Track streaming progress
let bytesProcessed = 0;
let linesProcessed = 0;
const progressStream = new Transform({
transform(chunk, encoding, callback) {
bytesProcessed += chunk.length;
linesProcessed += chunk.toString().split('\n').length - 1;
// Report progress every 100 lines
if (linesProcessed % 100 === 0) {
console.log(`Processed: ${linesProcessed} lines, ${bytesProcessed} bytes`);
}
callback(null, chunk);
}
});
await $`cat large-file.txt`
.stdout(progressStream)
.stdout(process.stdout);
Download Progress
// Track download progress
await $.ssh('server')`cat large-file.tar.gz`
.progress((transferred, total) => {
const percent = (transferred / total * 100).toFixed(2);
process.stdout.write(`\rDownloading: ${percent}%`);
})
.stdout(createWriteStream('downloaded.tar.gz'));