Async Patterns for Command Execution
Xec scripts are inherently asynchronous, leveraging JavaScript's async/await for elegant control flow. This guide covers advanced async patterns for efficient and scalable command execution.
Sequential vs Parallel Execution
Sequential Execution
Commands execute one after another:
import { $ } from '@xec-sh/core';
// Sequential - each command waits for the previous
async function sequential() {
console.time('sequential');
await $`sleep 1 && echo "Task 1"`;
await $`sleep 1 && echo "Task 2"`;
await $`sleep 1 && echo "Task 3"`;
console.timeEnd('sequential'); // ~3 seconds
}
Parallel Execution
Execute multiple commands simultaneously:
// Parallel - all commands run at the same time
async function parallel() {
console.time('parallel');
await Promise.all([
$`sleep 1 && echo "Task 1"`,
$`sleep 1 && echo "Task 2"`,
$`sleep 1 && echo "Task 3"`
]);
console.timeEnd('parallel'); // ~1 second
}
// With result handling
async function parallelWithResults() {
const results = await Promise.all([
$`echo "Result 1"`,
$`echo "Result 2"`,
$`echo "Result 3"`
]);
results.forEach((result, i) => {
console.log(`Task ${i + 1}: ${result.stdout.trim()}`);
});
}
Mixed Sequential and Parallel
async function mixed() {
// Phase 1: Parallel preparation
await Promise.all([
$`npm install`,
$`pip install -r requirements.txt`,
$`bundle install`
]);
// Phase 2: Sequential build
await $`npm run build`;
await $`python setup.py build`;
// Phase 3: Parallel tests
const testResults = await Promise.all([
$`npm test`,
$`pytest`,
$`rspec`
]);
return testResults;
}
Promise Control Patterns
Promise.allSettled for Resilient Execution
async function runAllTests() {
const tests = [
$`npm run test:unit`,
$`npm run test:integration`,
$`npm run test:e2e`,
$`npm run test:performance`
];
const results = await Promise.allSettled(tests);
const summary = {
total: results.length,
passed: 0,
failed: 0,
errors: []
};
results.forEach((result, index) => {
if (result.status === 'fulfilled' && result.value.exitCode === 0) {
summary.passed++;
} else {
summary.failed++;
summary.errors.push({
test: tests[index],
reason: result.reason || result.value.stderr
});
}
});
console.log(`Tests: ${summary.passed}/${summary.total} passed`);
return summary;
}
Promise.race for Timeouts
async function raceWithTimeout(command, timeoutMs = 5000) {
const timeout = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Command timed out')), timeoutMs)
);
try {
return await Promise.race([command, timeout]);
} catch (error) {
if (error.message === 'Command timed out') {
// Kill the command if it's still running
command.kill();
}
throw error;
}
}
// Usage
const result = await raceWithTimeout(
$`curl https://slow-api.example.com`,
3000
);
Promise.any for Fallback Servers
async function fetchFromAnyServer(servers) {
const attempts = servers.map(server =>
$`curl https://${server}/api/data`
);
try {
const result = await Promise.any(attempts);
console.log('Fetched from first available server');
return result;
} catch (error) {
console.error('All servers failed:', error);
throw new Error('No servers available');
}
}
// Usage
const data = await fetchFromAnyServer([
'primary.example.com',
'backup1.example.com',
'backup2.example.com'
]);
Concurrency Control
Limited Parallelism
class ConcurrencyLimiter {
constructor(limit = 5) {
this.limit = limit;
this.running = 0;
this.queue = [];
}
async run(fn) {
while (this.running >= this.limit) {
await new Promise(resolve => this.queue.push(resolve));
}
this.running++;
try {
return await fn();
} finally {
this.running--;
const resolve = this.queue.shift();
if (resolve) resolve();
}
}
}
// Usage
const limiter = new ConcurrencyLimiter(3);
async function deployToManyServers(servers) {
const deployments = servers.map(server =>
limiter.run(async () => {
console.log(`Deploying to ${server}...`);
await $`ssh ${server} "cd /app && git pull && npm install"`;
console.log(`Deployed to ${server}`);
})
);
await Promise.all(deployments);
}
Batch Processing
async function processBatches(items, batchSize = 10, processor) {
const results = [];
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
console.log(`Processing batch ${i / batchSize + 1}/${Math.ceil(items.length / batchSize)}`);
const batchResults = await Promise.all(
batch.map(item => processor(item))
);
results.push(...batchResults);
// Optional delay between batches
if (i + batchSize < items.length) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
return results;
}
// Usage
const files = await glob('**/*.log');
await processBatches(files, 5, async (file) => {
return await $`gzip ${file}`;
});
Async Iterators and Generators
Async Generator for Streaming Results
async function* executeCommands(commands) {
for (const command of commands) {
try {
const result = await command;
yield { success: true, result };
} catch (error) {
yield { success: false, error };
}
}
}
// Usage
const commands = [
$`echo "Command 1"`,
$`echo "Command 2"`,
$`false`, // This will fail
$`echo "Command 4"`
];
for await (const outcome of executeCommands(commands)) {
if (outcome.success) {
console.log('✅', outcome.result.stdout.trim());
} else {
console.log('❌', outcome.error.message);
}
}
Async Iterator for Progress Updates
async function* deployWithProgress(steps) {
const total = steps.length;
for (let i = 0; i < total; i++) {
const step = steps[i];
yield {
type: 'progress',
current: i,
total,
message: `Starting: ${step.name}`
};
try {
const result = await step.command();
yield {
type: 'complete',
current: i + 1,
total,
message: `Completed: ${step.name}`,
result
};
} catch (error) {
yield {
type: 'error',
current: i,
total,
message: `Failed: ${step.name}`,
error
};
throw error;
}
}
}
// Usage
const deploymentSteps = [
{ name: 'Build', command: () => $`npm run build` },
{ name: 'Test', command: () => $`npm test` },
{ name: 'Deploy', command: () => $`npm run deploy` }
];
for await (const update of deployWithProgress(deploymentSteps)) {
const progress = `[${update.current}/${update.total}]`;
switch (update.type) {
case 'progress':
console.log(`${progress} 🔄 ${update.message}`);
break;
case 'complete':
console.log(`${progress} ✅ ${update.message}`);
break;
case 'error':
console.log(`${progress} ❌ ${update.message}`);
break;
}
}
Event-Driven Async Patterns
Event Emitter for Long Operations
import { EventEmitter } from 'events';
class AsyncTaskRunner extends EventEmitter {
async run(tasks) {
this.emit('start', { total: tasks.length });
for (let i = 0; i < tasks.length; i++) {
const task = tasks[i];
this.emit('taskStart', { index: i, task });
try {
const result = await task.execute();
this.emit('taskComplete', { index: i, task, result });
} catch (error) {
this.emit('taskError', { index: i, task, error });
if (!task.optional) {
this.emit('abort', { reason: `Critical task failed: ${task.name}` });
throw error;
}
}
}
this.emit('complete');
}
}
// Usage
const runner = new AsyncTaskRunner();
runner.on('start', ({ total }) => {
console.log(`Starting ${total} tasks...`);
});
runner.on('taskStart', ({ task }) => {
console.log(` ⏳ ${task.name}`);
});
runner.on('taskComplete', ({ task }) => {
console.log(` ✅ ${task.name}`);
});
runner.on('taskError', ({ task, error }) => {
console.log(` ❌ ${task.name}: ${error.message}`);
});
const tasks = [
{ name: 'Lint', execute: () => $`npm run lint`, optional: true },
{ name: 'Build', execute: () => $`npm run build`, optional: false },
{ name: 'Test', execute: () => $`npm test`, optional: false }
];
await runner.run(tasks);
Queue Management
Priority Queue
class PriorityQueue {
constructor(concurrency = 5) {
this.concurrency = concurrency;
this.running = 0;
this.queue = [];
}
async add(task, priority = 0) {
return new Promise((resolve, reject) => {
this.queue.push({ task, priority, resolve, reject });
this.queue.sort((a, b) => b.priority - a.priority);
this.process();
});
}
async process() {
if (this.running >= this.concurrency || this.queue.length === 0) {
return;
}
this.running++;
const { task, resolve, reject } = this.queue.shift();
try {
const result = await task();
resolve(result);
} catch (error) {
reject(error);
} finally {
this.running--;
this.process();
}
}
}
// Usage
const queue = new PriorityQueue(3);
// High priority tasks
await queue.add(() => $`npm run critical-fix`, 10);
// Normal priority tasks
await queue.add(() => $`npm run build`, 5);
// Low priority tasks
await queue.add(() => $`npm run cleanup`, 1);
Async Coordination Patterns
Semaphore for Resource Limiting
class Semaphore {
constructor(max) {
this.max = max;
this.current = 0;
this.waiting = [];
}
async acquire() {
if (this.current < this.max) {
this.current++;
return;
}
await new Promise(resolve => this.waiting.push(resolve));
this.current++;
}
release() {
this.current--;
const next = this.waiting.shift();
if (next) next();
}
async use(fn) {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
// Usage - limit concurrent SSH connections
const sshSemaphore = new Semaphore(10);
async function executeOnServer(server, command) {
return sshSemaphore.use(async () => {
return await $`ssh ${server} "${command}"`;
});
}
// Can have many concurrent calls but only 10 SSH connections
const results = await Promise.all(
servers.map(server =>
executeOnServer(server, 'uptime')
)
);
Barrier Synchronization
class Barrier {
constructor(count) {
this.count = count;
this.current = 0;
this.promise = new Promise(resolve => {
this.resolve = resolve;
});
}
async wait() {
this.current++;
if (this.current >= this.count) {
this.resolve();
}
return this.promise;
}
}
// Usage - wait for all services to be ready
async function startMicroservices() {
const barrier = new Barrier(3);
// Start services in parallel
const services = [
startDatabase(barrier),
startCache(barrier),
startAPI(barrier)
];
await Promise.all(services);
console.log('All services started and ready');
}
async function startDatabase(barrier) {
await $`docker-compose up -d postgres`;
await $`wait-for-it postgres:5432`;
console.log('Database ready');
await barrier.wait();
}
async function startCache(barrier) {
await $`docker-compose up -d redis`;
await $`wait-for-it redis:6379`;
console.log('Cache ready');
await barrier.wait();
}
async function startAPI(barrier) {
await $`npm run start:dev`;
console.log('API ready');
await barrier.wait();
}
Async Pipelines
Pipeline Builder
class AsyncPipeline {
constructor() {
this.steps = [];
}
add(name, fn) {
this.steps.push({ name, fn });
return this;
}
async execute(input) {
let result = input;
for (const step of this.steps) {
console.log(`⚙️ ${step.name}`);
try {
result = await step.fn(result);
} catch (error) {
console.error(`❌ ${step.name} failed:`, error.message);
throw error;
}
}
return result;
}
}
// Usage
const buildPipeline = new AsyncPipeline()
.add('Clean', async () => {
await $`rm -rf dist`;
return { cleaned: true };
})
.add('Install', async (ctx) => {
await $`npm ci`;
return { ...ctx, installed: true };
})
.add('Build', async (ctx) => {
await $`npm run build`;
return { ...ctx, built: true };
})
.add('Test', async (ctx) => {
const result = await $`npm test`.nothrow();
return { ...ctx, tested: true, testsPassed: result.exitCode === 0 };
})
.add('Package', async (ctx) => {
if (!ctx.testsPassed) {
throw new Error('Cannot package: tests failed');
}
await $`npm pack`;
return { ...ctx, packaged: true };
});
const result = await buildPipeline.execute({});
console.log('Pipeline result:', result);