Перейти к основному содержимому

Stream Processing Patterns

Xec provides powerful streaming capabilities for handling large outputs, real-time data, and efficient I/O operations. This guide covers stream processing patterns for command execution.

Basic Output Streaming

Streaming to Console

import { $ } from '@xec-sh/core';

// Stream output directly to console
await $`npm install`.pipe(process.stdout);

// Stream both stdout and stderr
await $`npm test`
.pipe(process.stdout)
.stderr(process.stderr);

// Stream with prefix
import { Transform } from 'stream';

const prefixer = new Transform({
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
const prefixed = lines.map(line => line ? `[LOG] ${line}` : '').join('\n');
callback(null, prefixed);
}
});

await $`npm run dev`.pipe(prefixer).pipe(process.stdout);

Streaming to Files

import { createWriteStream, createReadStream } from 'fs';

// Stream to file
const logFile = createWriteStream('build.log');
await $`npm run build`.pipe(logFile);

// Append to file
const appendLog = createWriteStream('app.log', { flags: 'a' });
await $`echo "New log entry"`.pipe(appendLog);

// Tee - stream to multiple destinations
import { PassThrough } from 'stream';

const tee = new PassThrough();
const file1 = createWriteStream('output1.log');
const file2 = createWriteStream('output2.log');

tee.pipe(file1);
tee.pipe(file2);
tee.pipe(process.stdout);

await $`npm test`.pipe(tee);

Real-Time Log Processing

Log Parsing and Filtering

import { Transform } from 'stream';

class LogParser extends Transform {
constructor(options = {}) {
super(options);
this.filters = options.filters || [];
this.formatter = options.formatter || (log => JSON.stringify(log));
}

_transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');

for (const line of lines) {
if (!line.trim()) continue;

try {
const log = this.parseLine(line);

// Apply filters
if (this.filters.some(filter => !filter(log))) {
continue;
}

// Format and output
this.push(this.formatter(log) + '\n');
} catch (error) {
// Pass through unparseable lines
this.push(line + '\n');
}
}

callback();
}

parseLine(line) {
// Try JSON format
if (line.startsWith('{')) {
return JSON.parse(line);
}

// Try common log format
const match = line.match(/^\[([^\]]+)\] \[([^\]]+)\] (.+)$/);
if (match) {
return {
timestamp: match[1],
level: match[2],
message: match[3]
};
}

// Default format
return { message: line };
}
}

// Usage
const parser = new LogParser({
filters: [
log => log.level !== 'DEBUG', // Filter out DEBUG logs
log => !log.message.includes('deprecated') // Filter deprecation warnings
],
formatter: log => `${log.timestamp || new Date().toISOString()} - ${log.message}`
});

await $`tail -f /var/log/app.log`
.pipe(parser)
.pipe(process.stdout);

Multi-Source Log Aggregation

import { Readable } from 'stream';

class LogAggregator extends Transform {
constructor() {
super();
this.sources = new Map();
}

addSource(name, stream) {
stream.on('data', chunk => {
const lines = chunk.toString().split('\n');
lines.forEach(line => {
if (line.trim()) {
this.push(`[${name}] ${line}\n`);
}
});
});

stream.on('error', error => {
this.push(`[${name}] ERROR: ${error.message}\n`);
});

this.sources.set(name, stream);
}

removeSource(name) {
const stream = this.sources.get(name);
if (stream) {
stream.destroy();
this.sources.delete(name);
}
}
}

// Usage
const aggregator = new LogAggregator();

// Add multiple log sources
aggregator.addSource('app', $`tail -f /var/log/app.log`);
aggregator.addSource('nginx', $`tail -f /var/log/nginx/access.log`);
aggregator.addSource('system', $`journalctl -f`);

// Output aggregated logs
aggregator.pipe(process.stdout);

Stream Transformation

Data Transformation Pipeline

class StreamPipeline {
constructor() {
this.transforms = [];
}

add(transform) {
this.transforms.push(transform);
return this;
}

createStream() {
if (this.transforms.length === 0) {
return new PassThrough();
}

let stream = this.transforms[0];
for (let i = 1; i < this.transforms.length; i++) {
stream = stream.pipe(this.transforms[i]);
}

return stream;
}
}

