feat (Worker): add a worker to check and adjust the TikTok graph
This commit is contained in:
22
worker/Dockerfile
Normal file
22
worker/Dockerfile
Normal file
@@ -0,0 +1,22 @@
|
||||
FROM node:20-alpine
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copie les fichiers nécessaires
|
||||
COPY package*.json ./
|
||||
COPY prisma.config.ts ./
|
||||
COPY prisma ./prisma/
|
||||
COPY tsconfig.json ./
|
||||
|
||||
# Install deps
|
||||
RUN npm ci
|
||||
|
||||
# Génère le client Prisma dans app/generated/prisma (output défini dans schema.prisma)
|
||||
RUN npx prisma generate
|
||||
|
||||
# Copie le code du worker ET le client Prisma généré
|
||||
COPY worker ./worker/
|
||||
COPY app/generated ./app/generated/
|
||||
|
||||
# Lance le worker avec tsx
|
||||
CMD ["npx", "tsx", "worker/snapshot-worker.ts"]
|
||||
172
worker/snapshot-worker.ts
Normal file
172
worker/snapshot-worker.ts
Normal file
@@ -0,0 +1,172 @@
|
||||
/**
|
||||
* Snapshot Worker
|
||||
* Tourne toutes les heures, indépendamment de Next.js.
|
||||
* Pour chaque user avec un token TikTok valide :
|
||||
* 1. Rafraîchit le token si nécessaire
|
||||
* 2. Récupère les stats TikTok
|
||||
* 3. Sauvegarde un snapshot en base (max 1 par heure par compte)
|
||||
*/
|
||||
|
||||
import { PrismaClient } from "../app/generated/prisma/client";
|
||||
import { PrismaPg } from "@prisma/adapter-pg";
|
||||
import { Pool } from "pg";
|
||||
|
||||
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
|
||||
const adapter = new PrismaPg(pool);
|
||||
const prisma = new PrismaClient({ adapter });
|
||||
|
||||
const TIKTOK_TOKEN_URL = "https://open.tiktokapis.com/v2/oauth/token/";
|
||||
const TIKTOK_USER_INFO_URL = "https://open.tiktokapis.com/v2/user/info/";
|
||||
const CLIENT_KEY = process.env.TIKTOK_CLIENT_KEY!;
|
||||
const CLIENT_SECRET = process.env.TIKTOK_CLIENT_SECRET!;
|
||||
const INTERVAL_MS = 60 * 60 * 1000; // 1 heure
|
||||
const DEDUP_WINDOW_MS = 55 * 60 * 1000; // 55 min — évite les doublons si restart
|
||||
|
||||
// ── Graceful shutdown ─────────────────────────────────────────
|
||||
|
||||
async function shutdown() {
|
||||
console.log("[worker] arrêt propre...");
|
||||
await prisma.$disconnect();
|
||||
await pool.end();
|
||||
process.exit(0);
|
||||
}
|
||||
process.on("SIGTERM", shutdown);
|
||||
process.on("SIGINT", shutdown);
|
||||
|
||||
// ── TikTok helpers ────────────────────────────────────────────
|
||||
|
||||
async function refreshTikTokToken(refreshTokenStr: string) {
|
||||
const body = new URLSearchParams({
|
||||
client_key: CLIENT_KEY,
|
||||
client_secret: CLIENT_SECRET,
|
||||
grant_type: "refresh_token",
|
||||
refresh_token: refreshTokenStr,
|
||||
});
|
||||
|
||||
const res = await fetch(TIKTOK_TOKEN_URL, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/x-www-form-urlencoded" },
|
||||
body: body.toString(),
|
||||
});
|
||||
const data = await res.json();
|
||||
|
||||
if (!res.ok || data.error) {
|
||||
throw new Error(data.error_description ?? data.error ?? "Refresh failed");
|
||||
}
|
||||
return data as { access_token: string; refresh_token: string; expires_in: number };
|
||||
}
|
||||
|
||||
async function fetchTikTokStats(accessToken: string) {
|
||||
const fields = "follower_count,likes_count,video_count,display_name";
|
||||
const res = await fetch(`${TIKTOK_USER_INFO_URL}?fields=${fields}`, {
|
||||
headers: { Authorization: `Bearer ${accessToken}` },
|
||||
});
|
||||
const data = await res.json();
|
||||
|
||||
if (!res.ok || data.error?.code !== "ok") {
|
||||
throw new Error(data.error?.message ?? "Stats fetch failed");
|
||||
}
|
||||
|
||||
const user = data.data?.user ?? {};
|
||||
return {
|
||||
followers: (user.follower_count ?? 0) as number,
|
||||
likes: (user.likes_count ?? 0) as number,
|
||||
videoCount: (user.video_count ?? 0) as number,
|
||||
displayName: (user.display_name ?? "") as string,
|
||||
};
|
||||
}
|
||||
|
||||
// ── Core job ──────────────────────────────────────────────────
|
||||
|
||||
async function runSnapshots() {
|
||||
console.log(`[worker] ${new Date().toISOString()} — début du run`);
|
||||
|
||||
const tokens = await prisma.tikTokToken.findMany({
|
||||
include: { user: { include: { accounts: { where: { platform: "tiktok" } } } } },
|
||||
});
|
||||
|
||||
console.log(`[worker] ${tokens.length} compte(s) TikTok à traiter`);
|
||||
|
||||
for (const token of tokens) {
|
||||
const { userId, openId } = token;
|
||||
let { accessToken, refreshToken: rt, expiresAt } = token;
|
||||
|
||||
try {
|
||||
// 1. Refresh si nécessaire
|
||||
if (expiresAt.getTime() - Date.now() < 60_000) {
|
||||
console.log(`[worker] refresh token userId=${userId}`);
|
||||
const refreshed = await refreshTikTokToken(rt);
|
||||
await prisma.tikTokToken.update({
|
||||
where: { userId },
|
||||
data: {
|
||||
accessToken: refreshed.access_token,
|
||||
refreshToken: refreshed.refresh_token,
|
||||
expiresAt: new Date(Date.now() + refreshed.expires_in * 1000),
|
||||
},
|
||||
});
|
||||
accessToken = refreshed.access_token;
|
||||
}
|
||||
|
||||
// 2. Upsert TrackedAccount
|
||||
let account = token.user.accounts[0] ?? null;
|
||||
if (!account) {
|
||||
const stats0 = await fetchTikTokStats(accessToken);
|
||||
account = await prisma.trackedAccount.create({
|
||||
data: { userId, platform: "tiktok", username: stats0.displayName || openId, accountId: openId },
|
||||
});
|
||||
}
|
||||
|
||||
// 3. Déduplication — skip si snapshot < 55 min
|
||||
const lastSnapshot = await prisma.snapshot.findFirst({
|
||||
where: { accountId: account.id },
|
||||
orderBy: { createdAt: "desc" },
|
||||
});
|
||||
if (lastSnapshot && Date.now() - lastSnapshot.createdAt.getTime() < DEDUP_WINDOW_MS) {
|
||||
console.log(`[worker] skip userId=${userId} — snapshot trop récent (${Math.round((Date.now() - lastSnapshot.createdAt.getTime()) / 60_000)}min)`);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 4. Fetch stats
|
||||
const stats = await fetchTikTokStats(accessToken);
|
||||
|
||||
// 5. Sauvegarde snapshot
|
||||
await prisma.snapshot.create({
|
||||
data: {
|
||||
accountId: account.id,
|
||||
followers: stats.followers,
|
||||
likes: stats.likes,
|
||||
videoCount: stats.videoCount,
|
||||
views: 0,
|
||||
},
|
||||
});
|
||||
|
||||
console.log(`[worker] ✓ userId=${userId} — followers=${stats.followers} likes=${stats.likes} videos=${stats.videoCount}`);
|
||||
|
||||
} catch (err) {
|
||||
console.error(`[worker] ✗ erreur userId=${userId}:`, err);
|
||||
// Non bloquant — on continue avec le user suivant
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`[worker] ${new Date().toISOString()} — run terminé`);
|
||||
}
|
||||
|
||||
// ── Loop principale ───────────────────────────────────────────
|
||||
|
||||
async function main() {
|
||||
console.log(`[worker] démarrage — intervalle ${INTERVAL_MS / 60_000}min`);
|
||||
|
||||
// Run immédiat au démarrage
|
||||
await runSnapshots().catch(err => console.error("[worker] erreur:", err));
|
||||
|
||||
// Puis toutes les heures
|
||||
setInterval(() => {
|
||||
runSnapshots().catch(err => console.error("[worker] erreur:", err));
|
||||
}, INTERVAL_MS);
|
||||
}
|
||||
|
||||
main().catch(err => {
|
||||
console.error("[worker] crash fatal:", err);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user