Initial commit

This commit is contained in:
Zhongwei Li
2025-11-30 09:04:14 +08:00
commit 70c36b5eff
248 changed files with 47482 additions and 0 deletions

View File

@@ -0,0 +1,175 @@
/**
* Jest CLI Test Template
*
* Complete test suite for CLI tools using Jest and child_process.execSync
* Tests command execution, exit codes, stdout/stderr output
*/
import { execSync } from 'child_process';
import fs from 'fs';
import path from 'path';
describe('CLI Tool Tests', () => {
const CLI_PATH = path.join(__dirname, '../bin/mycli');
/**
* Helper function to execute CLI commands and capture output
* @param args - Command line arguments as string
* @returns Object with stdout, stderr, and exit code
*/
function runCLI(args: string): {
stdout: string;
stderr: string;
code: number;
} {
try {
const stdout = execSync(`${CLI_PATH} ${args}`, {
encoding: 'utf8',
stdio: 'pipe',
});
return { stdout, stderr: '', code: 0 };
} catch (error: any) {
return {
stdout: error.stdout || '',
stderr: error.stderr || '',
code: error.status || 1,
};
}
}
// Version Testing
describe('version command', () => {
test('should display version with --version', () => {
const { stdout, code } = runCLI('--version');
expect(code).toBe(0);
expect(stdout).toContain('1.0.0');
});
test('should display version with -v', () => {
const { stdout, code } = runCLI('-v');
expect(code).toBe(0);
expect(stdout).toMatch(/\d+\.\d+\.\d+/);
});
});
// Help Testing
describe('help command', () => {
test('should display help with --help', () => {
const { stdout, code } = runCLI('--help');
expect(code).toBe(0);
expect(stdout).toContain('Usage:');
expect(stdout).toContain('Commands:');
expect(stdout).toContain('Options:');
});
test('should display help with -h', () => {
const { stdout, code } = runCLI('-h');
expect(code).toBe(0);
expect(stdout).toContain('Usage:');
});
});
// Error Handling
describe('error handling', () => {
test('should handle unknown command', () => {
const { stderr, code } = runCLI('unknown-command');
expect(code).toBe(1);
expect(stderr).toContain('unknown command');
});
test('should handle invalid options', () => {
const { stderr, code } = runCLI('--invalid-option');
expect(code).toBe(1);
expect(stderr).toContain('unknown option');
});
test('should validate required arguments', () => {
const { stderr, code } = runCLI('deploy');
expect(code).toBe(1);
expect(stderr).toContain('missing required argument');
});
});
// Command Execution
describe('command execution', () => {
test('should execute deploy command', () => {
const { stdout, code } = runCLI('deploy production --force');
expect(code).toBe(0);
expect(stdout).toContain('Deploying to production');
expect(stdout).toContain('Force mode enabled');
});
test('should execute with flags', () => {
const { stdout, code } = runCLI('build --verbose --output dist');
expect(code).toBe(0);
expect(stdout).toContain('Building project');
expect(stdout).toContain('Output: dist');
});
});
// Configuration Testing
describe('configuration', () => {
test('should set configuration value', () => {
const { stdout, code } = runCLI('config set key value');
expect(code).toBe(0);
expect(stdout).toContain('Configuration updated');
});
test('should get configuration value', () => {
runCLI('config set api_key your_key_here');
const { stdout, code } = runCLI('config get api_key');
expect(code).toBe(0);
expect(stdout).toContain('your_key_here');
});
test('should list all configuration', () => {
const { stdout, code } = runCLI('config list');
expect(code).toBe(0);
expect(stdout).toContain('Configuration:');
});
});
// Exit Code Validation
describe('exit codes', () => {
test('should return 0 on success', () => {
const { code } = runCLI('status');
expect(code).toBe(0);
});
test('should return 1 on general error', () => {
const { code } = runCLI('invalid-command');
expect(code).toBe(1);
});
test('should return 2 on invalid arguments', () => {
const { code } = runCLI('deploy --invalid-flag');
expect(code).toBe(2);
});
});
// Output Format Testing
describe('output formatting', () => {
test('should output JSON when requested', () => {
const { stdout, code } = runCLI('status --format json');
expect(code).toBe(0);
expect(() => JSON.parse(stdout)).not.toThrow();
});
test('should output YAML when requested', () => {
const { stdout, code } = runCLI('status --format yaml');
expect(code).toBe(0);
expect(stdout).toContain(':');
});
test('should output table by default', () => {
const { stdout, code } = runCLI('status');
expect(code).toBe(0);
expect(stdout).toMatch(/[─┼│]/); // Table characters
});
});
// Cleanup
afterAll(() => {
// Clean up any test artifacts
});
});

View File

@@ -0,0 +1,198 @@
/**
* Jest Configuration Testing Template
*
* Test CLI configuration file handling, validation, and persistence
*/
import { execSync } from 'child_process';
import fs from 'fs';
import path from 'path';
import os from 'os';
describe('CLI Configuration Tests', () => {
const CLI_PATH = path.join(__dirname, '../bin/mycli');
const TEST_CONFIG_DIR = path.join(os.tmpdir(), 'cli-test-config');
const TEST_CONFIG_FILE = path.join(TEST_CONFIG_DIR, '.myclirc');
function runCLI(args: string, env: Record<string, string> = {}): {
stdout: string;
stderr: string;
code: number;
} {
try {
const stdout = execSync(`${CLI_PATH} ${args}`, {
encoding: 'utf8',
stdio: 'pipe',
env: {
...process.env,
HOME: TEST_CONFIG_DIR,
...env,
},
});
return { stdout, stderr: '', code: 0 };
} catch (error: any) {
return {
stdout: error.stdout || '',
stderr: error.stderr || '',
code: error.status || 1,
};
}
}
beforeEach(() => {
// Create temporary config directory
if (!fs.existsSync(TEST_CONFIG_DIR)) {
fs.mkdirSync(TEST_CONFIG_DIR, { recursive: true });
}
});
afterEach(() => {
// Clean up test config directory
if (fs.existsSync(TEST_CONFIG_DIR)) {
fs.rmSync(TEST_CONFIG_DIR, { recursive: true, force: true });
}
});
describe('config initialization', () => {
test('should create config file on first run', () => {
runCLI('config init');
expect(fs.existsSync(TEST_CONFIG_FILE)).toBe(true);
});
test('should not overwrite existing config', () => {
fs.writeFileSync(TEST_CONFIG_FILE, 'existing: data\n');
const { stderr, code } = runCLI('config init');
expect(code).toBe(1);
expect(stderr).toContain('Config file already exists');
});
test('should create config with default values', () => {
runCLI('config init');
const config = fs.readFileSync(TEST_CONFIG_FILE, 'utf8');
expect(config).toContain('api_key: your_api_key_here');
expect(config).toContain('environment: development');
});
});
describe('config set operations', () => {
beforeEach(() => {
runCLI('config init');
});
test('should set string value', () => {
const { code } = runCLI('config set api_key test_key_123');
expect(code).toBe(0);
const config = fs.readFileSync(TEST_CONFIG_FILE, 'utf8');
expect(config).toContain('api_key: test_key_123');
});
test('should set boolean value', () => {
const { code } = runCLI('config set verbose true');
expect(code).toBe(0);
const config = fs.readFileSync(TEST_CONFIG_FILE, 'utf8');
expect(config).toContain('verbose: true');
});
test('should set nested value', () => {
const { code } = runCLI('config set logging.level debug');
expect(code).toBe(0);
const config = fs.readFileSync(TEST_CONFIG_FILE, 'utf8');
expect(config).toContain('level: debug');
});
test('should handle invalid key names', () => {
const { stderr, code } = runCLI('config set invalid..key value');
expect(code).toBe(1);
expect(stderr).toContain('Invalid key name');
});
});
describe('config get operations', () => {
beforeEach(() => {
runCLI('config init');
runCLI('config set api_key test_key_123');
runCLI('config set environment production');
});
test('should get existing value', () => {
const { stdout, code } = runCLI('config get api_key');
expect(code).toBe(0);
expect(stdout).toContain('test_key_123');
});
test('should handle non-existent key', () => {
const { stderr, code } = runCLI('config get nonexistent');
expect(code).toBe(1);
expect(stderr).toContain('Key not found');
});
test('should get nested value', () => {
runCLI('config set database.host localhost');
const { stdout, code } = runCLI('config get database.host');
expect(code).toBe(0);
expect(stdout).toContain('localhost');
});
});
describe('config list operations', () => {
beforeEach(() => {
runCLI('config init');
runCLI('config set api_key test_key_123');
runCLI('config set verbose true');
});
test('should list all configuration', () => {
const { stdout, code } = runCLI('config list');
expect(code).toBe(0);
expect(stdout).toContain('api_key');
expect(stdout).toContain('verbose');
});
test('should format list output', () => {
const { stdout, code } = runCLI('config list --format json');
expect(code).toBe(0);
const config = JSON.parse(stdout);
expect(config.api_key).toBe('test_key_123');
expect(config.verbose).toBe(true);
});
});
describe('config validation', () => {
test('should validate config file on load', () => {
fs.writeFileSync(TEST_CONFIG_FILE, 'invalid yaml: [}');
const { stderr, code } = runCLI('config list');
expect(code).toBe(1);
expect(stderr).toContain('Invalid configuration file');
});
test('should validate required fields', () => {
runCLI('config init');
fs.writeFileSync(TEST_CONFIG_FILE, 'optional: value\n');
const { stderr, code } = runCLI('deploy production');
expect(code).toBe(1);
expect(stderr).toContain('api_key is required');
});
});
describe('environment variable overrides', () => {
beforeEach(() => {
runCLI('config init');
runCLI('config set api_key file_key_123');
});
test('should override with environment variable', () => {
const { stdout } = runCLI('config get api_key', {
MYCLI_API_KEY: 'env_key_123',
});
expect(stdout).toContain('env_key_123');
});
test('should use file value when env var not set', () => {
const { stdout } = runCLI('config get api_key');
expect(stdout).toContain('file_key_123');
});
});
});