// Create transformation pipeline
const pipeline = new StreamPipeline()
.add(new Transform({
transform(chunk, encoding, callback) {
// Convert to uppercase
callback(null, chunk.toString().toUpperCase());
}
}))
.add(new Transform({
transform(chunk, encoding, callback) {
// Add timestamp
const lines = chunk.toString().split('\n');
const timestamped = lines.map(line =>
line ? `[${new Date().toISOString()}] ${line}` : ''
).join('\n');
callback(null, timestamped);
}
}))
.add(new Transform({
transform(chunk, encoding, callback) {
// Colorize
const colored = chunk.toString()
.replace(/ERROR/g, '\x1b[31mERROR\x1b[0m')
.replace(/WARNING/g, '\x1b[33mWARNING\x1b[0m')
.replace(/INFO/g, '\x1b[32mINFO\x1b[0m');
callback(null, colored);
}
}));

await $`npm test`
.pipe(pipeline.createStream())
.pipe(process.stdout);

JSON Stream Processing

import { Transform } from 'stream';

class JSONStreamProcessor extends Transform {
constructor(processor) {
super();
this.processor = processor;
this.buffer = '';
}

_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();

// Try to extract complete JSON objects
let startIndex = 0;
let braceCount = 0;
let inString = false;
let escapeNext = false;

for (let i = 0; i < this.buffer.length; i++) {
const char = this.buffer[i];

if (escapeNext) {
escapeNext = false;
continue;
}

if (char === '\\') {
escapeNext = true;
continue;
}

if (char === '"') {
inString = !inString;
continue;
}

if (!inString) {
if (char === '{') braceCount++;
if (char === '}') braceCount--;

if (braceCount === 0 && i > startIndex) {
const jsonStr = this.buffer.substring(startIndex, i + 1);

try {
const json = JSON.parse(jsonStr);
const processed = this.processor(json);
this.push(JSON.stringify(processed) + '\n');
startIndex = i + 1;
} catch (error) {
// Invalid JSON, skip
}
}
}
}

this.buffer = this.buffer.substring(startIndex);
callback();
}

_flush(callback) {
if (this.buffer.trim()) {
try {
const json = JSON.parse(this.buffer);
const processed = this.processor(json);
this.push(JSON.stringify(processed) + '\n');
} catch (error) {
// Invalid JSON at end
}
}
callback();
}
}

// Usage - process JSON logs
const processor = new JSONStreamProcessor(log => ({
...log,
processed: true,
timestamp: new Date().toISOString()
}));

await $`docker logs -f container_name`
.pipe(processor)
.pipe(process.stdout);

Progress Monitoring

Progress Stream

class ProgressStream extends Transform {
constructor(options = {}) {
super();
this.total = options.total || 0;
this.current = 0;
this.label = options.label || 'Progress';
this.updateInterval = options.updateInterval || 100;
this.lastUpdate = 0;
}

_transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');

for (const line of lines) {
// Look for progress indicators
const progressMatch = line.match(/(\d+)\/(\d+)/);
if (progressMatch) {
this.current = parseInt(progressMatch[1]);
this.total = parseInt(progressMatch[2]);
this.updateProgress();
}

const percentMatch = line.match(/(\d+)%/);
if (percentMatch) {
const percent = parseInt(percentMatch[1]);
this.current = Math.floor(this.total * percent / 100);
this.updateProgress();
}
}

// Pass through original data
callback(null, chunk);
}

updateProgress() {
const now = Date.now();
if (now - this.lastUpdate < this.updateInterval) return;

this.lastUpdate = now;
const percent = this.total > 0 ? Math.floor(this.current / this.total * 100) : 0;
const bar = this.createProgressBar(percent);

process.stdout.write(`\r${this.label}: ${bar} ${percent}% (${this.current}/${this.total})`);

if (this.current >= this.total) {
process.stdout.write('\n');
}
}

