Files
ci/src/agents/orchestrator.ts
T
Jon Chery d6ba76e660 fix(P01): add SIGTERM/SIGINT signal handlers for graceful shutdown
---ci---
project: ci
phase: 1
milestone: v0.8
status: in_progress
decisions:
  - id: D-026
    decision: Graceful drain on SIGTERM/SIGINT: dispose timers then exit
    rationale: Prevents orphaned setTimeout timers from leaking when process is killed
    confidence: 0.88
requirements:
  covered: [FIX-07]
---/ci---

FIX-07: cli/index.ts registers SIGTERM/SIGINT handlers that call
escalationProtocol.dispose() before process.exit. OrchestratorAgent
registers its EscalationProtocol instance via registerEscalationProtocol().
SIGINT exits with code 130, SIGTERM with 143 (standard signal+128 convention).
2026-05-29 20:05:48 +00:00

731 lines
27 KiB
TypeScript

import { BaseAgent, AgentContext, AgentResult } from "./base.js";
import { DecisionEngine } from "../core/decision-engine.js";
import { ClarifyPhase } from "../core/clarify.js";
import { EscalationProtocol, EscalationInput } from "../core/escalation.js";
import { GitContext, ProjectState } from "../core/git-context.js";
import { GitBranch } from "../core/git-branch.js";
import { CIAgentFiles } from "../core/ciagent-files.js";
import { CommitBuilder } from "../core/commit-builder.js";
import { CIAgentConfig, AgentName } from "../types/config.js";
import {
PipelineState,
PipelineStage,
PhaseResult,
OrchestratorResult,
createInitialPipelineState,
STAGE_ORDER,
} from "../types/pipeline.js";
import { Specification, parseSpecification } from "../types/specification.js";
import { loadConfig, saveConfig, isCIAgentInitialized, initCIAgent } from "../core/config.js";
import { getAgent } from "./index.js";
import { IntelligenceBackend, BackendUnavailableError } from "../backends/types.js";
import { registerEscalationProtocol } from "../cli/index.js";
import { execSync } from "node:child_process";
export interface GitAgentContext extends AgentContext {
gitContext: GitContext;
gitBranch: GitBranch;
ciFiles: CIAgentFiles;
milestone: string;
}
export class OrchestratorAgent extends BaseAgent {
readonly name: AgentName = "orchestrator";
readonly description = "Top-level autonomous controller that coordinates the full CIAgent pipeline";
readonly workflow = "run";
private config: CIAgentConfig;
private pipelineState: PipelineState | null = null;
private decisionEngine: DecisionEngine | null = null;
private escalationProtocol: EscalationProtocol | null = null;
private gitContext: GitContext | null = null;
private gitBranch: GitBranch | null = null;
private ciFiles: CIAgentFiles | null = null;
private currentMilestone: string;
private phaseResults: PhaseResult[] = [];
private totalPhases: number = 1;
private static readonly STAGE_AGENT_MAP: Partial<Record<PipelineStage, AgentName[]>> = {
research: ["researcher"],
plan: ["planner"],
execute: ["executor", "code-reviewer", "security-auditor"],
test: ["tester"],
verify: ["verifier"],
complete: ["doc-writer"],
};
constructor(config?: CIAgentConfig) {
super();
this.config = config || loadConfig(process.cwd());
this.currentMilestone = "v1.0";
}
async execute(context: AgentContext): Promise<AgentResult> {
const startTime = Date.now();
this.log("Starting CIAgent Orchestrator pipeline (git-native)");
try {
this.config = loadConfig(context.project_path);
this.gitContext = new GitContext(context.project_path);
this.gitBranch = new GitBranch(context.project_path);
this.ciFiles = new CIAgentFiles(context.project_path);
this.ciFiles.ensureCIDir();
const projectState = this.gitContext.reconstructState();
this.currentMilestone = projectState.currentMilestone || "v1.0";
this.log(`Reconstructed state: phase=${projectState.currentPhase}, milestone=${projectState.currentMilestone}, stage=${projectState.currentStage}`);
this.pipelineState = createInitialPipelineState(context.project_path);
if (projectState.currentPhase > 0) {
this.pipelineState.current_phase = projectState.currentPhase;
this.pipelineState.current_stage = projectState.currentStage;
}
this.totalPhases = this.deriveTotalPhases();
this.log(`Total phases in milestone: ${this.totalPhases}`);
this.decisionEngine = new DecisionEngine(this.config, context.project_path, this.currentMilestone);
this.escalationProtocol = new EscalationProtocol(this.config, context.project_path, this.currentMilestone);
registerEscalationProtocol(this.escalationProtocol);
while (this.pipelineState.current_phase <= this.totalPhases) {
this.log(`Processing phase ${this.pipelineState.current_phase} of ${this.totalPhases}`);
for (const stage of STAGE_ORDER) {
this.log(`Entering stage: ${stage}`);
this.pipelineState.current_stage = stage;
this.pipelineState.last_updated = new Date().toISOString();
const result = await this.executeStageWithRecovery(stage, context);
this.phaseResults.push(result);
this.recordPhaseResult(result);
if (!result.success && stage !== "complete") {
this.pipelineState.errors.push({
stage,
phase: this.pipelineState.current_phase,
message: result.error || "Stage failed",
timestamp: new Date().toISOString(),
retry_count: 0,
resolved: false,
});
if (stage === "specify" || stage === "clarify") {
return {
success: false,
output: `Pipeline failed at ${stage}: ${result.error}`,
artifacts_created: this.phaseResults.reduce(
(acc, r) => acc + r.artifacts_created.length,
0
),
decisions: this.phaseResults.reduce(
(acc, r) => acc + r.decisions_made,
0
),
escalations: this.phaseResults.reduce(
(acc, r) => acc + r.escalations_raised,
0
),
duration_ms: Date.now() - startTime,
error: result.error,
};
}
}
}
if (this.pipelineState.current_phase < this.totalPhases) {
this.performPhaseBoundaryCheckpoint(context);
this.pipelineState.current_phase++;
this.pipelineState.current_stage = "specify";
this.log(`Advancing to phase ${this.pipelineState.current_phase}`);
} else {
break;
}
}
const totalDuration = Date.now() - startTime;
const completionReport = this.generateCompletionReport();
return {
success: true,
output: completionReport,
artifacts_created: this.phaseResults.reduce(
(acc, r) => acc + r.artifacts_created.length,
0
),
decisions: this.phaseResults.reduce(
(acc, r) => acc + r.decisions_made,
0
),
escalations: this.phaseResults.reduce(
(acc, r) => acc + r.escalations_raised,
0
),
duration_ms: totalDuration,
};
} catch (err) {
return {
success: false,
output: `Orchestrator failed: ${err instanceof Error ? err.message : String(err)}`,
artifacts_created: 0,
decisions: 0,
escalations: 0,
duration_ms: Date.now() - startTime,
error: err instanceof Error ? err.message : String(err),
};
} finally {
this.escalationProtocol?.dispose();
}
}
private buildGitAgentContext(context: AgentContext): GitAgentContext {
return {
...context,
gitContext: this.gitContext!,
gitBranch: this.gitBranch!,
ciFiles: this.ciFiles!,
milestone: this.currentMilestone,
};
}
private recordPhaseResult(result: PhaseResult): void {
for (const artifact of result.artifacts_created) {
this.log(`Artifact created: ${artifact}`);
}
if (result.decisions_made > 0 && this.decisionEngine) {
this.decisionEngine.makeHighConfidenceDecision(
`Agent reported ${result.decisions_made} decision(s) during ${result.stage}`,
`Decisions recorded from ${result.stage} stage execution`,
"general",
[]
);
}
if (result.escalations_raised > 0 && this.escalationProtocol) {
this.escalationProtocol.escalate({
type: "low_confidence_decision",
phase: String(this.pipelineState!.current_phase),
description: `Agent reported ${result.escalations_raised} escalation(s) during ${result.stage}`,
context: `Stage ${result.stage} raised escalations during execution`,
options: [
{ id: "proceed", label: "Proceed", description: "Continue pipeline execution", recommended: true },
{ id: "halt", label: "Halt", description: "Stop pipeline and await manual review", recommended: false },
],
default_option_id: "proceed",
});
}
}
private deriveTotalPhases(): number {
if (!this.ciFiles) return 1;
const roadmap = this.ciFiles.readRoadmapMd();
if (!roadmap || roadmap.phases.length === 0) return 1;
return roadmap.phases.length;
}
private performPhaseBoundaryCheckpoint(context: AgentContext): void {
this.log(`Phase boundary checkpoint for phase ${this.pipelineState!.current_phase}`);
if (this.config.git.auto_commit && this.gitContext!.isGitRepo()) {
try {
const message = `chore(P${String(this.pipelineState!.current_phase).padStart(2, "0")}): phase boundary checkpoint\n\n---ci---\nphase: ${this.pipelineState!.current_phase}\nmilestone: ${this.currentMilestone}\nstatus: complete\n---/ci---`;
execSync(`git add -A && git commit -m "${message.replace(/"/g, '\\"')}" --allow-empty`, {
cwd: context.project_path,
stdio: "pipe",
});
} catch (err) {
this.warn(`Phase boundary commit failed: ${err instanceof Error ? err.message : String(err)}`);
}
}
if (this.ciFiles) {
this.ciFiles.updatePhaseStatus(this.pipelineState!.current_phase, "complete");
const reqs = this.ciFiles.readRequirementsMd();
if (reqs) {
for (const t of reqs.traceability) {
if (t.phase === this.pipelineState!.current_phase && t.status === "in_progress") {
this.ciFiles.updateRequirementStatus(t.requirement, "complete");
}
}
}
}
if (this.gitContext) {
const verifiedState = this.gitContext.reconstructState();
this.log(`Verified state: phase=${verifiedState.currentPhase}, milestone=${verifiedState.currentMilestone}, stage=${verifiedState.currentStage}`);
}
}
private async executeStageWithRecovery(
stage: PipelineStage,
context: AgentContext
): Promise<PhaseResult> {
try {
const result = await this.executeStage(stage, context);
if (result.success) return result;
} catch (err) {
this.warn(`First attempt failed for ${stage}: ${err instanceof Error ? err.message : String(err)}`);
}
this.log(`Retrying stage ${stage}...`);
try {
const result = await this.executeStage(stage, context);
if (result.success) return result;
} catch (err) {
this.warn(`Retry failed for ${stage}: ${err instanceof Error ? err.message : String(err)}`);
}
if (context.backend) {
this.log(`Attempting plan revision for failed stage ${stage}...`);
try {
const planner = getAgent("planner");
const gitContext = this.buildGitAgentContext(context);
const planResult = await planner.execute({
...gitContext,
specification: `Plan revision needed: stage ${stage} failed twice. Original error context: phase ${this.pipelineState!.current_phase}`,
});
if (planResult.success) {
this.log(`Plan revision succeeded, retrying ${stage} with revised plan...`);
try {
const result = await this.executeStage(stage, context);
if (result.success) return result;
} catch (err) {
this.warn(`Post-revision retry failed for ${stage}: ${err instanceof Error ? err.message : String(err)}`);
}
}
} catch (err) {
this.warn(`Plan revision failed: ${err instanceof Error ? err.message : String(err)}`);
}
}
if (this.escalationProtocol) {
this.escalationProtocol.escalate({
type: "verification_failure",
phase: String(this.pipelineState!.current_phase),
description: `Stage ${stage} failed after retry and plan revision attempts`,
context: `All recovery attempts exhausted for stage ${stage} in phase ${this.pipelineState!.current_phase}`,
options: [
{ id: "skip", label: "Skip stage", description: "Continue pipeline skipping this stage", recommended: true },
{ id: "abort", label: "Abort pipeline", description: "Stop the entire pipeline", recommended: false },
],
default_option_id: "skip",
});
}
return {
phase: this.pipelineState!.current_phase,
stage,
success: false,
artifacts_created: [],
decisions_made: 0,
escalations_raised: 1,
duration_ms: 0,
error: `Stage ${stage} failed after recovery attempts`,
};
}
private async executeStage(
stage: PipelineStage,
context: AgentContext
): Promise<PhaseResult> {
const stageStart = Date.now();
const agentNames = OrchestratorAgent.STAGE_AGENT_MAP[stage];
if (agentNames && agentNames.length > 0 && context.backend) {
this.log(`Delegating ${stage} to ${agentNames.join(", ")} agent(s) via backend...`);
try {
let primaryResult: AgentResult | null = null;
const allArtifacts: string[] = [];
let totalDecisions = 0;
let totalEscalations = 0;
let lastError: string | undefined;
for (let i = 0; i < agentNames.length; i++) {
const agentName = agentNames[i];
const agent = getAgent(agentName);
const gitContext = this.buildGitAgentContext(context);
if (i === 0) {
const result = await agent.execute(gitContext);
primaryResult = result;
if (Array.isArray(result.artifacts_created)) {
allArtifacts.push(...result.artifacts_created);
}
totalDecisions += result.decisions;
totalEscalations += result.escalations;
if (!result.success) {
this.warn(`Primary agent ${agentName} failed for ${stage}`);
return {
phase: this.pipelineState!.current_phase,
stage,
success: false,
artifacts_created: allArtifacts,
decisions_made: totalDecisions,
escalations_raised: totalEscalations,
duration_ms: Date.now() - stageStart,
error: result.error || `Primary agent ${agentName} failed`,
};
}
} else {
try {
const reviewContext: AgentContext = {
...gitContext,
specification: `${context.specification}\n\nPrimary agent (${agentNames[0]}) completed. Review context:\n- Success: ${primaryResult!.success}\n- Output: ${primaryResult!.output}\n- Artifacts: ${Array.isArray(primaryResult!.artifacts_created) ? primaryResult!.artifacts_created.join(", ") : String(primaryResult!.artifacts_created)}`,
};
const result = await agent.execute(reviewContext);
if (Array.isArray(result.artifacts_created)) {
allArtifacts.push(...result.artifacts_created);
}
totalDecisions += result.decisions;
totalEscalations += result.escalations;
if (!result.success) {
this.warn(`Review agent ${agentName} reported issues for ${stage}: ${result.error || "unspecified"}`);
lastError = result.error;
}
} catch (err) {
this.warn(`Review agent ${agentName} failed for ${stage}: ${err instanceof Error ? err.message : String(err)}`);
}
}
}
return {
phase: this.pipelineState!.current_phase,
stage,
success: primaryResult?.success ?? false,
artifacts_created: allArtifacts,
decisions_made: totalDecisions,
escalations_raised: totalEscalations,
duration_ms: Date.now() - stageStart,
error: lastError,
};
} catch (err) {
if (err instanceof BackendUnavailableError) {
this.warn(`Backend unavailable for ${stage}, falling back to mechanical execution`);
} else {
this.warn(`Agents failed for ${stage}: ${err instanceof Error ? err.message : String(err)}`);
}
}
}
let decisionsMade = 0;
let escalationsRaised = 0;
const artifactsCreated: string[] = [];
switch (stage) {
case "specify": {
this.log("Loading specification from git context...");
let spec: Specification;
if (context.specification) {
spec = parseSpecification(context.specification);
const initCommit = CommitBuilder.buildInitCommit({
projectName: spec.objective.slice(0, 30),
phaseCount: 0,
milestone: this.currentMilestone,
specification: spec.raw_content,
requirements: spec.requirements,
constraints: spec.constraints,
outOfScope: spec.out_of_scope,
});
this.log("Init commit prepared with specification in ---ci--- block");
artifactsCreated.push(".ciagent/config.json");
if (this.config.git.auto_commit && this.gitContext!.isGitRepo()) {
try {
this.ciFiles!.writeProjectMd({
name: spec.objective.slice(0, 30),
coreValue: spec.objective,
requirements: { validated: [], active: spec.requirements, outOfScope: spec.out_of_scope },
constraints: spec.constraints.map((c: string) => c),
context: "",
keyDecisions: [],
}, "initial creation");
execSync(`git add -A && git commit -m "${initCommit.replace(/"/g, '\\"')}"`, {
cwd: context.project_path,
stdio: "pipe",
});
} catch (err) {
this.warn(`Specify commit failed: ${err instanceof Error ? err.message : String(err)}`);
}
}
} else {
const projectMd = this.ciFiles!.readProjectMd();
if (!projectMd) {
return {
phase: 0,
stage: "specify",
success: false,
artifacts_created: [],
decisions_made: 0,
escalations_raised: 0,
duration_ms: Date.now() - stageStart,
error: "No specification provided and no PROJECT.md found",
};
}
}
this.pipelineState!.specification_loaded = true;
break;
}
case "clarify": {
this.log("Running Clarify phase...");
const projectMd = this.ciFiles!.readProjectMd();
if (!projectMd) {
return {
phase: 0,
stage: "clarify",
success: false,
artifacts_created: [],
decisions_made: 0,
escalations_raised: 0,
duration_ms: Date.now() - stageStart,
error: "No PROJECT.md to clarify",
};
}
if (this.config.autonomy.level === "full") {
this.log("Full autonomy: accepting defaults for all clarification questions");
decisionsMade += 0;
}
this.pipelineState!.clarify_completed = true;
break;
}
case "research": {
this.log("Researching project domain...");
this.decisionEngine!.setPhase(this.pipelineState!.current_phase);
const archMd = this.ciFiles!.readArchitectureMd();
if (!archMd) {
this.log("No ARCHITECTURE.md found — mechanical research cannot proceed without backend");
return {
phase: this.pipelineState!.current_phase,
stage: "research",
success: false,
artifacts_created: artifactsCreated,
decisions_made: decisionsMade,
escalations_raised: escalationsRaised,
duration_ms: Date.now() - stageStart,
error: "Research stage requires intelligence backend or existing ARCHITECTURE.md",
};
}
if (this.config.git.auto_commit && this.gitContext!.isGitRepo()) {
const researchCommit = CommitBuilder.buildResearchCommit(
this.pipelineState!.current_phase,
this.currentMilestone,
"initial domain research",
["Research completed. Key findings in .ciagent/ARCHITECTURE.md and .ciagent/PROJECT.md updates."]
);
try {
execSync(`git add -A && git commit -m "${researchCommit.replace(/"/g, '\\"')}" --allow-empty`, {
cwd: context.project_path,
stdio: "pipe",
});
} catch (err) {
this.warn(`Research commit failed: ${err instanceof Error ? err.message : String(err)}`);
}
}
this.pipelineState!.research_completed = true;
artifactsCreated.push(".ciagent/ARCHITECTURE.md");
break;
}
case "plan":
this.log("Planning phase execution...");
if (this.config.git.branching_strategy === "phase" && this.gitBranch && this.gitContext!.isGitRepo()) {
this.gitBranch.createPhaseBranch(this.pipelineState!.current_phase, "initial-phase");
}
this.pipelineState!.plan_completed = true;
break;
case "execute":
this.log("Executing implementation...");
if (!context.backend) {
this.log("No backend available — mechanical execution cannot implement code changes");
return {
phase: this.pipelineState!.current_phase,
stage: "execute",
success: false,
artifacts_created: artifactsCreated,
decisions_made: decisionsMade,
escalations_raised: escalationsRaised,
duration_ms: Date.now() - stageStart,
error: "Execute stage requires intelligence backend for code implementation",
};
}
this.pipelineState!.execute_completed = true;
break;
case "test": {
this.log("Running tests...");
if (!context.backend) {
this.log("No backend available — running mechanical test fallback via npm test");
try {
const testOutput = execSync("npm test", {
cwd: context.project_path,
encoding: "utf-8",
stdio: ["pipe", "pipe", "pipe"],
timeout: 120000,
});
this.log("npm test passed");
this.pipelineState!.test_completed = true;
artifactsCreated.push("test-results");
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
this.warn(`npm test failed: ${errMsg}`);
return {
phase: this.pipelineState!.current_phase,
stage: "test",
success: false,
artifacts_created: artifactsCreated,
decisions_made: decisionsMade,
escalations_raised: escalationsRaised,
duration_ms: Date.now() - stageStart,
error: `Test stage failed: ${errMsg}`,
};
}
}
break;
}
case "verify": {
this.log("Running verification...");
const { VerificationPipeline } = await import("../verification/index.js");
const verification = new VerificationPipeline(context.project_path);
const verifyResult = await verification.run(this.pipelineState!.current_phase || 1);
if (!verifyResult.all_passed) {
return {
phase: this.pipelineState!.current_phase,
stage: "verify",
success: false,
artifacts_created: artifactsCreated,
decisions_made: decisionsMade,
escalations_raised: escalationsRaised,
duration_ms: Date.now() - stageStart,
error: `Verification failed: ${verifyResult.escalations_needed.join("; ")}`,
};
}
this.pipelineState!.verify_completed = true;
if (this.config.git.auto_commit && this.gitContext!.isGitRepo()) {
const verifyCommit = CommitBuilder.buildVerifyCommit({
phase: this.pipelineState!.current_phase,
milestone: this.currentMilestone,
subject: "automated verification passed",
requirements: { covered: [], partial: [] },
});
try {
execSync(`git add -A && git commit -m "${verifyCommit.replace(/"/g, '\\"')}" --allow-empty`, {
cwd: context.project_path,
stdio: "pipe",
});
} catch (err) {
this.warn(`Verify commit failed: ${err instanceof Error ? err.message : String(err)}`);
}
}
break;
}
case "complete": {
this.log("Pipeline complete");
if (this.config.git.auto_commit && this.gitContext!.isGitRepo()) {
const completionCommit = CommitBuilder.buildPhaseCompletionCommit({
phase: this.pipelineState!.current_phase,
milestone: this.currentMilestone,
phaseName: "initial-phase",
tasksCompleted: 0,
tasksTotal: 0,
taskNames: [],
});
try {
execSync(`git add -A && git commit -m "${completionCommit.replace(/"/g, '\\"')}" --allow-empty`, {
cwd: context.project_path,
stdio: "pipe",
});
} catch (err) {
this.warn(`Completion commit failed: ${err instanceof Error ? err.message : String(err)}`);
}
}
const versionTag = `${this.currentMilestone}-P${String(this.pipelineState!.current_phase).padStart(2, "0")}`;
try {
execSync(`git tag "${versionTag}"`, {
cwd: context.project_path,
stdio: "pipe",
});
this.log(`Created version tag: ${versionTag}`);
artifactsCreated.push(`tag:${versionTag}`);
} catch (err) {
this.warn(`Version tag creation failed: ${err instanceof Error ? err.message : String(err)}`);
}
if (this.config.git.auto_push && this.gitContext!.isGitRepo()) {
try {
execSync(`git push origin ${versionTag}`, {
cwd: context.project_path,
stdio: "pipe",
});
this.log(`Pushed version tag: ${versionTag}`);
} catch (err) {
this.warn(`Version tag push failed: ${err instanceof Error ? err.message : String(err)}`);
}
}
break;
}
}
return {
phase: this.pipelineState!.current_phase,
stage,
success: true,
artifacts_created: artifactsCreated,
decisions_made: decisionsMade,
escalations_raised: escalationsRaised,
duration_ms: Date.now() - stageStart,
};
}
private generateCompletionReport(): string {
const lines: string[] = [
"# CIAgent Completion Report",
"",
`✓ Pipeline completed successfully (git-native)`,
"",
`Duration: ${(this.phaseResults.reduce((a, r) => a + r.duration_ms, 0) / 1000).toFixed(1)}s`,
`Decisions made: ${this.phaseResults.reduce((a, r) => a + r.decisions_made, 0)}`,
`Escalations raised: ${this.phaseResults.reduce((a, r) => a + r.escalations_raised, 0)}`,
"",
];
for (const result of this.phaseResults) {
const marker = result.success ? "✓" : "✗";
lines.push(
`${marker} ${result.stage} (phase ${result.phase}): ${result.duration_ms}ms`
);
}
lines.push("");
lines.push("Audit trail available via: git log --grep='decisions:'");
return lines.join("\n");
}
}