diff --git a/API b/API index 6f7c4ab..c84c188 100755 Binary files a/API and b/API differ diff --git a/include/server_structs.h b/include/server_structs.h index 968da3f..e3d13b8 100644 --- a/include/server_structs.h +++ b/include/server_structs.h @@ -4,6 +4,9 @@ #include #include +#define MAX_PLAYERS 64 +const int MAX_TELEMETRY_CLIENTS = 128; + typedef struct handshake { // [not used in the current Remote Telemtry version by AC] // In future versions it will identify the platform type of the client. @@ -59,17 +62,17 @@ typedef struct postion { } __attribute__((packed)) postion; typedef enum flag { - NO_FLAG = 0, - YELLOW_FLAG = 1, - BLUE_FLAG = 2, - BLACK_FLAG = 3, - CHECKERED_FLAG = 4, + NO_FLAG = 0, + YELLOW_FLAG = 1, + BLUE_FLAG = 2, + BLACK_FLAG = 3, + CHECKERED_FLAG = 4, } __attribute__((packed)) flag; typedef struct carAtributes { // Related to ACSP_CAR_INFO u_char isConnected; // 1 = connected, 0 = disconnected - u_char isLoading; // 1 = loading, 0 = not isLoading + u_char isLoading; // 1 = loading, 0 = not isLoading char *car_model; char *car_skin; @@ -83,16 +86,16 @@ typedef struct carAtributes { postion velocity; u_int8_t carGear; u_int16_t carRPM; - u_int32_t lap_time; - u_int32_t cuts; - u_int32_t total_cuts; - u_int32_t total_cuts_alltime; - u_int16_t total_laps_completed; - u_int16_t contacts; - u_int16_t total_contacts; + u_int32_t lap_time; + u_int32_t cuts; + u_int32_t total_cuts; + u_int32_t total_cuts_alltime; + u_int16_t total_laps_completed; + u_int16_t contacts; + u_int16_t total_contacts; - flag current_flag; // TODO: implement flag status updates - // TAG:3 + flag current_flag; // TODO: implement flag status updates + // TAG:3 float normalizedSplinePos; } __attribute__((packed)) carAtributes; @@ -100,7 +103,7 @@ typedef struct carAtributes { typedef struct carAtributesAPI { // Related to ACSP_CAR_INFO u_char isConnected; // 1 = connected, 0 = disconnected - u_char isLoading; // 1 = loading, 0 = not isLoading + u_char isLoading; // 1 = loading, 0 = not isLoading char car_model[64]; char car_skin[64]; @@ -114,21 +117,20 @@ typedef struct carAtributesAPI { postion velocity; u_int8_t carGear; u_int16_t carRPM; - u_int32_t lap_time; - u_int32_t cuts; - u_int32_t total_cuts; - u_int32_t total_cuts_alltime; - u_int16_t total_laps_completed; - u_int16_t contacts; - u_int16_t total_contacts; + u_int32_t lap_time; + u_int32_t cuts; + u_int32_t total_cuts; + u_int32_t total_cuts_alltime; + u_int16_t total_laps_completed; + u_int16_t contacts; + u_int16_t total_contacts; - flag current_flag; // TODO: implement flag status updates - // TAG:3 + flag current_flag; // TODO: implement flag status updates + // TAG:3 float normalizedSplinePos; } __attribute__((packed)) carAtributesAPI; - typedef enum SessionType { PRACTICE = 0, RACE = 1, @@ -184,15 +186,36 @@ typedef struct trackAtributesAPI { } __attribute__((packed)) trackAtributesAPI; typedef struct api_packet { - u_int8_t tracker_id; - u_int8_t last_updated_carid; - u_int8_t connected_cars; + u_char message_type; // ACSP_MessageType + u_int8_t tracker_id; + u_int8_t connected_players; - carAtributesAPI cars[64]; - trackAtributesAPI track_info; + carAtributesAPI cars[64]; + trackAtributesAPI track_info; } __attribute__((packed)) api_packet; +// Telemetry packet structure (lightweight for streaming) +struct telemetry_packet { + u_int8_t server_id; + u_int8_t car_count; + + struct car_telemetry { + u_int8_t carID; + char driver_name[64]; + char driver_guid[64]; + char car_model[64]; + float normalizedSplinePos; + float speed_kmh; + u_int8_t gear; + u_int16_t rpm; + u_int32_t last_lap_time; + u_int32_t best_lap_time; + u_int16_t current_lap; + u_int8_t position; + } __attribute__((packed)) cars[64]; +} __attribute__((packed)); + enum ACSP_MessageType { // ============================ // PROTOCOL VERSION diff --git a/include/stack.cpp b/include/stack.cpp index 6f417c1..367193f 100644 --- a/include/stack.cpp +++ b/include/stack.cpp @@ -46,3 +46,7 @@ bool Stack::peek(api_packet &item) { size_t Stack::size() { return top + 1; } + +size_t Stack::getCapacity() { + return capacity; +} diff --git a/include/stack.h b/include/stack.h index d9236c9..0aaa2bd 100644 --- a/include/stack.h +++ b/include/stack.h @@ -20,6 +20,7 @@ class Stack { bool pop(api_packet &item); bool peek(api_packet &item); size_t size(); + size_t getCapacity(); }; #endif // STACK_H diff --git a/include/telemetry.cpp b/include/telemetry.cpp new file mode 100644 index 0000000..bc0e59a --- /dev/null +++ b/include/telemetry.cpp @@ -0,0 +1,119 @@ +// include/telemetry.cpp +// Telemetry server implementation for broadcasting car telemetry data to connected clients. +#include "telemetry.h" + +std::vector telemetry_clients; +pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; + +// Broadcast telemetry to all connected clients +void broadcast_telemetry(const api_packet& packet) { + telemetry_packet telem; + telem.server_id = packet.tracker_id; + telem.car_count = 0; + + // Convert api_packet to telemetry_packet + for (int i = 0; i < MAX_PLAYERS; i++) { + if (packet.cars[i].isConnected) { + auto& car = telem.cars[telem.car_count]; + + car.carID = packet.cars[i].carID; + strncpy(car.driver_name, packet.cars[i].driver_name, sizeof(car.driver_name) - 1); + car.driver_name[sizeof(car.driver_name) - 1] = '\0'; + strncpy(car.driver_guid, packet.cars[i].driver_GUID, sizeof(car.driver_guid) - 1); + car.driver_guid[sizeof(car.driver_guid) - 1] = '\0'; + strncpy(car.car_model, packet.cars[i].car_model, sizeof(car.car_model) - 1); + car.car_model[sizeof(car.car_model) - 1] = '\0'; + car.normalizedSplinePos = packet.cars[i].normalizedSplinePos; + + // Calculate speed from velocity vector (m/s to km/h) + float speed_ms = std::sqrt( + std::pow(packet.cars[i].velocity.x, 2.0f) + + std::pow(packet.cars[i].velocity.y, 2.0f) + + std::pow(packet.cars[i].velocity.z, 2.0f) + ); + car.speed_kmh = speed_ms * 3.6f; + + car.gear = packet.cars[i].carGear; + car.rpm = packet.cars[i].carRPM; + car.last_lap_time = packet.cars[i].lap_time; + car.best_lap_time = 0; // TODO: Track best lap + car.current_lap = packet.cars[i].total_laps_completed; + car.position = telem.car_count + 1; // Temporary position + + telem.car_count++; + } + } + + // Broadcast to all connected clients + pthread_mutex_lock(&clients_mutex); + + for (auto it = telemetry_clients.begin(); it != telemetry_clients.end();) { + ssize_t sent = send(*it, &telem, sizeof(telem), MSG_NOSIGNAL); + + if (sent < 0) { + // Client disconnected, remove from list + printf("[T] Telemetry client %d disconnected\n", *it); + close(*it); + it = telemetry_clients.erase(it); + } else { + ++it; + } + } + + pthread_mutex_unlock(&clients_mutex); +} + +// Thread to accept telemetry client connections +void* telemetry_server_thread(void* arg) { + (void)arg; + + int server_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (server_fd < 0) { + perror("[!] Telemetry socket creation failed"); + return NULL; + } + + unlink(TELEMETRY_SOCKET_PATH); + + struct sockaddr_un addr; + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, TELEMETRY_SOCKET_PATH, sizeof(addr.sun_path) - 1); + addr.sun_path[sizeof(addr.sun_path) - 1] = '\0'; + + if (bind(server_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + perror("[!] Telemetry socket bind failed"); + close(server_fd); + return NULL; + } + + if (listen(server_fd, 5) < 0) { + perror("[!] Telemetry socket listen failed"); + close(server_fd); + return NULL; + } + + printf("[+] Telemetry server listening on %s\n", TELEMETRY_SOCKET_PATH); + + while (1) { + int client_fd = accept(server_fd, NULL, NULL); + if (client_fd < 0) { + perror("[!] Telemetry accept failed"); + continue; + } + + pthread_mutex_lock(&clients_mutex); + + if (telemetry_clients.size() < (size_t)MAX_TELEMETRY_CLIENTS) { + telemetry_clients.push_back(client_fd); + printf("[+] Telemetry client connected (total: %zu)\n", telemetry_clients.size()); + } else { + printf("[!] Max telemetry clients reached, rejecting connection\n"); + close(client_fd); + } + + pthread_mutex_unlock(&clients_mutex); + } + + close(server_fd); + return NULL; +} diff --git a/include/telemetry.h b/include/telemetry.h new file mode 100644 index 0000000..bc8610c --- /dev/null +++ b/include/telemetry.h @@ -0,0 +1,25 @@ +// include/telemetry.h +// Header for telemetry server and broadcasting functionality +#ifndef TELEMETRY_H +#define TELEMETRY_H + +#include "server_structs.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#define TELEMETRY_SOCKET_PATH "/tmp/ACtelemetry_socket" + +void *telemetry_server_thread(void *arg); + +void broadcast_telemetry(const api_packet &packet); + +#endif // TELEMETRY_H diff --git a/source/main.cpp b/source/main.cpp index 7dde4bf..2b67246 100644 --- a/source/main.cpp +++ b/source/main.cpp @@ -9,9 +9,12 @@ #include #include +// native webstocket + // SERVER STRUCTS #include "server_structs.h" #include "stack.h" +#include "telemetry.h" #define SERVER_SOCKET_PATH "/tmp/ACplayer_socket" @@ -27,6 +30,18 @@ void checkConn(PGconn *conn) { } } +void sanitize_utf8(char *str) { + if (!str) + return; + + for (size_t i = 0; str[i] != '\0'; i++) { + // Replace invalid UTF-8 bytes with '?' + if ((unsigned char)str[i] > 127) { + str[i] = '?'; + } + } +} + void *db_write_thread(void *arg) { (void)arg; // Unused parameter @@ -44,8 +59,19 @@ void *db_write_thread(void *arg) { if (api_queue.pop(packet)) { + // Check if message was Handshake + if (packet.tracker_id == 65) { + printf("[+] Handshake packet received for server \"%s\" (%d), skipping database write.\n", packet.track_info.server_name, packet.tracker_id); + continue; // Skip database write for handshake packets + } + + broadcast_telemetry(packet); + + // DEBUG printf("[W] Writing packet for Server with tracker ID: %d to database.\tQueue: %u/%d\n", packet.tracker_id, (unsigned int)api_queue.size(), STACK_SIZE); + // Time for servers table insert/UPDATE + const char *query = "INSERT INTO servers (" "server_id, server_name, session_type, session_count, server_track, " "server_config, server_weather_graphics, typ, session_time, session_laps, " @@ -84,7 +110,7 @@ void *db_write_thread(void *arg) { std::string road_temp_s = std::to_string(packet.track_info.road_temp); std::string elapsed_time_s = std::to_string((u_int16_t)packet.track_info.elapsed_ms); - std::string connected_players_s = std::to_string(packet.connected_cars); + std::string connected_players_s = std::to_string(packet.connected_players); const char *paramValues[15] = {server_id_str.c_str(), server_name, @@ -107,22 +133,31 @@ void *db_write_thread(void *arg) { if (PQresultStatus(res) != PGRES_TUPLES_OK) { printf("[!] Insert failed: %s", PQerrorMessage(conn)); PQclear(res); - PQfinish(conn); - break; + continue; } + PQclear(res); + // Time for users table insert/UPDATE - - if (packet.connected_cars == 0) { - PQclear(res); - continue; // No connected cars to process - } - - for (int i = 0; i < packet.connected_cars; i++) { + for (int i = 0; i < MAX_PLAYERS; i++) { if (packet.cars[i].isConnected == 0) { + if (packet.cars[i].driver_GUID[0] == '\0') { + continue; // Skip empty slots + } + printf("[D] User disconnected (%d): GUID=%s, Name=%s\n", i, packet.cars[i].driver_GUID, packet.cars[i].driver_name); + // If the car is not connected, we still need to update its status in the db + const char *disconnect_query = "UPDATE users SET is_connect = false, current_server = NULL WHERE driver_guid = $1;"; + const char *disconnect_paramValues[1] = {packet.cars[i].driver_GUID}; + + PGresult *disconnect_res = PQexecParams(conn, disconnect_query, 1, nullptr, disconnect_paramValues, nullptr, nullptr, 0); + if (PQresultStatus(disconnect_res) != PGRES_COMMAND_OK) { + printf("[!] User disconnect update failed for GUID %s: %s", packet.cars[i].driver_GUID, PQerrorMessage(conn)); + } + PQclear(disconnect_res); continue; // Skip disconnected cars } + u_int8_t car_id = packet.cars[i].carID; const char *user_query = "INSERT INTO users (" "driver_guid, driver_name, driver_team, car_model, car_skin, " @@ -141,18 +176,23 @@ void *db_write_thread(void *arg) { "is_loading = EXCLUDED.is_loading, " "current_server = EXCLUDED.current_server;"; - const char *driver_guid = packet.cars[i].driver_GUID; - const char *driver_name = packet.cars[i].driver_name; - const char *driver_team = packet.cars[i].driver_team; - const char *car_model = packet.cars[i].car_model; - const char *car_skin = packet.cars[i].car_skin; + const char *driver_guid = packet.cars[car_id].driver_GUID; + const char *driver_name = packet.cars[car_id].driver_name; + const char *driver_team = packet.cars[car_id].driver_team; + const char *car_model = packet.cars[car_id].car_model; + const char *car_skin = packet.cars[car_id].car_skin; - std::string cuts_alltime_s = std::to_string(packet.cars[i].total_cuts_alltime); - std::string contacts_alltime_s = std::to_string(packet.cars[i].total_contacts); - std::string laps_completed_s = std::to_string(packet.cars[i].total_laps_completed); - std::string is_connect_s = std::to_string(packet.cars[i].isConnected); - std::string is_loading_s = std::to_string(packet.cars[i].isLoading); - std::string current_server_s = server_id_str; + std::string cuts_alltime_s = std::to_string(packet.cars[car_id].total_cuts_alltime); + std::string contacts_alltime_s = std::to_string(packet.cars[car_id].total_contacts); + std::string laps_completed_s = std::to_string(packet.cars[car_id].total_laps_completed); + std::string is_connect_s = std::to_string(packet.cars[car_id].isConnected); + std::string is_loading_s = std::to_string(packet.cars[car_id].isLoading); + std::string current_server_s = std::to_string(packet.tracker_id); + + // Debug output for user data being inserted/updated + printf("[D] Processing user \(%d\): GUID=%s, Name=%s, Team=%s Car Model=%s, Skin=%s, Cuts=%s, Contacts=%s, Laps=%s, IsConnect=%s, IsLoading=%s, CurrentServer=%s\n", + i, driver_guid, driver_name, driver_team, car_model, car_skin, cuts_alltime_s.c_str(), contacts_alltime_s.c_str(), laps_completed_s.c_str(), + is_connect_s.c_str(), is_loading_s.c_str(), current_server_s.c_str()); const char *user_paramValues[11] = {driver_guid, driver_name, @@ -165,31 +205,67 @@ void *db_write_thread(void *arg) { is_connect_s.c_str(), is_loading_s.c_str(), current_server_s.c_str()}; + PGresult *user_res = PQexecParams(conn, user_query, 11, nullptr, user_paramValues, nullptr, nullptr, 0); + if (PQresultStatus(user_res) != PGRES_COMMAND_OK) { + printf("[!] User insert/update failed for GUID %s: %s", driver_guid, PQerrorMessage(conn)); + PQclear(user_res); + continue; // Proceed to next user + } PQclear(user_res); } - PQclear(res); } else { usleep(1000); // Sleep for 1ms if no packets are available } + } PQfinish(conn); return NULL; } +void *handle_client(void *arg) { + int client_fd = *(int *)arg; + free(arg); + + api_packet api_packet_storage; + ssize_t n; + + while ((n = read(client_fd, &api_packet_storage, sizeof(api_packet))) > 0) { + printf("[+] Received %zd bytes\n", n); + + if (api_packet_storage.tracker_id == 65) { + printf("[+] Handshake received from server \"%s\" (%d).\n", api_packet_storage.track_info.server_name, api_packet_storage.tracker_id); + continue; + } + + printf("[*] Info: Tracker id: %d (\"%s\")\tQueue: %u/%d\n", api_packet_storage.tracker_id, api_packet_storage.track_info.server_name, (unsigned int)api_queue.size(), + (int)api_queue.getCapacity()); + + if (!api_queue.push(api_packet_storage)) { + printf("[!] Api queue full, dropping packet from tracker id: %d\n", api_packet_storage.tracker_id); + } + + usleep(1500); + } + + if (n < 0) + perror("[!] Error on read"); + else + printf("[+] Client disconnected\n"); + + close(client_fd); + return NULL; +} + int main(void) { int server_fd, client_fd; struct sockaddr_un addr; - socklen_t addr_len; - ssize_t n; - api_packet api_packet_storage; - - // Create socket + // create socket server_fd = socket(AF_UNIX, SOCK_STREAM, 0); if (server_fd < 0) { perror("socket"); @@ -198,25 +274,35 @@ int main(void) { unlink(SERVER_SOCKET_PATH); addr.sun_family = AF_UNIX; - strcpy(addr.sun_path, SERVER_SOCKET_PATH); + strncpy(addr.sun_path, SERVER_SOCKET_PATH, sizeof(addr.sun_path) - 1); + addr.sun_path[sizeof(addr.sun_path) - 1] = '\0'; // null-terminate just in case + if (bind(server_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { perror("bind"); exit(1); } + if (listen(server_fd, 5) < 0) { perror("listen"); exit(1); } - // Start DB write db_write_thread + // start db write db_write_thread pthread_t db_thread; if (pthread_create(&db_thread, NULL, db_write_thread, NULL) != 0) { perror("pthread create"); exit(1); } - // Detach the thread so it cleans up after itself + // detach the thread so it cleans up after itself pthread_detach(db_thread); + pthread_t telemetry_thread; + if (pthread_create(&telemetry_thread, NULL, telemetry_server_thread, NULL) != 0) { + perror("[!] Failed to create telemetry server thread"); + return EXIT_FAILURE; + } + pthread_detach(telemetry_thread); + printf("[+] Server listening on %s\n", SERVER_SOCKET_PATH); while (1) { @@ -228,29 +314,18 @@ int main(void) { printf("[+] Client connected\n"); - while ((n = read(client_fd, &api_packet_storage, sizeof(api_packet))) > 0) { - printf("[+] Received %zd bytes\n", n); + int *client_fd_ptr = (int *)malloc(sizeof(int)); + *client_fd_ptr = client_fd; - if (api_packet_storage.tracker_id == 65) { - printf("[+] Handshake Received from server \"%s\" (%d).\n", api_packet_storage.track_info.server_name, api_packet_storage.tracker_id); - continue; - } - - // Print info - printf("[*] Info: Tracker ID: %d (\"%s\")\tQueue: %u/%d\n", api_packet_storage.tracker_id, api_packet_storage.track_info.server_name, (unsigned int)api_queue.size(), - STACK_SIZE); - if (!api_queue.push(api_packet_storage)) { - printf("[!] API Queue full, dropping packet from Tracker ID: %d\n", api_packet_storage.tracker_id); - } - usleep(1500); + pthread_t client_thread; + if (pthread_create(&client_thread, NULL, handle_client, client_fd_ptr) != 0) { + perror("pthread_create"); + close(client_fd); + free(client_fd_ptr); + continue; } - if (n < 0) - perror("[!] Error on read"); - else - printf("[+] Client disconnected\n"); - - close(client_fd); + pthread_detach(client_thread); // no need to join manually } return 0;