View File

@@ -0,0 +1,223 @@
/**
* Jest Integration Test Template
*
* Test complete CLI workflows with multiple commands and state persistence
*/
import { execSync } from 'child_process';
import fs from 'fs';
import path from 'path';
import os from 'os';
describe('CLI Integration Tests', () => {
const CLI_PATH = path.join(__dirname, '../bin/mycli');
const TEST_WORKSPACE = path.join(os.tmpdir(), 'cli-integration-test');
function runCLI(args: string, cwd: string = TEST_WORKSPACE): {
stdout: string;
stderr: string;
code: number;
} {
try {
const stdout = execSync(`${CLI_PATH} ${args}`, {
encoding: 'utf8',
stdio: 'pipe',
cwd,
});
return { stdout, stderr: '', code: 0 };
} catch (error: any) {
return {
stdout: error.stdout || '',
stderr: error.stderr || '',
code: error.status || 1,
};
}
}
beforeEach(() => {
// Create clean test workspace
if (fs.existsSync(TEST_WORKSPACE)) {
fs.rmSync(TEST_WORKSPACE, { recursive: true, force: true });
}
fs.mkdirSync(TEST_WORKSPACE, { recursive: true });
});
afterEach(() => {
// Clean up test workspace
if (fs.existsSync(TEST_WORKSPACE)) {
fs.rmSync(TEST_WORKSPACE, { recursive: true, force: true });
}
});
describe('complete deployment workflow', () => {
test('should initialize, configure, and deploy', () => {
// Step 1: Initialize project
const init = runCLI('init my-project');
expect(init.code).toBe(0);
expect(init.stdout).toContain('Project initialized');
// Step 2: Configure deployment
const config = runCLI('config set api_key test_key_123');
expect(config.code).toBe(0);
// Step 3: Build project
const build = runCLI('build --production');
expect(build.code).toBe(0);
expect(build.stdout).toContain('Build successful');
// Step 4: Deploy
const deploy = runCLI('deploy production');
expect(deploy.code).toBe(0);
expect(deploy.stdout).toContain('Deployed successfully');
// Verify deployment artifacts
const deployFile = path.join(TEST_WORKSPACE, '.deploy');
expect(fs.existsSync(deployFile)).toBe(true);
});
test('should fail deployment without configuration', () => {
runCLI('init my-project');
// Try to deploy without configuring API key
const { stderr, code } = runCLI('deploy production');
expect(code).toBe(1);
expect(stderr).toContain('API key not configured');
});
});
describe('multi-environment workflow', () => {
test('should manage multiple environments', () => {
// Initialize project
runCLI('init my-project');
// Configure development environment
runCLI('config set api_key dev_key_123 --env development');
runCLI('config set base_url https://dev.example.com --env development');
// Configure production environment
runCLI('config set api_key prod_key_123 --env production');
runCLI('config set base_url https://api.example.com --env production');
// Deploy to development
const devDeploy = runCLI('deploy development');
expect(devDeploy.code).toBe(0);
expect(devDeploy.stdout).toContain('dev.example.com');
// Deploy to production
const prodDeploy = runCLI('deploy production');
expect(prodDeploy.code).toBe(0);
expect(prodDeploy.stdout).toContain('api.example.com');
});
});
describe('state persistence workflow', () => {
test('should persist and restore state', () => {
// Create initial state
runCLI('state set counter 0');
// Increment counter multiple times
runCLI('increment');
runCLI('increment');
runCLI('increment');
// Verify final state
const { stdout } = runCLI('state get counter');
expect(stdout).toContain('3');
});
test('should handle state file corruption', () => {
runCLI('state set key value');
// Corrupt state file
const stateFile = path.join(TEST_WORKSPACE, '.state');
fs.writeFileSync(stateFile, 'invalid json {[}');
// Should recover gracefully
const { stderr, code } = runCLI('state get key');
expect(code).toBe(1);
expect(stderr).toContain('Corrupted state file');
});
});
describe('plugin workflow', () => {
test('should install and use plugins', () => {
// Initialize project
runCLI('init my-project');
// Install plugin
const install = runCLI('plugin install my-plugin');
expect(install.code).toBe(0);
// Verify plugin is listed
const list = runCLI('plugin list');
expect(list.stdout).toContain('my-plugin');
// Use plugin command
const usePlugin = runCLI('my-plugin:command');
expect(usePlugin.code).toBe(0);
// Uninstall plugin
const uninstall = runCLI('plugin uninstall my-plugin');
expect(uninstall.code).toBe(0);
// Verify plugin is removed
const listAfter = runCLI('plugin list');
expect(listAfter.stdout).not.toContain('my-plugin');
});
});
describe('error recovery workflow', () => {
test('should recover from partial failure', () => {
runCLI('init my-project');
// Simulate partial deployment failure
runCLI('deploy staging --force');
// Should be able to rollback
const rollback = runCLI('rollback');
expect(rollback.code).toBe(0);
expect(rollback.stdout).toContain('Rollback successful');
// Should be able to retry
const retry = runCLI('deploy staging --retry');
expect(retry.code).toBe(0);
});
});
describe('concurrent operations', () => {
test('should handle file locking', async () => {
runCLI('init my-project');
// Start long-running operation
const longOp = execSync(`${CLI_PATH} long-running-task &`, {
cwd: TEST_WORKSPACE,
});
// Try to run another operation that needs lock
const { stderr, code } = runCLI('another-task');
expect(code).toBe(1);
expect(stderr).toContain('Another operation in progress');
});
});
describe('data migration workflow', () => {
test('should migrate data between versions', () => {
// Create old version data
const oldData = { version: 1, data: 'legacy format' };
fs.writeFileSync(
path.join(TEST_WORKSPACE, 'data.json'),
JSON.stringify(oldData)
);
// Run migration
const migrate = runCLI('migrate --to 2.0');
expect(migrate.code).toBe(0);
// Verify new format
const newData = JSON.parse(
fs.readFileSync(path.join(TEST_WORKSPACE, 'data.json'), 'utf8')
);
expect(newData.version).toBe(2);
});
});
});

