Matou-Garou / convex /crons.ts
Jofthomas's picture
Jofthomas HF staff
bulk
ce8b18b
import { cronJobs } from 'convex/server';
import { DELETE_BATCH_SIZE, IDLE_WORLD_TIMEOUT, VACUUM_MAX_AGE } from './constants';
import { internal } from './_generated/api';
import { internalMutation } from './_generated/server';
import { TableNames } from './_generated/dataModel';
import { v } from 'convex/values';
const crons = cronJobs();
crons.interval(
'stop inactive worlds',
{ seconds: IDLE_WORLD_TIMEOUT / 1000 },
internal.world.stopInactiveWorlds,
);
crons.interval('restart dead worlds', { seconds: 60 }, internal.world.restartDeadWorlds);
crons.daily('vacuum old entries', { hourUTC: 4, minuteUTC: 20 }, internal.crons.vacuumOldEntries);
export default crons;
const TablesToVacuum: TableNames[] = [
// Un-comment this to also clean out old conversations.
// 'conversationMembers', 'conversations', 'messages',
// Inputs aren't useful unless you're trying to replay history.
// If you want to support that, you should add a snapshot table, so you can
// replay from a certain time period. Or stop vacuuming inputs and replay from
// the beginning of time
'inputs',
// We can keep memories without their embeddings for inspection, but we won't
// retrieve them when searching memories via vector search.
'memories',
// We can vacuum fewer tables without serious consequences, but the only
// one that will cause issues over time is having >>100k vectors.
'memoryEmbeddings',
];
export const vacuumOldEntries = internalMutation({
args: {},
handler: async (ctx, args) => {
const before = Date.now() - VACUUM_MAX_AGE;
for (const tableName of TablesToVacuum) {
console.log(`Checking ${tableName}...`);
const exists = await ctx.db
.query(tableName)
.withIndex('by_creation_time', (q) => q.lt('_creationTime', before))
.first();
if (exists) {
console.log(`Vacuuming ${tableName}...`);
await ctx.scheduler.runAfter(0, internal.crons.vacuumTable, {
tableName,
before,
cursor: null,
soFar: 0,
});
}
}
},
});
export const vacuumTable = internalMutation({
args: {
tableName: v.string(),
before: v.number(),
cursor: v.union(v.string(), v.null()),
soFar: v.number(),
},
handler: async (ctx, { tableName, before, cursor, soFar }) => {
const results = await ctx.db
.query(tableName as TableNames)
.withIndex('by_creation_time', (q) => q.lt('_creationTime', before))
.paginate({ cursor, numItems: DELETE_BATCH_SIZE });
for (const row of results.page) {
await ctx.db.delete(row._id);
}
if (!results.isDone) {
await ctx.scheduler.runAfter(0, internal.crons.vacuumTable, {
tableName,
before,
soFar: results.page.length + soFar,
cursor: results.continueCursor,
});
} else {
console.log(`Vacuumed ${soFar + results.page.length} entries from ${tableName}`);
}
},
});