createProgressBar(percent) {
const width = 30;
const filled = Math.floor(width * percent / 100);
const empty = width - filled;
return '█'.repeat(filled) + '░'.repeat(empty);
}
}

// Usage
const progress = new ProgressStream({
label: 'Building',
total: 100
});

await $`npm run build:verbose`
.pipe(progress)
.pipe(createWriteStream('build.log'));

Parallel Stream Processing

Multi-Stream Processor

class ParallelStreamProcessor {
constructor(workerCount = 4) {
this.workerCount = workerCount;
this.workers = [];
this.currentWorker = 0;
}

async process(inputStream, processor) {
// Create worker streams
for (let i = 0; i < this.workerCount; i++) {
this.workers.push(this.createWorker(processor));
}

// Distribute input to workers
return new Promise((resolve, reject) => {
inputStream.on('data', chunk => {
const lines = chunk.toString().split('\n');

lines.forEach(line => {
if (line.trim()) {
// Round-robin distribution
this.workers[this.currentWorker].write(line + '\n');
this.currentWorker = (this.currentWorker + 1) % this.workerCount;
}
});
});

inputStream.on('end', () => {
this.workers.forEach(worker => worker.end());
resolve();
});

inputStream.on('error', reject);
});
}

createWorker(processor) {
return new Transform({
async transform(chunk, encoding, callback) {
try {
const result = await processor(chunk.toString());
callback(null, result);
} catch (error) {
callback(error);
}
}
}).pipe(process.stdout);
}
}

// Usage
const parallelProcessor = new ParallelStreamProcessor(4);

await parallelProcessor.process(
$`find . -name "*.log"`,
async (filename) => {
const result = await $`wc -l ${filename.trim()}`.nothrow();
return result.stdout;
}
);

Buffering and Batching

Batch Stream Processor

class BatchStream extends Transform {
constructor(options = {}) {
super({ objectMode: true });
this.batchSize = options.batchSize || 100;
this.flushInterval = options.flushInterval || 1000;
this.processor = options.processor || (batch => batch);

this.batch = [];
this.timer = null;

this.startTimer();
}

_transform(chunk, encoding, callback) {
this.batch.push(chunk);

if (this.batch.length >= this.batchSize) {
this.flush();
}

callback();
}

_flush(callback) {
this.flush();
clearInterval(this.timer);
callback();
}

async flush() {
if (this.batch.length === 0) return;

const currentBatch = this.batch;
this.batch = [];

try {
const result = await this.processor(currentBatch);
this.push(result);
} catch (error) {
this.emit('error', error);
}
}

startTimer() {
this.timer = setInterval(() => {
this.flush();
}, this.flushInterval);
}
}

// Usage - batch database inserts
const batcher = new BatchStream({
batchSize: 1000,
flushInterval: 5000,
processor: async (batch) => {
// Insert batch into database
await $`psql -c "INSERT INTO logs VALUES ${batch.join(',')}"`;
return `Inserted ${batch.length} records\n`;
}
});

await $`tail -f /var/log/app.log`
.pipe(new Transform({
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
lines.forEach(line => {
if (line.trim()) {
this.push(line);
}
});
callback();
},
objectMode: true
}))
.pipe(batcher)
.pipe(process.stdout);

Error Handling in Streams

Resilient Stream Pipeline

class ResilientStream extends Transform {
constructor(options = {}) {
super();
this.maxRetries = options.maxRetries || 3;
this.retryDelay = options.retryDelay || 1000;
this.errorHandler = options.errorHandler || (() => {});
}

async _transform(chunk, encoding, callback) {
let retries = 0;

while (retries < this.maxRetries) {
try {
const result = await this.process(chunk);
callback(null, result);
return;
} catch (error) {
retries++;

if (retries >= this.maxRetries) {
this.errorHandler(error, chunk);
// Skip this chunk but continue processing
callback();
return;
}

await new Promise(resolve =>
setTimeout(resolve, this.retryDelay * retries)
);
}
}
}

async process(chunk) {
// Override in subclass
return chunk;
}
}