View File

@@ -0,0 +1,270 @@
"""
Pytest Click Testing Template
Complete test suite for Click-based CLI applications using CliRunner
Tests command execution, exit codes, output validation, and interactive prompts
"""
import pytest
from click.testing import CliRunner
from mycli.cli import cli
@pytest.fixture
def runner():
"""Create a CliRunner instance for testing"""
return CliRunner()
class TestVersionCommand:
"""Test version display"""
def test_version_flag(self, runner):
"""Should display version with --version"""
result = runner.invoke(cli, ['--version'])
assert result.exit_code == 0
assert '1.0.0' in result.output
def test_version_short_flag(self, runner):
"""Should display version with -v"""
result = runner.invoke(cli, ['-v'])
assert result.exit_code == 0
assert result.output.count('.') == 2 # Version format X.Y.Z
class TestHelpCommand:
"""Test help display"""
def test_help_flag(self, runner):
"""Should display help with --help"""
result = runner.invoke(cli, ['--help'])
assert result.exit_code == 0
assert 'Usage:' in result.output
assert 'Commands:' in result.output
assert 'Options:' in result.output
def test_help_short_flag(self, runner):
"""Should display help with -h"""
result = runner.invoke(cli, ['-h'])
assert result.exit_code == 0
assert 'Usage:' in result.output
def test_command_help(self, runner):
"""Should display help for specific command"""
result = runner.invoke(cli, ['deploy', '--help'])
assert result.exit_code == 0
assert 'deploy' in result.output.lower()
class TestErrorHandling:
"""Test error handling and validation"""
def test_unknown_command(self, runner):
"""Should handle unknown commands"""
result = runner.invoke(cli, ['unknown-command'])
assert result.exit_code != 0
assert 'no such command' in result.output.lower()
def test_invalid_option(self, runner):
"""Should handle invalid options"""
result = runner.invoke(cli, ['--invalid-option'])
assert result.exit_code != 0
assert 'no such option' in result.output.lower()
def test_missing_required_argument(self, runner):
"""Should validate required arguments"""
result = runner.invoke(cli, ['deploy'])
assert result.exit_code != 0
assert 'missing argument' in result.output.lower()
def test_invalid_argument_type(self, runner):
"""Should validate argument types"""
result = runner.invoke(cli, ['retry', '--count', 'invalid'])
assert result.exit_code != 0
assert 'invalid' in result.output.lower()
class TestCommandExecution:
"""Test command execution with various arguments"""
def test_deploy_command(self, runner):
"""Should execute deploy command"""
result = runner.invoke(cli, ['deploy', 'production', '--force'])
assert result.exit_code == 0
assert 'Deploying to production' in result.output
assert 'Force mode enabled' in result.output
def test_deploy_with_flags(self, runner):
"""Should handle multiple flags"""
result = runner.invoke(cli, ['deploy', 'staging', '--verbose', '--dry-run'])
assert result.exit_code == 0
assert 'staging' in result.output
assert 'dry run' in result.output.lower()
def test_build_command(self, runner):
"""Should execute build command"""
result = runner.invoke(cli, ['build', '--output', 'dist'])
assert result.exit_code == 0
assert 'Building project' in result.output
assert 'dist' in result.output
class TestConfiguration:
"""Test configuration management"""
def test_config_set(self, runner):
"""Should set configuration value"""
result = runner.invoke(cli, ['config', 'set', 'api_key', 'your_key_here'])
assert result.exit_code == 0
assert 'Configuration updated' in result.output
def test_config_get(self, runner):
"""Should get configuration value"""
runner.invoke(cli, ['config', 'set', 'api_key', 'your_key_here'])
result = runner.invoke(cli, ['config', 'get', 'api_key'])
assert result.exit_code == 0
assert 'your_key_here' in result.output
def test_config_list(self, runner):
"""Should list all configuration"""
result = runner.invoke(cli, ['config', 'list'])
assert result.exit_code == 0
assert 'Configuration:' in result.output
def test_config_delete(self, runner):
"""Should delete configuration value"""
runner.invoke(cli, ['config', 'set', 'temp_key', 'temp_value'])
result = runner.invoke(cli, ['config', 'delete', 'temp_key'])
assert result.exit_code == 0
assert 'deleted' in result.output.lower()
class TestExitCodes:
"""Test exit code validation"""
def test_success_exit_code(self, runner):
"""Should return 0 on success"""
result = runner.invoke(cli, ['status'])
assert result.exit_code == 0
def test_error_exit_code(self, runner):
"""Should return non-zero on error"""
result = runner.invoke(cli, ['invalid-command'])
assert result.exit_code != 0
def test_validation_error_exit_code(self, runner):
"""Should return specific code for validation errors"""
result = runner.invoke(cli, ['deploy', '--invalid-flag'])
assert result.exit_code == 2 # Click uses 2 for usage errors
class TestInteractivePrompts:
"""Test interactive prompt handling"""
def test_interactive_deploy_wizard(self, runner):
"""Should handle interactive prompts"""
result = runner.invoke(
cli,
['deploy-wizard'],
input='my-app\n1\nyes\n'
)
assert result.exit_code == 0
assert 'my-app' in result.output
def test_confirmation_prompt(self, runner):
"""Should handle confirmation prompts"""
result = runner.invoke(
cli,
['delete', 'resource-id'],
input='y\n'
)
assert result.exit_code == 0
assert 'deleted' in result.output.lower()
def test_confirmation_prompt_denied(self, runner):
"""Should handle denied confirmation"""
result = runner.invoke(
cli,
['delete', 'resource-id'],
input='n\n'
)
assert result.exit_code == 1
assert 'cancelled' in result.output.lower()
def test_multiple_prompts(self, runner):
"""Should handle multiple prompts in sequence"""
result = runner.invoke(
cli,
['init'],
input='my-project\nJohn Doe\njohn@example.com\n'
)
assert result.exit_code == 0
assert 'my-project' in result.output
assert 'John Doe' in result.output
class TestOutputFormatting:
"""Test output formatting options"""
def test_json_output(self, runner):
"""Should output JSON format"""
result = runner.invoke(cli, ['status', '--format', 'json'])
assert result.exit_code == 0
import json
try:
json.loads(result.output)
except json.JSONDecodeError:
pytest.fail("Output is not valid JSON")
def test_yaml_output(self, runner):
"""Should output YAML format"""
result = runner.invoke(cli, ['status', '--format', 'yaml'])
assert result.exit_code == 0
assert ':' in result.output
def test_table_output(self, runner):
"""Should output table format by default"""
result = runner.invoke(cli, ['list'])
assert result.exit_code == 0
assert '' in result.output or '|' in result.output
def test_quiet_mode(self, runner):
"""Should suppress output in quiet mode"""
result = runner.invoke(cli, ['deploy', 'production', '--quiet'])
assert result.exit_code == 0
assert len(result.output.strip()) == 0
class TestFileOperations:
"""Test file-based operations"""
def test_file_input(self, runner):
"""Should read from file"""
with runner.isolated_filesystem():
with open('input.txt', 'w') as f:
f.write('test data\n')
result = runner.invoke(cli, ['process', '--input', 'input.txt'])
assert result.exit_code == 0
def test_file_output(self, runner):
"""Should write to file"""
with runner.isolated_filesystem():
result = runner.invoke(cli, ['export', '--output', 'output.txt'])
assert result.exit_code == 0
with open('output.txt', 'r') as f:
content = f.read()
assert len(content) > 0
class TestIsolation:
"""Test isolated filesystem operations"""
def test_isolated_filesystem(self, runner):
"""Should work in isolated filesystem"""
with runner.isolated_filesystem():
result = runner.invoke(cli, ['init', 'test-project'])
assert result.exit_code == 0
import os
assert os.path.exists('test-project')

