diff --git a/src/router/websocket.ts b/src/router/websocket.ts index 4b0e2e3..7a0c7fb 100644 --- a/src/router/websocket.ts +++ b/src/router/websocket.ts @@ -1,5 +1,4 @@ import { Elysia, t } from 'elysia'; -import { RedisRateLimiter } from 'rolling-rate-limiter' import * as fs from 'node:fs' import { db } from '@/utils/database' @@ -15,12 +14,20 @@ import { parseSlop } from '@/utils/slop'; const app = new Elysia() const videoIds: Record = {} -const limiter = new RedisRateLimiter({ - client: redis, - namespace: 'save:', - interval: 24 * 60 * 60000, // 24h - maxInInterval: 50 -}) +const MB_LIMIT = 500 + +const checkMbLimit = async (hash: string, mb?: number): Promise => { + const key = `save-mb:${hash}` + const current = parseInt(await redis.get(key) || '0') + if (!mb) return current >= MB_LIMIT + if (current + mb > MB_LIMIT) return true + + const pipeline = redis.pipeline() + pipeline.incrby(key, mb) + pipeline.expire(key, 24 * 60 * 60) + await pipeline.exec() + return false +} const sendError = (ws: any, message: string, close: boolean = true) => { ws.send(`ERROR - ${message}`); @@ -101,7 +108,7 @@ app.ws('/save', { ws.close() } else { const hash = Bun.hash(getRateLimitKey(ws.data.headers['cf-connecting-ip'] || '0.0.0.0')) - const isLimited = await limiter.limit(hash.toString()) + const isLimited = await checkMbLimit(hash.toString()) if (isLimited) { return sendError(ws, 'You have been ratelimited.
Is this an urgent archive? Please email me: admin@preservetube.com'); } @@ -150,6 +157,16 @@ app.ws('/save', { return sendError(ws, downloadResult.message); } + const mbsUsed = Math.ceil(downloadResult.size / (1024 * 1024)) + const hash = Bun.hash(getRateLimitKey(ws.data.headers['cf-connecting-ip'] || '0.0.0.0')) + const isMbLimited = await checkMbLimit(hash.toString(), mbsUsed) + if (isMbLimited) { + const file = fs.readdirSync('./videos/').find(f => f.includes(`${videoId}.`)) + if (file) fs.unlinkSync('./videos/' + file) + await cleanup(ws, videoId); + return sendError(ws, 'Daily storage limit reached. Is this an urgent archive? Please email me: admin@preservetube.com'); + } + const uploadSuccess = await handleUpload(ws, videoId); if (!uploadSuccess) await redis.del(videoId); @@ -200,6 +217,7 @@ app.ws('/savechannel', { videoIds[ws.id] = `downloading-${channelId}`; const videos = await getChannelVideos(channelId); + const hash = Bun.hash(getRateLimitKey(ws.data.headers['cf-connecting-ip'] || '0.0.0.0')) for (const video of videos.slice(0, 5)) { if (!video || (await redis.get(video.video_id)) || (await redis.get(`blacklist:${video.video_id}`))) continue; @@ -211,7 +229,7 @@ app.ws('/savechannel', { if (already) continue const hash = Bun.hash(getRateLimitKey(ws.data.headers['cf-connecting-ip'] || '0.0.0.0')) - const isLimited = await limiter.limit(hash.toString()) + const isLimited = await checkMbLimit(hash.toString()) if (isLimited) { sendError(ws, 'You have been ratelimited.
Is this an urgent archive? Please email me: admin@preservetube.com', false); break; @@ -231,7 +249,17 @@ app.ws('/savechannel', { await redis.set(video.video_id, 'downloading', 'EX', 300); const downloadResult = await downloadVideo(ws, video.video_id); - if (!downloadResult.fail) await handleUpload(ws, video.video_id, true); + if (!downloadResult.fail) { + const mbsUsed = Math.ceil(downloadResult.size / (1024 * 1024)) + const isMbLimited = await checkMbLimit(hash.toString(), mbsUsed) + if (isMbLimited) { + const file = fs.readdirSync('./videos/').find(f => f.includes(`${video.video_id}.`)) + if (file) fs.unlinkSync('./videos/' + file) + sendError(ws, 'Daily storage limit reached. Is this an urgent archive? Please email me: admin@preservetube.com', false); + break; + } + await handleUpload(ws, video.video_id, true); + } await redis.del(video.video_id); ws.send(`DATA - Created video page for ${video.title.text}`) diff --git a/src/utils/download.ts b/src/utils/download.ts index 951d112..e3f0f62 100644 --- a/src/utils/download.ts +++ b/src/utils/download.ts @@ -1,17 +1,21 @@ import WebSocket from 'ws'; -async function downloadVideo(ws: any, id: string): Promise<{ fail: boolean, message: string }> { +async function downloadVideo(ws: any, id: string): Promise<{ fail: boolean, message: string, size: number }> { return new Promise(async (resolve, reject) => { let isDownloading = true const downloader = new WebSocket(`ws://${(process.env.METADATA!).replace('http://', '')}/download/${id}`) + let size: number = 0 downloader.on('message', async function message(data: any) { const text = data.toString() - if (text == 'done') { + if (text.startsWith('VIDEOSIZE-')) { + size = parseInt(text.replace('VIDEOSIZE-', '')) + } else if (text == 'done') { isDownloading = false return resolve({ fail: false, - message: '' + message: '', + size }) } else { ws.send(`DATA - ${text}`) @@ -23,7 +27,8 @@ async function downloadVideo(ws: any, id: string): Promise<{ fail: boolean, mess return resolve({ fail: true, - message: 'The metadata server unexpectedly closed the websocket. Please try again.' + message: 'The metadata server unexpectedly closed the websocket. Please try again.', + size }) }) })