📄 backfill.ts • 3792 bytes
/**
* CmdCode 向量记忆系统 - 增量回填定时任务
*/
import { t } from '../i18n'
import { getDb } from './database'
import { getPendingEmbeddings, recordEmbeddingFailure } from './sessionStore'
import { storeMessageEmbedding } from './vectorSearch'
import { getEmbedding } from './embedding'
let backfillInterval: ReturnType<typeof setInterval> | null = null
let isRunning = false
let hasPendingBackfill = false
let lastBackfillTime = 0
const MIN_BACKFILL_INTERVAL = 1000
/** 开始定时回填(默认每 5 分钟一次) */
export function startBackfill(intervalMs: number = 5 * 60 * 1000): void {
if (backfillInterval) {
console.log(t('memory.backfill_running'))
return
}
console.log(' ⏰ ' + t('memory.backfill_start', { interval: String(intervalMs / 1000 / 60) }))
runBackfill().catch(console.error)
backfillInterval = setInterval(() => {
runBackfill().catch(console.error)
}, intervalMs)
}
/** 停止回填任务 */
export function stopBackfill(): void {
if (backfillInterval) {
clearInterval(backfillInterval)
backfillInterval = null
console.log(' ⏹️ ' + t('memory.backfill_stop'))
}
}
/** 执行一次回填 - P4 #36: 添加频率限制 */
export async function runBackfill(): Promise<{ processed: number; failed: number }> {
// P4 #36: 检查最小间隔
const now = Date.now()
if (now - lastBackfillTime < MIN_BACKFILL_INTERVAL && !isRunning) {
// 间隔太短,跳过本次并安排延迟执行
setTimeout(() => runBackfill().catch(console.error), MIN_BACKFILL_INTERVAL)
return { processed: 0, failed: 0 }
}
if (isRunning) {
// 标记有待处理任务,当前任务完成后会自动再执行一次
hasPendingBackfill = true
return { processed: 0, failed: 0 }
}
isRunning = true
lastBackfillTime = now
hasPendingBackfill = false
let processed = 0
let failed = 0
try {
// 获取待处理消息(每次最多 50 条)
const pending = getPendingEmbeddings(50)
if (pending.length === 0) {
return { processed: 0, failed: 0 }
}
console.log(' 📝 ' + t('memory.backfill_process', { count: pending.length }))
for (const msg of pending) {
try {
// 限流:每条间隔 100ms
await new Promise(resolve => setTimeout(resolve, 100))
const success = await storeMessageEmbedding(msg.id, msg.content)
if (success) {
processed++
} else {
failed++
recordEmbeddingFailure(msg.id, '存储失败')
// 仅记录消息 ID,不记录内容(隐私保护)
console.log(` ⚠️ 消息 ${msg.id} 存储失败`)
}
} catch (e: any) {
failed++
// 仅记录消息 ID 和错误类型,不记录内容
console.log(` ❌ 消息 ${msg.id} 处理异常: ${e.message?.slice(0, 50) || 'unknown'}`)
recordEmbeddingFailure(msg.id, e.message?.slice(0, 100) || 'unknown')
}
}
console.log(' ✅ ' + t('memory.backfill_done', { success: processed, failed: failed }))
} finally {
isRunning = false
// 检查是否有等待的任务(并发期间新加入的)
if (hasPendingBackfill) {
hasPendingBackfill = false
// 异步执行后续任务,避免递归调用栈溢出
setTimeout(() => runBackfill().catch(console.error), 100)
}
}
return { processed, failed }
}
/** 获取回填状态 */
export function getBackfillStatus(): { running: boolean; interval: number | null } {
return {
running: isRunning,
interval: backfillInterval ? 5 * 60 * 1000 : null
}
}
/** 手动触发回填(立即执行,不等待定时) */
export async function triggerBackfill(): Promise<{ processed: number; failed: number }> {
return runBackfill()
}