View File

@@ -0,0 +1,346 @@
"""
Pytest Fixtures Template
Reusable pytest fixtures for CLI testing with Click.testing.CliRunner
Provides common setup, teardown, and test utilities
"""
import pytest
import os
import tempfile
import shutil
from pathlib import Path
from click.testing import CliRunner
from mycli.cli import cli
# Basic Fixtures
@pytest.fixture
def runner():
"""Create a CliRunner instance for testing"""
return CliRunner()
@pytest.fixture
def isolated_runner():
"""Create a CliRunner with isolated filesystem"""
runner = CliRunner()
with runner.isolated_filesystem():
yield runner
# Configuration Fixtures
@pytest.fixture
def temp_config_dir(tmp_path):
"""Create a temporary configuration directory"""
config_dir = tmp_path / '.mycli'
config_dir.mkdir()
return config_dir
@pytest.fixture
def config_file(temp_config_dir):
"""Create a temporary configuration file"""
config_path = temp_config_dir / 'config.yaml'
config_content = """
api_key: your_test_key_here
environment: development
verbose: false
timeout: 30
"""
config_path.write_text(config_content)
return config_path
@pytest.fixture
def env_with_config(temp_config_dir, monkeypatch):
"""Set up environment with config directory"""
monkeypatch.setenv('MYCLI_CONFIG_DIR', str(temp_config_dir))
return temp_config_dir
# File System Fixtures
@pytest.fixture
def temp_workspace(tmp_path):
"""Create a temporary workspace directory"""
workspace = tmp_path / 'workspace'
workspace.mkdir()
return workspace
@pytest.fixture
def sample_project(temp_workspace):
"""Create a sample project structure"""
project = temp_workspace / 'sample-project'
project.mkdir()
# Create sample files
(project / 'package.json').write_text('{"name": "sample", "version": "1.0.0"}')
(project / 'README.md').write_text('# Sample Project')
src_dir = project / 'src'
src_dir.mkdir()
(src_dir / 'index.js').write_text('console.log("Hello, World!");')
return project
@pytest.fixture
def sample_files(temp_workspace):
"""Create sample files for testing"""
files = {
'input.txt': 'test input data\n',
'config.yaml': 'key: value\n',
'data.json': '{"id": 1, "name": "test"}\n'
}
created_files = {}
for filename, content in files.items():
file_path = temp_workspace / filename
file_path.write_text(content)
created_files[filename] = file_path
return created_files
# Mock Fixtures
@pytest.fixture
def mock_api_key(monkeypatch):
"""Mock API key environment variable"""
monkeypatch.setenv('MYCLI_API_KEY', 'test_api_key_123')
return 'test_api_key_123'
@pytest.fixture
def mock_home_dir(tmp_path, monkeypatch):
"""Mock home directory"""
home = tmp_path / 'home'
home.mkdir()
monkeypatch.setenv('HOME', str(home))
return home
@pytest.fixture
def mock_no_config(monkeypatch):
"""Remove all configuration environment variables"""
vars_to_remove = [
'MYCLI_CONFIG_DIR',
'MYCLI_API_KEY',
'MYCLI_ENVIRONMENT',
]
for var in vars_to_remove:
monkeypatch.delenv(var, raising=False)
# State Management Fixtures
@pytest.fixture
def cli_state(temp_workspace):
"""Create a CLI state file"""
state_file = temp_workspace / '.mycli-state'
state = {
'initialized': True,
'last_command': None,
'history': []
}
import json
state_file.write_text(json.dumps(state, indent=2))
return state_file
@pytest.fixture
def clean_state(temp_workspace):
"""Ensure no state file exists"""
state_file = temp_workspace / '.mycli-state'
if state_file.exists():
state_file.unlink()
return temp_workspace
# Helper Function Fixtures
@pytest.fixture
def run_cli_command(runner):
"""Helper function to run CLI commands and return parsed results"""
def _run(args, input_data=None, env=None):
"""
Run a CLI command and return structured results
Args:
args: List of command arguments
input_data: Optional input for interactive prompts
env: Optional environment variables dict
Returns:
dict with keys: exit_code, output, lines, success
"""
result = runner.invoke(cli, args, input=input_data, env=env)
return {
'exit_code': result.exit_code,
'output': result.output,
'lines': result.output.splitlines(),
'success': result.exit_code == 0
}
return _run
@pytest.fixture
def assert_cli_success(runner):
"""Helper to assert successful CLI execution"""
def _assert(args, expected_in_output=None):
"""
Run CLI command and assert success
Args:
args: List of command arguments
expected_in_output: Optional string expected in output
"""
result = runner.invoke(cli, args)
assert result.exit_code == 0, f"Command failed: {result.output}"
if expected_in_output:
assert expected_in_output in result.output
return result
return _assert
@pytest.fixture
def assert_cli_failure(runner):
"""Helper to assert CLI command failure"""
def _assert(args, expected_in_output=None):
"""
Run CLI command and assert failure
Args:
args: List of command arguments
expected_in_output: Optional string expected in output
"""
result = runner.invoke(cli, args)
assert result.exit_code != 0, f"Command should have failed: {result.output}"
if expected_in_output:
assert expected_in_output in result.output
return result
return _assert
# Cleanup Fixtures
@pytest.fixture(autouse=True)
def cleanup_temp_files(request):
"""Automatically clean up temporary files after tests"""
temp_files = []
def _register(filepath):
temp_files.append(filepath)
request.addfinalizer(lambda: [
os.remove(f) for f in temp_files if os.path.exists(f)
])
return _register
@pytest.fixture(scope='session')
def test_data_dir():
"""Provide path to test data directory"""
return Path(__file__).parent / 'test_data'
# Parametrized Fixtures
@pytest.fixture(params=['json', 'yaml', 'table'])
def output_format(request):
"""Parametrize tests across different output formats"""
return request.param
@pytest.fixture(params=[True, False])
def verbose_mode(request):
"""Parametrize tests with and without verbose mode"""
return request.param
@pytest.fixture(params=['development', 'staging', 'production'])
def environment(request):
"""Parametrize tests across different environments"""
return request.param
# Integration Test Fixtures
@pytest.fixture
def integration_workspace(tmp_path):
"""
Create a complete integration test workspace with all necessary files
"""
workspace = tmp_path / 'integration'
workspace.mkdir()
# Create directory structure
(workspace / 'src').mkdir()
(workspace / 'tests').mkdir()
(workspace / 'config').mkdir()
(workspace / 'data').mkdir()
# Create config files
(workspace / 'config' / 'dev.yaml').write_text('env: development\n')
(workspace / 'config' / 'prod.yaml').write_text('env: production\n')
# Initialize CLI
runner = CliRunner()
with runner.isolated_filesystem(temp_dir=workspace):
runner.invoke(cli, ['init'])
return workspace
@pytest.fixture
def mock_external_service(monkeypatch):
"""Mock external service API calls"""
class MockService:
def __init__(self):
self.calls = []
def call_api(self, endpoint, method='GET', data=None):
self.calls.append({
'endpoint': endpoint,
'method': method,
'data': data
})
return {'status': 'success', 'data': 'mock response'}
mock = MockService()
# Replace actual service with mock
monkeypatch.setattr('mycli.services.api', mock)
return mock
# Snapshot Testing Fixtures
@pytest.fixture
def snapshot_dir(tmp_path):
"""Create directory for snapshot testing"""
snapshot = tmp_path / 'snapshots'
snapshot.mkdir()
return snapshot
@pytest.fixture
def compare_output(snapshot_dir):
"""Compare CLI output with saved snapshot"""
def _compare(output, snapshot_name):
snapshot_file = snapshot_dir / f'{snapshot_name}.txt'
if not snapshot_file.exists():
# Create snapshot
snapshot_file.write_text(output)
return True
# Compare with existing snapshot
expected = snapshot_file.read_text()
return output == expected
return _compare