// Usage
class APIStream extends ResilientStream {
async process(chunk) {
const data = JSON.parse(chunk.toString());
const response = await $`curl -X POST https://api.example.com/data -d '${JSON.stringify(data)}'`;
return response.stdout;
}
}

const apiStream = new APIStream({
maxRetries: 5,
retryDelay: 2000,
errorHandler: (error, chunk) => {
console.error('Failed to process chunk:', error.message);
// Log to dead letter queue
fs.appendFileSync('failed.log', chunk + '\n');
}
});

Complete Streaming Example

// streaming-log-processor.js
import { $ } from '@xec-sh/core';
import { Transform, PassThrough } from 'stream';
import { createWriteStream } from 'fs';
import chalk from 'chalk';

class LogProcessingPipeline {
constructor(config) {
this.config = config;
this.stats = {
total: 0,
errors: 0,
warnings: 0,
processed: 0
};
}

async start() {
console.log(chalk.blue('Starting log processing pipeline...'));

// Create pipeline stages
const source = this.createSource();
const parser = this.createParser();
const filter = this.createFilter();
const enricher = this.createEnricher();
const aggregator = this.createAggregator();
const writer = this.createWriter();

// Connect pipeline
source
.pipe(parser)
.pipe(filter)
.pipe(enricher)
.pipe(aggregator)
.pipe(writer);

// Monitor pipeline
this.monitorPipeline([source, parser, filter, enricher, aggregator, writer]);

// Start stats reporting
this.startStatsReporting();

return new Promise((resolve, reject) => {
writer.on('finish', resolve);
writer.on('error', reject);
});
}

createSource() {
if (this.config.follow) {
return $`tail -f ${this.config.logFile}`;
} else {
return $`cat ${this.config.logFile}`;
}
}

createParser() {
return new Transform({
transform: (chunk, encoding, callback) => {
const lines = chunk.toString().split('\n');

for (const line of lines) {
if (!line.trim()) continue;

try {
// Parse log line
const log = this.parseLogLine(line);
this.stats.total++;

// Track log levels
if (log.level === 'ERROR') this.stats.errors++;
if (log.level === 'WARNING') this.stats.warnings++;

callback(null, JSON.stringify(log) + '\n');
} catch (error) {
// Pass through unparseable lines
callback(null, line + '\n');
}
}
}
});
}

createFilter() {
return new Transform({
transform: (chunk, encoding, callback) => {
try {
const log = JSON.parse(chunk.toString());

// Apply filters
if (this.config.filters.level &&
log.level !== this.config.filters.level) {
callback(); // Skip
return;
}

if (this.config.filters.timeRange) {
const logTime = new Date(log.timestamp);
const { start, end } = this.config.filters.timeRange;

if (logTime < start || logTime > end) {
callback(); // Skip
return;
}
}

if (this.config.filters.pattern &&
!log.message.match(this.config.filters.pattern)) {
callback(); // Skip
return;
}

callback(null, JSON.stringify(log) + '\n');
} catch (error) {
callback(); // Skip invalid entries
}
}
});
}

createEnricher() {
return new Transform({
transform: async (chunk, encoding, callback) => {
try {
const log = JSON.parse(chunk.toString());

// Enrich with additional data
if (log.userId) {
// Lookup user info (example)
const userInfo = await this.getUserInfo(log.userId);
log.user = userInfo;
}

if (log.ip) {
// Geo-locate IP (example)
const location = await this.geoLocate(log.ip);
log.location = location;
}

// Add processing metadata
log.processedAt = new Date().toISOString();
log.pipeline = this.config.pipelineName;

this.stats.processed++;

callback(null, JSON.stringify(log) + '\n');
} catch (error) {
callback(null, chunk); // Pass through on error
}
}
});
}

createAggregator() {
const aggregator = new Transform({
transform: function(chunk, encoding, callback) {
// Collect for batching
if (!this.buffer) this.buffer = [];
this.buffer.push(chunk);

if (this.buffer.length >= 100) {
const batch = this.buffer.join('');
this.buffer = [];
callback(null, batch);
} else {
callback();
}
},
flush: function(callback) {
if (this.buffer && this.buffer.length > 0) {
callback(null, this.buffer.join(''));
} else {
callback();
}
}
});

return aggregator;
}

createWriter() {
const outputs = [];

// File output
if (this.config.output.file) {
outputs.push(createWriteStream(this.config.output.file, { flags: 'a' }));
}

// Database output
if (this.config.output.database) {
outputs.push(this.createDatabaseStream());
}

// Console output
if (this.config.output.console) {
outputs.push(this.createConsoleStream());
}

// Create tee for multiple outputs
const tee = new PassThrough();
outputs.forEach(output => tee.pipe(output));

return tee;
}

createDatabaseStream() {
return new Transform({
transform: async (chunk, encoding, callback) => {
const logs = chunk.toString()
.split('\n')
.filter(line => line.trim())
.map(line => JSON.parse(line));

if (logs.length > 0) {
// Insert into database
const values = logs.map(log =>
`('${log.timestamp}', '${log.level}', '${log.message}')`
).join(',');

await $`psql -c "INSERT INTO logs (timestamp, level, message) VALUES ${values}"`;
}

callback();
}
});
}

createConsoleStream() {
return new Transform({
transform: (chunk, encoding, callback) => {
const logs = chunk.toString()
.split('\n')
.filter(line => line.trim())
.map(line => JSON.parse(line));

for (const log of logs) {
const color = {
'ERROR': chalk.red,
'WARNING': chalk.yellow,
'INFO': chalk.blue,
'DEBUG': chalk.gray
}[log.level] || chalk.white;

console.log(color(`[${log.timestamp}] [${log.level}] ${log.message}`));
}

callback();
}
});
}

parseLogLine(line) {
// Try different log formats

// JSON format
if (line.startsWith('{')) {
return JSON.parse(line);
}

// Apache/Nginx format
const apacheMatch = line.match(/^(\S+) \S+ \S+ \[([^\]]+)\] "([^"]+)" (\d+) (\d+)/);
if (apacheMatch) {
return {
ip: apacheMatch[1],
timestamp: apacheMatch[2],
request: apacheMatch[3],
status: apacheMatch[4],
bytes: apacheMatch[5],
level: 'INFO'
};
}

