67 lines
1.8 KiB
TypeScript
67 lines
1.8 KiB
TypeScript
import { createRedisClient, closeRedis } from "../utils/redis";
|
|
import { getDb, closeDb } from "../utils/mongodb";
|
|
|
|
export default defineNitroPlugin((nitro) => {
|
|
let running = true;
|
|
let workerRedis: ReturnType<typeof createRedisClient> | null = null;
|
|
|
|
const worker = async () => {
|
|
try {
|
|
workerRedis = createRedisClient();
|
|
const db = await getDb();
|
|
const collection = db.collection("requests");
|
|
|
|
while (running) {
|
|
const batch: Record<string, unknown>[] = [];
|
|
const deadline = Date.now() + 2000;
|
|
|
|
// Blocking pop — waits up to 5s for an item
|
|
const item = await workerRedis.brpop("request-logs", 5);
|
|
if (item) {
|
|
try {
|
|
batch.push(JSON.parse(item[1]));
|
|
} catch {}
|
|
}
|
|
|
|
// Drain up to 49 more items non-blocking
|
|
while (batch.length < 50) {
|
|
const next = await workerRedis.rpop("request-logs");
|
|
if (!next) break;
|
|
try {
|
|
batch.push(JSON.parse(next));
|
|
} catch {}
|
|
}
|
|
|
|
if (batch.length > 0) {
|
|
try {
|
|
await collection.insertMany(batch);
|
|
} catch (err) {
|
|
console.error("[logWorker] MongoDB insert failed:", err);
|
|
}
|
|
}
|
|
|
|
// If we haven't hit the deadline and batch was small, wait a bit
|
|
const remaining = deadline - Date.now();
|
|
if (remaining > 0 && batch.length < 10) {
|
|
await new Promise((r) => setTimeout(r, remaining));
|
|
}
|
|
}
|
|
} catch (err) {
|
|
console.error("[logWorker] Worker error:", err);
|
|
}
|
|
};
|
|
|
|
// Start worker in background
|
|
worker();
|
|
|
|
nitro.hooks.hook("close", async () => {
|
|
running = false;
|
|
if (workerRedis) {
|
|
await workerRedis.quit();
|
|
workerRedis = null;
|
|
}
|
|
await closeRedis();
|
|
await closeDb();
|
|
});
|
|
});
|