SSH Batch Operations
Implementation Reference
Source Files:
packages/core/src/ssh/batch.ts
- Batch execution logicpackages/core/src/utils/parallel.ts
- Parallel execution utilitiespackages/core/src/core/multi-target.ts
- Multi-target coordinationapps/xec/src/commands/on.ts
- On command implementation
Key Functions:
executeBatch()
- Execute on multiple hostsexecuteParallel()
- Parallel execution with limitsexecuteSequential()
- Sequential executionaggregateResults()
- Combine results from multiple hosts
Multi-Host Execution
Basic Batch Operations
// Execute on multiple hosts
const hosts = ['host1', 'host2', 'host3'];
for (const host of hosts) {
await $.ssh(`user@${host}`)`uptime`;
}
// Using target patterns
await $.target('hosts.*')`systemctl status nginx`;
Configuration-Based Batching
# .xec/config.yaml
targets:
web-servers:
type: group
members:
- web1
- web2
- web3
web1:
type: ssh
host: web1.example.com
user: deploy
web2:
type: ssh
host: web2.example.com
user: deploy
web3:
type: ssh
host: web3.example.com
user: deploy
// Execute on group
await $.target('web-servers')`sudo systemctl restart nginx`;
Parallel Execution
Unlimited Parallelism
// Execute on all hosts simultaneously
const hosts = ['host1', 'host2', 'host3', 'host4', 'host5'];
const results = await Promise.all(
hosts.map(host =>
$.ssh(`user@${host}`)`df -h`
)
);
// Process results
results.forEach((result, i) => {
console.log(`${hosts[i]}:`, result.stdout);
});
Controlled Parallelism
import pLimit from 'p-limit';
// Limit to 3 concurrent connections
const limit = pLimit(3);
const hosts = Array.from({ length: 20 }, (_, i) => `host${i + 1}`);
const results = await Promise.all(
hosts.map(host =>
limit(() => $.ssh(`user@${host}`)`apt-get update`)
)
);
Batch with Progress
class BatchExecutor {
async execute(hosts: string[], command: string, options: BatchOptions = {}) {
const {
parallel = 5,
onProgress,
onError
} = options;
const limit = pLimit(parallel);
let completed = 0;
const total = hosts.length;
const results: BatchResult[] = [];
await Promise.all(
hosts.map(host =>
limit(async () => {
try {
const start = Date.now();
const result = await $.ssh(`user@${host}`)`${command}`;
completed++;
const progress = {
host,
completed,
total,
percentage: (completed / total) * 100,
duration: Date.now() - start,
success: true
};
onProgress?.(progress);
results.push({
host,
success: true,
stdout: result.stdout,
stderr: result.stderr,
duration: progress.duration
});
} catch (error) {
completed++;
onError?.({ host, error });
results.push({
host,
success: false,
error: error.message
});
}
})
)
);
return results;
}
}
// Usage
const executor = new BatchExecutor();
const results = await executor.execute(
hosts,
'sudo apt-get upgrade -y',
{
parallel: 3,
onProgress: (p) => console.log(`Progress: ${p.percentage.toFixed(1)}%`),
onError: (e) => console.error(`Failed on ${e.host}: ${e.error.message}`)
}
);
Sequential Execution
Simple Sequential
// Execute one by one
const hosts = ['critical1', 'critical2', 'critical3'];
for (const host of hosts) {
console.log(`Updating ${host}...`);
try {
await $.ssh(`user@${host}`)`
sudo apt-get update &&
sudo apt-get upgrade -y &&
sudo systemctl restart app
`;
console.log(`✓ ${host} updated successfully`);
} catch (error) {
console.error(`✗ ${host} failed: ${error.message}`);
// Decide whether to continue or abort
if (options.stopOnError) {
throw error;
}
}
}
Rolling Updates
class RollingUpdater {
async update(hosts: string[], options: RollingOptions = {}) {
const {
batchSize = 1,
delay = 0,
healthCheck,
rollback
} = options;
const batches = this.chunk(hosts, batchSize);
const updated: string[] = [];
for (const batch of batches) {
console.log(`Updating batch: ${batch.join(', ')}`);
try {
// Update batch
await Promise.all(
batch.map(host => this.updateHost(host))
);
// Health check
if (healthCheck) {
await this.waitForHealth(batch, healthCheck);
}
updated.push(...batch);
// Delay between batches
if (delay > 0) {
await new Promise(resolve => setTimeout(resolve, delay));
}
} catch (error) {
console.error(`Batch failed: ${batch.join(', ')}`);
// Rollback if needed
if (rollback) {
await this.rollbackHosts(updated);
}
throw error;
}
}
}
private chunk<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
private async updateHost(host: string): Promise<void> {
await $.ssh(`user@${host}`)`
sudo systemctl stop app &&
sudo cp /tmp/app.new /usr/local/bin/app &&
sudo systemctl start app
`;
}
private async waitForHealth(hosts: string[], check: HealthCheck): Promise<void> {
const maxAttempts = 30;
const delay = 2000;
for (let attempt = 0; attempt < maxAttempts; attempt++) {
const healthy = await Promise.all(
hosts.map(host => check(host))
);
if (healthy.every(h => h)) {
return; // All healthy
}
await new Promise(resolve => setTimeout(resolve, delay));
}
throw new Error('Health check timeout');
}
}
Pattern-Based Execution
Wildcard Patterns
# Configuration with patterns
targets:
web1:
type: ssh
host: web1.example.com
tags: [web, production]
web2:
type: ssh
host: web2.example.com
tags: [web, production]
db1:
type: ssh
host: db1.example.com
tags: [database, production]
// Execute on pattern
await $.target('web*')`systemctl status nginx`;
// Execute on tagged hosts
await $.target('tags:production')`uptime`;
// Complex patterns
await $.target('web* && tags:production')`deploy.sh`;
Dynamic Host Selection
// Select hosts dynamically
async function selectHosts(criteria: HostCriteria): Promise<string[]> {
const allHosts = await $.config.getTargets();
return allHosts.filter(host => {
// Match by name pattern
if (criteria.pattern) {
const regex = new RegExp(criteria.pattern.replace('*', '.*'));
if (!regex.test(host.name)) return false;
}
// Match by tags
if (criteria.tags) {
const hostTags = host.tags || [];
if (!criteria.tags.every(tag => hostTags.includes(tag))) {
return false;
}
}
// Match by custom predicate
if (criteria.predicate) {
if (!criteria.predicate(host)) return false;
}
return true;
}).map(h => h.name);
}
// Usage
const webServers = await selectHosts({
pattern: 'web*',
tags: ['production'],
predicate: (host) => host.region === 'us-east-1'
});
await $.batch(webServers)`deploy.sh`;
Result Aggregation
Collecting Results
interface BatchResults {
successful: Map<string, CommandResult>;
failed: Map<string, Error>;
summary: {
total: number;
succeeded: number;
failed: number;
duration: number;
};
}
async function executeBatchCommand(
hosts: string[],
command: string
): Promise<BatchResults> {
const startTime = Date.now();
const successful = new Map<string, CommandResult>();
const failed = new Map<string, Error>();
await Promise.all(
hosts.map(async (host) => {
try {
const result = await $.ssh(`user@${host}`)`${command}`;
successful.set(host, result);
} catch (error) {
failed.set(host, error);
}
})
);
return {
successful,
failed,
summary: {
total: hosts.length,
succeeded: successful.size,
failed: failed.size,
duration: Date.now() - startTime
}
};
}
// Usage
const results = await executeBatchCommand(hosts, 'uptime');
// Process results
console.log(`Success: ${results.summary.succeeded}/${results.summary.total}`);
for (const [host, result] of results.successful) {
console.log(`${host}: ${result.stdout.trim()}`);
}
for (const [host, error] of results.failed) {
console.error(`${host} failed: ${error.message}`);
}
Aggregating Output
// Aggregate similar output
function aggregateOutput(results: Map<string, string>): Map<string, string[]> {
const aggregated = new Map<string, string[]>();
for (const [host, output] of results) {
const normalized = output.trim();
if (!aggregated.has(normalized)) {
aggregated.set(normalized, []);
}
aggregated.get(normalized)!.push(host);
}
return aggregated;
}
// Usage
const outputs = new Map([
['web1', 'nginx is running'],
['web2', 'nginx is running'],
['web3', 'nginx is stopped']
]);
const aggregated = aggregateOutput(outputs);
// Map {
// 'nginx is running' => ['web1', 'web2'],
// 'nginx is stopped' => ['web3']
// }
Error Handling Strategies
Fail Fast
// Stop on first error
async function failFast(hosts: string[], command: string): Promise<void> {
for (const host of hosts) {
await $.ssh(`user@${host}`)`${command}`; // Throws on error
}
}
Fail Soft
// Continue on errors
async function failSoft(hosts: string[], command: string): Promise<BatchResults> {
const results: BatchResults = {
successful: [],
failed: []
};
for (const host of hosts) {
try {
const result = await $.ssh(`user@${host}`)`${command}`;
results.successful.push({ host, result });
} catch (error) {
results.failed.push({ host, error });
// Continue to next host
}
}
return results;
}
Retry Logic
// Retry failed hosts
async function withRetry(
hosts: string[],
command: string,
maxRetries = 3
): Promise<BatchResults> {
let remainingHosts = [...hosts];
const successful: string[] = [];
const failed: Map<string, Error> = new Map();
for (let attempt = 0; attempt < maxRetries && remainingHosts.length > 0; attempt++) {
if (attempt > 0) {
console.log(`Retry attempt ${attempt} for ${remainingHosts.length} hosts`);
await new Promise(resolve => setTimeout(resolve, 2000 * attempt));
}
const retryHosts = [...remainingHosts];
remainingHosts = [];
for (const host of retryHosts) {
try {
await $.ssh(`user@${host}`)`${command}`;
successful.push(host);
} catch (error) {
if (attempt === maxRetries - 1) {
failed.set(host, error);
} else {
remainingHosts.push(host);
}
}
}
}
return { successful, failed };
}
Performance Optimization
Connection Pooling
// Reuse connections for batch operations
class BatchSSHExecutor {
private pools = new Map<string, SSHConnectionPool>();
async execute(host: string, command: string): Promise<CommandResult> {
if (!this.pools.has(host)) {
this.pools.set(host, new SSHConnectionPool({
host,
maxConnections: 5
}));
}
const pool = this.pools.get(host)!;
const connection = await pool.acquire();
try {
return await connection.exec(command);
} finally {
pool.release(connection);
}
}
async cleanup(): Promise<void> {
for (const pool of this.pools.values()) {
await pool.destroy();
}
this.pools.clear();
}
}
Batch Command Optimization
// Combine multiple commands
async function batchCommands(host: string, commands: string[]): Promise<void> {
// Instead of multiple round trips
// for (const cmd of commands) {
// await $.ssh(host)`${cmd}`;
// }
// Single round trip
const combinedCommand = commands.join(' && ');
await $.ssh(host)`${combinedCommand}`;
}
Monitoring and Reporting
Progress Reporting
class BatchReporter {
private startTime = Date.now();
private completed = 0;
private failed = 0;
report(host: string, success: boolean, output?: string): void {
if (success) {
this.completed++;
console.log(`✓ ${host} completed`);
} else {
this.failed++;
console.error(`✗ ${host} failed`);
}
this.printProgress();
}
private printProgress(): void {
const elapsed = (Date.now() - this.startTime) / 1000;
const rate = this.completed / elapsed;
console.log(
`Progress: ${this.completed} completed, ${this.failed} failed ` +
`(${rate.toFixed(1)} hosts/sec)`
);
}
summary(): void {
const elapsed = (Date.now() - this.startTime) / 1000;
console.log('\n=== Batch Execution Summary ===');
console.log(`Total: ${this.completed + this.failed}`);
console.log(`Succeeded: ${this.completed}`);
console.log(`Failed: ${this.failed}`);
console.log(`Duration: ${elapsed.toFixed(1)}s`);
console.log(`Success Rate: ${(this.completed / (this.completed + this.failed) * 100).toFixed(1)}%`);
}
}
Detailed Logging
// Log batch operations
class BatchLogger {
private logFile: string;
constructor(logFile = 'batch-execution.log') {
this.logFile = logFile;
}
async log(entry: LogEntry): Promise<void> {
const timestamp = new Date().toISOString();
const logLine = JSON.stringify({
timestamp,
...entry
}) + '\n';
await fs.appendFile(this.logFile, logLine);
}
async logExecution(host: string, command: string, result: any): Promise<void> {
await this.log({
type: 'execution',
host,
command,
success: result.success,
stdout: result.stdout?.substring(0, 1000), // Truncate
stderr: result.stderr?.substring(0, 1000),
duration: result.duration
});
}
}
Use Cases
Deployment
async function deployToServers(servers: string[], version: string) {
const deployer = new RollingUpdater();
await deployer.update(servers, {
batchSize: 2, // Deploy 2 at a time
delay: 5000, // 5 seconds between batches
async updateHost(host: string) {
await $.ssh(`user@${host}`)`
cd /app &&
git fetch &&
git checkout ${version} &&
npm install &&
npm run build &&
pm2 reload app
`;
},
async healthCheck(host: string) {
try {
const response = await fetch(`http://${host}/health`);
return response.ok;
} catch {
return false;
}
}
});
}