feat(P02): orchestrator enrichment — GitAgentContext, multi-phase, error recovery, timer cleanup, TEST stage
---ci---
phase: 2
milestone: v0.6
status: execute
decisions:
- id: D-001
decision: Pass GitAgentContext to agents instead of bare AgentContext
rationale: Agents need git-native context (gitContext, gitBranch, ciFiles, milestone) to operate autonomously
confidence: 0.95
- id: D-002
decision: Implement multi-phase iteration with totalPhases derived from ROADMAP.md
rationale: Milestones can span multiple phases; orchestrator must advance through all of them
confidence: 0.90
- id: D-003
decision: Add executeStageWithRecovery with retry + plan revision + escalation
rationale: Robust error recovery requires multiple fallback levels before giving up
confidence: 0.85
- id: D-004
decision: Add timer-to-escalation mapping in EscalationProtocol for proper cleanup
rationale: resolveEscalation must clearTimeout for the corresponding timer to prevent resource leaks
confidence: 0.90
- id: D-005
decision: Add dispose() to EscalationProtocol called in orchestrator finally block
rationale: Ensures all timers are cleaned up on orchestrator exit regardless of outcome
confidence: 0.95
- id: D-006
decision: Add mechanical TEST stage fallback running npm test via execSync
rationale: When no backend is available, tests can still be run mechanically
confidence: 0.85
---/ci---
This commit is contained in:
+238
-38
@@ -19,6 +19,7 @@ import { Specification, parseSpecification } from "../types/specification.js";
|
||||
import { loadConfig, saveConfig, isCIAgentInitialized, initCIAgent } from "../core/config.js";
|
||||
import { getAgent } from "./index.js";
|
||||
import { IntelligenceBackend, BackendUnavailableError } from "../backends/types.js";
|
||||
import { execSync } from "node:child_process";
|
||||
|
||||
export interface GitAgentContext extends AgentContext {
|
||||
gitContext: GitContext;
|
||||
@@ -41,6 +42,7 @@ export class OrchestratorAgent extends BaseAgent {
|
||||
private ciFiles: CIAgentFiles | null = null;
|
||||
private currentMilestone: string;
|
||||
private phaseResults: PhaseResult[] = [];
|
||||
private totalPhases: number = 1;
|
||||
|
||||
private static readonly STAGE_AGENT_MAP: Partial<Record<PipelineStage, AgentName>> = {
|
||||
research: "researcher",
|
||||
@@ -79,47 +81,66 @@ export class OrchestratorAgent extends BaseAgent {
|
||||
this.pipelineState.current_stage = projectState.currentStage;
|
||||
}
|
||||
|
||||
this.totalPhases = this.deriveTotalPhases();
|
||||
this.log(`Total phases in milestone: ${this.totalPhases}`);
|
||||
|
||||
this.decisionEngine = new DecisionEngine(this.config, context.project_path, this.currentMilestone);
|
||||
this.escalationProtocol = new EscalationProtocol(this.config, context.project_path, this.currentMilestone);
|
||||
|
||||
for (const stage of STAGE_ORDER) {
|
||||
this.log(`Entering stage: ${stage}`);
|
||||
this.pipelineState.current_stage = stage;
|
||||
this.pipelineState.last_updated = new Date().toISOString();
|
||||
while (this.pipelineState.current_phase <= this.totalPhases) {
|
||||
this.log(`Processing phase ${this.pipelineState.current_phase} of ${this.totalPhases}`);
|
||||
|
||||
const result = await this.executeStage(stage, context);
|
||||
for (const stage of STAGE_ORDER) {
|
||||
this.log(`Entering stage: ${stage}`);
|
||||
this.pipelineState.current_stage = stage;
|
||||
this.pipelineState.last_updated = new Date().toISOString();
|
||||
|
||||
if (!result.success && stage !== "complete") {
|
||||
this.pipelineState.errors.push({
|
||||
stage,
|
||||
phase: this.pipelineState.current_phase,
|
||||
message: result.error || "Stage failed",
|
||||
timestamp: new Date().toISOString(),
|
||||
retry_count: 0,
|
||||
resolved: false,
|
||||
});
|
||||
const result = await this.executeStageWithRecovery(stage, context);
|
||||
|
||||
if (stage === "specify" || stage === "clarify") {
|
||||
return {
|
||||
success: false,
|
||||
output: `Pipeline failed at ${stage}: ${result.error}`,
|
||||
artifacts_created: this.phaseResults.reduce(
|
||||
(acc, r) => acc + r.artifacts_created.length,
|
||||
0
|
||||
),
|
||||
decisions: this.phaseResults.reduce(
|
||||
(acc, r) => acc + r.decisions_made,
|
||||
0
|
||||
),
|
||||
escalations: this.phaseResults.reduce(
|
||||
(acc, r) => acc + r.escalations_raised,
|
||||
0
|
||||
),
|
||||
duration_ms: Date.now() - startTime,
|
||||
error: result.error,
|
||||
};
|
||||
this.phaseResults.push(result);
|
||||
this.recordPhaseResult(result);
|
||||
|
||||
if (!result.success && stage !== "complete") {
|
||||
this.pipelineState.errors.push({
|
||||
stage,
|
||||
phase: this.pipelineState.current_phase,
|
||||
message: result.error || "Stage failed",
|
||||
timestamp: new Date().toISOString(),
|
||||
retry_count: 0,
|
||||
resolved: false,
|
||||
});
|
||||
|
||||
if (stage === "specify" || stage === "clarify") {
|
||||
return {
|
||||
success: false,
|
||||
output: `Pipeline failed at ${stage}: ${result.error}`,
|
||||
artifacts_created: this.phaseResults.reduce(
|
||||
(acc, r) => acc + r.artifacts_created.length,
|
||||
0
|
||||
),
|
||||
decisions: this.phaseResults.reduce(
|
||||
(acc, r) => acc + r.decisions_made,
|
||||
0
|
||||
),
|
||||
escalations: this.phaseResults.reduce(
|
||||
(acc, r) => acc + r.escalations_raised,
|
||||
0
|
||||
),
|
||||
duration_ms: Date.now() - startTime,
|
||||
error: result.error,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (this.pipelineState.current_phase < this.totalPhases) {
|
||||
this.performPhaseBoundaryCheckpoint(context);
|
||||
this.pipelineState.current_phase++;
|
||||
this.pipelineState.current_stage = "specify";
|
||||
this.log(`Advancing to phase ${this.pipelineState.current_phase}`);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const totalDuration = Date.now() - startTime;
|
||||
@@ -152,9 +173,159 @@ export class OrchestratorAgent extends BaseAgent {
|
||||
duration_ms: Date.now() - startTime,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
};
|
||||
} finally {
|
||||
this.escalationProtocol?.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
private buildGitAgentContext(context: AgentContext): GitAgentContext {
|
||||
return {
|
||||
...context,
|
||||
gitContext: this.gitContext!,
|
||||
gitBranch: this.gitBranch!,
|
||||
ciFiles: this.ciFiles!,
|
||||
milestone: this.currentMilestone,
|
||||
};
|
||||
}
|
||||
|
||||
private recordPhaseResult(result: PhaseResult): void {
|
||||
for (const artifact of result.artifacts_created) {
|
||||
this.log(`Artifact created: ${artifact}`);
|
||||
}
|
||||
|
||||
if (result.decisions_made > 0 && this.decisionEngine) {
|
||||
this.decisionEngine.makeHighConfidenceDecision(
|
||||
`Agent reported ${result.decisions_made} decision(s) during ${result.stage}`,
|
||||
`Decisions recorded from ${result.stage} stage execution`,
|
||||
"general",
|
||||
[]
|
||||
);
|
||||
}
|
||||
|
||||
if (result.escalations_raised > 0 && this.escalationProtocol) {
|
||||
this.escalationProtocol.escalate({
|
||||
type: "low_confidence_decision",
|
||||
phase: String(this.pipelineState!.current_phase),
|
||||
description: `Agent reported ${result.escalations_raised} escalation(s) during ${result.stage}`,
|
||||
context: `Stage ${result.stage} raised escalations during execution`,
|
||||
options: [
|
||||
{ id: "proceed", label: "Proceed", description: "Continue pipeline execution", recommended: true },
|
||||
{ id: "halt", label: "Halt", description: "Stop pipeline and await manual review", recommended: false },
|
||||
],
|
||||
default_option_id: "proceed",
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private deriveTotalPhases(): number {
|
||||
if (!this.ciFiles) return 1;
|
||||
const roadmap = this.ciFiles.readRoadmapMd();
|
||||
if (!roadmap || roadmap.phases.length === 0) return 1;
|
||||
return roadmap.phases.length;
|
||||
}
|
||||
|
||||
private performPhaseBoundaryCheckpoint(context: AgentContext): void {
|
||||
this.log(`Phase boundary checkpoint for phase ${this.pipelineState!.current_phase}`);
|
||||
|
||||
if (this.config.git.auto_commit && this.gitContext!.isGitRepo()) {
|
||||
try {
|
||||
const message = `chore(P${String(this.pipelineState!.current_phase).padStart(2, "0")}): phase boundary checkpoint\n\n---ci---\nphase: ${this.pipelineState!.current_phase}\nmilestone: ${this.currentMilestone}\nstatus: complete\n---/ci---`;
|
||||
execSync(`git add -A && git commit -m "${message.replace(/"/g, '\\"')}" --allow-empty`, {
|
||||
cwd: context.project_path,
|
||||
stdio: "pipe",
|
||||
});
|
||||
} catch (err) {
|
||||
this.warn(`Phase boundary commit failed: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.ciFiles) {
|
||||
this.ciFiles.updatePhaseStatus(this.pipelineState!.current_phase, "complete");
|
||||
|
||||
const reqs = this.ciFiles.readRequirementsMd();
|
||||
if (reqs) {
|
||||
for (const t of reqs.traceability) {
|
||||
if (t.phase === this.pipelineState!.current_phase && t.status === "in_progress") {
|
||||
this.ciFiles.updateRequirementStatus(t.requirement, "complete");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (this.gitContext) {
|
||||
const verifiedState = this.gitContext.reconstructState();
|
||||
this.log(`Verified state: phase=${verifiedState.currentPhase}, milestone=${verifiedState.currentMilestone}, stage=${verifiedState.currentStage}`);
|
||||
}
|
||||
}
|
||||
|
||||
private async executeStageWithRecovery(
|
||||
stage: PipelineStage,
|
||||
context: AgentContext
|
||||
): Promise<PhaseResult> {
|
||||
try {
|
||||
const result = await this.executeStage(stage, context);
|
||||
if (result.success) return result;
|
||||
} catch (err) {
|
||||
this.warn(`First attempt failed for ${stage}: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
|
||||
this.log(`Retrying stage ${stage}...`);
|
||||
try {
|
||||
const result = await this.executeStage(stage, context);
|
||||
if (result.success) return result;
|
||||
} catch (err) {
|
||||
this.warn(`Retry failed for ${stage}: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
|
||||
if (context.backend) {
|
||||
this.log(`Attempting plan revision for failed stage ${stage}...`);
|
||||
try {
|
||||
const planner = getAgent("planner");
|
||||
const gitContext = this.buildGitAgentContext(context);
|
||||
const planResult = await planner.execute({
|
||||
...gitContext,
|
||||
specification: `Plan revision needed: stage ${stage} failed twice. Original error context: phase ${this.pipelineState!.current_phase}`,
|
||||
});
|
||||
if (planResult.success) {
|
||||
this.log(`Plan revision succeeded, retrying ${stage} with revised plan...`);
|
||||
try {
|
||||
const result = await this.executeStage(stage, context);
|
||||
if (result.success) return result;
|
||||
} catch (err) {
|
||||
this.warn(`Post-revision retry failed for ${stage}: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
this.warn(`Plan revision failed: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.escalationProtocol) {
|
||||
this.escalationProtocol.escalate({
|
||||
type: "verification_failure",
|
||||
phase: String(this.pipelineState!.current_phase),
|
||||
description: `Stage ${stage} failed after retry and plan revision attempts`,
|
||||
context: `All recovery attempts exhausted for stage ${stage} in phase ${this.pipelineState!.current_phase}`,
|
||||
options: [
|
||||
{ id: "skip", label: "Skip stage", description: "Continue pipeline skipping this stage", recommended: true },
|
||||
{ id: "abort", label: "Abort pipeline", description: "Stop the entire pipeline", recommended: false },
|
||||
],
|
||||
default_option_id: "skip",
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
phase: this.pipelineState!.current_phase,
|
||||
stage,
|
||||
success: false,
|
||||
artifacts_created: [],
|
||||
decisions_made: 0,
|
||||
escalations_raised: 1,
|
||||
duration_ms: 0,
|
||||
error: `Stage ${stage} failed after recovery attempts`,
|
||||
};
|
||||
}
|
||||
|
||||
private async executeStage(
|
||||
stage: PipelineStage,
|
||||
context: AgentContext
|
||||
@@ -166,7 +337,8 @@ export class OrchestratorAgent extends BaseAgent {
|
||||
this.log(`Delegating ${stage} to ${agentName} agent via backend...`);
|
||||
try {
|
||||
const agent = getAgent(agentName);
|
||||
const result = await agent.execute(context);
|
||||
const gitContext = this.buildGitAgentContext(context);
|
||||
const result = await agent.execute(gitContext);
|
||||
return {
|
||||
phase: this.pipelineState!.current_phase,
|
||||
stage,
|
||||
@@ -212,7 +384,6 @@ export class OrchestratorAgent extends BaseAgent {
|
||||
|
||||
if (this.config.git.auto_commit && this.gitContext!.isGitRepo()) {
|
||||
try {
|
||||
const { execSync } = await import("node:child_process");
|
||||
this.ciFiles!.writeProjectMd({
|
||||
name: spec.objective.slice(0, 30),
|
||||
coreValue: spec.objective,
|
||||
@@ -300,7 +471,6 @@ export class OrchestratorAgent extends BaseAgent {
|
||||
["Research completed. Key findings in .ciagent/ARCHITECTURE.md and .ciagent/PROJECT.md updates."]
|
||||
);
|
||||
try {
|
||||
const { execSync } = await import("node:child_process");
|
||||
execSync(`git add -A && git commit -m "${researchCommit.replace(/"/g, '\\"')}" --allow-empty`, {
|
||||
cwd: context.project_path,
|
||||
stdio: "pipe",
|
||||
@@ -343,6 +513,38 @@ export class OrchestratorAgent extends BaseAgent {
|
||||
this.pipelineState!.execute_completed = true;
|
||||
break;
|
||||
|
||||
case "test": {
|
||||
this.log("Running tests...");
|
||||
if (!context.backend) {
|
||||
this.log("No backend available — running mechanical test fallback via npm test");
|
||||
try {
|
||||
const testOutput = execSync("npm test", {
|
||||
cwd: context.project_path,
|
||||
encoding: "utf-8",
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
timeout: 120000,
|
||||
});
|
||||
this.log("npm test passed");
|
||||
this.pipelineState!.test_completed = true;
|
||||
artifactsCreated.push("test-results");
|
||||
} catch (err) {
|
||||
const errMsg = err instanceof Error ? err.message : String(err);
|
||||
this.warn(`npm test failed: ${errMsg}`);
|
||||
return {
|
||||
phase: this.pipelineState!.current_phase,
|
||||
stage: "test",
|
||||
success: false,
|
||||
artifacts_created: artifactsCreated,
|
||||
decisions_made: decisionsMade,
|
||||
escalations_raised: escalationsRaised,
|
||||
duration_ms: Date.now() - stageStart,
|
||||
error: `Test stage failed: ${errMsg}`,
|
||||
};
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case "verify": {
|
||||
this.log("Running verification...");
|
||||
|
||||
@@ -373,7 +575,6 @@ export class OrchestratorAgent extends BaseAgent {
|
||||
requirements: { covered: [], partial: [] },
|
||||
});
|
||||
try {
|
||||
const { execSync } = await import("node:child_process");
|
||||
execSync(`git add -A && git commit -m "${verifyCommit.replace(/"/g, '\\"')}" --allow-empty`, {
|
||||
cwd: context.project_path,
|
||||
stdio: "pipe",
|
||||
@@ -399,7 +600,6 @@ export class OrchestratorAgent extends BaseAgent {
|
||||
taskNames: [],
|
||||
});
|
||||
try {
|
||||
const { execSync } = await import("node:child_process");
|
||||
execSync(`git add -A && git commit -m "${completionCommit.replace(/"/g, '\\"')}" --allow-empty`, {
|
||||
cwd: context.project_path,
|
||||
stdio: "pipe",
|
||||
|
||||
@@ -29,6 +29,7 @@ export class EscalationProtocol {
|
||||
private pendingEscalations: Map<string, Escalation>;
|
||||
private timeoutCallback: (escalation: Escalation, chosenOption: string) => void;
|
||||
private timers: NodeJS.Timeout[];
|
||||
private timerEscalationMap: Map<NodeJS.Timeout, string>;
|
||||
|
||||
constructor(
|
||||
config: CIAgentConfig,
|
||||
@@ -43,6 +44,7 @@ export class EscalationProtocol {
|
||||
this.pendingEscalations = new Map();
|
||||
this.timeoutCallback = timeoutCallback;
|
||||
this.timers = [];
|
||||
this.timerEscalationMap = new Map();
|
||||
}
|
||||
|
||||
setMilestone(milestone: string): void {
|
||||
@@ -102,6 +104,16 @@ export class EscalationProtocol {
|
||||
const escalation = this.pendingEscalations.get(escalationId);
|
||||
if (!escalation) return null;
|
||||
|
||||
for (let i = this.timers.length - 1; i >= 0; i--) {
|
||||
const timer = this.timers[i];
|
||||
const mappedId = this.timerEscalationMap.get(timer);
|
||||
if (mappedId === escalationId) {
|
||||
clearTimeout(timer);
|
||||
this.timerEscalationMap.delete(timer);
|
||||
this.timers.splice(i, 1);
|
||||
}
|
||||
}
|
||||
|
||||
escalation.resolution = resolution;
|
||||
escalation.resolved_at = new Date().toISOString();
|
||||
escalation.resolution_detail = `Chose option: ${chosenOptionId}`;
|
||||
@@ -139,11 +151,16 @@ export class EscalationProtocol {
|
||||
clearAllTimers(): void {
|
||||
for (const timer of this.timers) {
|
||||
clearTimeout(timer);
|
||||
this.timerEscalationMap.delete(timer);
|
||||
}
|
||||
this.timers = [];
|
||||
this.pendingEscalations.clear();
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
this.clearAllTimers();
|
||||
}
|
||||
|
||||
formatEscalation(escalation: Escalation): string {
|
||||
const lines: string[] = [
|
||||
`⚠️ ESCALATION [${escalation.id}]`,
|
||||
@@ -200,9 +217,13 @@ export class EscalationProtocol {
|
||||
escalation.resolved_at = new Date().toISOString();
|
||||
escalation.resolution_detail = `Auto-proceeded with default: ${escalation.default_option_id}`;
|
||||
this.pendingEscalations.delete(escalation.id);
|
||||
this.timerEscalationMap.delete(timer);
|
||||
const idx = this.timers.indexOf(timer);
|
||||
if (idx >= 0) this.timers.splice(idx, 1);
|
||||
this.timeoutCallback(escalation, escalation.default_option_id);
|
||||
}
|
||||
}, timeout);
|
||||
this.timers.push(timer);
|
||||
this.timerEscalationMap.set(timer, escalation.id);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user