Initial import
This commit is contained in:
146
packages/db/src/account-store.ts
Normal file
146
packages/db/src/account-store.ts
Normal file
@@ -0,0 +1,146 @@
|
||||
import { getApproximateQuotaBucket, type QuotaBucket } from "@nproxy/domain";
|
||||
import type { PrismaClient, SubscriptionStatus } from "@prisma/client";
|
||||
import { Prisma } from "@prisma/client";
|
||||
import { prisma as defaultPrisma } from "./prisma-client.js";
|
||||
|
||||
export interface UserAccountOverview {
|
||||
user: {
|
||||
id: string;
|
||||
email: string;
|
||||
isAdmin: boolean;
|
||||
createdAt: Date;
|
||||
};
|
||||
subscription: {
|
||||
id: string;
|
||||
status: SubscriptionStatus;
|
||||
renewsManually: boolean;
|
||||
activatedAt?: Date;
|
||||
currentPeriodStart?: Date;
|
||||
currentPeriodEnd?: Date;
|
||||
canceledAt?: Date;
|
||||
plan: {
|
||||
id: string;
|
||||
code: string;
|
||||
displayName: string;
|
||||
monthlyRequestLimit: number;
|
||||
monthlyPriceUsd: number;
|
||||
billingCurrency: string;
|
||||
isActive: boolean;
|
||||
};
|
||||
} | null;
|
||||
quota: {
|
||||
approximateBucket: QuotaBucket;
|
||||
usedSuccessfulRequests: number;
|
||||
monthlyRequestLimit: number;
|
||||
} | null;
|
||||
}
|
||||
|
||||
export function createPrismaAccountStore(database: PrismaClient = defaultPrisma) {
|
||||
return {
|
||||
async getUserAccountOverview(userId: string): Promise<UserAccountOverview | null> {
|
||||
const user = await database.user.findUnique({
|
||||
where: {
|
||||
id: userId,
|
||||
},
|
||||
});
|
||||
|
||||
if (!user) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const subscription = await database.subscription.findFirst({
|
||||
where: {
|
||||
userId,
|
||||
},
|
||||
include: {
|
||||
plan: true,
|
||||
},
|
||||
orderBy: [
|
||||
{ currentPeriodEnd: "desc" },
|
||||
{ createdAt: "desc" },
|
||||
],
|
||||
});
|
||||
|
||||
const quota = subscription
|
||||
? await buildQuotaSnapshot(database, userId, {
|
||||
monthlyRequestLimit: subscription.plan.monthlyRequestLimit,
|
||||
cycleStart:
|
||||
subscription.currentPeriodStart ??
|
||||
subscription.activatedAt ??
|
||||
subscription.createdAt,
|
||||
})
|
||||
: null;
|
||||
|
||||
return {
|
||||
user: {
|
||||
id: user.id,
|
||||
email: user.email,
|
||||
isAdmin: user.isAdmin,
|
||||
createdAt: user.createdAt,
|
||||
},
|
||||
subscription: subscription
|
||||
? {
|
||||
id: subscription.id,
|
||||
status: subscription.status,
|
||||
renewsManually: subscription.renewsManually,
|
||||
...(subscription.activatedAt ? { activatedAt: subscription.activatedAt } : {}),
|
||||
...(subscription.currentPeriodStart
|
||||
? { currentPeriodStart: subscription.currentPeriodStart }
|
||||
: {}),
|
||||
...(subscription.currentPeriodEnd
|
||||
? { currentPeriodEnd: subscription.currentPeriodEnd }
|
||||
: {}),
|
||||
...(subscription.canceledAt ? { canceledAt: subscription.canceledAt } : {}),
|
||||
plan: {
|
||||
id: subscription.plan.id,
|
||||
code: subscription.plan.code,
|
||||
displayName: subscription.plan.displayName,
|
||||
monthlyRequestLimit: subscription.plan.monthlyRequestLimit,
|
||||
monthlyPriceUsd: decimalToNumber(subscription.plan.monthlyPriceUsd),
|
||||
billingCurrency: subscription.plan.billingCurrency,
|
||||
isActive: subscription.plan.isActive,
|
||||
},
|
||||
}
|
||||
: null,
|
||||
quota,
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function decimalToNumber(value: Prisma.Decimal | { toNumber(): number }): number {
|
||||
return value.toNumber();
|
||||
}
|
||||
|
||||
async function buildQuotaSnapshot(
|
||||
database: PrismaClient,
|
||||
userId: string,
|
||||
input: {
|
||||
monthlyRequestLimit: number;
|
||||
cycleStart: Date;
|
||||
},
|
||||
): Promise<UserAccountOverview["quota"]> {
|
||||
const usageAggregation = await database.usageLedgerEntry.aggregate({
|
||||
where: {
|
||||
userId,
|
||||
entryType: "generation_success",
|
||||
createdAt: {
|
||||
gte: input.cycleStart,
|
||||
},
|
||||
},
|
||||
_sum: {
|
||||
deltaRequests: true,
|
||||
},
|
||||
});
|
||||
|
||||
const usedSuccessfulRequests = usageAggregation._sum.deltaRequests ?? 0;
|
||||
|
||||
return {
|
||||
approximateBucket: getApproximateQuotaBucket({
|
||||
used: usedSuccessfulRequests,
|
||||
limit: input.monthlyRequestLimit,
|
||||
}),
|
||||
usedSuccessfulRequests,
|
||||
monthlyRequestLimit: input.monthlyRequestLimit,
|
||||
};
|
||||
}
|
||||
399
packages/db/src/auth-store.ts
Normal file
399
packages/db/src/auth-store.ts
Normal file
@@ -0,0 +1,399 @@
|
||||
import {
|
||||
AuthError,
|
||||
createPasswordResetToken,
|
||||
createSessionToken,
|
||||
hashPasswordResetToken,
|
||||
hashPassword,
|
||||
hashSessionToken,
|
||||
normalizeEmail,
|
||||
validateEmail,
|
||||
validatePassword,
|
||||
verifyPassword,
|
||||
} from "@nproxy/domain";
|
||||
import type { PrismaClient } from "@prisma/client";
|
||||
import { prisma as defaultPrisma } from "./prisma-client.js";
|
||||
|
||||
export interface AuthenticatedUserRecord {
|
||||
id: string;
|
||||
email: string;
|
||||
isAdmin: boolean;
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
export interface SessionRecord {
|
||||
token: string;
|
||||
user: AuthenticatedUserRecord;
|
||||
expiresAt: Date;
|
||||
}
|
||||
|
||||
export interface UserSessionRecord {
|
||||
id: string;
|
||||
expiresAt: Date;
|
||||
revokedAt?: Date;
|
||||
lastSeenAt?: Date;
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
export interface AuthenticatedSessionRecord {
|
||||
session: UserSessionRecord;
|
||||
user: AuthenticatedUserRecord;
|
||||
}
|
||||
|
||||
export interface PasswordResetChallengeRecord {
|
||||
email: string;
|
||||
token: string;
|
||||
expiresAt: Date;
|
||||
}
|
||||
|
||||
export function createPrismaAuthStore(database: PrismaClient = defaultPrisma) {
|
||||
return {
|
||||
async registerUser(input: {
|
||||
email: string;
|
||||
password: string;
|
||||
passwordPepper: string;
|
||||
sessionTtlDays?: number;
|
||||
}): Promise<SessionRecord> {
|
||||
const email = validateEmail(input.email);
|
||||
const password = validatePassword(input.password);
|
||||
const existing = await database.user.findUnique({
|
||||
where: {
|
||||
email,
|
||||
},
|
||||
});
|
||||
|
||||
if (existing) {
|
||||
throw new AuthError("email_already_exists", `User ${email} already exists.`);
|
||||
}
|
||||
|
||||
const passwordHash = hashPassword(password, input.passwordPepper);
|
||||
const token = createSessionToken();
|
||||
const tokenHash = hashSessionToken(token);
|
||||
const expiresAt = addDays(new Date(), input.sessionTtlDays ?? 30);
|
||||
|
||||
return database.$transaction(async (transaction) => {
|
||||
const defaultPlan = await transaction.subscriptionPlan.findFirst({
|
||||
where: {
|
||||
code: "mvp_monthly",
|
||||
isActive: true,
|
||||
},
|
||||
});
|
||||
|
||||
const user = await transaction.user.create({
|
||||
data: {
|
||||
email,
|
||||
passwordHash,
|
||||
},
|
||||
});
|
||||
|
||||
await transaction.userSession.create({
|
||||
data: {
|
||||
userId: user.id,
|
||||
tokenHash,
|
||||
expiresAt,
|
||||
},
|
||||
});
|
||||
|
||||
if (defaultPlan) {
|
||||
await transaction.subscription.create({
|
||||
data: {
|
||||
userId: user.id,
|
||||
planId: defaultPlan.id,
|
||||
status: "pending_activation",
|
||||
renewsManually: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
token,
|
||||
expiresAt,
|
||||
user: mapAuthenticatedUser(user),
|
||||
};
|
||||
});
|
||||
},
|
||||
|
||||
async loginUser(input: {
|
||||
email: string;
|
||||
password: string;
|
||||
passwordPepper: string;
|
||||
sessionTtlDays?: number;
|
||||
}): Promise<SessionRecord> {
|
||||
const email = normalizeEmail(input.email);
|
||||
const user = await database.user.findUnique({
|
||||
where: {
|
||||
email,
|
||||
},
|
||||
});
|
||||
|
||||
if (!user || !verifyPassword(input.password, user.passwordHash, input.passwordPepper)) {
|
||||
throw new AuthError("invalid_credentials", "Invalid email or password.");
|
||||
}
|
||||
|
||||
const token = createSessionToken();
|
||||
const tokenHash = hashSessionToken(token);
|
||||
const expiresAt = addDays(new Date(), input.sessionTtlDays ?? 30);
|
||||
|
||||
await database.userSession.create({
|
||||
data: {
|
||||
userId: user.id,
|
||||
tokenHash,
|
||||
expiresAt,
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
token,
|
||||
expiresAt,
|
||||
user: mapAuthenticatedUser(user),
|
||||
};
|
||||
},
|
||||
|
||||
async getUserBySessionToken(
|
||||
sessionToken: string,
|
||||
): Promise<AuthenticatedSessionRecord | null> {
|
||||
const tokenHash = hashSessionToken(sessionToken);
|
||||
const now = new Date();
|
||||
const session = await database.userSession.findUnique({
|
||||
where: {
|
||||
tokenHash,
|
||||
},
|
||||
include: {
|
||||
user: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (!session || session.revokedAt || session.expiresAt <= now) {
|
||||
return null;
|
||||
}
|
||||
|
||||
await database.userSession.update({
|
||||
where: {
|
||||
id: session.id,
|
||||
},
|
||||
data: {
|
||||
lastSeenAt: now,
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
session: mapUserSession(session),
|
||||
user: mapAuthenticatedUser(session.user),
|
||||
};
|
||||
},
|
||||
|
||||
async revokeSession(sessionToken: string): Promise<void> {
|
||||
const tokenHash = hashSessionToken(sessionToken);
|
||||
await database.userSession.updateMany({
|
||||
where: {
|
||||
tokenHash,
|
||||
revokedAt: null,
|
||||
},
|
||||
data: {
|
||||
revokedAt: new Date(),
|
||||
},
|
||||
});
|
||||
},
|
||||
|
||||
async listUserSessions(userId: string): Promise<UserSessionRecord[]> {
|
||||
const sessions = await database.userSession.findMany({
|
||||
where: {
|
||||
userId,
|
||||
},
|
||||
orderBy: {
|
||||
createdAt: "desc",
|
||||
},
|
||||
});
|
||||
|
||||
return sessions.map(mapUserSession);
|
||||
},
|
||||
|
||||
async revokeUserSession(input: {
|
||||
userId: string;
|
||||
sessionId: string;
|
||||
}): Promise<boolean> {
|
||||
const result = await database.userSession.updateMany({
|
||||
where: {
|
||||
id: input.sessionId,
|
||||
userId: input.userId,
|
||||
revokedAt: null,
|
||||
},
|
||||
data: {
|
||||
revokedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
return result.count > 0;
|
||||
},
|
||||
|
||||
async revokeAllUserSessions(input: {
|
||||
userId: string;
|
||||
exceptSessionId?: string;
|
||||
}): Promise<number> {
|
||||
const result = await database.userSession.updateMany({
|
||||
where: {
|
||||
userId: input.userId,
|
||||
revokedAt: null,
|
||||
...(input.exceptSessionId
|
||||
? {
|
||||
id: {
|
||||
not: input.exceptSessionId,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
},
|
||||
data: {
|
||||
revokedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
return result.count;
|
||||
},
|
||||
|
||||
async createPasswordResetChallenge(input: {
|
||||
email: string;
|
||||
ttlMinutes?: number;
|
||||
}): Promise<PasswordResetChallengeRecord | null> {
|
||||
const email = normalizeEmail(input.email);
|
||||
const user = await database.user.findUnique({
|
||||
where: {
|
||||
email,
|
||||
},
|
||||
});
|
||||
|
||||
if (!user) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const token = createPasswordResetToken();
|
||||
const tokenHash = hashPasswordResetToken(token);
|
||||
const expiresAt = addMinutes(new Date(), input.ttlMinutes ?? 30);
|
||||
|
||||
await database.$transaction([
|
||||
database.passwordResetToken.updateMany({
|
||||
where: {
|
||||
userId: user.id,
|
||||
consumedAt: null,
|
||||
},
|
||||
data: {
|
||||
consumedAt: new Date(),
|
||||
},
|
||||
}),
|
||||
database.passwordResetToken.create({
|
||||
data: {
|
||||
userId: user.id,
|
||||
tokenHash,
|
||||
expiresAt,
|
||||
},
|
||||
}),
|
||||
]);
|
||||
|
||||
return {
|
||||
email: user.email,
|
||||
token,
|
||||
expiresAt,
|
||||
};
|
||||
},
|
||||
|
||||
async resetPassword(input: {
|
||||
token: string;
|
||||
newPassword: string;
|
||||
passwordPepper: string;
|
||||
}): Promise<void> {
|
||||
const tokenHash = hashPasswordResetToken(input.token);
|
||||
const newPassword = validatePassword(input.newPassword);
|
||||
const passwordHash = hashPassword(newPassword, input.passwordPepper);
|
||||
const now = new Date();
|
||||
|
||||
await database.$transaction(async (transaction) => {
|
||||
const resetToken = await transaction.passwordResetToken.findUnique({
|
||||
where: {
|
||||
tokenHash,
|
||||
},
|
||||
include: {
|
||||
user: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (
|
||||
!resetToken ||
|
||||
resetToken.consumedAt ||
|
||||
resetToken.expiresAt <= now
|
||||
) {
|
||||
throw new AuthError(
|
||||
"reset_token_invalid",
|
||||
"Password reset token is invalid or expired.",
|
||||
);
|
||||
}
|
||||
|
||||
await transaction.user.update({
|
||||
where: {
|
||||
id: resetToken.userId,
|
||||
},
|
||||
data: {
|
||||
passwordHash,
|
||||
passwordResetVersion: {
|
||||
increment: 1,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await transaction.passwordResetToken.update({
|
||||
where: {
|
||||
id: resetToken.id,
|
||||
},
|
||||
data: {
|
||||
consumedAt: now,
|
||||
},
|
||||
});
|
||||
|
||||
await transaction.userSession.updateMany({
|
||||
where: {
|
||||
userId: resetToken.userId,
|
||||
revokedAt: null,
|
||||
},
|
||||
data: {
|
||||
revokedAt: now,
|
||||
},
|
||||
});
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function mapAuthenticatedUser(user: {
|
||||
id: string;
|
||||
email: string;
|
||||
isAdmin: boolean;
|
||||
createdAt: Date;
|
||||
}): AuthenticatedUserRecord {
|
||||
return {
|
||||
id: user.id,
|
||||
email: user.email,
|
||||
isAdmin: user.isAdmin,
|
||||
createdAt: user.createdAt,
|
||||
};
|
||||
}
|
||||
|
||||
function mapUserSession(session: {
|
||||
id: string;
|
||||
expiresAt: Date;
|
||||
revokedAt: Date | null;
|
||||
lastSeenAt: Date | null;
|
||||
createdAt: Date;
|
||||
}): UserSessionRecord {
|
||||
return {
|
||||
id: session.id,
|
||||
expiresAt: session.expiresAt,
|
||||
createdAt: session.createdAt,
|
||||
...(session.revokedAt ? { revokedAt: session.revokedAt } : {}),
|
||||
...(session.lastSeenAt ? { lastSeenAt: session.lastSeenAt } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function addDays(value: Date, days: number): Date {
|
||||
return new Date(value.getTime() + days * 24 * 60 * 60 * 1000);
|
||||
}
|
||||
|
||||
function addMinutes(value: Date, minutes: number): Date {
|
||||
return new Date(value.getTime() + minutes * 60 * 1000);
|
||||
}
|
||||
254
packages/db/src/billing-store.ts
Normal file
254
packages/db/src/billing-store.ts
Normal file
@@ -0,0 +1,254 @@
|
||||
import type { PaymentProviderAdapter } from "@nproxy/providers";
|
||||
import { Prisma, type PaymentInvoiceStatus, type PrismaClient, type SubscriptionStatus } from "@prisma/client";
|
||||
import { prisma as defaultPrisma } from "./prisma-client.js";
|
||||
|
||||
export interface BillingInvoiceRecord {
|
||||
id: string;
|
||||
subscriptionId?: string;
|
||||
provider: string;
|
||||
providerInvoiceId?: string;
|
||||
status: PaymentInvoiceStatus;
|
||||
currency: string;
|
||||
amountCrypto: number;
|
||||
amountUsd?: number;
|
||||
paymentAddress?: string;
|
||||
expiresAt?: Date;
|
||||
paidAt?: Date;
|
||||
createdAt: Date;
|
||||
updatedAt: Date;
|
||||
}
|
||||
|
||||
export interface SubscriptionBillingRecord {
|
||||
id: string;
|
||||
status: SubscriptionStatus;
|
||||
renewsManually: boolean;
|
||||
activatedAt?: Date;
|
||||
currentPeriodStart?: Date;
|
||||
currentPeriodEnd?: Date;
|
||||
canceledAt?: Date;
|
||||
plan: {
|
||||
id: string;
|
||||
code: string;
|
||||
displayName: string;
|
||||
monthlyRequestLimit: number;
|
||||
monthlyPriceUsd: number;
|
||||
billingCurrency: string;
|
||||
};
|
||||
}
|
||||
|
||||
export function createPrismaBillingStore(database: PrismaClient = defaultPrisma) {
|
||||
return {
|
||||
async listUserInvoices(userId: string): Promise<BillingInvoiceRecord[]> {
|
||||
const invoices = await database.paymentInvoice.findMany({
|
||||
where: { userId },
|
||||
orderBy: { createdAt: "desc" },
|
||||
});
|
||||
|
||||
return invoices.map(mapInvoice);
|
||||
},
|
||||
|
||||
async getCurrentSubscription(userId: string): Promise<SubscriptionBillingRecord | null> {
|
||||
const subscription = await database.subscription.findFirst({
|
||||
where: { userId },
|
||||
include: { plan: true },
|
||||
orderBy: [{ currentPeriodEnd: "desc" }, { createdAt: "desc" }],
|
||||
});
|
||||
|
||||
return subscription ? mapSubscription(subscription) : null;
|
||||
},
|
||||
|
||||
async createSubscriptionInvoice(input: {
|
||||
userId: string;
|
||||
paymentProvider: string;
|
||||
paymentProviderAdapter: PaymentProviderAdapter;
|
||||
}): Promise<BillingInvoiceRecord> {
|
||||
const subscription = await database.subscription.findFirst({
|
||||
where: { userId: input.userId },
|
||||
include: { plan: true },
|
||||
orderBy: [{ currentPeriodEnd: "desc" }, { createdAt: "desc" }],
|
||||
});
|
||||
|
||||
if (!subscription) {
|
||||
throw new Error("Subscription not found.");
|
||||
}
|
||||
|
||||
const existingPending = await database.paymentInvoice.findFirst({
|
||||
where: {
|
||||
userId: input.userId,
|
||||
subscriptionId: subscription.id,
|
||||
status: "pending",
|
||||
expiresAt: {
|
||||
gt: new Date(),
|
||||
},
|
||||
},
|
||||
orderBy: { createdAt: "desc" },
|
||||
});
|
||||
|
||||
if (existingPending) {
|
||||
return mapInvoice(existingPending);
|
||||
}
|
||||
|
||||
const amountUsd = subscription.plan.monthlyPriceUsd.toNumber();
|
||||
const currency = subscription.plan.billingCurrency;
|
||||
const amountCrypto = amountUsd;
|
||||
const providerInvoice = await input.paymentProviderAdapter.createInvoice({
|
||||
userId: input.userId,
|
||||
planCode: subscription.plan.code,
|
||||
amountUsd,
|
||||
amountCrypto,
|
||||
currency,
|
||||
});
|
||||
|
||||
const invoice = await database.paymentInvoice.create({
|
||||
data: {
|
||||
userId: input.userId,
|
||||
subscriptionId: subscription.id,
|
||||
provider: input.paymentProvider,
|
||||
providerInvoiceId: providerInvoice.providerInvoiceId,
|
||||
status: "pending",
|
||||
currency: providerInvoice.currency,
|
||||
amountCrypto: new Prisma.Decimal(providerInvoice.amountCrypto),
|
||||
amountUsd: new Prisma.Decimal(providerInvoice.amountUsd),
|
||||
paymentAddress: providerInvoice.paymentAddress,
|
||||
expiresAt: providerInvoice.expiresAt,
|
||||
},
|
||||
});
|
||||
|
||||
return mapInvoice(invoice);
|
||||
},
|
||||
|
||||
async markInvoicePaid(input: {
|
||||
invoiceId: string;
|
||||
}): Promise<BillingInvoiceRecord> {
|
||||
return database.$transaction(async (transaction) => {
|
||||
const invoice = await transaction.paymentInvoice.findUnique({
|
||||
where: { id: input.invoiceId },
|
||||
include: {
|
||||
subscription: {
|
||||
include: {
|
||||
plan: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
if (!invoice) {
|
||||
throw new Error("Invoice not found.");
|
||||
}
|
||||
|
||||
const paidAt = invoice.paidAt ?? new Date();
|
||||
const updatedInvoice =
|
||||
invoice.status === "paid"
|
||||
? invoice
|
||||
: await transaction.paymentInvoice.update({
|
||||
where: { id: invoice.id },
|
||||
data: {
|
||||
status: "paid",
|
||||
paidAt,
|
||||
},
|
||||
});
|
||||
|
||||
if (invoice.subscription) {
|
||||
const periodStart = paidAt;
|
||||
const periodEnd = addDays(periodStart, 30);
|
||||
|
||||
await transaction.subscription.update({
|
||||
where: { id: invoice.subscription.id },
|
||||
data: {
|
||||
status: "active",
|
||||
activatedAt: invoice.subscription.activatedAt ?? paidAt,
|
||||
currentPeriodStart: periodStart,
|
||||
currentPeriodEnd: periodEnd,
|
||||
canceledAt: null,
|
||||
},
|
||||
});
|
||||
|
||||
await transaction.usageLedgerEntry.create({
|
||||
data: {
|
||||
userId: invoice.userId,
|
||||
entryType: "cycle_reset",
|
||||
deltaRequests: 0,
|
||||
cycleStartedAt: periodStart,
|
||||
cycleEndsAt: periodEnd,
|
||||
note: `Cycle activated from invoice ${invoice.id}.`,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
return mapInvoice(updatedInvoice);
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function mapInvoice(invoice: {
|
||||
id: string;
|
||||
subscriptionId: string | null;
|
||||
provider: string;
|
||||
providerInvoiceId: string | null;
|
||||
status: PaymentInvoiceStatus;
|
||||
currency: string;
|
||||
amountCrypto: Prisma.Decimal;
|
||||
amountUsd: Prisma.Decimal | null;
|
||||
paymentAddress: string | null;
|
||||
expiresAt: Date | null;
|
||||
paidAt: Date | null;
|
||||
createdAt: Date;
|
||||
updatedAt: Date;
|
||||
}): BillingInvoiceRecord {
|
||||
return {
|
||||
id: invoice.id,
|
||||
provider: invoice.provider,
|
||||
status: invoice.status,
|
||||
currency: invoice.currency,
|
||||
amountCrypto: invoice.amountCrypto.toNumber(),
|
||||
createdAt: invoice.createdAt,
|
||||
updatedAt: invoice.updatedAt,
|
||||
...(invoice.subscriptionId ? { subscriptionId: invoice.subscriptionId } : {}),
|
||||
...(invoice.providerInvoiceId ? { providerInvoiceId: invoice.providerInvoiceId } : {}),
|
||||
...(invoice.amountUsd !== null ? { amountUsd: invoice.amountUsd.toNumber() } : {}),
|
||||
...(invoice.paymentAddress ? { paymentAddress: invoice.paymentAddress } : {}),
|
||||
...(invoice.expiresAt ? { expiresAt: invoice.expiresAt } : {}),
|
||||
...(invoice.paidAt ? { paidAt: invoice.paidAt } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function mapSubscription(subscription: {
|
||||
id: string;
|
||||
status: SubscriptionStatus;
|
||||
renewsManually: boolean;
|
||||
activatedAt: Date | null;
|
||||
currentPeriodStart: Date | null;
|
||||
currentPeriodEnd: Date | null;
|
||||
canceledAt: Date | null;
|
||||
plan: {
|
||||
id: string;
|
||||
code: string;
|
||||
displayName: string;
|
||||
monthlyRequestLimit: number;
|
||||
monthlyPriceUsd: Prisma.Decimal;
|
||||
billingCurrency: string;
|
||||
};
|
||||
}): SubscriptionBillingRecord {
|
||||
return {
|
||||
id: subscription.id,
|
||||
status: subscription.status,
|
||||
renewsManually: subscription.renewsManually,
|
||||
plan: {
|
||||
id: subscription.plan.id,
|
||||
code: subscription.plan.code,
|
||||
displayName: subscription.plan.displayName,
|
||||
monthlyRequestLimit: subscription.plan.monthlyRequestLimit,
|
||||
monthlyPriceUsd: subscription.plan.monthlyPriceUsd.toNumber(),
|
||||
billingCurrency: subscription.plan.billingCurrency,
|
||||
},
|
||||
...(subscription.activatedAt ? { activatedAt: subscription.activatedAt } : {}),
|
||||
...(subscription.currentPeriodStart ? { currentPeriodStart: subscription.currentPeriodStart } : {}),
|
||||
...(subscription.currentPeriodEnd ? { currentPeriodEnd: subscription.currentPeriodEnd } : {}),
|
||||
...(subscription.canceledAt ? { canceledAt: subscription.canceledAt } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function addDays(value: Date, days: number): Date {
|
||||
return new Date(value.getTime() + days * 24 * 60 * 60 * 1000);
|
||||
}
|
||||
16
packages/db/src/bootstrap-main.ts
Normal file
16
packages/db/src/bootstrap-main.ts
Normal file
@@ -0,0 +1,16 @@
|
||||
import { ensureDefaultSubscriptionPlan } from "./bootstrap.js";
|
||||
import { prisma } from "./prisma-client.js";
|
||||
|
||||
async function main(): Promise<void> {
|
||||
await ensureDefaultSubscriptionPlan(prisma);
|
||||
console.log("default subscription plan ensured");
|
||||
}
|
||||
|
||||
main()
|
||||
.catch((error) => {
|
||||
console.error("failed to ensure default subscription plan", error);
|
||||
process.exitCode = 1;
|
||||
})
|
||||
.finally(async () => {
|
||||
await prisma.$disconnect();
|
||||
});
|
||||
50
packages/db/src/bootstrap.ts
Normal file
50
packages/db/src/bootstrap.ts
Normal file
@@ -0,0 +1,50 @@
|
||||
import { Prisma, type PrismaClient } from "@prisma/client";
|
||||
import { prisma as defaultPrisma } from "./prisma-client.js";
|
||||
|
||||
export interface SubscriptionPlanSeedInput {
|
||||
code: string;
|
||||
displayName: string;
|
||||
monthlyRequestLimit: number;
|
||||
monthlyPriceUsd: number;
|
||||
billingCurrency: string;
|
||||
}
|
||||
|
||||
export const defaultSubscriptionPlanSeed: SubscriptionPlanSeedInput = {
|
||||
code: "mvp_monthly",
|
||||
displayName: "MVP Monthly",
|
||||
monthlyRequestLimit: 100,
|
||||
monthlyPriceUsd: 9.99,
|
||||
billingCurrency: "USDT",
|
||||
};
|
||||
|
||||
export async function ensureSubscriptionPlan(
|
||||
input: SubscriptionPlanSeedInput,
|
||||
database: PrismaClient = defaultPrisma,
|
||||
): Promise<void> {
|
||||
await database.subscriptionPlan.upsert({
|
||||
where: {
|
||||
code: input.code,
|
||||
},
|
||||
update: {
|
||||
displayName: input.displayName,
|
||||
monthlyRequestLimit: input.monthlyRequestLimit,
|
||||
monthlyPriceUsd: new Prisma.Decimal(input.monthlyPriceUsd),
|
||||
billingCurrency: input.billingCurrency,
|
||||
isActive: true,
|
||||
},
|
||||
create: {
|
||||
code: input.code,
|
||||
displayName: input.displayName,
|
||||
monthlyRequestLimit: input.monthlyRequestLimit,
|
||||
monthlyPriceUsd: new Prisma.Decimal(input.monthlyPriceUsd),
|
||||
billingCurrency: input.billingCurrency,
|
||||
isActive: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function ensureDefaultSubscriptionPlan(
|
||||
database: PrismaClient = defaultPrisma,
|
||||
): Promise<void> {
|
||||
await ensureSubscriptionPlan(defaultSubscriptionPlanSeed, database);
|
||||
}
|
||||
211
packages/db/src/generation-store.ts
Normal file
211
packages/db/src/generation-store.ts
Normal file
@@ -0,0 +1,211 @@
|
||||
import {
|
||||
type ActiveSubscriptionContext,
|
||||
type CreateGenerationRequestInput,
|
||||
type CreateGenerationRequestDeps,
|
||||
type GenerationRequestRecord,
|
||||
type MarkGenerationSucceededDeps,
|
||||
type SuccessfulGenerationRecord,
|
||||
} from "@nproxy/domain";
|
||||
import { Prisma, type PrismaClient } from "@prisma/client";
|
||||
import { prisma as defaultPrisma } from "./prisma-client.js";
|
||||
|
||||
export interface GenerationStore
|
||||
extends CreateGenerationRequestDeps,
|
||||
MarkGenerationSucceededDeps {}
|
||||
|
||||
export function createPrismaGenerationStore(
|
||||
database: PrismaClient = defaultPrisma,
|
||||
): GenerationStore {
|
||||
return {
|
||||
async findReusableRequest(userId: string, idempotencyKey: string) {
|
||||
const request = await database.generationRequest.findFirst({
|
||||
where: {
|
||||
userId,
|
||||
idempotencyKey,
|
||||
},
|
||||
});
|
||||
|
||||
return request ? mapGenerationRequest(request) : null;
|
||||
},
|
||||
|
||||
async findActiveSubscriptionContext(
|
||||
userId: string,
|
||||
): Promise<ActiveSubscriptionContext | null> {
|
||||
const subscription = await database.subscription.findFirst({
|
||||
where: {
|
||||
userId,
|
||||
status: "active",
|
||||
},
|
||||
include: {
|
||||
plan: true,
|
||||
},
|
||||
orderBy: [
|
||||
{ currentPeriodEnd: "desc" },
|
||||
{ createdAt: "desc" },
|
||||
],
|
||||
});
|
||||
|
||||
if (!subscription) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const cycleStart =
|
||||
subscription.currentPeriodStart ?? subscription.activatedAt ?? subscription.createdAt;
|
||||
|
||||
const usageAggregation = await database.usageLedgerEntry.aggregate({
|
||||
where: {
|
||||
userId,
|
||||
entryType: "generation_success",
|
||||
createdAt: { gte: cycleStart },
|
||||
},
|
||||
_sum: {
|
||||
deltaRequests: true,
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
subscriptionId: subscription.id,
|
||||
planId: subscription.planId,
|
||||
monthlyRequestLimit: subscription.plan.monthlyRequestLimit,
|
||||
usedSuccessfulRequests: usageAggregation._sum.deltaRequests ?? 0,
|
||||
};
|
||||
},
|
||||
|
||||
async createGenerationRequest(
|
||||
input: CreateGenerationRequestInput,
|
||||
): Promise<GenerationRequestRecord> {
|
||||
const request = await database.generationRequest.create({
|
||||
data: {
|
||||
userId: input.userId,
|
||||
mode: input.mode,
|
||||
providerModel: input.providerModel,
|
||||
prompt: input.prompt.trim(),
|
||||
resolutionPreset: input.resolutionPreset,
|
||||
batchSize: input.batchSize,
|
||||
...(input.sourceImageKey !== undefined
|
||||
? { sourceImageKey: input.sourceImageKey }
|
||||
: {}),
|
||||
...(input.imageStrength !== undefined
|
||||
? { imageStrength: new Prisma.Decimal(input.imageStrength) }
|
||||
: {}),
|
||||
...(input.idempotencyKey !== undefined
|
||||
? { idempotencyKey: input.idempotencyKey }
|
||||
: {}),
|
||||
},
|
||||
});
|
||||
|
||||
return mapGenerationRequest(request);
|
||||
},
|
||||
|
||||
async getGenerationRequest(requestId: string): Promise<GenerationRequestRecord | null> {
|
||||
const request = await database.generationRequest.findUnique({
|
||||
where: {
|
||||
id: requestId,
|
||||
},
|
||||
});
|
||||
|
||||
return request ? mapGenerationRequest(request) : null;
|
||||
},
|
||||
|
||||
async markGenerationSucceeded(requestId: string): Promise<SuccessfulGenerationRecord> {
|
||||
return database.$transaction(async (transaction) => {
|
||||
const request = await transaction.generationRequest.findUnique({
|
||||
where: {
|
||||
id: requestId,
|
||||
},
|
||||
include: {
|
||||
usageLedgerEntry: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (!request) {
|
||||
throw new Error(`Generation request ${requestId} was not found.`);
|
||||
}
|
||||
|
||||
const completedAt = request.completedAt ?? new Date();
|
||||
const nextStatus =
|
||||
request.status === "succeeded" ? request.status : "succeeded";
|
||||
|
||||
const updatedRequest =
|
||||
request.status === "succeeded" && request.completedAt
|
||||
? request
|
||||
: await transaction.generationRequest.update({
|
||||
where: {
|
||||
id: requestId,
|
||||
},
|
||||
data: {
|
||||
status: nextStatus,
|
||||
completedAt,
|
||||
},
|
||||
});
|
||||
|
||||
if (!request.usageLedgerEntry) {
|
||||
await transaction.usageLedgerEntry.create({
|
||||
data: {
|
||||
userId: request.userId,
|
||||
generationRequestId: request.id,
|
||||
entryType: "generation_success",
|
||||
deltaRequests: 1,
|
||||
note: "Consumed after first successful generation result.",
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
request: mapGenerationRequest(updatedRequest),
|
||||
quotaConsumed: !request.usageLedgerEntry,
|
||||
};
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function mapGenerationRequest(
|
||||
request: {
|
||||
id: string;
|
||||
userId: string;
|
||||
mode: string;
|
||||
status: string;
|
||||
providerModel: string;
|
||||
prompt: string;
|
||||
sourceImageKey: string | null;
|
||||
resolutionPreset: string;
|
||||
batchSize: number;
|
||||
imageStrength: Prisma.Decimal | null;
|
||||
idempotencyKey: string | null;
|
||||
terminalErrorCode: string | null;
|
||||
terminalErrorText: string | null;
|
||||
requestedAt: Date;
|
||||
startedAt: Date | null;
|
||||
completedAt: Date | null;
|
||||
createdAt: Date;
|
||||
updatedAt: Date;
|
||||
},
|
||||
): GenerationRequestRecord {
|
||||
return {
|
||||
id: request.id,
|
||||
userId: request.userId,
|
||||
mode: request.mode as GenerationRequestRecord["mode"],
|
||||
status: request.status as GenerationRequestRecord["status"],
|
||||
providerModel: request.providerModel,
|
||||
prompt: request.prompt,
|
||||
resolutionPreset: request.resolutionPreset,
|
||||
batchSize: request.batchSize,
|
||||
requestedAt: request.requestedAt,
|
||||
createdAt: request.createdAt,
|
||||
updatedAt: request.updatedAt,
|
||||
...(request.sourceImageKey !== null ? { sourceImageKey: request.sourceImageKey } : {}),
|
||||
...(request.imageStrength !== null
|
||||
? { imageStrength: request.imageStrength.toNumber() }
|
||||
: {}),
|
||||
...(request.idempotencyKey !== null ? { idempotencyKey: request.idempotencyKey } : {}),
|
||||
...(request.terminalErrorCode !== null
|
||||
? { terminalErrorCode: request.terminalErrorCode }
|
||||
: {}),
|
||||
...(request.terminalErrorText !== null
|
||||
? { terminalErrorText: request.terminalErrorText }
|
||||
: {}),
|
||||
...(request.startedAt !== null ? { startedAt: request.startedAt } : {}),
|
||||
...(request.completedAt !== null ? { completedAt: request.completedAt } : {}),
|
||||
};
|
||||
}
|
||||
10
packages/db/src/index.ts
Normal file
10
packages/db/src/index.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
export { prisma } from "./prisma-client.js";
|
||||
export { prismaSchemaPath } from "./schema-path.js";
|
||||
export * from "./account-store.js";
|
||||
export * from "./auth-store.js";
|
||||
export * from "./billing-store.js";
|
||||
export * from "./bootstrap.js";
|
||||
export * from "./generation-store.js";
|
||||
export * from "./telegram-bot-store.js";
|
||||
export * from "./telegram-pairing-store.js";
|
||||
export * from "./worker-store.js";
|
||||
11
packages/db/src/prisma-client.ts
Normal file
11
packages/db/src/prisma-client.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import { PrismaClient } from "@prisma/client";
|
||||
|
||||
const globalForPrisma = globalThis as {
|
||||
prisma?: PrismaClient;
|
||||
};
|
||||
|
||||
export const prisma = globalForPrisma.prisma ?? new PrismaClient();
|
||||
|
||||
if (process.env.NODE_ENV !== "production") {
|
||||
globalForPrisma.prisma = prisma;
|
||||
}
|
||||
1
packages/db/src/schema-path.ts
Normal file
1
packages/db/src/schema-path.ts
Normal file
@@ -0,0 +1 @@
|
||||
export const prismaSchemaPath = new URL("../prisma/schema.prisma", import.meta.url);
|
||||
106
packages/db/src/telegram-bot-store.ts
Normal file
106
packages/db/src/telegram-bot-store.ts
Normal file
@@ -0,0 +1,106 @@
|
||||
import { randomBytes } from "node:crypto";
|
||||
import { hashPairingCode, isPairingExpired } from "@nproxy/domain";
|
||||
import type { PrismaClient } from "@prisma/client";
|
||||
import { prisma as defaultPrisma } from "./prisma-client.js";
|
||||
|
||||
export interface TelegramUserSnapshot {
|
||||
telegramUserId: string;
|
||||
telegramUsername?: string;
|
||||
displayNameSnapshot: string;
|
||||
}
|
||||
|
||||
export interface PendingPairingChallenge {
|
||||
pairingId: string;
|
||||
code: string;
|
||||
expiresAt: Date;
|
||||
}
|
||||
|
||||
export function createPrismaTelegramBotStore(database: PrismaClient = defaultPrisma) {
|
||||
return {
|
||||
async isTelegramAdminAllowed(telegramUserId: string): Promise<boolean> {
|
||||
const entry = await database.telegramAdminAllowlistEntry.findUnique({
|
||||
where: {
|
||||
telegramUserId,
|
||||
},
|
||||
});
|
||||
|
||||
return Boolean(entry?.isActive);
|
||||
},
|
||||
|
||||
async getOrCreatePendingPairingChallenge(
|
||||
user: TelegramUserSnapshot,
|
||||
expiresInMinutes: number,
|
||||
): Promise<PendingPairingChallenge> {
|
||||
const now = new Date();
|
||||
const existing = await database.telegramPairing.findFirst({
|
||||
where: {
|
||||
telegramUserId: user.telegramUserId,
|
||||
status: "pending",
|
||||
},
|
||||
orderBy: {
|
||||
createdAt: "desc",
|
||||
},
|
||||
});
|
||||
|
||||
if (existing && !isPairingExpired(existing.expiresAt, now)) {
|
||||
await database.telegramPairing.update({
|
||||
where: {
|
||||
id: existing.id,
|
||||
},
|
||||
data: {
|
||||
status: "revoked",
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (existing && isPairingExpired(existing.expiresAt, now)) {
|
||||
await database.telegramPairing.update({
|
||||
where: {
|
||||
id: existing.id,
|
||||
},
|
||||
data: {
|
||||
status: "expired",
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
const code = generatePairingCode();
|
||||
const expiresAt = new Date(now.getTime() + expiresInMinutes * 60 * 1000);
|
||||
const pairing = await database.telegramPairing.create({
|
||||
data: {
|
||||
telegramUserId: user.telegramUserId,
|
||||
...(user.telegramUsername ? { telegramUsername: user.telegramUsername } : {}),
|
||||
displayNameSnapshot: user.displayNameSnapshot,
|
||||
codeHash: hashPairingCode(code),
|
||||
expiresAt,
|
||||
status: "pending",
|
||||
},
|
||||
});
|
||||
|
||||
await database.adminAuditLog.create({
|
||||
data: {
|
||||
actorType: "system",
|
||||
action: "telegram_pair_pending_created",
|
||||
targetType: "telegram_pairing",
|
||||
targetId: pairing.id,
|
||||
metadata: {
|
||||
telegramUserId: user.telegramUserId,
|
||||
telegramUsername: user.telegramUsername ?? null,
|
||||
displayNameSnapshot: user.displayNameSnapshot,
|
||||
expiresAt: expiresAt.toISOString(),
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
pairingId: pairing.id,
|
||||
code,
|
||||
expiresAt,
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function generatePairingCode(): string {
|
||||
return randomBytes(4).toString("hex").toUpperCase();
|
||||
}
|
||||
291
packages/db/src/telegram-pairing-store.ts
Normal file
291
packages/db/src/telegram-pairing-store.ts
Normal file
@@ -0,0 +1,291 @@
|
||||
import { isPairingExpired } from "@nproxy/domain";
|
||||
import type { PrismaClient, TelegramPairingStatus } from "@prisma/client";
|
||||
import { prisma as defaultPrisma } from "./prisma-client.js";
|
||||
|
||||
export interface PendingTelegramPairingRecord {
|
||||
id: string;
|
||||
telegramUserId: string;
|
||||
telegramUsername?: string;
|
||||
displayNameSnapshot: string;
|
||||
codeHash: string;
|
||||
expiresAt: Date;
|
||||
status: TelegramPairingStatus;
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
export interface ActiveTelegramAdminRecord {
|
||||
telegramUserId: string;
|
||||
telegramUsername?: string;
|
||||
displayNameSnapshot: string;
|
||||
pairedAt: Date;
|
||||
}
|
||||
|
||||
export function createPrismaTelegramPairingStore(database: PrismaClient = defaultPrisma) {
|
||||
return {
|
||||
async findPendingPairingByCodeHash(
|
||||
codeHash: string,
|
||||
): Promise<PendingTelegramPairingRecord | null> {
|
||||
const record = await database.telegramPairing.findFirst({
|
||||
where: {
|
||||
codeHash,
|
||||
status: "pending",
|
||||
},
|
||||
orderBy: {
|
||||
createdAt: "desc",
|
||||
},
|
||||
});
|
||||
|
||||
if (!record) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return mapPendingPairingRecord(record);
|
||||
},
|
||||
|
||||
async listTelegramPairings(): Promise<{
|
||||
pending: PendingTelegramPairingRecord[];
|
||||
activeAdmins: ActiveTelegramAdminRecord[];
|
||||
}> {
|
||||
const [pending, activeAdmins] = await Promise.all([
|
||||
database.telegramPairing.findMany({
|
||||
where: {
|
||||
status: "pending",
|
||||
},
|
||||
orderBy: {
|
||||
createdAt: "desc",
|
||||
},
|
||||
}),
|
||||
database.telegramAdminAllowlistEntry.findMany({
|
||||
where: {
|
||||
isActive: true,
|
||||
},
|
||||
orderBy: {
|
||||
pairedAt: "desc",
|
||||
},
|
||||
}),
|
||||
]);
|
||||
|
||||
return {
|
||||
pending: pending.map(mapPendingPairingRecord),
|
||||
activeAdmins: activeAdmins.map((entry) => ({
|
||||
telegramUserId: entry.telegramUserId,
|
||||
...(entry.telegramUsername ? { telegramUsername: entry.telegramUsername } : {}),
|
||||
displayNameSnapshot: entry.displayNameSnapshot,
|
||||
pairedAt: entry.pairedAt,
|
||||
})),
|
||||
};
|
||||
},
|
||||
|
||||
async completePendingPairing(input: {
|
||||
pairingId: string;
|
||||
actorRef?: string;
|
||||
}): Promise<ActiveTelegramAdminRecord> {
|
||||
return database.$transaction(async (transaction) => {
|
||||
const pairing = await transaction.telegramPairing.findUnique({
|
||||
where: {
|
||||
id: input.pairingId,
|
||||
},
|
||||
});
|
||||
|
||||
if (!pairing || pairing.status !== "pending") {
|
||||
throw new Error("Pending pairing not found.");
|
||||
}
|
||||
|
||||
if (isPairingExpired(pairing.expiresAt)) {
|
||||
await transaction.telegramPairing.update({
|
||||
where: {
|
||||
id: pairing.id,
|
||||
},
|
||||
data: {
|
||||
status: "expired",
|
||||
},
|
||||
});
|
||||
|
||||
throw new Error("Pairing code has expired.");
|
||||
}
|
||||
|
||||
const allowlistEntry = await transaction.telegramAdminAllowlistEntry.upsert({
|
||||
where: {
|
||||
telegramUserId: pairing.telegramUserId,
|
||||
},
|
||||
update: {
|
||||
telegramUsername: pairing.telegramUsername,
|
||||
displayNameSnapshot: pairing.displayNameSnapshot,
|
||||
pairedAt: new Date(),
|
||||
revokedAt: null,
|
||||
isActive: true,
|
||||
},
|
||||
create: {
|
||||
telegramUserId: pairing.telegramUserId,
|
||||
telegramUsername: pairing.telegramUsername,
|
||||
displayNameSnapshot: pairing.displayNameSnapshot,
|
||||
isActive: true,
|
||||
},
|
||||
});
|
||||
|
||||
await transaction.telegramPairing.update({
|
||||
where: {
|
||||
id: pairing.id,
|
||||
},
|
||||
data: {
|
||||
status: "completed",
|
||||
completedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
await transaction.adminAuditLog.create({
|
||||
data: {
|
||||
actorType: "cli_operator",
|
||||
...(input.actorRef ? { actorRef: input.actorRef } : {}),
|
||||
action: "telegram_pair_complete",
|
||||
targetType: "telegram_admin_allowlist_entry",
|
||||
targetId: allowlistEntry.telegramUserId,
|
||||
metadata: {
|
||||
pairingId: pairing.id,
|
||||
telegramUsername: pairing.telegramUsername,
|
||||
displayNameSnapshot: pairing.displayNameSnapshot,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
telegramUserId: allowlistEntry.telegramUserId,
|
||||
...(allowlistEntry.telegramUsername
|
||||
? { telegramUsername: allowlistEntry.telegramUsername }
|
||||
: {}),
|
||||
displayNameSnapshot: allowlistEntry.displayNameSnapshot,
|
||||
pairedAt: allowlistEntry.pairedAt,
|
||||
};
|
||||
});
|
||||
},
|
||||
|
||||
async revokeTelegramAdmin(input: {
|
||||
telegramUserId: string;
|
||||
actorRef?: string;
|
||||
}): Promise<ActiveTelegramAdminRecord | null> {
|
||||
return database.$transaction(async (transaction) => {
|
||||
const entry = await transaction.telegramAdminAllowlistEntry.findUnique({
|
||||
where: {
|
||||
telegramUserId: input.telegramUserId,
|
||||
},
|
||||
});
|
||||
|
||||
if (!entry || !entry.isActive) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const revokedAt = new Date();
|
||||
const updated = await transaction.telegramAdminAllowlistEntry.update({
|
||||
where: {
|
||||
telegramUserId: input.telegramUserId,
|
||||
},
|
||||
data: {
|
||||
isActive: false,
|
||||
revokedAt,
|
||||
},
|
||||
});
|
||||
|
||||
await transaction.telegramPairing.updateMany({
|
||||
where: {
|
||||
telegramUserId: input.telegramUserId,
|
||||
status: "pending",
|
||||
},
|
||||
data: {
|
||||
status: "revoked",
|
||||
},
|
||||
});
|
||||
|
||||
await transaction.adminAuditLog.create({
|
||||
data: {
|
||||
actorType: "cli_operator",
|
||||
...(input.actorRef ? { actorRef: input.actorRef } : {}),
|
||||
action: "telegram_pair_revoke",
|
||||
targetType: "telegram_admin_allowlist_entry",
|
||||
targetId: updated.telegramUserId,
|
||||
metadata: {
|
||||
telegramUsername: updated.telegramUsername,
|
||||
displayNameSnapshot: updated.displayNameSnapshot,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
telegramUserId: updated.telegramUserId,
|
||||
...(updated.telegramUsername ? { telegramUsername: updated.telegramUsername } : {}),
|
||||
displayNameSnapshot: updated.displayNameSnapshot,
|
||||
pairedAt: updated.pairedAt,
|
||||
};
|
||||
});
|
||||
},
|
||||
|
||||
async cleanupExpiredPendingPairings(input?: {
|
||||
actorRef?: string;
|
||||
now?: Date;
|
||||
}): Promise<number> {
|
||||
const now = input?.now ?? new Date();
|
||||
const expired = await database.telegramPairing.findMany({
|
||||
where: {
|
||||
status: "pending",
|
||||
expiresAt: {
|
||||
lte: now,
|
||||
},
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (expired.length === 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
await database.$transaction([
|
||||
database.telegramPairing.updateMany({
|
||||
where: {
|
||||
id: {
|
||||
in: expired.map((item) => item.id),
|
||||
},
|
||||
},
|
||||
data: {
|
||||
status: "expired",
|
||||
},
|
||||
}),
|
||||
database.adminAuditLog.create({
|
||||
data: {
|
||||
actorType: "cli_operator",
|
||||
...(input?.actorRef ? { actorRef: input.actorRef } : {}),
|
||||
action: "telegram_pair_cleanup",
|
||||
targetType: "telegram_pairing",
|
||||
metadata: {
|
||||
expiredCount: expired.length,
|
||||
},
|
||||
},
|
||||
}),
|
||||
]);
|
||||
|
||||
return expired.length;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function mapPendingPairingRecord(record: {
|
||||
id: string;
|
||||
telegramUserId: string;
|
||||
telegramUsername: string | null;
|
||||
displayNameSnapshot: string;
|
||||
codeHash: string;
|
||||
expiresAt: Date;
|
||||
status: TelegramPairingStatus;
|
||||
createdAt: Date;
|
||||
}): PendingTelegramPairingRecord {
|
||||
return {
|
||||
id: record.id,
|
||||
telegramUserId: record.telegramUserId,
|
||||
...(record.telegramUsername ? { telegramUsername: record.telegramUsername } : {}),
|
||||
displayNameSnapshot: record.displayNameSnapshot,
|
||||
codeHash: record.codeHash,
|
||||
expiresAt: record.expiresAt,
|
||||
status: record.status,
|
||||
createdAt: record.createdAt,
|
||||
};
|
||||
}
|
||||
502
packages/db/src/worker-store.ts
Normal file
502
packages/db/src/worker-store.ts
Normal file
@@ -0,0 +1,502 @@
|
||||
import {
|
||||
buildAttemptPlan,
|
||||
evaluateAttempt,
|
||||
markGenerationRequestSucceeded,
|
||||
type GenerationRequestRecord,
|
||||
type ProviderFailureKind,
|
||||
type ProviderKeySnapshot,
|
||||
} from "@nproxy/domain";
|
||||
import type { AdminActorType, PrismaClient, ProviderKeyState } from "@prisma/client";
|
||||
import { prisma as defaultPrisma } from "./prisma-client.js";
|
||||
import type { GeneratedAssetPayload, ProviderExecutionResult } from "@nproxy/providers";
|
||||
import { createPrismaGenerationStore } from "./generation-store.js";
|
||||
|
||||
export interface WorkerGenerationRequest extends GenerationRequestRecord {}
|
||||
|
||||
export interface WorkerProviderKey extends ProviderKeySnapshot {
|
||||
providerCode: string;
|
||||
label: string;
|
||||
apiKeyLastFour: string;
|
||||
roundRobinOrder: number;
|
||||
proxyBaseUrl?: string;
|
||||
proxyLabel?: string;
|
||||
}
|
||||
|
||||
export interface ClaimedGenerationJob {
|
||||
request: WorkerGenerationRequest;
|
||||
providerKeys: WorkerProviderKey[];
|
||||
lastUsedKeyId?: string;
|
||||
}
|
||||
|
||||
export interface ProcessGenerationJobResult {
|
||||
requestId: string;
|
||||
finalStatus: "succeeded" | "failed";
|
||||
attemptsCreated: number;
|
||||
consumedQuota: boolean;
|
||||
}
|
||||
|
||||
export interface RecoverCooldownKeysResult {
|
||||
recoveredCount: number;
|
||||
}
|
||||
|
||||
export type WorkerKeyExecutionResult = ProviderExecutionResult & {
|
||||
usedProxy: boolean;
|
||||
directFallbackUsed: boolean;
|
||||
};
|
||||
|
||||
export interface WorkerExecutionPolicy {
|
||||
cooldownMinutes: number;
|
||||
failuresBeforeManualReview: number;
|
||||
}
|
||||
|
||||
const defaultWorkerExecutionPolicy: WorkerExecutionPolicy = {
|
||||
cooldownMinutes: 5,
|
||||
failuresBeforeManualReview: 10,
|
||||
};
|
||||
|
||||
export function createPrismaWorkerStore(
|
||||
database: PrismaClient = defaultPrisma,
|
||||
policy: WorkerExecutionPolicy = defaultWorkerExecutionPolicy,
|
||||
) {
|
||||
const generationStore = createPrismaGenerationStore(database);
|
||||
|
||||
return {
|
||||
async recoverCooldownProviderKeys(now: Date = new Date()): Promise<RecoverCooldownKeysResult> {
|
||||
const eligibleKeys = await database.providerKey.findMany({
|
||||
where: {
|
||||
state: "cooldown",
|
||||
cooldownUntil: {
|
||||
lte: now,
|
||||
},
|
||||
},
|
||||
include: {
|
||||
proxy: true,
|
||||
},
|
||||
orderBy: {
|
||||
cooldownUntil: "asc",
|
||||
},
|
||||
});
|
||||
|
||||
for (const providerKey of eligibleKeys) {
|
||||
await updateProviderKeyState(database, {
|
||||
providerKey: mapWorkerProviderKey(providerKey),
|
||||
toState: "active",
|
||||
reason: "recovered",
|
||||
nextConsecutiveRetryableFailures: providerKey.consecutiveRetryableFailures,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
recoveredCount: eligibleKeys.length,
|
||||
};
|
||||
},
|
||||
|
||||
async claimNextQueuedGenerationJob(): Promise<ClaimedGenerationJob | null> {
|
||||
return database.$transaction(async (transaction) => {
|
||||
const queuedRequest = await transaction.generationRequest.findFirst({
|
||||
where: {
|
||||
status: "queued",
|
||||
},
|
||||
orderBy: {
|
||||
requestedAt: "asc",
|
||||
},
|
||||
});
|
||||
|
||||
if (!queuedRequest) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const claimResult = await transaction.generationRequest.updateMany({
|
||||
where: {
|
||||
id: queuedRequest.id,
|
||||
status: "queued",
|
||||
},
|
||||
data: {
|
||||
status: "running",
|
||||
startedAt: queuedRequest.startedAt ?? new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
if (claimResult.count === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const request = await transaction.generationRequest.findUnique({
|
||||
where: {
|
||||
id: queuedRequest.id,
|
||||
},
|
||||
});
|
||||
|
||||
if (!request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const providerKeys = await transaction.providerKey.findMany({
|
||||
where: {
|
||||
providerCode: request.providerModel,
|
||||
},
|
||||
include: {
|
||||
proxy: true,
|
||||
},
|
||||
orderBy: {
|
||||
roundRobinOrder: "asc",
|
||||
},
|
||||
});
|
||||
|
||||
const lastAttempt = await transaction.generationAttempt.findFirst({
|
||||
where: {
|
||||
providerKey: {
|
||||
providerCode: request.providerModel,
|
||||
},
|
||||
},
|
||||
orderBy: {
|
||||
startedAt: "desc",
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
request: mapGenerationRequest(request),
|
||||
providerKeys: providerKeys.map(mapWorkerProviderKey),
|
||||
...(lastAttempt ? { lastUsedKeyId: lastAttempt.providerKeyId } : {}),
|
||||
};
|
||||
});
|
||||
},
|
||||
|
||||
async processClaimedGenerationJob(
|
||||
job: ClaimedGenerationJob,
|
||||
executeWithKey: (
|
||||
request: WorkerGenerationRequest,
|
||||
providerKey: WorkerProviderKey,
|
||||
) => Promise<WorkerKeyExecutionResult>,
|
||||
): Promise<ProcessGenerationJobResult> {
|
||||
const attemptPlan = buildAttemptPlan({
|
||||
keys: job.providerKeys,
|
||||
...(job.lastUsedKeyId ? { lastUsedKeyId: job.lastUsedKeyId } : {}),
|
||||
});
|
||||
|
||||
if (attemptPlan.keyIdsInAttemptOrder.length === 0) {
|
||||
await markRequestFailed(
|
||||
database,
|
||||
job.request.id,
|
||||
"no_provider_keys",
|
||||
"No active provider keys are available for the configured model.",
|
||||
);
|
||||
|
||||
return {
|
||||
requestId: job.request.id,
|
||||
finalStatus: "failed",
|
||||
attemptsCreated: 0,
|
||||
consumedQuota: false,
|
||||
};
|
||||
}
|
||||
|
||||
let attemptsCreated = 0;
|
||||
|
||||
for (const providerKeyId of attemptPlan.keyIdsInAttemptOrder) {
|
||||
const providerKey = job.providerKeys.find((key) => key.id === providerKeyId);
|
||||
|
||||
if (!providerKey) {
|
||||
continue;
|
||||
}
|
||||
|
||||
attemptsCreated += 1;
|
||||
|
||||
const executionResult = await executeWithKey(job.request, providerKey);
|
||||
|
||||
const attempt = await database.generationAttempt.create({
|
||||
data: {
|
||||
generationRequestId: job.request.id,
|
||||
providerKeyId: providerKey.id,
|
||||
attemptIndex: attemptsCreated,
|
||||
status: executionResult.ok ? "succeeded" : "failed",
|
||||
usedProxy: executionResult.usedProxy,
|
||||
directFallbackUsed: executionResult.directFallbackUsed,
|
||||
...(executionResult.ok
|
||||
? {}
|
||||
: {
|
||||
failureCategory: mapFailureCategory(executionResult.failureKind),
|
||||
providerHttpStatus: executionResult.providerHttpStatus ?? null,
|
||||
providerErrorCode: executionResult.providerErrorCode ?? null,
|
||||
providerErrorText: executionResult.providerErrorText ?? null,
|
||||
}),
|
||||
completedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
if (executionResult.ok) {
|
||||
if (providerKey.state === "cooldown") {
|
||||
await updateProviderKeyState(database, {
|
||||
providerKey,
|
||||
toState: "active",
|
||||
reason: "recovered",
|
||||
nextConsecutiveRetryableFailures: 0,
|
||||
});
|
||||
} else if (providerKey.consecutiveRetryableFailures !== 0) {
|
||||
await database.providerKey.update({
|
||||
where: {
|
||||
id: providerKey.id,
|
||||
},
|
||||
data: {
|
||||
consecutiveRetryableFailures: 0,
|
||||
lastErrorCategory: null,
|
||||
lastErrorCode: null,
|
||||
lastErrorAt: null,
|
||||
cooldownUntil: null,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
await persistGeneratedAssets(database, job.request.id, executionResult.assets);
|
||||
const successRecord = await markGenerationRequestSucceeded(
|
||||
{
|
||||
getGenerationRequest: generationStore.getGenerationRequest,
|
||||
markGenerationSucceeded: generationStore.markGenerationSucceeded,
|
||||
},
|
||||
attempt.generationRequestId,
|
||||
);
|
||||
|
||||
return {
|
||||
requestId: job.request.id,
|
||||
finalStatus: "succeeded",
|
||||
attemptsCreated,
|
||||
consumedQuota: successRecord.quotaConsumed,
|
||||
};
|
||||
}
|
||||
|
||||
const evaluation = evaluateAttempt(providerKey, executionResult, {
|
||||
failuresBeforeManualReview: policy.failuresBeforeManualReview,
|
||||
});
|
||||
await updateProviderKeyState(database, {
|
||||
providerKey,
|
||||
toState: evaluation.transition.to,
|
||||
reason: evaluation.transition.reason,
|
||||
nextConsecutiveRetryableFailures: evaluation.nextConsecutiveRetryableFailures,
|
||||
failureKind: executionResult.failureKind,
|
||||
...(executionResult.providerErrorCode !== undefined
|
||||
? { errorCode: executionResult.providerErrorCode }
|
||||
: {}),
|
||||
cooldownMinutes: policy.cooldownMinutes,
|
||||
});
|
||||
|
||||
if (evaluation.retryDisposition === "stop_request") {
|
||||
await markRequestFailed(
|
||||
database,
|
||||
job.request.id,
|
||||
executionResult.providerErrorCode ?? "request_failed",
|
||||
executionResult.providerErrorText ?? "Generation failed.",
|
||||
);
|
||||
|
||||
return {
|
||||
requestId: job.request.id,
|
||||
finalStatus: "failed",
|
||||
attemptsCreated,
|
||||
consumedQuota: false,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
await markRequestFailed(
|
||||
database,
|
||||
job.request.id,
|
||||
"eligible_keys_exhausted",
|
||||
"All eligible provider keys were exhausted by retryable failures.",
|
||||
);
|
||||
|
||||
return {
|
||||
requestId: job.request.id,
|
||||
finalStatus: "failed",
|
||||
attemptsCreated,
|
||||
consumedQuota: false,
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async function persistGeneratedAssets(
|
||||
database: PrismaClient,
|
||||
generationRequestId: string,
|
||||
assets: GeneratedAssetPayload[],
|
||||
): Promise<void> {
|
||||
for (const asset of assets) {
|
||||
await database.generatedAsset.create({
|
||||
data: {
|
||||
generationRequestId,
|
||||
objectKey: asset.objectKey,
|
||||
mimeType: asset.mimeType,
|
||||
...(asset.width !== undefined ? { width: asset.width } : {}),
|
||||
...(asset.height !== undefined ? { height: asset.height } : {}),
|
||||
...(asset.bytes !== undefined ? { bytes: asset.bytes } : {}),
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function updateProviderKeyState(
|
||||
database: PrismaClient,
|
||||
input: {
|
||||
providerKey: WorkerProviderKey;
|
||||
toState: ProviderKeyState;
|
||||
reason: string;
|
||||
nextConsecutiveRetryableFailures: number;
|
||||
cooldownMinutes?: number;
|
||||
failureKind?: ProviderFailureKind;
|
||||
errorCode?: string;
|
||||
},
|
||||
): Promise<void> {
|
||||
const now = new Date();
|
||||
const fromState = input.providerKey.state;
|
||||
const lastErrorCategory = input.failureKind
|
||||
? mapFailureCategory(input.failureKind)
|
||||
: null;
|
||||
|
||||
await database.providerKey.update({
|
||||
where: {
|
||||
id: input.providerKey.id,
|
||||
},
|
||||
data: {
|
||||
state: input.toState,
|
||||
consecutiveRetryableFailures: input.nextConsecutiveRetryableFailures,
|
||||
cooldownUntil:
|
||||
input.toState === "cooldown"
|
||||
? addMinutes(now, input.cooldownMinutes ?? 5)
|
||||
: null,
|
||||
lastErrorCategory,
|
||||
lastErrorCode: input.errorCode ?? null,
|
||||
lastErrorAt: input.failureKind ? now : null,
|
||||
disabledAt: input.toState === "disabled" ? now : null,
|
||||
},
|
||||
});
|
||||
|
||||
if (fromState !== input.toState || input.reason !== "none") {
|
||||
await database.providerKeyStatusEvent.create({
|
||||
data: {
|
||||
providerKeyId: input.providerKey.id,
|
||||
fromState,
|
||||
toState: input.toState,
|
||||
reason: input.reason,
|
||||
errorCategory: lastErrorCategory,
|
||||
errorCode: input.errorCode ?? null,
|
||||
actorType: "system" satisfies AdminActorType,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function markRequestFailed(
|
||||
database: PrismaClient,
|
||||
requestId: string,
|
||||
terminalErrorCode: string,
|
||||
terminalErrorText: string,
|
||||
): Promise<void> {
|
||||
await database.generationRequest.update({
|
||||
where: {
|
||||
id: requestId,
|
||||
},
|
||||
data: {
|
||||
status: "failed",
|
||||
terminalErrorCode,
|
||||
terminalErrorText,
|
||||
completedAt: new Date(),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function mapFailureCategory(failureKind: ProviderFailureKind) {
|
||||
switch (failureKind) {
|
||||
case "transport":
|
||||
return "transport";
|
||||
case "timeout":
|
||||
return "timeout";
|
||||
case "provider_5xx":
|
||||
return "provider_5xx";
|
||||
case "provider_4xx_user":
|
||||
return "provider_4xx_user";
|
||||
case "insufficient_funds":
|
||||
return "insufficient_funds";
|
||||
case "unknown":
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
function addMinutes(value: Date, minutes: number): Date {
|
||||
return new Date(value.getTime() + minutes * 60 * 1000);
|
||||
}
|
||||
|
||||
function mapGenerationRequest(request: {
|
||||
id: string;
|
||||
userId: string;
|
||||
mode: string;
|
||||
status: string;
|
||||
providerModel: string;
|
||||
prompt: string;
|
||||
sourceImageKey: string | null;
|
||||
resolutionPreset: string;
|
||||
batchSize: number;
|
||||
imageStrength: { toNumber(): number } | null;
|
||||
idempotencyKey: string | null;
|
||||
terminalErrorCode: string | null;
|
||||
terminalErrorText: string | null;
|
||||
requestedAt: Date;
|
||||
startedAt: Date | null;
|
||||
completedAt: Date | null;
|
||||
createdAt: Date;
|
||||
updatedAt: Date;
|
||||
}): WorkerGenerationRequest {
|
||||
return {
|
||||
id: request.id,
|
||||
userId: request.userId,
|
||||
mode: request.mode as WorkerGenerationRequest["mode"],
|
||||
status: request.status as WorkerGenerationRequest["status"],
|
||||
providerModel: request.providerModel,
|
||||
prompt: request.prompt,
|
||||
resolutionPreset: request.resolutionPreset,
|
||||
batchSize: request.batchSize,
|
||||
requestedAt: request.requestedAt,
|
||||
createdAt: request.createdAt,
|
||||
updatedAt: request.updatedAt,
|
||||
...(request.sourceImageKey !== null ? { sourceImageKey: request.sourceImageKey } : {}),
|
||||
...(request.imageStrength !== null
|
||||
? { imageStrength: request.imageStrength.toNumber() }
|
||||
: {}),
|
||||
...(request.idempotencyKey !== null ? { idempotencyKey: request.idempotencyKey } : {}),
|
||||
...(request.terminalErrorCode !== null
|
||||
? { terminalErrorCode: request.terminalErrorCode }
|
||||
: {}),
|
||||
...(request.terminalErrorText !== null
|
||||
? { terminalErrorText: request.terminalErrorText }
|
||||
: {}),
|
||||
...(request.startedAt !== null ? { startedAt: request.startedAt } : {}),
|
||||
...(request.completedAt !== null ? { completedAt: request.completedAt } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function mapWorkerProviderKey(providerKey: {
|
||||
id: string;
|
||||
providerCode: string;
|
||||
label: string;
|
||||
apiKeyLastFour: string;
|
||||
state: string;
|
||||
roundRobinOrder: number;
|
||||
consecutiveRetryableFailures: number;
|
||||
proxy: {
|
||||
label: string;
|
||||
baseUrl: string;
|
||||
isActive: boolean;
|
||||
} | null;
|
||||
}): WorkerProviderKey {
|
||||
return {
|
||||
id: providerKey.id,
|
||||
providerCode: providerKey.providerCode,
|
||||
label: providerKey.label,
|
||||
apiKeyLastFour: providerKey.apiKeyLastFour,
|
||||
state: providerKey.state as WorkerProviderKey["state"],
|
||||
roundRobinOrder: providerKey.roundRobinOrder,
|
||||
consecutiveRetryableFailures: providerKey.consecutiveRetryableFailures,
|
||||
...(providerKey.proxy && providerKey.proxy.isActive
|
||||
? {
|
||||
proxyBaseUrl: providerKey.proxy.baseUrl,
|
||||
proxyLabel: providerKey.proxy.label,
|
||||
}
|
||||
: {}),
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user