View File

@@ -0,0 +1,378 @@
"""
Pytest Integration Test Template
Complete workflow testing for CLI applications using Click.testing.CliRunner
Tests multi-command workflows, state persistence, and end-to-end scenarios
"""
import pytest
import os
import json
import yaml
from pathlib import Path
from click.testing import CliRunner
from mycli.cli import cli
@pytest.fixture
def integration_runner():
"""Create runner with isolated filesystem for integration tests"""
runner = CliRunner()
with runner.isolated_filesystem():
yield runner
class TestDeploymentWorkflow:
"""Test complete deployment workflow"""
def test_full_deployment_workflow(self, integration_runner):
"""Should complete init -> configure -> build -> deploy workflow"""
runner = integration_runner
# Step 1: Initialize project
result = runner.invoke(cli, ['init', 'my-project'])
assert result.exit_code == 0
assert 'Project initialized' in result.output
assert os.path.exists('my-project')
# Step 2: Configure API key
os.chdir('my-project')
result = runner.invoke(cli, ['config', 'set', 'api_key', 'your_key_here'])
assert result.exit_code == 0
# Step 3: Build project
result = runner.invoke(cli, ['build', '--production'])
assert result.exit_code == 0
assert 'Build successful' in result.output
# Step 4: Deploy to production
result = runner.invoke(cli, ['deploy', 'production'])
assert result.exit_code == 0
assert 'Deployed successfully' in result.output
def test_deployment_without_config_fails(self, integration_runner):
"""Should fail deployment without required configuration"""
runner = integration_runner
# Initialize but don't configure
runner.invoke(cli, ['init', 'my-project'])
os.chdir('my-project')
# Try to deploy without API key
result = runner.invoke(cli, ['deploy', 'production'])
assert result.exit_code != 0
assert 'api_key' in result.output.lower()
def test_deployment_rollback(self, integration_runner):
"""Should rollback failed deployment"""
runner = integration_runner
# Setup and deploy
runner.invoke(cli, ['init', 'my-project'])
os.chdir('my-project')
runner.invoke(cli, ['config', 'set', 'api_key', 'your_key_here'])
runner.invoke(cli, ['deploy', 'staging'])
# Rollback
result = runner.invoke(cli, ['rollback'])
assert result.exit_code == 0
assert 'Rollback successful' in result.output
class TestMultiEnvironmentWorkflow:
"""Test multi-environment configuration and deployment"""
def test_manage_multiple_environments(self, integration_runner):
"""Should manage dev, staging, and production environments"""
runner = integration_runner
runner.invoke(cli, ['init', 'multi-env-project'])
os.chdir('multi-env-project')
# Configure development
runner.invoke(cli, ['config', 'set', 'api_key', 'dev_key', '--env', 'development'])
runner.invoke(cli, ['config', 'set', 'base_url', 'https://dev.api.example.com', '--env', 'development'])
# Configure staging
runner.invoke(cli, ['config', 'set', 'api_key', 'staging_key', '--env', 'staging'])
runner.invoke(cli, ['config', 'set', 'base_url', 'https://staging.api.example.com', '--env', 'staging'])
# Configure production
runner.invoke(cli, ['config', 'set', 'api_key', 'prod_key', '--env', 'production'])
runner.invoke(cli, ['config', 'set', 'base_url', 'https://api.example.com', '--env', 'production'])
# Deploy to each environment
dev_result = runner.invoke(cli, ['deploy', 'development'])
assert dev_result.exit_code == 0
assert 'dev.api.example.com' in dev_result.output
staging_result = runner.invoke(cli, ['deploy', 'staging'])
assert staging_result.exit_code == 0
assert 'staging.api.example.com' in staging_result.output
prod_result = runner.invoke(cli, ['deploy', 'production'])
assert prod_result.exit_code == 0
assert 'api.example.com' in prod_result.output
def test_environment_isolation(self, integration_runner):
"""Should keep environment configurations isolated"""
runner = integration_runner
runner.invoke(cli, ['init', 'isolated-project'])
os.chdir('isolated-project')
# Set different values for each environment
runner.invoke(cli, ['config', 'set', 'timeout', '10', '--env', 'development'])
runner.invoke(cli, ['config', 'set', 'timeout', '30', '--env', 'production'])
# Verify values are isolated
dev_result = runner.invoke(cli, ['config', 'get', 'timeout', '--env', 'development'])
assert '10' in dev_result.output
prod_result = runner.invoke(cli, ['config', 'get', 'timeout', '--env', 'production'])
assert '30' in prod_result.output
class TestStatePersistence:
"""Test state management and persistence"""
def test_state_persistence_across_commands(self, integration_runner):
"""Should maintain state across multiple commands"""
runner = integration_runner
# Initialize state
result = runner.invoke(cli, ['state', 'init'])
assert result.exit_code == 0
# Set multiple state values
runner.invoke(cli, ['state', 'set', 'counter', '0'])
runner.invoke(cli, ['state', 'set', 'user', 'testuser'])
# Increment counter multiple times
for i in range(5):
runner.invoke(cli, ['increment'])
# Verify final state
result = runner.invoke(cli, ['state', 'get', 'counter'])
assert result.exit_code == 0
assert '5' in result.output
result = runner.invoke(cli, ['state', 'get', 'user'])
assert 'testuser' in result.output
def test_state_recovery_from_corruption(self, integration_runner):
"""Should recover from corrupted state file"""
runner = integration_runner
# Create valid state
runner.invoke(cli, ['state', 'init'])
runner.invoke(cli, ['state', 'set', 'key', 'value'])
# Corrupt the state file
with open('.mycli-state', 'w') as f:
f.write('invalid json {[}')
# Should detect corruption and recover
result = runner.invoke(cli, ['state', 'get', 'key'])
assert result.exit_code != 0
assert 'corrupt' in result.output.lower()
# Should be able to reset
result = runner.invoke(cli, ['state', 'reset'])
assert result.exit_code == 0
class TestPluginWorkflow:
"""Test plugin installation and usage"""
def test_plugin_lifecycle(self, integration_runner):
"""Should install, use, and uninstall plugins"""
runner = integration_runner
runner.invoke(cli, ['init', 'plugin-project'])
os.chdir('plugin-project')
# Install plugin
result = runner.invoke(cli, ['plugin', 'install', 'test-plugin'])
assert result.exit_code == 0
assert 'installed' in result.output.lower()
# Verify plugin is listed
result = runner.invoke(cli, ['plugin', 'list'])
assert 'test-plugin' in result.output
# Use plugin command
result = runner.invoke(cli, ['test-plugin:command', '--arg', 'value'])
assert result.exit_code == 0
# Uninstall plugin
result = runner.invoke(cli, ['plugin', 'uninstall', 'test-plugin'])
assert result.exit_code == 0
# Verify plugin is removed
result = runner.invoke(cli, ['plugin', 'list'])
assert 'test-plugin' not in result.output
def test_plugin_conflict_detection(self, integration_runner):
"""Should detect and handle plugin conflicts"""
runner = integration_runner
runner.invoke(cli, ['init', 'conflict-project'])
os.chdir('conflict-project')
# Install first plugin
runner.invoke(cli, ['plugin', 'install', 'plugin-a'])
# Try to install conflicting plugin
result = runner.invoke(cli, ['plugin', 'install', 'plugin-b'])
if 'conflict' in result.output.lower():
assert result.exit_code != 0
class TestDataMigration:
"""Test data migration workflows"""
def test_version_migration(self, integration_runner):
"""Should migrate data between versions"""
runner = integration_runner
# Create old version data
old_data = {
'version': 1,
'format': 'legacy',
'data': {'key': 'value'}
}
with open('data.json', 'w') as f:
json.dump(old_data, f)
# Run migration
result = runner.invoke(cli, ['migrate', '--to', '2.0'])
assert result.exit_code == 0
# Verify new format
with open('data.json', 'r') as f:
new_data = json.load(f)
assert new_data['version'] == 2
assert 'legacy' not in new_data.get('format', '')
def test_migration_backup(self, integration_runner):
"""Should create backup during migration"""
runner = integration_runner
# Create data
data = {'version': 1, 'data': 'important'}
with open('data.json', 'w') as f:
json.dump(data, f)
# Migrate with backup
result = runner.invoke(cli, ['migrate', '--to', '2.0', '--backup'])
assert result.exit_code == 0
# Verify backup exists
assert os.path.exists('data.json.backup')
class TestConcurrentOperations:
"""Test handling of concurrent operations"""
def test_file_locking(self, integration_runner):
"""Should prevent concurrent modifications"""
runner = integration_runner
runner.invoke(cli, ['init', 'lock-project'])
os.chdir('lock-project')
# Create lock file
with open('.mycli.lock', 'w') as f:
f.write('locked')
# Try to run command that needs lock
result = runner.invoke(cli, ['deploy', 'production'])
assert result.exit_code != 0
assert 'lock' in result.output.lower()
def test_lock_timeout(self, integration_runner):
"""Should timeout waiting for lock"""
runner = integration_runner
runner.invoke(cli, ['init', 'timeout-project'])
os.chdir('timeout-project')
# Create stale lock
with open('.mycli.lock', 'w') as f:
import time
f.write(str(time.time() - 3600)) # 1 hour old
# Should detect stale lock and continue
result = runner.invoke(cli, ['build'])
assert result.exit_code == 0
class TestErrorRecovery:
"""Test error recovery and retry logic"""
def test_retry_on_failure(self, integration_runner):
"""Should retry failed operations"""
runner = integration_runner
runner.invoke(cli, ['init', 'retry-project'])
os.chdir('retry-project')
runner.invoke(cli, ['config', 'set', 'api_key', 'your_key_here'])
# Simulate failure and retry
result = runner.invoke(cli, ['deploy', 'staging', '--retry', '3'])
# Should attempt retry logic
def test_partial_failure_recovery(self, integration_runner):
"""Should recover from partial failures"""
runner = integration_runner
runner.invoke(cli, ['init', 'recovery-project'])
os.chdir('recovery-project')
# Create partial state
runner.invoke(cli, ['build', '--step', '1'])
runner.invoke(cli, ['build', '--step', '2'])
# Complete from last successful step
result = runner.invoke(cli, ['build', '--continue'])
assert result.exit_code == 0
class TestCompleteWorkflow:
"""Test complete end-to-end workflows"""
def test_full_project_lifecycle(self, integration_runner):
"""Should complete entire project lifecycle"""
runner = integration_runner
# Create project
result = runner.invoke(cli, ['create', 'full-project'])
assert result.exit_code == 0
os.chdir('full-project')
# Configure
runner.invoke(cli, ['config', 'set', 'api_key', 'your_key_here'])
runner.invoke(cli, ['config', 'set', 'region', 'us-west-1'])
# Add dependencies
result = runner.invoke(cli, ['add', 'dependency', 'package-name'])
assert result.exit_code == 0
# Build
result = runner.invoke(cli, ['build', '--production'])
assert result.exit_code == 0
# Test
result = runner.invoke(cli, ['test'])
assert result.exit_code == 0
# Deploy
result = runner.invoke(cli, ['deploy', 'production'])
assert result.exit_code == 0
# Verify deployment
result = runner.invoke(cli, ['status'])
assert result.exit_code == 0
assert 'deployed' in result.output.lower()

