// app/api/live/telemetry/route.ts // Server-Sent Events endpoint streaming from C++ Unix socket import { getTelemetryBridge } from '@/lib/telemetryBridge'; import { query } from '@/lib/db'; export const dynamic = 'force-dynamic'; export const runtime = 'nodejs'; // Cache for driver ranks (refreshed periodically) let rankCache: Map = new Map(); let lastRankUpdate = 0; const RANK_CACHE_TTL = 60000; // 1 minute async function updateRankCache() { const now = Date.now(); if (now - lastRankUpdate < RANK_CACHE_TTL) { return; // Cache still valid } try { // Get all users sorted by rank (higher is better) const users = await query( `SELECT driver_guid, user_rank, ROW_NUMBER() OVER (ORDER BY user_rank DESC) as rank_position FROM users WHERE user_rank IS NOT NULL ORDER BY user_rank DESC` ); rankCache.clear(); users.forEach((row: any) => { rankCache.set(row.driver_guid.toString(), { rank: row.rank_position, rating: row.user_rank }); }); lastRankUpdate = now; console.log('[Rank Cache] Updated with', rankCache.size, 'drivers'); } catch (error) { console.error('[Rank Cache] Failed to update:', error); } } export async function GET(request: Request) { const { searchParams } = new URL(request.url); const serverId = searchParams.get('serverId'); const filterServerId = serverId ? parseInt(serverId) : null; const encoder = new TextEncoder(); const bridge = getTelemetryBridge(); const stream = new ReadableStream({ async start(controller) { console.log('[SSE] Client connected', filterServerId ? `(server ${filterServerId})` : '(all servers)'); // Update rank cache on new connection await updateRankCache(); // Send initial connection message controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: 'connected', bridge_status: bridge.isConnected() ? 'connected' : 'connecting', timestamp: Date.now() })}\n\n`)); // Subscribe to telemetry updates const unsubscribe = bridge.subscribe((packet) => { // Filter by server if specified if (filterServerId && packet.server_id !== filterServerId) { return; } // Transform to client format const telemetry = { type: 'update', timestamp: Date.now(), server_id: packet.server_id, cars: packet.cars.map((car) => { const driverRank = rankCache.get(car.driver_guid); return { carID: car.carID, driver_guid: car.driver_guid, driver_name: car.driver_name, car_model: car.car_model, position: car.position_rank, current_lap: car.current_lap, normalizedSplinePos: car.normalizedSplinePos, speed: Math.round(car.speed_kmh), gear: car.gear, rpm: car.rpm, last_lap_time: car.last_lap_time || null, best_lap_time: car.best_lap_time || null, world_position: car.position, user_rank: driverRank ? driverRank.rank : null, user_rating: driverRank ? driverRank.rating : null, }; }), }; try { controller.enqueue(encoder.encode(`data: ${JSON.stringify(telemetry)}\n\n`)); } catch (error) { console.error('[SSE] Error sending data:', error); } }); // Heartbeat every 30 seconds to keep connection alive const heartbeat = setInterval(() => { try { controller.enqueue(encoder.encode(`: heartbeat\n\n`)); } catch (error) { clearInterval(heartbeat); } }, 30000); // Cleanup on connection close request.signal.addEventListener('abort', () => { console.log('[SSE] Client disconnected'); unsubscribe(); clearInterval(heartbeat); try { controller.close(); } catch (error) { // Controller already closed } }); }, }); return new Response(stream, { headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache, no-transform', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no', // Disable nginx buffering }, }); }