fix: harden invoice reconciliation
This commit is contained in:
@@ -105,6 +105,21 @@ async function runTick(): Promise<void> {
|
||||
for (const invoice of pendingInvoices) {
|
||||
try {
|
||||
const providerInvoice = await paymentProviderAdapter.getInvoiceStatus(invoice.providerInvoiceId);
|
||||
|
||||
if (providerInvoice.providerInvoiceId !== invoice.providerInvoiceId) {
|
||||
reconciliationSummary.failedCount += 1;
|
||||
console.error(
|
||||
JSON.stringify({
|
||||
service: "worker",
|
||||
event: "invoice_reconciliation_provider_mismatch",
|
||||
invoiceId: invoice.id,
|
||||
requestedProviderInvoiceId: invoice.providerInvoiceId,
|
||||
returnedProviderInvoiceId: providerInvoice.providerInvoiceId,
|
||||
}),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
const result = await billingStore.reconcilePendingInvoice({
|
||||
invoiceId: invoice.id,
|
||||
providerStatus: providerInvoice.status,
|
||||
|
||||
@@ -117,6 +117,7 @@ Current rule:
|
||||
- The system must treat an invoice as `paid` only after the payment provider reports a final successful status, meaning the funds are accepted strongly enough for access activation.
|
||||
- Worker reconciliation now polls provider status for stored `pending` invoices.
|
||||
- If the provider reports final `paid`, the worker activates access from provider `paidAt` when that timestamp is available.
|
||||
- If the provider reports final `paid` after a local fallback expiry, provider `paid` still wins and access is activated.
|
||||
- If the provider reports final `expired` or `canceled`, the worker finalizes the local invoice to the same terminal status.
|
||||
|
||||
## Worker reconciliation flow
|
||||
@@ -128,6 +129,9 @@ Current rule:
|
||||
6. After provider polling, the worker also expires any still-pending invoices whose local `expiresAt` has elapsed.
|
||||
7. If a manual admin action or another worker already finalized the invoice, reconciliation degrades to replay/no-op behavior instead of duplicating side effects.
|
||||
|
||||
Implementation note:
|
||||
- invoice creation paths take a per-subscription database lock so manual invoice creation and worker renewal creation do not create duplicate invoices for the same subscription at the same time.
|
||||
|
||||
## Invoice listing flow
|
||||
- `GET /api/billing/invoices` returns the user's invoices ordered by newest first.
|
||||
- This is a read-only view over persisted `PaymentInvoice` rows.
|
||||
|
||||
@@ -89,6 +89,43 @@ test("createUpcomingRenewalInvoices does not auto-create another invoice after o
|
||||
assert.equal(database.calls.paymentInvoiceCreate.length, 0);
|
||||
});
|
||||
|
||||
test("createUpcomingRenewalInvoices acquires a subscription billing lock before creating an invoice", async () => {
|
||||
const database = createRenewalBillingDatabase({
|
||||
subscriptions: [
|
||||
createRenewalSubscriptionFixture({
|
||||
id: "subscription_1",
|
||||
currentPeriodStart: new Date("2026-03-01T12:00:00.000Z"),
|
||||
currentPeriodEnd: new Date("2026-03-12T11:00:00.000Z"),
|
||||
}),
|
||||
],
|
||||
});
|
||||
const store = createPrismaBillingStore(database.client);
|
||||
|
||||
const notifications = await store.createUpcomingRenewalInvoices({
|
||||
paymentProvider: "nowpayments",
|
||||
paymentProviderAdapter: {
|
||||
createInvoice: async () => ({
|
||||
providerInvoiceId: "provider_invoice_renewal_1",
|
||||
paymentAddress: "wallet_renewal_1",
|
||||
amountCrypto: 29,
|
||||
amountUsd: 29,
|
||||
currency: "USDT",
|
||||
expiresAt: new Date("2026-03-10T13:00:00.000Z"),
|
||||
}),
|
||||
getInvoiceStatus: async (providerInvoiceId) => ({
|
||||
providerInvoiceId,
|
||||
status: "pending",
|
||||
}),
|
||||
},
|
||||
renewalLeadTimeHours: 72,
|
||||
now: new Date("2026-03-09T12:00:00.000Z"),
|
||||
});
|
||||
|
||||
assert.equal(notifications.length, 1);
|
||||
assert.equal(database.calls.paymentInvoiceCreate.length, 1);
|
||||
assert.deepEqual(database.calls.subscriptionBillingLocks, ["subscription_1"]);
|
||||
});
|
||||
|
||||
test("expireElapsedPendingInvoices marks pending invoices expired", async () => {
|
||||
const database = createRenewalBillingDatabase({
|
||||
subscriptions: [],
|
||||
@@ -333,6 +370,33 @@ test("reconcilePendingInvoice does not override an already paid invoice with exp
|
||||
assert.equal(database.calls.paymentInvoiceTerminalUpdateMany.length, 0);
|
||||
});
|
||||
|
||||
test("reconcilePendingInvoice accepts provider paid for a locally expired invoice", async () => {
|
||||
const providerPaidAt = new Date("2026-03-10T12:34:56.000Z");
|
||||
const database = createBillingDatabase({
|
||||
invoice: createInvoiceFixture({
|
||||
status: "expired",
|
||||
paidAt: null,
|
||||
subscription: createSubscriptionFixture(),
|
||||
}),
|
||||
});
|
||||
const store = createPrismaBillingStore(database.client);
|
||||
|
||||
const result = await store.reconcilePendingInvoice({
|
||||
invoiceId: "invoice_1",
|
||||
providerStatus: "paid",
|
||||
paidAt: providerPaidAt,
|
||||
actor: {
|
||||
type: "system",
|
||||
ref: "invoice_reconciliation",
|
||||
},
|
||||
});
|
||||
|
||||
assert.equal(result.outcome, "marked_paid");
|
||||
assert.equal(result.invoice.status, "paid");
|
||||
assert.equal(result.invoice.paidAt?.toISOString(), providerPaidAt.toISOString());
|
||||
assert.equal(database.calls.paymentInvoiceUpdateMany.length, 1);
|
||||
});
|
||||
|
||||
function createBillingDatabase(input: {
|
||||
invoice: ReturnType<typeof createInvoiceFixture>;
|
||||
updateManyCount?: number;
|
||||
@@ -368,14 +432,18 @@ function createBillingDatabase(input: {
|
||||
where,
|
||||
data,
|
||||
}: {
|
||||
where: { id: string; status: "pending" };
|
||||
where: { id: string; status: "pending" | { in: Array<"pending" | "expired" | "canceled"> } };
|
||||
data: { status: "paid"; paidAt: Date };
|
||||
}) => {
|
||||
calls.paymentInvoiceUpdateMany.push({ where, data });
|
||||
|
||||
const allowedStatuses =
|
||||
(typeof where.status === "string" ? [where.status] : where.status.in) as Array<
|
||||
"pending" | "expired" | "canceled" | "paid"
|
||||
>;
|
||||
const count =
|
||||
input.updateManyCount ??
|
||||
(currentInvoice.id === where.id && currentInvoice.status === where.status ? 1 : 0);
|
||||
(currentInvoice.id === where.id && allowedStatuses.includes(currentInvoice.status) ? 1 : 0);
|
||||
|
||||
if (count > 0) {
|
||||
currentInvoice = {
|
||||
@@ -511,9 +579,17 @@ function createRenewalBillingDatabase(input: {
|
||||
paymentInvoiceCreate: [] as Array<Record<string, unknown>>,
|
||||
paymentInvoiceFindFirst: [] as Array<Record<string, unknown>>,
|
||||
paymentInvoiceExpireUpdateMany: [] as Array<Record<string, unknown>>,
|
||||
subscriptionBillingLocks: [] as string[],
|
||||
};
|
||||
|
||||
const client = {
|
||||
const transaction = {
|
||||
$queryRaw: async (strings: TemplateStringsArray, subscriptionId: string) => {
|
||||
if (strings[0]?.includes('FROM "Subscription"')) {
|
||||
calls.subscriptionBillingLocks.push(subscriptionId);
|
||||
}
|
||||
|
||||
return [];
|
||||
},
|
||||
subscription: {
|
||||
findMany: async () => input.subscriptions,
|
||||
},
|
||||
@@ -563,6 +639,12 @@ function createRenewalBillingDatabase(input: {
|
||||
return { count: input.expirePendingCount ?? 0 };
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const client = {
|
||||
subscription: transaction.subscription,
|
||||
paymentInvoice: transaction.paymentInvoice,
|
||||
$transaction: async <T>(callback: (tx: typeof transaction) => Promise<T>) => callback(transaction),
|
||||
} as unknown as Parameters<typeof createPrismaBillingStore>[0];
|
||||
|
||||
return {
|
||||
|
||||
@@ -124,7 +124,8 @@ export function createPrismaBillingStore(database: PrismaClient = defaultPrisma)
|
||||
paymentProvider: string;
|
||||
paymentProviderAdapter: PaymentProviderAdapter;
|
||||
}): Promise<BillingInvoiceRecord> {
|
||||
const subscription = await database.subscription.findFirst({
|
||||
return database.$transaction(async (transaction) => {
|
||||
const subscription = await transaction.subscription.findFirst({
|
||||
where: { userId: input.userId },
|
||||
include: { plan: true },
|
||||
orderBy: [{ currentPeriodEnd: "desc" }, { createdAt: "desc" }],
|
||||
@@ -134,7 +135,9 @@ export function createPrismaBillingStore(database: PrismaClient = defaultPrisma)
|
||||
throw new Error("Subscription not found.");
|
||||
}
|
||||
|
||||
const existingPending = await database.paymentInvoice.findFirst({
|
||||
await lockSubscriptionForBilling(transaction, subscription.id);
|
||||
|
||||
const existingPending = await transaction.paymentInvoice.findFirst({
|
||||
where: {
|
||||
userId: input.userId,
|
||||
subscriptionId: subscription.id,
|
||||
@@ -161,7 +164,7 @@ export function createPrismaBillingStore(database: PrismaClient = defaultPrisma)
|
||||
currency,
|
||||
});
|
||||
|
||||
const invoice = await database.paymentInvoice.create({
|
||||
const invoice = await transaction.paymentInvoice.create({
|
||||
data: {
|
||||
userId: input.userId,
|
||||
subscriptionId: subscription.id,
|
||||
@@ -177,6 +180,7 @@ export function createPrismaBillingStore(database: PrismaClient = defaultPrisma)
|
||||
});
|
||||
|
||||
return mapInvoice(invoice);
|
||||
});
|
||||
},
|
||||
|
||||
async expireElapsedPendingInvoices(
|
||||
@@ -234,7 +238,11 @@ export function createPrismaBillingStore(database: PrismaClient = defaultPrisma)
|
||||
|
||||
const cycleStart =
|
||||
subscription.currentPeriodStart ?? subscription.activatedAt ?? subscription.createdAt;
|
||||
const existingCycleInvoice = await database.paymentInvoice.findFirst({
|
||||
const subscriptionCurrentPeriodEnd = subscription.currentPeriodEnd;
|
||||
const notification = await database.$transaction(async (transaction) => {
|
||||
await lockSubscriptionForBilling(transaction, subscription.id);
|
||||
|
||||
const existingCycleInvoice = await transaction.paymentInvoice.findFirst({
|
||||
where: {
|
||||
subscriptionId: subscription.id,
|
||||
createdAt: {
|
||||
@@ -247,7 +255,7 @@ export function createPrismaBillingStore(database: PrismaClient = defaultPrisma)
|
||||
});
|
||||
|
||||
if (existingCycleInvoice) {
|
||||
continue;
|
||||
return null;
|
||||
}
|
||||
|
||||
const amountUsd = subscription.plan.monthlyPriceUsd.toNumber();
|
||||
@@ -261,7 +269,7 @@ export function createPrismaBillingStore(database: PrismaClient = defaultPrisma)
|
||||
currency,
|
||||
});
|
||||
|
||||
const invoice = await database.paymentInvoice.create({
|
||||
const invoice = await transaction.paymentInvoice.create({
|
||||
data: {
|
||||
userId: subscription.userId,
|
||||
subscriptionId: subscription.id,
|
||||
@@ -276,13 +284,18 @@ export function createPrismaBillingStore(database: PrismaClient = defaultPrisma)
|
||||
},
|
||||
});
|
||||
|
||||
notifications.push({
|
||||
return {
|
||||
userId: subscription.userId,
|
||||
email: subscription.user.email,
|
||||
subscriptionId: subscription.id,
|
||||
subscriptionCurrentPeriodEnd: subscription.currentPeriodEnd,
|
||||
subscriptionCurrentPeriodEnd,
|
||||
invoice: mapInvoice(invoice),
|
||||
} satisfies RenewalInvoiceNotification;
|
||||
});
|
||||
|
||||
if (notification) {
|
||||
notifications.push(notification);
|
||||
}
|
||||
}
|
||||
|
||||
return notifications;
|
||||
@@ -347,18 +360,12 @@ export function createPrismaBillingStore(database: PrismaClient = defaultPrisma)
|
||||
}
|
||||
|
||||
if (input.providerStatus === "paid") {
|
||||
if (currentInvoice.status === "expired" || currentInvoice.status === "canceled") {
|
||||
return {
|
||||
outcome: "ignored_terminal_state",
|
||||
invoice: mapInvoice(currentInvoice),
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await markInvoicePaidInternal(database, {
|
||||
invoiceId: input.invoiceId,
|
||||
...(input.actor ? { actor: input.actor } : {}),
|
||||
...(input.paidAt ? { paidAt: input.paidAt } : {}),
|
||||
allowedSourceStatuses: ["pending", "expired", "canceled"],
|
||||
});
|
||||
|
||||
return {
|
||||
@@ -457,6 +464,7 @@ async function markInvoicePaidInternal(
|
||||
invoiceId: string;
|
||||
actor?: BillingActorMetadata;
|
||||
paidAt?: Date;
|
||||
allowedSourceStatuses?: PaymentInvoiceStatus[];
|
||||
},
|
||||
): Promise<{ invoice: BillingInvoiceRecord; replayed: boolean }> {
|
||||
return database.$transaction(async (transaction) => {
|
||||
@@ -475,13 +483,6 @@ async function markInvoicePaidInternal(
|
||||
throw new BillingError("invoice_not_found", "Invoice not found.");
|
||||
}
|
||||
|
||||
if (invoice.status === "canceled" || invoice.status === "expired") {
|
||||
throw new BillingError(
|
||||
"invoice_transition_not_allowed",
|
||||
`Invoice in status "${invoice.status}" cannot be marked paid.`,
|
||||
);
|
||||
}
|
||||
|
||||
if (invoice.status === "paid") {
|
||||
await writeInvoicePaidAuditLog(transaction, invoice, input.actor, true);
|
||||
return {
|
||||
@@ -490,11 +491,22 @@ async function markInvoicePaidInternal(
|
||||
};
|
||||
}
|
||||
|
||||
const allowedSourceStatuses = input.allowedSourceStatuses ?? ["pending"];
|
||||
|
||||
if (!allowedSourceStatuses.includes(invoice.status)) {
|
||||
throw new BillingError(
|
||||
"invoice_transition_not_allowed",
|
||||
`Invoice in status "${invoice.status}" cannot be marked paid.`,
|
||||
);
|
||||
}
|
||||
|
||||
const paidAt = invoice.paidAt ?? input.paidAt ?? new Date();
|
||||
const transitionResult = await transaction.paymentInvoice.updateMany({
|
||||
where: {
|
||||
id: invoice.id,
|
||||
status: "pending",
|
||||
status: {
|
||||
in: allowedSourceStatuses,
|
||||
},
|
||||
},
|
||||
data: {
|
||||
status: "paid",
|
||||
@@ -695,3 +707,10 @@ function addDays(value: Date, days: number): Date {
|
||||
function addHours(value: Date, hours: number): Date {
|
||||
return new Date(value.getTime() + hours * 60 * 60 * 1000);
|
||||
}
|
||||
|
||||
async function lockSubscriptionForBilling(
|
||||
database: Pick<Prisma.TransactionClient, "$queryRaw">,
|
||||
subscriptionId: string,
|
||||
): Promise<void> {
|
||||
await database.$queryRaw`SELECT 1 FROM "Subscription" WHERE id = ${subscriptionId} FOR UPDATE`;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user