From 7793fc3887adb95700d5fecaa54912ee8b2dfd7f Mon Sep 17 00:00:00 2001 From: sirily Date: Wed, 11 Mar 2026 12:29:42 +0300 Subject: [PATCH] fix: harden invoice reconciliation --- apps/worker/src/main.ts | 15 ++ docs/ops/payment-system.md | 4 + packages/db/src/billing-store.test.ts | 88 +++++++++- packages/db/src/billing-store.ts | 225 ++++++++++++++------------ 4 files changed, 226 insertions(+), 106 deletions(-) diff --git a/apps/worker/src/main.ts b/apps/worker/src/main.ts index 59c293c..9269d69 100644 --- a/apps/worker/src/main.ts +++ b/apps/worker/src/main.ts @@ -105,6 +105,21 @@ async function runTick(): Promise { for (const invoice of pendingInvoices) { try { const providerInvoice = await paymentProviderAdapter.getInvoiceStatus(invoice.providerInvoiceId); + + if (providerInvoice.providerInvoiceId !== invoice.providerInvoiceId) { + reconciliationSummary.failedCount += 1; + console.error( + JSON.stringify({ + service: "worker", + event: "invoice_reconciliation_provider_mismatch", + invoiceId: invoice.id, + requestedProviderInvoiceId: invoice.providerInvoiceId, + returnedProviderInvoiceId: providerInvoice.providerInvoiceId, + }), + ); + continue; + } + const result = await billingStore.reconcilePendingInvoice({ invoiceId: invoice.id, providerStatus: providerInvoice.status, diff --git a/docs/ops/payment-system.md b/docs/ops/payment-system.md index 588a55a..18db3bd 100644 --- a/docs/ops/payment-system.md +++ b/docs/ops/payment-system.md @@ -117,6 +117,7 @@ Current rule: - The system must treat an invoice as `paid` only after the payment provider reports a final successful status, meaning the funds are accepted strongly enough for access activation. - Worker reconciliation now polls provider status for stored `pending` invoices. - If the provider reports final `paid`, the worker activates access from provider `paidAt` when that timestamp is available. +- If the provider reports final `paid` after a local fallback expiry, provider `paid` still wins and access is activated. - If the provider reports final `expired` or `canceled`, the worker finalizes the local invoice to the same terminal status. ## Worker reconciliation flow @@ -128,6 +129,9 @@ Current rule: 6. After provider polling, the worker also expires any still-pending invoices whose local `expiresAt` has elapsed. 7. If a manual admin action or another worker already finalized the invoice, reconciliation degrades to replay/no-op behavior instead of duplicating side effects. +Implementation note: +- invoice creation paths take a per-subscription database lock so manual invoice creation and worker renewal creation do not create duplicate invoices for the same subscription at the same time. + ## Invoice listing flow - `GET /api/billing/invoices` returns the user's invoices ordered by newest first. - This is a read-only view over persisted `PaymentInvoice` rows. diff --git a/packages/db/src/billing-store.test.ts b/packages/db/src/billing-store.test.ts index 3d4e8e8..40d494b 100644 --- a/packages/db/src/billing-store.test.ts +++ b/packages/db/src/billing-store.test.ts @@ -89,6 +89,43 @@ test("createUpcomingRenewalInvoices does not auto-create another invoice after o assert.equal(database.calls.paymentInvoiceCreate.length, 0); }); +test("createUpcomingRenewalInvoices acquires a subscription billing lock before creating an invoice", async () => { + const database = createRenewalBillingDatabase({ + subscriptions: [ + createRenewalSubscriptionFixture({ + id: "subscription_1", + currentPeriodStart: new Date("2026-03-01T12:00:00.000Z"), + currentPeriodEnd: new Date("2026-03-12T11:00:00.000Z"), + }), + ], + }); + const store = createPrismaBillingStore(database.client); + + const notifications = await store.createUpcomingRenewalInvoices({ + paymentProvider: "nowpayments", + paymentProviderAdapter: { + createInvoice: async () => ({ + providerInvoiceId: "provider_invoice_renewal_1", + paymentAddress: "wallet_renewal_1", + amountCrypto: 29, + amountUsd: 29, + currency: "USDT", + expiresAt: new Date("2026-03-10T13:00:00.000Z"), + }), + getInvoiceStatus: async (providerInvoiceId) => ({ + providerInvoiceId, + status: "pending", + }), + }, + renewalLeadTimeHours: 72, + now: new Date("2026-03-09T12:00:00.000Z"), + }); + + assert.equal(notifications.length, 1); + assert.equal(database.calls.paymentInvoiceCreate.length, 1); + assert.deepEqual(database.calls.subscriptionBillingLocks, ["subscription_1"]); +}); + test("expireElapsedPendingInvoices marks pending invoices expired", async () => { const database = createRenewalBillingDatabase({ subscriptions: [], @@ -333,6 +370,33 @@ test("reconcilePendingInvoice does not override an already paid invoice with exp assert.equal(database.calls.paymentInvoiceTerminalUpdateMany.length, 0); }); +test("reconcilePendingInvoice accepts provider paid for a locally expired invoice", async () => { + const providerPaidAt = new Date("2026-03-10T12:34:56.000Z"); + const database = createBillingDatabase({ + invoice: createInvoiceFixture({ + status: "expired", + paidAt: null, + subscription: createSubscriptionFixture(), + }), + }); + const store = createPrismaBillingStore(database.client); + + const result = await store.reconcilePendingInvoice({ + invoiceId: "invoice_1", + providerStatus: "paid", + paidAt: providerPaidAt, + actor: { + type: "system", + ref: "invoice_reconciliation", + }, + }); + + assert.equal(result.outcome, "marked_paid"); + assert.equal(result.invoice.status, "paid"); + assert.equal(result.invoice.paidAt?.toISOString(), providerPaidAt.toISOString()); + assert.equal(database.calls.paymentInvoiceUpdateMany.length, 1); +}); + function createBillingDatabase(input: { invoice: ReturnType; updateManyCount?: number; @@ -368,14 +432,18 @@ function createBillingDatabase(input: { where, data, }: { - where: { id: string; status: "pending" }; + where: { id: string; status: "pending" | { in: Array<"pending" | "expired" | "canceled"> } }; data: { status: "paid"; paidAt: Date }; }) => { calls.paymentInvoiceUpdateMany.push({ where, data }); + const allowedStatuses = + (typeof where.status === "string" ? [where.status] : where.status.in) as Array< + "pending" | "expired" | "canceled" | "paid" + >; const count = input.updateManyCount ?? - (currentInvoice.id === where.id && currentInvoice.status === where.status ? 1 : 0); + (currentInvoice.id === where.id && allowedStatuses.includes(currentInvoice.status) ? 1 : 0); if (count > 0) { currentInvoice = { @@ -511,9 +579,17 @@ function createRenewalBillingDatabase(input: { paymentInvoiceCreate: [] as Array>, paymentInvoiceFindFirst: [] as Array>, paymentInvoiceExpireUpdateMany: [] as Array>, + subscriptionBillingLocks: [] as string[], }; - const client = { + const transaction = { + $queryRaw: async (strings: TemplateStringsArray, subscriptionId: string) => { + if (strings[0]?.includes('FROM "Subscription"')) { + calls.subscriptionBillingLocks.push(subscriptionId); + } + + return []; + }, subscription: { findMany: async () => input.subscriptions, }, @@ -563,6 +639,12 @@ function createRenewalBillingDatabase(input: { return { count: input.expirePendingCount ?? 0 }; }, }, + }; + + const client = { + subscription: transaction.subscription, + paymentInvoice: transaction.paymentInvoice, + $transaction: async (callback: (tx: typeof transaction) => Promise) => callback(transaction), } as unknown as Parameters[0]; return { diff --git a/packages/db/src/billing-store.ts b/packages/db/src/billing-store.ts index 6711171..e52ca97 100644 --- a/packages/db/src/billing-store.ts +++ b/packages/db/src/billing-store.ts @@ -124,59 +124,63 @@ export function createPrismaBillingStore(database: PrismaClient = defaultPrisma) paymentProvider: string; paymentProviderAdapter: PaymentProviderAdapter; }): Promise { - const subscription = await database.subscription.findFirst({ - where: { userId: input.userId }, - include: { plan: true }, - orderBy: [{ currentPeriodEnd: "desc" }, { createdAt: "desc" }], - }); + return database.$transaction(async (transaction) => { + const subscription = await transaction.subscription.findFirst({ + where: { userId: input.userId }, + include: { plan: true }, + orderBy: [{ currentPeriodEnd: "desc" }, { createdAt: "desc" }], + }); - if (!subscription) { - throw new Error("Subscription not found."); - } + if (!subscription) { + throw new Error("Subscription not found."); + } - const existingPending = await database.paymentInvoice.findFirst({ - where: { - userId: input.userId, - subscriptionId: subscription.id, - status: "pending", - expiresAt: { - gt: new Date(), + await lockSubscriptionForBilling(transaction, subscription.id); + + const existingPending = await transaction.paymentInvoice.findFirst({ + where: { + userId: input.userId, + subscriptionId: subscription.id, + status: "pending", + expiresAt: { + gt: new Date(), + }, }, - }, - orderBy: { createdAt: "desc" }, - }); + orderBy: { createdAt: "desc" }, + }); - if (existingPending) { - return mapInvoice(existingPending); - } + if (existingPending) { + return mapInvoice(existingPending); + } - const amountUsd = subscription.plan.monthlyPriceUsd.toNumber(); - const currency = subscription.plan.billingCurrency; - const amountCrypto = amountUsd; - const providerInvoice = await input.paymentProviderAdapter.createInvoice({ - userId: input.userId, - planCode: subscription.plan.code, - amountUsd, - amountCrypto, - currency, - }); - - const invoice = await database.paymentInvoice.create({ - data: { + const amountUsd = subscription.plan.monthlyPriceUsd.toNumber(); + const currency = subscription.plan.billingCurrency; + const amountCrypto = amountUsd; + const providerInvoice = await input.paymentProviderAdapter.createInvoice({ userId: input.userId, - subscriptionId: subscription.id, - provider: input.paymentProvider, - providerInvoiceId: providerInvoice.providerInvoiceId, - status: "pending", - currency: providerInvoice.currency, - amountCrypto: new Prisma.Decimal(providerInvoice.amountCrypto), - amountUsd: new Prisma.Decimal(providerInvoice.amountUsd), - paymentAddress: providerInvoice.paymentAddress, - expiresAt: providerInvoice.expiresAt, - }, - }); + planCode: subscription.plan.code, + amountUsd, + amountCrypto, + currency, + }); - return mapInvoice(invoice); + const invoice = await transaction.paymentInvoice.create({ + data: { + userId: input.userId, + subscriptionId: subscription.id, + provider: input.paymentProvider, + providerInvoiceId: providerInvoice.providerInvoiceId, + status: "pending", + currency: providerInvoice.currency, + amountCrypto: new Prisma.Decimal(providerInvoice.amountCrypto), + amountUsd: new Prisma.Decimal(providerInvoice.amountUsd), + paymentAddress: providerInvoice.paymentAddress, + expiresAt: providerInvoice.expiresAt, + }, + }); + + return mapInvoice(invoice); + }); }, async expireElapsedPendingInvoices( @@ -234,55 +238,64 @@ export function createPrismaBillingStore(database: PrismaClient = defaultPrisma) const cycleStart = subscription.currentPeriodStart ?? subscription.activatedAt ?? subscription.createdAt; - const existingCycleInvoice = await database.paymentInvoice.findFirst({ - where: { - subscriptionId: subscription.id, - createdAt: { - gte: cycleStart, + const subscriptionCurrentPeriodEnd = subscription.currentPeriodEnd; + const notification = await database.$transaction(async (transaction) => { + await lockSubscriptionForBilling(transaction, subscription.id); + + const existingCycleInvoice = await transaction.paymentInvoice.findFirst({ + where: { + subscriptionId: subscription.id, + createdAt: { + gte: cycleStart, + }, }, - }, - orderBy: { - createdAt: "desc", - }, - }); + orderBy: { + createdAt: "desc", + }, + }); - if (existingCycleInvoice) { - continue; - } + if (existingCycleInvoice) { + return null; + } - const amountUsd = subscription.plan.monthlyPriceUsd.toNumber(); - const currency = subscription.plan.billingCurrency; - const amountCrypto = amountUsd; - const providerInvoice = await input.paymentProviderAdapter.createInvoice({ - userId: subscription.userId, - planCode: subscription.plan.code, - amountUsd, - amountCrypto, - currency, - }); - - const invoice = await database.paymentInvoice.create({ - data: { + const amountUsd = subscription.plan.monthlyPriceUsd.toNumber(); + const currency = subscription.plan.billingCurrency; + const amountCrypto = amountUsd; + const providerInvoice = await input.paymentProviderAdapter.createInvoice({ userId: subscription.userId, + planCode: subscription.plan.code, + amountUsd, + amountCrypto, + currency, + }); + + const invoice = await transaction.paymentInvoice.create({ + data: { + userId: subscription.userId, + subscriptionId: subscription.id, + provider: input.paymentProvider, + providerInvoiceId: providerInvoice.providerInvoiceId, + status: "pending", + currency: providerInvoice.currency, + amountCrypto: new Prisma.Decimal(providerInvoice.amountCrypto), + amountUsd: new Prisma.Decimal(providerInvoice.amountUsd), + paymentAddress: providerInvoice.paymentAddress, + expiresAt: providerInvoice.expiresAt, + }, + }); + + return { + userId: subscription.userId, + email: subscription.user.email, subscriptionId: subscription.id, - provider: input.paymentProvider, - providerInvoiceId: providerInvoice.providerInvoiceId, - status: "pending", - currency: providerInvoice.currency, - amountCrypto: new Prisma.Decimal(providerInvoice.amountCrypto), - amountUsd: new Prisma.Decimal(providerInvoice.amountUsd), - paymentAddress: providerInvoice.paymentAddress, - expiresAt: providerInvoice.expiresAt, - }, + subscriptionCurrentPeriodEnd, + invoice: mapInvoice(invoice), + } satisfies RenewalInvoiceNotification; }); - notifications.push({ - userId: subscription.userId, - email: subscription.user.email, - subscriptionId: subscription.id, - subscriptionCurrentPeriodEnd: subscription.currentPeriodEnd, - invoice: mapInvoice(invoice), - }); + if (notification) { + notifications.push(notification); + } } return notifications; @@ -347,18 +360,12 @@ export function createPrismaBillingStore(database: PrismaClient = defaultPrisma) } if (input.providerStatus === "paid") { - if (currentInvoice.status === "expired" || currentInvoice.status === "canceled") { - return { - outcome: "ignored_terminal_state", - invoice: mapInvoice(currentInvoice), - }; - } - try { const result = await markInvoicePaidInternal(database, { invoiceId: input.invoiceId, ...(input.actor ? { actor: input.actor } : {}), ...(input.paidAt ? { paidAt: input.paidAt } : {}), + allowedSourceStatuses: ["pending", "expired", "canceled"], }); return { @@ -457,6 +464,7 @@ async function markInvoicePaidInternal( invoiceId: string; actor?: BillingActorMetadata; paidAt?: Date; + allowedSourceStatuses?: PaymentInvoiceStatus[]; }, ): Promise<{ invoice: BillingInvoiceRecord; replayed: boolean }> { return database.$transaction(async (transaction) => { @@ -475,13 +483,6 @@ async function markInvoicePaidInternal( throw new BillingError("invoice_not_found", "Invoice not found."); } - if (invoice.status === "canceled" || invoice.status === "expired") { - throw new BillingError( - "invoice_transition_not_allowed", - `Invoice in status "${invoice.status}" cannot be marked paid.`, - ); - } - if (invoice.status === "paid") { await writeInvoicePaidAuditLog(transaction, invoice, input.actor, true); return { @@ -490,11 +491,22 @@ async function markInvoicePaidInternal( }; } + const allowedSourceStatuses = input.allowedSourceStatuses ?? ["pending"]; + + if (!allowedSourceStatuses.includes(invoice.status)) { + throw new BillingError( + "invoice_transition_not_allowed", + `Invoice in status "${invoice.status}" cannot be marked paid.`, + ); + } + const paidAt = invoice.paidAt ?? input.paidAt ?? new Date(); const transitionResult = await transaction.paymentInvoice.updateMany({ where: { id: invoice.id, - status: "pending", + status: { + in: allowedSourceStatuses, + }, }, data: { status: "paid", @@ -695,3 +707,10 @@ function addDays(value: Date, days: number): Date { function addHours(value: Date, hours: number): Date { return new Date(value.getTime() + hours * 60 * 60 * 1000); } + +async function lockSubscriptionForBilling( + database: Pick, + subscriptionId: string, +): Promise { + await database.$queryRaw`SELECT 1 FROM "Subscription" WHERE id = ${subscriptionId} FOR UPDATE`; +}