// Syslog format
const syslogMatch = line.match(/^(\w+\s+\d+\s+\d+:\d+:\d+)\s+(\S+)\s+(\S+)\[(\d+)\]:\s+(.+)$/);
if (syslogMatch) {
return {
timestamp: syslogMatch[1],
host: syslogMatch[2],
process: syslogMatch[3],
pid: syslogMatch[4],
message: syslogMatch[5],
level: 'INFO'
};
}

// Default format
return {
timestamp: new Date().toISOString(),
message: line,
level: 'INFO'
};
}

async getUserInfo(userId) {
// Mock user lookup
return { id: userId, name: 'User ' + userId };
}

async geoLocate(ip) {
// Mock geo-location
return { country: 'US', city: 'New York' };
}

monitorPipeline(stages) {
stages.forEach((stage, index) => {
stage.on('error', error => {
console.error(chalk.red(`Error in stage ${index}:`), error.message);
});
});
}

startStatsReporting() {
setInterval(() => {
console.log(chalk.gray(
`Stats: Total=${this.stats.total} Processed=${this.stats.processed} ` +
`Errors=${this.stats.errors} Warnings=${this.stats.warnings}`
));
}, 5000);
}
}

// Usage
const pipeline = new LogProcessingPipeline({
logFile: '/var/log/app.log',
follow: true,
pipelineName: 'main',
filters: {
level: 'ERROR',
timeRange: {
start: new Date(Date.now() - 3600000), // Last hour
end: new Date()
},
pattern: /database|connection/i
},
output: {
file: 'processed.log',
database: true,
console: true
}
});

await pipeline.start();

This comprehensive example demonstrates:

  • Multi-stage streaming pipeline
  • Log parsing and filtering
  • Data enrichment
  • Batch processing
  • Multiple output destinations
  • Error handling and monitoring
  • Real-time statistics
  • Format detection
  • Performance optimization through batching