import { loadConfig } from "@nproxy/config"; import { createPrismaBillingStore, createPrismaWorkerStore, prisma } from "@nproxy/db"; import { createEmailTransport, createNanoBananaSimulatedAdapter, createPaymentProviderAdapter, } from "@nproxy/providers"; const config = loadConfig(); const intervalMs = config.keyPool.balancePollSeconds * 1000; const renewalLeadTimeHours = 72; const invoiceReconciliationBatchSize = 100; const workerStore = createPrismaWorkerStore(prisma, { cooldownMinutes: config.keyPool.cooldownMinutes, failuresBeforeManualReview: config.keyPool.failuresBeforeManualReview, }); const billingStore = createPrismaBillingStore(prisma); const nanoBananaAdapter = createNanoBananaSimulatedAdapter(); const paymentProviderAdapter = createPaymentProviderAdapter(config.payment); const emailTransport = createEmailTransport(config.email); let isTickRunning = false; console.log( JSON.stringify({ service: "worker", balancePollSeconds: config.keyPool.balancePollSeconds, renewalLeadTimeHours, providerModel: config.provider.nanoBananaDefaultModel, }), ); setInterval(() => { void runTick(); }, intervalMs); void runTick(); process.once("SIGTERM", async () => { await prisma.$disconnect(); process.exit(0); }); process.once("SIGINT", async () => { await prisma.$disconnect(); process.exit(0); }); async function runTick(): Promise { if (isTickRunning) { console.log("worker tick skipped because previous tick is still running"); return; } isTickRunning = true; try { const renewalNotifications = await billingStore.createUpcomingRenewalInvoices({ paymentProvider: config.payment.provider, paymentProviderAdapter, renewalLeadTimeHours, }); for (const notification of renewalNotifications) { const billingUrl = new URL("/billing", config.urls.appBaseUrl); await emailTransport.send({ to: notification.email, subject: "Your nproxy subscription renewal invoice", text: [ "Your current subscription period is ending soon.", `Current access ends at ${notification.subscriptionCurrentPeriodEnd.toISOString()}.`, `Invoice amount: ${notification.invoice.amountCrypto} ${notification.invoice.currency}.`, ...(notification.invoice.paymentAddress ? [`Payment address: ${notification.invoice.paymentAddress}.`] : []), ...(notification.invoice.expiresAt ? [`Invoice expires at ${notification.invoice.expiresAt.toISOString()}.`] : []), `Open billing: ${billingUrl.toString()}`, ].join("\n"), }); } if (renewalNotifications.length > 0) { console.log( JSON.stringify({ service: "worker", event: "renewal_invoices_created", createdCount: renewalNotifications.length, }), ); } const pendingInvoices = await billingStore.listPendingInvoicesForReconciliation(invoiceReconciliationBatchSize); const reconciliationSummary = { polledCount: pendingInvoices.length, markedPaidCount: 0, markedExpiredCount: 0, markedCanceledCount: 0, alreadyTerminalCount: 0, ignoredCount: 0, failedCount: 0, }; for (const invoice of pendingInvoices) { try { const providerInvoice = await paymentProviderAdapter.getInvoiceStatus(invoice.providerInvoiceId); if (providerInvoice.providerInvoiceId !== invoice.providerInvoiceId) { reconciliationSummary.failedCount += 1; console.error( JSON.stringify({ service: "worker", event: "invoice_reconciliation_provider_mismatch", invoiceId: invoice.id, requestedProviderInvoiceId: invoice.providerInvoiceId, returnedProviderInvoiceId: providerInvoice.providerInvoiceId, }), ); continue; } const result = await billingStore.reconcilePendingInvoice({ invoiceId: invoice.id, providerStatus: providerInvoice.status, actor: { type: "system", ref: "invoice_reconciliation", }, ...(providerInvoice.paidAt ? { paidAt: providerInvoice.paidAt } : {}), }); switch (result.outcome) { case "marked_paid": reconciliationSummary.markedPaidCount += 1; break; case "marked_expired": reconciliationSummary.markedExpiredCount += 1; break; case "marked_canceled": reconciliationSummary.markedCanceledCount += 1; break; case "already_paid": case "already_expired": case "already_canceled": reconciliationSummary.alreadyTerminalCount += 1; break; case "ignored_terminal_state": reconciliationSummary.ignoredCount += 1; break; case "noop_pending": break; } } catch (error) { reconciliationSummary.failedCount += 1; console.error( JSON.stringify({ service: "worker", event: "invoice_reconciliation_failed", invoiceId: invoice.id, providerInvoiceId: invoice.providerInvoiceId, error: error instanceof Error ? error.message : String(error), }), ); } } if ( reconciliationSummary.polledCount > 0 || reconciliationSummary.failedCount > 0 || reconciliationSummary.markedPaidCount > 0 || reconciliationSummary.markedExpiredCount > 0 || reconciliationSummary.markedCanceledCount > 0 || reconciliationSummary.alreadyTerminalCount > 0 || reconciliationSummary.ignoredCount > 0 ) { console.log( JSON.stringify({ service: "worker", event: "pending_invoice_reconciliation", ...reconciliationSummary, }), ); } const expiredInvoices = await billingStore.expireElapsedPendingInvoices(); if (expiredInvoices.expiredCount > 0) { console.log( JSON.stringify({ service: "worker", event: "pending_invoices_expired", expiredCount: expiredInvoices.expiredCount, }), ); } const recovery = await workerStore.recoverCooldownProviderKeys(); if (recovery.recoveredCount > 0) { console.log( JSON.stringify({ service: "worker", event: "cooldown_keys_recovered", recoveredCount: recovery.recoveredCount, }), ); } const job = await workerStore.claimNextQueuedGenerationJob(); if (!job) { console.log(`worker heartbeat interval=${intervalMs} no_queued_jobs=true`); return; } const result = await workerStore.processClaimedGenerationJob( job, async (request, providerKey) => { if (providerKey.providerCode !== config.provider.nanoBananaDefaultModel) { return { ok: false as const, usedProxy: false, directFallbackUsed: false, failureKind: "unknown" as const, providerErrorCode: "unsupported_provider_model", providerErrorText: `Unsupported provider model: ${providerKey.providerCode}`, }; } if (providerKey.proxyBaseUrl) { const proxyResult = await nanoBananaAdapter.executeGeneration({ request, providerKey: { id: providerKey.id, providerCode: providerKey.providerCode, label: providerKey.label, apiKeyLastFour: providerKey.apiKeyLastFour, }, route: { kind: "proxy", proxyBaseUrl: providerKey.proxyBaseUrl, }, }); if (!proxyResult.ok && proxyResult.failureKind === "transport") { const directResult = await nanoBananaAdapter.executeGeneration({ request, providerKey: { id: providerKey.id, providerCode: providerKey.providerCode, label: providerKey.label, apiKeyLastFour: providerKey.apiKeyLastFour, }, route: { kind: "direct", }, }); return { ...directResult, usedProxy: true, directFallbackUsed: true, }; } return { ...proxyResult, usedProxy: true, directFallbackUsed: false, }; } const directResult = await nanoBananaAdapter.executeGeneration({ request, providerKey: { id: providerKey.id, providerCode: providerKey.providerCode, label: providerKey.label, apiKeyLastFour: providerKey.apiKeyLastFour, }, route: { kind: "direct", }, }); return { ...directResult, usedProxy: false, directFallbackUsed: false, }; }, ); console.log(JSON.stringify({ service: "worker", event: "job_processed", ...result })); } catch (error) { console.error("worker tick failed", error); } finally { isTickRunning = false; } }