Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 199 additions & 0 deletions packages/cli/src/commands/sources.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import { loadManifest, resolveVaultRoot, VaultNotFoundError } from "@kibhq/core";
import chalk from "chalk";
import * as log from "../ui/logger.js";

interface SourcesOpts {
status?: string;
json?: boolean;
limit?: number;
}

const STATUS_COLORS: Record<string, (s: string) => string> = {
queued: chalk.gray,
extracting: chalk.blue,
ingested: chalk.yellow,
compiling: chalk.magenta,
compiled: chalk.green,
enriched: chalk.cyan,
failed: chalk.red,
};

const STATUS_ICONS: Record<string, string> = {
queued: "○",
extracting: "◐",
ingested: "◑",
compiling: "◒",
compiled: "●",
enriched: "◉",
failed: "✗",
};

export async function sources(opts: SourcesOpts) {
let root: string;
try {
root = resolveVaultRoot();
} catch (err) {
if (err instanceof VaultNotFoundError) {
log.error(err.message);
process.exit(1);
}
throw err;
}

const limit = opts.limit ?? 50;

// Try to use pipeline DB for real-time status
let usePipelineDB = false;
let pipelineDB: import("@kibhq/core/src/pipeline/db.js").PipelineDB | null = null;

try {
const { openPipelineDB, syncManifestToPipeline } = await import("@kibhq/core");
pipelineDB = openPipelineDB(root);
// Bootstrap: sync manifest entries into pipeline DB on first use
await syncManifestToPipeline(root, pipelineDB);
usePipelineDB = true;
} catch {
// Fallback to manifest-only mode
}

if (usePipelineDB && pipelineDB) {
return showPipelineSources(pipelineDB, opts, limit);
}

// Fallback: derive status from manifest
return showManifestSources(root, opts, limit);
}

async function showPipelineSources(
pipelineDB: import("@kibhq/core/src/pipeline/db.js").PipelineDB,
opts: SourcesOpts,
limit: number,
) {
const stats = pipelineDB.stats();

if (opts.json) {
const allSources = opts.status
? pipelineDB.listByStatus(
opts.status as import("@kibhq/core/src/pipeline/db.js").SourceStatus,
limit,
)
: pipelineDB.listAll(limit);
console.log(JSON.stringify({ stats, sources: allSources }, null, 2));
pipelineDB.close();
return;
}

log.header("source pipeline");

// Status summary bar
const statusParts: string[] = [];
if (stats.queued > 0) statusParts.push(chalk.gray(`${stats.queued} queued`));
if (stats.extracting > 0) statusParts.push(chalk.blue(`${stats.extracting} extracting`));
if (stats.ingested > 0) statusParts.push(chalk.yellow(`${stats.ingested} pending compile`));
if (stats.compiling > 0) statusParts.push(chalk.magenta(`${stats.compiling} compiling`));
if (stats.compiled > 0) statusParts.push(chalk.green(`${stats.compiled} compiled`));
if (stats.enriched > 0) statusParts.push(chalk.cyan(`${stats.enriched} ready`));
if (stats.failed > 0) statusParts.push(chalk.red(`${stats.failed} failed`));

log.keyValue("TOTAL", `${stats.total} sources`);
if (statusParts.length > 0) {
console.log(` ${statusParts.join(chalk.dim(" · "))}`);
}

if (stats.total_input_tokens > 0) {
const totalTokens = stats.total_input_tokens + stats.total_output_tokens;
log.dim(
`${totalTokens.toLocaleString()} total tokens (${stats.total_input_tokens.toLocaleString()} in / ${stats.total_output_tokens.toLocaleString()} out)`,
);
}
log.blank();

// Source list
const allSources = opts.status
? pipelineDB.listByStatus(
opts.status as import("@kibhq/core/src/pipeline/db.js").SourceStatus,
limit,
)
: pipelineDB.listAll(limit);

if (allSources.length === 0) {
log.dim("No sources found. Ingest something with: kib ingest <url>");
pipelineDB.close();
return;
}

for (const src of allSources) {
const statusColor = STATUS_COLORS[src.status] ?? chalk.white;
const icon = STATUS_ICONS[src.status] ?? "?";
const title = src.title ?? src.uri;
const truncTitle = title.length > 50 ? `${title.slice(0, 47)}...` : title;
const type = src.source_type ? chalk.dim(`[${src.source_type}]`) : "";
const articles =
src.articles_produced > 0 ? chalk.dim(`→ ${src.articles_produced} articles`) : "";

console.log(` ${statusColor(icon)} ${truncTitle} ${type} ${articles}`);

// Show error for failed sources
if (src.status === "failed" && src.error) {
console.log(` ${chalk.red(chalk.dim(src.error.slice(0, 80)))}`);
}
}

log.blank();
pipelineDB.close();
}

