Madjor Fix: New Player data overwrites old/connected players

This commit is contained in:
Afonso Clerigo Mendes de Sousa 2025-10-29 13:52:37 +00:00
parent 2d3f52a9a5
commit 15f87631e2
7 changed files with 328 additions and 81 deletions

BIN
API

Binary file not shown.

View File

@ -4,6 +4,9 @@
#include <sys/cdefs.h> #include <sys/cdefs.h>
#include <sys/types.h> #include <sys/types.h>
#define MAX_PLAYERS 64
const int MAX_TELEMETRY_CLIENTS = 128;
typedef struct handshake { typedef struct handshake {
// [not used in the current Remote Telemtry version by AC] // [not used in the current Remote Telemtry version by AC]
// In future versions it will identify the platform type of the client. // In future versions it will identify the platform type of the client.
@ -128,7 +131,6 @@ typedef struct carAtributesAPI {
float normalizedSplinePos; float normalizedSplinePos;
} __attribute__((packed)) carAtributesAPI; } __attribute__((packed)) carAtributesAPI;
typedef enum SessionType { typedef enum SessionType {
PRACTICE = 0, PRACTICE = 0,
RACE = 1, RACE = 1,
@ -184,15 +186,36 @@ typedef struct trackAtributesAPI {
} __attribute__((packed)) trackAtributesAPI; } __attribute__((packed)) trackAtributesAPI;
typedef struct api_packet { typedef struct api_packet {
u_char message_type; // ACSP_MessageType
u_int8_t tracker_id; u_int8_t tracker_id;
u_int8_t last_updated_carid; u_int8_t connected_players;
u_int8_t connected_cars;
carAtributesAPI cars[64]; carAtributesAPI cars[64];
trackAtributesAPI track_info; trackAtributesAPI track_info;
} __attribute__((packed)) api_packet; } __attribute__((packed)) api_packet;
// Telemetry packet structure (lightweight for streaming)
struct telemetry_packet {
u_int8_t server_id;
u_int8_t car_count;
struct car_telemetry {
u_int8_t carID;
char driver_name[64];
char driver_guid[64];
char car_model[64];
float normalizedSplinePos;
float speed_kmh;
u_int8_t gear;
u_int16_t rpm;
u_int32_t last_lap_time;
u_int32_t best_lap_time;
u_int16_t current_lap;
u_int8_t position;
} __attribute__((packed)) cars[64];
} __attribute__((packed));
enum ACSP_MessageType { enum ACSP_MessageType {
// ============================ // ============================
// PROTOCOL VERSION // PROTOCOL VERSION

View File

@ -46,3 +46,7 @@ bool Stack::peek(api_packet &item) {
size_t Stack::size() { size_t Stack::size() {
return top + 1; return top + 1;
} }
size_t Stack::getCapacity() {
return capacity;
}

View File

@ -20,6 +20,7 @@ class Stack {
bool pop(api_packet &item); bool pop(api_packet &item);
bool peek(api_packet &item); bool peek(api_packet &item);
size_t size(); size_t size();
size_t getCapacity();
}; };
#endif // STACK_H #endif // STACK_H

119
include/telemetry.cpp Normal file
View File

@ -0,0 +1,119 @@
// include/telemetry.cpp
// Telemetry server implementation for broadcasting car telemetry data to connected clients.
#include "telemetry.h"
std::vector<int> telemetry_clients;
pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
// Broadcast telemetry to all connected clients
void broadcast_telemetry(const api_packet& packet) {
telemetry_packet telem;
telem.server_id = packet.tracker_id;
telem.car_count = 0;
// Convert api_packet to telemetry_packet
for (int i = 0; i < MAX_PLAYERS; i++) {
if (packet.cars[i].isConnected) {
auto& car = telem.cars[telem.car_count];
car.carID = packet.cars[i].carID;
strncpy(car.driver_name, packet.cars[i].driver_name, sizeof(car.driver_name) - 1);
car.driver_name[sizeof(car.driver_name) - 1] = '\0';
strncpy(car.driver_guid, packet.cars[i].driver_GUID, sizeof(car.driver_guid) - 1);
car.driver_guid[sizeof(car.driver_guid) - 1] = '\0';
strncpy(car.car_model, packet.cars[i].car_model, sizeof(car.car_model) - 1);
car.car_model[sizeof(car.car_model) - 1] = '\0';
car.normalizedSplinePos = packet.cars[i].normalizedSplinePos;
// Calculate speed from velocity vector (m/s to km/h)
float speed_ms = std::sqrt(
std::pow(packet.cars[i].velocity.x, 2.0f) +
std::pow(packet.cars[i].velocity.y, 2.0f) +
std::pow(packet.cars[i].velocity.z, 2.0f)
);
car.speed_kmh = speed_ms * 3.6f;
car.gear = packet.cars[i].carGear;
car.rpm = packet.cars[i].carRPM;
car.last_lap_time = packet.cars[i].lap_time;
car.best_lap_time = 0; // TODO: Track best lap
car.current_lap = packet.cars[i].total_laps_completed;
car.position = telem.car_count + 1; // Temporary position
telem.car_count++;
}
}
// Broadcast to all connected clients
pthread_mutex_lock(&clients_mutex);
for (auto it = telemetry_clients.begin(); it != telemetry_clients.end();) {
ssize_t sent = send(*it, &telem, sizeof(telem), MSG_NOSIGNAL);
if (sent < 0) {
// Client disconnected, remove from list
printf("[T] Telemetry client %d disconnected\n", *it);
close(*it);
it = telemetry_clients.erase(it);
} else {
++it;
}
}
pthread_mutex_unlock(&clients_mutex);
}
// Thread to accept telemetry client connections
void* telemetry_server_thread(void* arg) {
(void)arg;
int server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (server_fd < 0) {
perror("[!] Telemetry socket creation failed");
return NULL;
}
unlink(TELEMETRY_SOCKET_PATH);
struct sockaddr_un addr;
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, TELEMETRY_SOCKET_PATH, sizeof(addr.sun_path) - 1);
addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
if (bind(server_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
perror("[!] Telemetry socket bind failed");
close(server_fd);
return NULL;
}
if (listen(server_fd, 5) < 0) {
perror("[!] Telemetry socket listen failed");
close(server_fd);
return NULL;
}
printf("[+] Telemetry server listening on %s\n", TELEMETRY_SOCKET_PATH);
while (1) {
int client_fd = accept(server_fd, NULL, NULL);
if (client_fd < 0) {
perror("[!] Telemetry accept failed");
continue;
}
pthread_mutex_lock(&clients_mutex);
if (telemetry_clients.size() < (size_t)MAX_TELEMETRY_CLIENTS) {
telemetry_clients.push_back(client_fd);
printf("[+] Telemetry client connected (total: %zu)\n", telemetry_clients.size());
} else {
printf("[!] Max telemetry clients reached, rejecting connection\n");
close(client_fd);
}
pthread_mutex_unlock(&clients_mutex);
}
close(server_fd);
return NULL;
}

25
include/telemetry.h Normal file
View File

@ -0,0 +1,25 @@
// include/telemetry.h
// Header for telemetry server and broadcasting functionality
#ifndef TELEMETRY_H
#define TELEMETRY_H
#include "server_structs.h"
#include <pthread.h>
#include <cstdio>
#include <algorithm>
#include <cmath>
#include <cstring>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <vector>
#define TELEMETRY_SOCKET_PATH "/tmp/ACtelemetry_socket"
void *telemetry_server_thread(void *arg);
void broadcast_telemetry(const api_packet &packet);
#endif // TELEMETRY_H

View File

@ -9,9 +9,12 @@
#include <sys/un.h> #include <sys/un.h>
#include <unistd.h> #include <unistd.h>
// native webstocket
// SERVER STRUCTS // SERVER STRUCTS
#include "server_structs.h" #include "server_structs.h"
#include "stack.h" #include "stack.h"
#include "telemetry.h"
#define SERVER_SOCKET_PATH "/tmp/ACplayer_socket" #define SERVER_SOCKET_PATH "/tmp/ACplayer_socket"
@ -27,6 +30,18 @@ void checkConn(PGconn *conn) {
} }
} }
void sanitize_utf8(char *str) {
if (!str)
return;
for (size_t i = 0; str[i] != '\0'; i++) {
// Replace invalid UTF-8 bytes with '?'
if ((unsigned char)str[i] > 127) {
str[i] = '?';
}
}
}
void *db_write_thread(void *arg) { void *db_write_thread(void *arg) {
(void)arg; // Unused parameter (void)arg; // Unused parameter
@ -44,8 +59,19 @@ void *db_write_thread(void *arg) {
if (api_queue.pop(packet)) { if (api_queue.pop(packet)) {
// Check if message was Handshake
if (packet.tracker_id == 65) {
printf("[+] Handshake packet received for server \"%s\" (%d), skipping database write.\n", packet.track_info.server_name, packet.tracker_id);
continue; // Skip database write for handshake packets
}
broadcast_telemetry(packet);
// DEBUG
printf("[W] Writing packet for Server with tracker ID: %d to database.\tQueue: %u/%d\n", packet.tracker_id, (unsigned int)api_queue.size(), STACK_SIZE); printf("[W] Writing packet for Server with tracker ID: %d to database.\tQueue: %u/%d\n", packet.tracker_id, (unsigned int)api_queue.size(), STACK_SIZE);
// Time for servers table insert/UPDATE
const char *query = "INSERT INTO servers (" const char *query = "INSERT INTO servers ("
"server_id, server_name, session_type, session_count, server_track, " "server_id, server_name, session_type, session_count, server_track, "
"server_config, server_weather_graphics, typ, session_time, session_laps, " "server_config, server_weather_graphics, typ, session_time, session_laps, "
@ -84,7 +110,7 @@ void *db_write_thread(void *arg) {
std::string road_temp_s = std::to_string(packet.track_info.road_temp); std::string road_temp_s = std::to_string(packet.track_info.road_temp);
std::string elapsed_time_s = std::to_string((u_int16_t)packet.track_info.elapsed_ms); std::string elapsed_time_s = std::to_string((u_int16_t)packet.track_info.elapsed_ms);
std::string connected_players_s = std::to_string(packet.connected_cars); std::string connected_players_s = std::to_string(packet.connected_players);
const char *paramValues[15] = {server_id_str.c_str(), const char *paramValues[15] = {server_id_str.c_str(),
server_name, server_name,
@ -107,22 +133,31 @@ void *db_write_thread(void *arg) {
if (PQresultStatus(res) != PGRES_TUPLES_OK) { if (PQresultStatus(res) != PGRES_TUPLES_OK) {
printf("[!] Insert failed: %s", PQerrorMessage(conn)); printf("[!] Insert failed: %s", PQerrorMessage(conn));
PQclear(res); PQclear(res);
PQfinish(conn); continue;
break;
} }
PQclear(res);
// Time for users table insert/UPDATE // Time for users table insert/UPDATE
for (int i = 0; i < MAX_PLAYERS; i++) {
if (packet.connected_cars == 0) {
PQclear(res);
continue; // No connected cars to process
}
for (int i = 0; i < packet.connected_cars; i++) {
if (packet.cars[i].isConnected == 0) { if (packet.cars[i].isConnected == 0) {
if (packet.cars[i].driver_GUID[0] == '\0') {
continue; // Skip empty slots
}
printf("[D] User disconnected (%d): GUID=%s, Name=%s\n", i, packet.cars[i].driver_GUID, packet.cars[i].driver_name);
// If the car is not connected, we still need to update its status in the db
const char *disconnect_query = "UPDATE users SET is_connect = false, current_server = NULL WHERE driver_guid = $1;";
const char *disconnect_paramValues[1] = {packet.cars[i].driver_GUID};
PGresult *disconnect_res = PQexecParams(conn, disconnect_query, 1, nullptr, disconnect_paramValues, nullptr, nullptr, 0);
if (PQresultStatus(disconnect_res) != PGRES_COMMAND_OK) {
printf("[!] User disconnect update failed for GUID %s: %s", packet.cars[i].driver_GUID, PQerrorMessage(conn));
}
PQclear(disconnect_res);
continue; // Skip disconnected cars continue; // Skip disconnected cars
} }
u_int8_t car_id = packet.cars[i].carID;
const char *user_query = "INSERT INTO users (" const char *user_query = "INSERT INTO users ("
"driver_guid, driver_name, driver_team, car_model, car_skin, " "driver_guid, driver_name, driver_team, car_model, car_skin, "
@ -141,18 +176,23 @@ void *db_write_thread(void *arg) {
"is_loading = EXCLUDED.is_loading, " "is_loading = EXCLUDED.is_loading, "
"current_server = EXCLUDED.current_server;"; "current_server = EXCLUDED.current_server;";
const char *driver_guid = packet.cars[i].driver_GUID; const char *driver_guid = packet.cars[car_id].driver_GUID;
const char *driver_name = packet.cars[i].driver_name; const char *driver_name = packet.cars[car_id].driver_name;
const char *driver_team = packet.cars[i].driver_team; const char *driver_team = packet.cars[car_id].driver_team;
const char *car_model = packet.cars[i].car_model; const char *car_model = packet.cars[car_id].car_model;
const char *car_skin = packet.cars[i].car_skin; const char *car_skin = packet.cars[car_id].car_skin;
std::string cuts_alltime_s = std::to_string(packet.cars[i].total_cuts_alltime); std::string cuts_alltime_s = std::to_string(packet.cars[car_id].total_cuts_alltime);
std::string contacts_alltime_s = std::to_string(packet.cars[i].total_contacts); std::string contacts_alltime_s = std::to_string(packet.cars[car_id].total_contacts);
std::string laps_completed_s = std::to_string(packet.cars[i].total_laps_completed); std::string laps_completed_s = std::to_string(packet.cars[car_id].total_laps_completed);
std::string is_connect_s = std::to_string(packet.cars[i].isConnected); std::string is_connect_s = std::to_string(packet.cars[car_id].isConnected);
std::string is_loading_s = std::to_string(packet.cars[i].isLoading); std::string is_loading_s = std::to_string(packet.cars[car_id].isLoading);
std::string current_server_s = server_id_str; std::string current_server_s = std::to_string(packet.tracker_id);
// Debug output for user data being inserted/updated
printf("[D] Processing user \(%d\): GUID=%s, Name=%s, Team=%s Car Model=%s, Skin=%s, Cuts=%s, Contacts=%s, Laps=%s, IsConnect=%s, IsLoading=%s, CurrentServer=%s\n",
i, driver_guid, driver_name, driver_team, car_model, car_skin, cuts_alltime_s.c_str(), contacts_alltime_s.c_str(), laps_completed_s.c_str(),
is_connect_s.c_str(), is_loading_s.c_str(), current_server_s.c_str());
const char *user_paramValues[11] = {driver_guid, const char *user_paramValues[11] = {driver_guid,
driver_name, driver_name,
@ -165,31 +205,67 @@ void *db_write_thread(void *arg) {
is_connect_s.c_str(), is_connect_s.c_str(),
is_loading_s.c_str(), is_loading_s.c_str(),
current_server_s.c_str()}; current_server_s.c_str()};
PGresult *user_res = PQexecParams(conn, user_query, 11, nullptr, user_paramValues, nullptr, nullptr, 0); PGresult *user_res = PQexecParams(conn, user_query, 11, nullptr, user_paramValues, nullptr, nullptr, 0);
if (PQresultStatus(user_res) != PGRES_COMMAND_OK) {
printf("[!] User insert/update failed for GUID %s: %s", driver_guid, PQerrorMessage(conn));
PQclear(user_res);
continue; // Proceed to next user
}
PQclear(user_res); PQclear(user_res);
} }
PQclear(res);
} else { } else {
usleep(1000); // Sleep for 1ms if no packets are available usleep(1000); // Sleep for 1ms if no packets are available
} }
} }
PQfinish(conn); PQfinish(conn);
return NULL; return NULL;
} }
void *handle_client(void *arg) {
int client_fd = *(int *)arg;
free(arg);
api_packet api_packet_storage;
ssize_t n;
while ((n = read(client_fd, &api_packet_storage, sizeof(api_packet))) > 0) {
printf("[+] Received %zd bytes\n", n);
if (api_packet_storage.tracker_id == 65) {
printf("[+] Handshake received from server \"%s\" (%d).\n", api_packet_storage.track_info.server_name, api_packet_storage.tracker_id);
continue;
}
printf("[*] Info: Tracker id: %d (\"%s\")\tQueue: %u/%d\n", api_packet_storage.tracker_id, api_packet_storage.track_info.server_name, (unsigned int)api_queue.size(),
(int)api_queue.getCapacity());
if (!api_queue.push(api_packet_storage)) {
printf("[!] Api queue full, dropping packet from tracker id: %d\n", api_packet_storage.tracker_id);
}
usleep(1500);
}
if (n < 0)
perror("[!] Error on read");
else
printf("[+] Client disconnected\n");
close(client_fd);
return NULL;
}
int main(void) { int main(void) {
int server_fd, client_fd; int server_fd, client_fd;
struct sockaddr_un addr; struct sockaddr_un addr;
socklen_t addr_len;
ssize_t n;
api_packet api_packet_storage; // create socket
// Create socket
server_fd = socket(AF_UNIX, SOCK_STREAM, 0); server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (server_fd < 0) { if (server_fd < 0) {
perror("socket"); perror("socket");
@ -198,25 +274,35 @@ int main(void) {
unlink(SERVER_SOCKET_PATH); unlink(SERVER_SOCKET_PATH);
addr.sun_family = AF_UNIX; addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, SERVER_SOCKET_PATH); strncpy(addr.sun_path, SERVER_SOCKET_PATH, sizeof(addr.sun_path) - 1);
addr.sun_path[sizeof(addr.sun_path) - 1] = '\0'; // null-terminate just in case
if (bind(server_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { if (bind(server_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
perror("bind"); perror("bind");
exit(1); exit(1);
} }
if (listen(server_fd, 5) < 0) { if (listen(server_fd, 5) < 0) {
perror("listen"); perror("listen");
exit(1); exit(1);
} }
// Start DB write db_write_thread // start db write db_write_thread
pthread_t db_thread; pthread_t db_thread;
if (pthread_create(&db_thread, NULL, db_write_thread, NULL) != 0) { if (pthread_create(&db_thread, NULL, db_write_thread, NULL) != 0) {
perror("pthread create"); perror("pthread create");
exit(1); exit(1);
} }
// Detach the thread so it cleans up after itself // detach the thread so it cleans up after itself
pthread_detach(db_thread); pthread_detach(db_thread);
pthread_t telemetry_thread;
if (pthread_create(&telemetry_thread, NULL, telemetry_server_thread, NULL) != 0) {
perror("[!] Failed to create telemetry server thread");
return EXIT_FAILURE;
}
pthread_detach(telemetry_thread);
printf("[+] Server listening on %s\n", SERVER_SOCKET_PATH); printf("[+] Server listening on %s\n", SERVER_SOCKET_PATH);
while (1) { while (1) {
@ -228,29 +314,18 @@ int main(void) {
printf("[+] Client connected\n"); printf("[+] Client connected\n");
while ((n = read(client_fd, &api_packet_storage, sizeof(api_packet))) > 0) { int *client_fd_ptr = (int *)malloc(sizeof(int));
printf("[+] Received %zd bytes\n", n); *client_fd_ptr = client_fd;
if (api_packet_storage.tracker_id == 65) { pthread_t client_thread;
printf("[+] Handshake Received from server \"%s\" (%d).\n", api_packet_storage.track_info.server_name, api_packet_storage.tracker_id); if (pthread_create(&client_thread, NULL, handle_client, client_fd_ptr) != 0) {
perror("pthread_create");
close(client_fd);
free(client_fd_ptr);
continue; continue;
} }
// Print info pthread_detach(client_thread); // no need to join manually
printf("[*] Info: Tracker ID: %d (\"%s\")\tQueue: %u/%d\n", api_packet_storage.tracker_id, api_packet_storage.track_info.server_name, (unsigned int)api_queue.size(),
STACK_SIZE);
if (!api_queue.push(api_packet_storage)) {
printf("[!] API Queue full, dropping packet from Tracker ID: %d\n", api_packet_storage.tracker_id);
}
usleep(1500);
}
if (n < 0)
perror("[!] Error on read");
else
printf("[+] Client disconnected\n");
close(client_fd);
} }
return 0; return 0;