View File

@@ -0,0 +1,509 @@
"""
Python Test Helper Functions
Utility functions for CLI testing with pytest and Click.testing.CliRunner
"""
import os
import json
import tempfile
import shutil
from pathlib import Path
from typing import Any, Dict, List, Optional, Callable
from click.testing import CliRunner, Result
class CLITestHarness:
"""Test harness for CLI testing with helpful assertion methods"""
def __init__(self, cli_app):
"""
Initialize test harness
Args:
cli_app: Click CLI application to test
"""
self.cli = cli_app
self.runner = CliRunner()
def run(
self,
args: List[str],
input_data: Optional[str] = None,
env: Optional[Dict[str, str]] = None
) -> Result:
"""
Run CLI command
Args:
args: Command arguments
input_data: Input for interactive prompts
env: Environment variables
Returns:
Click Result object
"""
return self.runner.invoke(self.cli, args, input=input_data, env=env)
def assert_success(
self,
args: List[str],
expected_in_output: Optional[str] = None
) -> Result:
"""
Run command and assert successful execution
Args:
args: Command arguments
expected_in_output: Optional string expected in output
Returns:
Click Result object
Raises:
AssertionError: If command fails or output doesn't match
"""
result = self.run(args)
assert result.exit_code == 0, f"Command failed: {result.output}"
if expected_in_output:
assert expected_in_output in result.output, \
f"Expected '{expected_in_output}' in output: {result.output}"
return result
def assert_failure(
self,
args: List[str],
expected_in_output: Optional[str] = None
) -> Result:
"""
Run command and assert it fails
Args:
args: Command arguments
expected_in_output: Optional string expected in output
Returns:
Click Result object
Raises:
AssertionError: If command succeeds or output doesn't match
"""
result = self.run(args)
assert result.exit_code != 0, f"Command should have failed: {result.output}"
if expected_in_output:
assert expected_in_output in result.output, \
f"Expected '{expected_in_output}' in output: {result.output}"
return result
def assert_exit_code(self, args: List[str], expected_code: int) -> Result:
"""
Run command and assert specific exit code
Args:
args: Command arguments
expected_code: Expected exit code
Returns:
Click Result object
Raises:
AssertionError: If exit code doesn't match
"""
result = self.run(args)
assert result.exit_code == expected_code, \
f"Expected exit code {expected_code}, got {result.exit_code}"
return result
def run_json(self, args: List[str]) -> Dict[str, Any]:
"""
Run command and parse JSON output
Args:
args: Command arguments
Returns:
Parsed JSON object
Raises:
AssertionError: If command fails
json.JSONDecodeError: If output is not valid JSON
"""
result = self.assert_success(args)
return json.loads(result.output)
def create_temp_workspace() -> Path:
"""
Create temporary workspace directory
Returns:
Path to temporary workspace
"""
temp_dir = Path(tempfile.mkdtemp(prefix='cli-test-'))
return temp_dir
def cleanup_workspace(workspace: Path) -> None:
"""
Clean up temporary workspace
Args:
workspace: Path to workspace to remove
"""
if workspace.exists():
shutil.rmtree(workspace)
def create_temp_file(content: str, suffix: str = '.txt') -> Path:
"""
Create temporary file with content
Args:
content: File content
suffix: File extension
Returns:
Path to created file
"""
fd, path = tempfile.mkstemp(suffix=suffix)
with os.fdopen(fd, 'w') as f:
f.write(content)
return Path(path)
def assert_file_exists(filepath: Path, message: Optional[str] = None) -> None:
"""
Assert file exists
Args:
filepath: Path to file
message: Optional custom error message
"""
assert filepath.exists(), message or f"File does not exist: {filepath}"
def assert_file_contains(filepath: Path, expected: str) -> None:
"""
Assert file contains expected text
Args:
filepath: Path to file
expected: Expected text
"""
content = filepath.read_text()
assert expected in content, \
f"Expected '{expected}' in file {filepath}\nActual content: {content}"
def assert_json_output(result: Result, schema: Dict[str, type]) -> Dict[str, Any]:
"""
Assert output is valid JSON matching schema
Args:
result: Click Result object
schema: Expected schema as dict of {key: expected_type}
Returns:
Parsed JSON object
Raises:
AssertionError: If JSON is invalid or doesn't match schema
"""
try:
data = json.loads(result.output)
except json.JSONDecodeError as e:
raise AssertionError(f"Invalid JSON output: {e}\nOutput: {result.output}")
for key, expected_type in schema.items():
assert key in data, f"Missing key in JSON output: {key}"
assert isinstance(data[key], expected_type), \
f"Expected type {expected_type} for key {key}, got {type(data[key])}"
return data
def mock_env_vars(vars_dict: Dict[str, str]) -> Callable[[], None]:
"""
Mock environment variables
Args:
vars_dict: Dictionary of environment variables to set
Returns:
Function to restore original environment
Example:
restore = mock_env_vars({'API_KEY': 'test_key'})
# ... run tests ...
restore()
"""
original = {}
for key, value in vars_dict.items():
original[key] = os.environ.get(key)
os.environ[key] = value
def restore():
for key, value in original.items():
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value
return restore
def compare_output_lines(result: Result, expected_lines: List[str]) -> None:
"""
Compare output with expected lines
Args:
result: Click Result object
expected_lines: List of expected lines in output
Raises:
AssertionError: If any expected line is missing
"""
output = result.output
for expected in expected_lines:
assert expected in output, \
f"Expected line '{expected}' not found in output:\n{output}"
def parse_table_output(result: Result) -> List[Dict[str, str]]:
"""
Parse table output into list of dictionaries
Args:
result: Click Result object with table output
Returns:
List of row dictionaries
Note:
Expects table with headers and │ separators
"""
lines = result.output.strip().split('\n')
# Find header line
header_line = None
for i, line in enumerate(lines):
if '' in line and i > 0:
header_line = i
break
if header_line is None:
raise ValueError("Could not find table header")
# Parse headers
headers = [h.strip() for h in lines[header_line].split('') if h.strip()]
# Parse rows
rows = []
for line in lines[header_line + 2:]: # Skip separator
if '' in line:
values = [v.strip() for v in line.split('') if v.strip()]
if len(values) == len(headers):
rows.append(dict(zip(headers, values)))
return rows
class SnapshotTester:
"""Helper for snapshot testing CLI output"""
def __init__(self, snapshot_dir: Path):
"""
Initialize snapshot tester
Args:
snapshot_dir: Directory to store snapshots
"""
self.snapshot_dir = snapshot_dir
self.snapshot_dir.mkdir(exist_ok=True)
def assert_matches(
self,
result: Result,
snapshot_name: str,
update: bool = False
) -> None:
"""
Assert output matches snapshot
Args:
result: Click Result object
snapshot_name: Name of snapshot file
update: Whether to update snapshot
Raises:
AssertionError: If output doesn't match snapshot
"""
snapshot_file = self.snapshot_dir / f'{snapshot_name}.txt'
if update or not snapshot_file.exists():
snapshot_file.write_text(result.output)
return
expected = snapshot_file.read_text()
assert result.output == expected, \
f"Output doesn't match snapshot {snapshot_name}\n" \
f"Expected:\n{expected}\n\nActual:\n{result.output}"
class MockConfig:
"""Mock configuration file for testing"""
def __init__(self, workspace: Path, filename: str = '.myclirc'):
"""
Initialize mock config
Args:
workspace: Workspace directory
filename: Config filename
"""
self.config_path = workspace / filename
self.data = {}
def set(self, key: str, value: Any) -> None:
"""Set configuration value"""
self.data[key] = value
self.save()
def get(self, key: str, default: Any = None) -> Any:
"""Get configuration value"""
return self.data.get(key, default)
def save(self) -> None:
"""Save configuration to file"""
import yaml
with open(self.config_path, 'w') as f:
yaml.dump(self.data, f)
def load(self) -> None:
"""Load configuration from file"""
if self.config_path.exists():
import yaml
with open(self.config_path, 'r') as f:
self.data = yaml.safe_load(f) or {}
def wait_for_file(filepath: Path, timeout: float = 5.0) -> None:
"""
Wait for file to exist
Args:
filepath: Path to file
timeout: Timeout in seconds
Raises:
TimeoutError: If file doesn't exist within timeout
"""
import time
start = time.time()
while not filepath.exists():
if time.time() - start > timeout:
raise TimeoutError(f"Timeout waiting for file: {filepath}")
time.sleep(0.1)
def capture_output(func: Callable) -> Dict[str, str]:
"""
Capture stdout and stderr during function execution
Args:
func: Function to execute
Returns:
Dictionary with 'stdout' and 'stderr' keys
"""
import sys
from io import StringIO
old_stdout = sys.stdout
old_stderr = sys.stderr
stdout_capture = StringIO()
stderr_capture = StringIO()
sys.stdout = stdout_capture
sys.stderr = stderr_capture
try:
func()
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
return {
'stdout': stdout_capture.getvalue(),
'stderr': stderr_capture.getvalue()
}
class IntegrationTestHelper:
"""Helper for integration testing with state management"""
def __init__(self, cli_app, workspace: Optional[Path] = None):
"""
Initialize integration test helper
Args:
cli_app: Click CLI application
workspace: Optional workspace directory
"""
self.harness = CLITestHarness(cli_app)
self.workspace = workspace or create_temp_workspace()
self.original_cwd = Path.cwd()
def __enter__(self):
"""Enter context - change to workspace"""
os.chdir(self.workspace)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context - restore cwd and cleanup"""
os.chdir(self.original_cwd)
cleanup_workspace(self.workspace)
def run_workflow(self, commands: List[List[str]]) -> List[Result]:
"""
Run multiple commands in sequence
Args:
commands: List of command argument lists
Returns:
List of Result objects
"""
results = []
for cmd in commands:
result = self.harness.run(cmd)
results.append(result)
if result.exit_code != 0:
break
return results
def assert_workflow_success(self, commands: List[List[str]]) -> List[Result]:
"""
Run workflow and assert all commands succeed
Args:
commands: List of command argument lists
Returns:
List of Result objects
Raises:
AssertionError: If any command fails
"""
results = []
for i, cmd in enumerate(commands):
result = self.harness.assert_success(cmd)
results.append(result)
return results

View File

@@ -0,0 +1,362 @@
/**
* Node.js Test Helper Functions
*
* Utility functions for CLI testing with Jest
*/
import { execSync, spawn, SpawnOptions } from 'child_process';
import fs from 'fs';
import path from 'path';
import os from 'os';
/**
* CLI execution result interface
*/
export interface CLIResult {
stdout: string;
stderr: string;
code: number;
success: boolean;
}
/**
* Execute CLI command synchronously
* @param cliPath - Path to CLI executable
* @param args - Command arguments
* @param options - Execution options
* @returns CLI execution result
*/
export function runCLI(
cliPath: string,
args: string,
options: {
cwd?: string;
env?: Record<string, string>;
timeout?: number;
} = {}
): CLIResult {
try {
const stdout = execSync(`${cliPath} ${args}`, {
encoding: 'utf8',
stdio: 'pipe',
cwd: options.cwd,
env: { ...process.env, ...options.env },
timeout: options.timeout,
});
return {
stdout,
stderr: '',
code: 0,
success: true,
};
} catch (error: any) {
return {
stdout: error.stdout || '',
stderr: error.stderr || '',
code: error.status || 1,
success: false,
};
}
}
/**
* Execute CLI command asynchronously
* @param cliPath - Path to CLI executable
* @param args - Command arguments array
* @param options - Spawn options
* @returns Promise of CLI execution result
*/
export function runCLIAsync(
cliPath: string,
args: string[],
options: SpawnOptions = {}
): Promise<CLIResult> {
return new Promise((resolve) => {
const child = spawn(cliPath, args, {
...options,
stdio: 'pipe',
});
let stdout = '';
let stderr = '';
child.stdout?.on('data', (data) => {
stdout += data.toString();
});
child.stderr?.on('data', (data) => {
stderr += data.toString();
});
child.on('close', (code) => {
resolve({
stdout,
stderr,
code: code || 0,
success: code === 0,
});
});
child.on('error', (error) => {
resolve({
stdout,
stderr: stderr + error.message,
code: 1,
success: false,
});
});
});
}
/**
* Create temporary test directory
* @returns Path to temporary directory
*/
export function createTempDir(): string {
const tempDir = path.join(os.tmpdir(), `cli-test-${Date.now()}-${Math.random().toString(36).slice(2)}`);
fs.mkdirSync(tempDir, { recursive: true });
return tempDir;
}
/**
* Clean up temporary directory
* @param dirPath - Directory to remove
*/
export function cleanupTempDir(dirPath: string): void {
if (fs.existsSync(dirPath)) {
fs.rmSync(dirPath, { recursive: true, force: true });
}
}
/**
* Create temporary file with content
* @param content - File content
* @param extension - File extension
* @returns Path to created file
*/
export function createTempFile(content: string, extension: string = 'txt'): string {
const tempFile = path.join(os.tmpdir(), `test-${Date.now()}.${extension}`);
fs.writeFileSync(tempFile, content);
return tempFile;
}
/**
* Assert CLI command succeeds
* @param result - CLI execution result
* @param expectedOutput - Optional expected output substring
*/
export function assertSuccess(result: CLIResult, expectedOutput?: string): void {
if (!result.success) {
throw new Error(`CLI command failed with exit code ${result.code}\nStderr: ${result.stderr}`);
}
if (expectedOutput && !result.stdout.includes(expectedOutput)) {
throw new Error(`Expected output to contain "${expectedOutput}"\nActual: ${result.stdout}`);
}
}
/**
* Assert CLI command fails
* @param result - CLI execution result
* @param expectedError - Optional expected error substring
*/
export function assertFailure(result: CLIResult, expectedError?: string): void {
if (result.success) {
throw new Error(`CLI command should have failed but succeeded\nStdout: ${result.stdout}`);
}
if (expectedError && !result.stderr.includes(expectedError) && !result.stdout.includes(expectedError)) {
throw new Error(`Expected error to contain "${expectedError}"\nActual stderr: ${result.stderr}\nActual stdout: ${result.stdout}`);
}
}
/**
* Assert exit code matches expected value
* @param result - CLI execution result
* @param expectedCode - Expected exit code
*/
export function assertExitCode(result: CLIResult, expectedCode: number): void {
if (result.code !== expectedCode) {
throw new Error(`Expected exit code ${expectedCode} but got ${result.code}\nStderr: ${result.stderr}`);
}
}
/**
* Parse JSON output from CLI
* @param result - CLI execution result
* @returns Parsed JSON object
*/
export function parseJSONOutput<T = any>(result: CLIResult): T {
try {
return JSON.parse(result.stdout);
} catch (error) {
throw new Error(`Failed to parse JSON output: ${error}\nStdout: ${result.stdout}`);
}
}
/**
* Mock environment variables for test
* @param vars - Environment variables to set
* @returns Function to restore original environment
*/
export function mockEnv(vars: Record<string, string>): () => void {
const original = { ...process.env };
Object.entries(vars).forEach(([key, value]) => {
process.env[key] = value;
});
return () => {
Object.keys(process.env).forEach((key) => {
if (!(key in original)) {
delete process.env[key];
}
});
Object.entries(original).forEach(([key, value]) => {
process.env[key] = value;
});
};
}
/**
* Wait for file to exist
* @param filePath - Path to file
* @param timeout - Timeout in milliseconds
* @returns Promise that resolves when file exists
*/
export async function waitForFile(filePath: string, timeout: number = 5000): Promise<void> {
const startTime = Date.now();
while (!fs.existsSync(filePath)) {
if (Date.now() - startTime > timeout) {
throw new Error(`Timeout waiting for file: ${filePath}`);
}
await new Promise((resolve) => setTimeout(resolve, 100));
}
}
/**
* Create CLI test fixture with setup and teardown
* @param setup - Setup function
* @param teardown - Teardown function
* @returns Test fixture object
*/
export function createFixture<T>(
setup: () => T | Promise<T>,
teardown: (fixture: T) => void | Promise<void>
): {
beforeEach: () => Promise<T>;
afterEach: (fixture: T) => Promise<void>;
} {
return {
beforeEach: async () => setup(),
afterEach: async (fixture: T) => teardown(fixture),
};
}
/**
* Capture stdout/stderr during function execution
* @param fn - Function to execute
* @returns Captured output
*/
export function captureOutput(fn: () => void): { stdout: string; stderr: string } {
const originalStdout = process.stdout.write;
const originalStderr = process.stderr.write;
let stdout = '';
let stderr = '';
process.stdout.write = ((chunk: any) => {
stdout += chunk.toString();
return true;
}) as any;
process.stderr.write = ((chunk: any) => {
stderr += chunk.toString();
return true;
}) as any;
try {
fn();
} finally {
process.stdout.write = originalStdout;
process.stderr.write = originalStderr;
}
return { stdout, stderr };
}
/**
* Test helper for testing CLI with different input combinations
*/
export class CLITestHarness {
constructor(private cliPath: string) {}
/**
* Run command with arguments
*/
run(args: string, options?: { cwd?: string; env?: Record<string, string> }): CLIResult {
return runCLI(this.cliPath, args, options);
}
/**
* Run command and assert success
*/
assertSuccess(args: string, expectedOutput?: string): CLIResult {
const result = this.run(args);
assertSuccess(result, expectedOutput);
return result;
}
/**
* Run command and assert failure
*/
assertFailure(args: string, expectedError?: string): CLIResult {
const result = this.run(args);
assertFailure(result, expectedError);
return result;
}
/**
* Run command and parse JSON output
*/
runJSON<T = any>(args: string): T {
const result = this.run(args);
assertSuccess(result);
return parseJSONOutput<T>(result);
}
}
/**
* Validate JSON schema in CLI output
* @param result - CLI execution result
* @param schema - Expected schema object
*/
export function validateJSONSchema(result: CLIResult, schema: Record<string, string>): void {
const output = parseJSONOutput(result);
Object.entries(schema).forEach(([key, expectedType]) => {
if (!(key in output)) {
throw new Error(`Missing expected key in JSON output: ${key}`);
}
const actualType = typeof output[key];
if (actualType !== expectedType) {
throw new Error(`Expected type ${expectedType} for key ${key}, but got ${actualType}`);
}
});
}
/**
* Compare CLI output with snapshot
* @param result - CLI execution result
* @param snapshotPath - Path to snapshot file
* @param update - Whether to update snapshot
*/
export function compareSnapshot(result: CLIResult, snapshotPath: string, update: boolean = false): void {
if (update || !fs.existsSync(snapshotPath)) {
fs.writeFileSync(snapshotPath, result.stdout);
return;
}
const snapshot = fs.readFileSync(snapshotPath, 'utf8');
if (result.stdout !== snapshot) {
throw new Error(`Output does not match snapshot\nExpected:\n${snapshot}\n\nActual:\n${result.stdout}`);
}
}