async function showManifestSources(root: string, opts: SourcesOpts, limit: number) {
const manifest = await loadManifest(root);
const entries = Object.entries(manifest.sources);

if (opts.json) {
const sources = entries.slice(0, limit).map(([id, src]) => ({
id,
title: src.metadata.title ?? id,
sourceType: src.sourceType,
status: src.lastCompiled ? "compiled" : "ingested",
wordCount: src.metadata.wordCount,
ingestedAt: src.ingestedAt,
lastCompiled: src.lastCompiled,
articles: src.producedArticles.length,
}));
console.log(JSON.stringify({ total: entries.length, sources }, null, 2));
return;
}

log.header("sources");

const compiled = entries.filter(([, s]) => s.lastCompiled).length;
const pending = entries.length - compiled;

log.keyValue("TOTAL", `${entries.length} sources`);
if (compiled > 0) log.keyValue("COMPILED", `${compiled}`);
if (pending > 0) log.keyValue("PENDING", chalk.yellow(`${pending}`));
log.blank();

// Sort by ingestedAt descending
const sorted = entries.sort(
([, a], [, b]) => new Date(b.ingestedAt).getTime() - new Date(a.ingestedAt).getTime(),
);

for (const [, src] of sorted.slice(0, limit)) {
const status = src.lastCompiled ? "compiled" : "ingested";
const statusColor = STATUS_COLORS[status] ?? chalk.white;
const icon = STATUS_ICONS[status] ?? "?";
const title = src.metadata.title ?? "Untitled";
const truncTitle = title.length > 50 ? `${title.slice(0, 47)}...` : title;
const type = chalk.dim(`[${src.sourceType}]`);
const articles =
src.producedArticles.length > 0 ? chalk.dim(`→ ${src.producedArticles.length} articles`) : "";

console.log(` ${statusColor(icon)} ${truncTitle} ${type} ${articles}`);
}

if (entries.length > limit) {
log.blank();
log.dim(`Showing ${limit} of ${entries.length} sources. Use --limit to see more.`);
}

log.blank();
}
26 changes: 26 additions & 0 deletions packages/cli/src/commands/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,32 @@ export async function status(opts: StatusOpts) {
log.warn(`${pendingCount} sources pending — run kib compile`);
}

// Show pipeline stats if available
try {
const { openPipelineDB, syncManifestToPipeline } = await import("@kibhq/core");
const pipelineDB = openPipelineDB(root);
await syncManifestToPipeline(root, pipelineDB);
const pStats = pipelineDB.stats();
if (pStats.total > 0) {
log.blank();
log.keyValue("PIPELINE", "compile-on-ingest enabled");
const parts: string[] = [];
if (pStats.compiling > 0) parts.push(`${pStats.compiling} compiling`);
if (pStats.ingested > 0) parts.push(`${pStats.ingested} awaiting compile`);
if (pStats.failed > 0) parts.push(`${pStats.failed} failed`);
if (parts.length > 0) {
log.keyValue("ACTIVE", parts.join(", "));
}
if (pStats.total_input_tokens > 0) {
const total = pStats.total_input_tokens + pStats.total_output_tokens;
log.keyValue("TOKENS", `${total.toLocaleString()} lifetime`);
}
}
pipelineDB.close();
} catch {
// Pipeline DB not available — that's fine
}

log.blank();
}

Expand Down
88 changes: 64 additions & 24 deletions packages/cli/src/commands/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ async function startWatch(
startFolderWatchers,
startClipboardWatcher,
startScreenshotWatcher,
compileVault,
createProvider,
isLocked,
openPipelineDB,
ingestAndCompile,
batchEnrich,
} = await import("@kibhq/core");

const inboxPath = resolve(root, config.watch.inbox_path);
Expand All @@ -191,28 +192,38 @@ async function startWatch(
}
};

// ── Auto-compile scheduler ───────────────────────────────────
// ── Pipeline DB for real-time status tracking ───────────────

