a8b50f5109
---ci---
project: ci
phase: 6
milestone: v0.9
status: complete
artifacts:
tags: [v0.9.0]
decisions:
- id: D-047
decision: v0.9 theme = Distribution & Expansion
rationale: npm publish + OpenAI/Anthropic backends + agent flesh + parallel execution
confidence: 0.92
- id: D-049
decision: Feature milestone — patch tags v0.8.1-v0.8.6 then v0.9.0
rationale: OpenAI backend, agent flesh, npm publish all feat
confidence: 0.95
- id: D-059
decision: Rename OllamaBaseBackend to LLMBaseBackend + thin OllamaBaseBackend subclass
rationale: 15 of 17 methods backend-agnostic
confidence: 0.92
- id: D-060
decision: OpenAI/Anthropic backends use native fetch() not SDK packages
rationale: No dependency bloat; fetch native in Node 18+
confidence: 0.85
- id: D-066
decision: Concurrency limiter internal (no p-limit dependency)
rationale: 15 lines; avoids dependency for trivial feature
confidence: 0.90
- id: D-067
decision: Promise.allSettled for review agents at orchestrator lines 373-400
rationale: Current sequential loop replaced with parallel execution
confidence: 0.88
requirements:
covered: [PUBLISH-01, PUBLISH-02, PUBLISH-03, PUBLISH-04, OPENAI-01, OPENAI-02, OPENAI-03, OPENAI-04, OPENAI-05, FLESH-01, FLESH-02, FLESH-03, FLESH-04, FLESH-05, ANTHROPIC-01, ANTHROPIC-02, FLESH-06, FLESH-07, NPM-01, NPM-02, PARALLEL-01, PARALLEL-02, PARALLEL-03, INTEG-01, INTEG-02, INTEG-03, INTEG-04, INTEG-05]
---/ci---
6 phases, 28 tasks, 4077 net lines added, 57 test suites, 527 tests, zero stub agents
217 lines
6.8 KiB
TypeScript
217 lines
6.8 KiB
TypeScript
async function limitConcurrency<T>(
|
|
factories: Array<() => Promise<T>>,
|
|
maxConcurrency: number
|
|
): Promise<PromiseSettledResult<T>[]> {
|
|
if (factories.length === 0) {
|
|
return [];
|
|
}
|
|
|
|
if (maxConcurrency <= 0 || maxConcurrency >= factories.length) {
|
|
return Promise.allSettled(factories.map((f) => f()));
|
|
}
|
|
|
|
const results: Array<PromiseSettledResult<T> | undefined> = new Array(factories.length).fill(undefined);
|
|
let nextIndex = 0;
|
|
|
|
const worker = async () => {
|
|
while (nextIndex < factories.length) {
|
|
const index = nextIndex++;
|
|
try {
|
|
const value = await factories[index]();
|
|
results[index] = { status: "fulfilled", value };
|
|
} catch (reason) {
|
|
results[index] = { status: "rejected", reason };
|
|
}
|
|
}
|
|
};
|
|
|
|
const workers = Array(Math.min(maxConcurrency, factories.length)).fill(null).map(() => worker());
|
|
await Promise.all(workers);
|
|
return results as PromiseSettledResult<T>[];
|
|
}
|
|
|
|
function delay(ms: number): Promise<void> {
|
|
return new Promise((resolve) => setTimeout(resolve, ms));
|
|
}
|
|
|
|
describe("Parallel Execution", () => {
|
|
describe("limitConcurrency", () => {
|
|
it("returns empty array for zero factories", async () => {
|
|
const results = await limitConcurrency([], 5);
|
|
expect(results).toEqual([]);
|
|
});
|
|
|
|
it("returns single-element result for one factory", async () => {
|
|
const results = await limitConcurrency([() => Promise.resolve(42)], 5);
|
|
expect(results).toHaveLength(1);
|
|
expect(results[0].status).toBe("fulfilled");
|
|
if (results[0].status === "fulfilled") {
|
|
expect(results[0].value).toBe(42);
|
|
}
|
|
});
|
|
|
|
it("behaves sequentially when maxConcurrency=1", async () => {
|
|
const order: number[] = [];
|
|
|
|
const factories = [1, 2, 3].map((n) => () =>
|
|
delay(30).then(() => { order.push(n); return n; })
|
|
);
|
|
|
|
const start = Date.now();
|
|
const results = await limitConcurrency(factories, 1);
|
|
const elapsed = Date.now() - start;
|
|
|
|
expect(results).toHaveLength(3);
|
|
for (const r of results) {
|
|
expect(r.status).toBe("fulfilled");
|
|
}
|
|
expect(order).toEqual([1, 2, 3]);
|
|
expect(elapsed).toBeGreaterThanOrEqual(80);
|
|
});
|
|
|
|
it("runs concurrently when maxConcurrency exceeds factory count", async () => {
|
|
const factories = ["a", "b"].map((v) => () =>
|
|
delay(50).then(() => v)
|
|
);
|
|
|
|
const start = Date.now();
|
|
const results = await limitConcurrency(factories, 10);
|
|
const elapsed = Date.now() - start;
|
|
|
|
expect(results).toHaveLength(2);
|
|
expect(elapsed).toBeLessThan(120);
|
|
});
|
|
|
|
it("limits to maxConcurrency=2 with 4 factories", async () => {
|
|
const timestamps: number[] = [];
|
|
|
|
const factories = [0, 1, 2, 3].map((i) => () =>
|
|
delay(80).then(() => { timestamps.push(i); return i; })
|
|
);
|
|
|
|
const start = Date.now();
|
|
const results = await limitConcurrency(factories, 2);
|
|
const elapsed = Date.now() - start;
|
|
|
|
expect(results).toHaveLength(4);
|
|
for (const r of results) {
|
|
expect(r.status).toBe("fulfilled");
|
|
if (r.status === "fulfilled") {
|
|
expect([0, 1, 2, 3]).toContain(r.value);
|
|
}
|
|
}
|
|
|
|
expect(elapsed).toBeGreaterThanOrEqual(150);
|
|
expect(elapsed).toBeLessThan(350);
|
|
});
|
|
|
|
it("isolates rejected promises from fulfilled ones", async () => {
|
|
const factories = [
|
|
() => Promise.resolve("success"),
|
|
() => Promise.reject(new Error("boom")),
|
|
() => Promise.resolve("also-success"),
|
|
];
|
|
|
|
const results = await limitConcurrency(factories, 5);
|
|
|
|
expect(results).toHaveLength(3);
|
|
const fulfilled = results.filter((r) => r.status === "fulfilled");
|
|
const rejected = results.filter((r) => r.status === "rejected");
|
|
expect(fulfilled).toHaveLength(2);
|
|
expect(rejected).toHaveLength(1);
|
|
|
|
if (fulfilled[0].status === "fulfilled") {
|
|
expect(fulfilled[0].value).toBe("success");
|
|
}
|
|
if (fulfilled[1].status === "fulfilled") {
|
|
expect(fulfilled[1].value).toBe("also-success");
|
|
}
|
|
if (rejected[0].status === "rejected") {
|
|
expect((rejected[0].reason as Error).message).toBe("boom");
|
|
}
|
|
});
|
|
|
|
it("handles maxConcurrency=0 as no limit", async () => {
|
|
const factories = [1, 2, 3].map((v) => () => Promise.resolve(v));
|
|
const results = await limitConcurrency(factories, 0);
|
|
expect(results).toHaveLength(3);
|
|
for (const r of results) {
|
|
expect(r.status).toBe("fulfilled");
|
|
}
|
|
});
|
|
});
|
|
|
|
describe("parallel vs sequential timing", () => {
|
|
it("parallel execution is faster than sequential", async () => {
|
|
const DELAY_MS = 50;
|
|
const parallelFactories = [DELAY_MS, DELAY_MS].map((ms) => () => delay(ms));
|
|
|
|
const parallelStart = Date.now();
|
|
const parallelResults = await limitConcurrency(parallelFactories, 5);
|
|
const parallelElapsed = Date.now() - parallelStart;
|
|
|
|
const sequentialStart = Date.now();
|
|
for (const ms of [DELAY_MS, DELAY_MS]) {
|
|
await delay(ms);
|
|
}
|
|
const sequentialElapsed = Date.now() - sequentialStart;
|
|
|
|
expect(parallelResults).toHaveLength(2);
|
|
expect(parallelElapsed).toBeLessThan(sequentialElapsed);
|
|
expect(parallelElapsed).toBeLessThan(DELAY_MS * 1.8);
|
|
});
|
|
});
|
|
|
|
describe("concurrency limit verification", () => {
|
|
it("at most maxConcurrency agents run simultaneously", async () => {
|
|
let concurrentCount = 0;
|
|
let maxConcurrent = 0;
|
|
const MAX = 2;
|
|
|
|
const factories = [0, 1, 2, 3].map(
|
|
(i) => () =>
|
|
new Promise<number>((resolve) => {
|
|
concurrentCount++;
|
|
if (concurrentCount > maxConcurrent) maxConcurrent = concurrentCount;
|
|
delay(60).then(() => {
|
|
concurrentCount--;
|
|
resolve(i);
|
|
});
|
|
})
|
|
);
|
|
|
|
const results = await limitConcurrency(factories, MAX);
|
|
|
|
expect(results).toHaveLength(4);
|
|
expect(maxConcurrent).toBeLessThanOrEqual(MAX);
|
|
});
|
|
});
|
|
|
|
describe("sequential fallback behavior", () => {
|
|
it("runs agents in order when parallelization disabled", async () => {
|
|
const executionOrder: string[] = [];
|
|
|
|
await (async () => {
|
|
for (const name of ["code-reviewer", "security-auditor"]) {
|
|
executionOrder.push(`start:${name}`);
|
|
await delay(10);
|
|
executionOrder.push(`end:${name}`);
|
|
}
|
|
})();
|
|
|
|
expect(executionOrder).toEqual([
|
|
"start:code-reviewer",
|
|
"end:code-reviewer",
|
|
"start:security-auditor",
|
|
"end:security-auditor",
|
|
]);
|
|
});
|
|
});
|
|
|
|
describe("single agent edge case", () => {
|
|
it("no review agents means no parallel code path triggered", async () => {
|
|
const results = await limitConcurrency([], 5);
|
|
expect(results).toHaveLength(0);
|
|
});
|
|
});
|
|
}); |