const pipelineDB = openPipelineDB(root);

// ── LLM provider for compile-on-ingest ──────────────────────

let compileProvider: import("@kibhq/core").LLMProvider | null = null;
try {
const compileModel = config.compile.model ?? config.provider.model;
compileProvider = await createProvider(config.provider.default, compileModel);
} catch {
emit("warn", "No LLM provider available — compile-on-ingest disabled, ingest-only mode.");
}

// ── Enrichment scheduler (batched, runs after N compilations or idle) ─

const scheduler = new CompileScheduler({
threshold: config.watch.auto_compile_threshold,
delayMs: config.watch.auto_compile_delay_ms,
onCompile: async () => {
const lockStatus = await isLocked(root);
if (lockStatus.locked) {
emit("warn", "Skipping auto-compile: vault is locked.");
return;
}
emit("info", "Auto-compiling...");
// In the new paradigm, this runs batch enrichment (not full compilation)
if (!compileProvider) return;
emit("info", "Running batch enrichment...");
try {
const compileModel = config.compile.model ?? config.provider.model;
const provider = await createProvider(config.provider.default, compileModel);
const result = await compileVault(root, provider, config);
emit(
"info",
`Compiled ${result.sourcesCompiled} sources → ${result.articlesCreated} created, ${result.articlesUpdated} updated.`,
);
const enriched = await batchEnrich(root, compileProvider, pipelineDB, {
onProgress: (msg) => emit("info", msg),
});
if (enriched > 0) {
emit("info", `Enriched ${enriched} articles with cross-references.`);
}
} catch (err) {
emit("error", `Auto-compile failed: ${(err as Error).message}`);
emit("error", `Batch enrichment failed: ${(err as Error).message}`);
}
},
onLog: (msg) => emit("info", msg),
Expand All @@ -229,15 +240,43 @@ async function startWatch(
const items = await listPending(root, 20);
for (const item of items) {
try {
const result = await ingestSource(root, item.uri, item.options);
await dequeue(root, item.id);
if (result.skipped) {
emit("info", `Skipped: ${result.skipReason}`);
} else {
emit("info", `Ingested: ${result.title} → ${result.path}`);
if (config.watch.auto_compile) {
// Compile-on-ingest: each source goes through the full pipeline inline
if (compileProvider && config.watch.auto_compile) {
const result = await ingestAndCompile(root, item.uri, compileProvider, pipelineDB, {
config,
callbacks: {
onStatus: (id, status, detail) => {
emit("info", `[${status}] ${detail ?? id}`);
},
onProgress: (msg) => emit("info", msg),
},
...(item.options ?? {}),
});
await dequeue(root, item.id);

if (result.error) {
emit("warn", `Pipeline failed for ${item.uri}: ${result.error}`);
} else {
const elapsed = Math.round(result.elapsed);
const articles = result.compile
? `→ ${result.compile.articlesCreated + result.compile.articlesUpdated} articles`
: "";
emit(
"info",
`Pipeline complete: ${result.ingest?.title ?? item.uri} ${articles} (${elapsed}ms)`,
);
// Schedule batch enrichment
scheduler.recordIngest();
}
} else {
// Fallback: ingest-only (no LLM provider available)
const result = await ingestSource(root, item.uri, item.options);
await dequeue(root, item.id);
if (result.skipped) {
emit("info", `Skipped: ${result.skipReason}`);
} else {
emit("info", `Ingested (no compile): ${result.title} → ${result.path}`);
}
}
} catch (err) {
const retry = await markFailed(root, item.id, (err as Error).message);
Expand Down Expand Up @@ -400,6 +439,7 @@ async function startWatch(
screenshotCleanup?.stop();
scheduler.stop();
clearInterval(queuePollInterval);
pipelineDB.close();
emit("info", "Daemon stopped.");
};
}
Expand Down
14 changes: 14 additions & 0 deletions packages/cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ program
await ingest(sources, opts);
});

program
.command("sources")
.description("List sources with real-time pipeline status")
.option(
"--status <status>",
"filter by status (queued, ingested, compiling, compiled, enriched, failed)",
)
.option("--limit <n>", "max sources to show", Number.parseInt)
.option("--json", "JSON output")
.action(async (opts) => {
const { sources } = await import("./commands/sources.js");
await sources(opts);
});

program
.command("compile")
.description("Compile raw sources into wiki articles")
Expand Down
Loading
Loading