Implemented separate TCP logger service

This commit is contained in:
Martino Ferrari
2026-02-21 22:30:16 +01:00
parent 817d7276b7
commit 87b9ccebfd
12 changed files with 626 additions and 224 deletions

View File

@@ -26,7 +26,6 @@
namespace MARTe {
DebugService* DebugService::instance = NULL_PTR(DebugService*);
ErrorManagement::ErrorProcessFunctionType DebugService::originalLogCallback = NULL_PTR(ErrorManagement::ErrorProcessFunctionType);
static void EscapeJson(const char8* src, StreamString &dst) {
if (src == NULL_PTR(const char8*)) return;
@@ -44,17 +43,14 @@ static void EscapeJson(const char8* src, StreamString &dst) {
CLASS_REGISTER(DebugService, "1.0")
DebugService::DebugService() :
ReferenceContainer(), EmbeddedServiceMethodBinderI(), LoggerConsumerI(),
ReferenceContainer(), EmbeddedServiceMethodBinderI(),
binderServer(this, ServiceBinder::ServerType),
binderStreamer(this, ServiceBinder::StreamerType),
binderLogStreamer(this, ServiceBinder::LogStreamerType),
threadService(binderServer),
streamerService(binderStreamer),
logStreamerService(binderLogStreamer)
streamerService(binderStreamer)
{
controlPort = 0;
streamPort = 8081;
logPort = 8082;
streamIP = "127.0.0.1";
numberOfSignals = 0;
numberOfAliases = 0;
@@ -63,40 +59,27 @@ DebugService::DebugService() :
isPaused = false;
for (uint32 i=0; i<MAX_CLIENTS; i++) {
activeClients[i] = NULL_PTR(BasicTCPSocket*);
activeLogClients[i] = NULL_PTR(BasicTCPSocket*);
}
logQueueRead = 0;
logQueueWrite = 0;
serverThreadId = InvalidThreadIdentifier;
streamerThreadId = InvalidThreadIdentifier;
logStreamerThreadId = InvalidThreadIdentifier;
}
DebugService::~DebugService() {
if (instance == this) {
if (ErrorManagement::errorMessageProcessFunction == &DebugService::LogCallback) {
ErrorManagement::SetErrorProcessFunction(originalLogCallback);
}
instance = NULL_PTR(DebugService*);
}
threadService.Stop();
streamerService.Stop();
logStreamerService.Stop();
tcpServer.Close();
udpSocket.Close();
logServer.Close();
for (uint32 i=0; i<MAX_CLIENTS; i++) {
if (activeClients[i] != NULL_PTR(BasicTCPSocket*)) {
activeClients[i]->Close();
delete activeClients[i];
}
if (activeLogClients[i] != NULL_PTR(BasicTCPSocket*)) {
activeLogClients[i]->Close();
delete activeLogClients[i];
}
}
}
@@ -115,10 +98,6 @@ bool DebugService::Initialise(StructuredDataI & data) {
if (!data.Read("StreamPort", streamPort)) {
(void)data.Read("UdpPort", streamPort);
}
if (!data.Read("LogPort", logPort)) {
(void)data.Read("TcpLogPort", logPort);
}
StreamString tempIP;
if (data.Read("StreamIP", tempIP)) {
@@ -133,22 +112,14 @@ bool DebugService::Initialise(StructuredDataI & data) {
}
if (isServer) {
if (ErrorManagement::errorMessageProcessFunction != &DebugService::LogCallback) {
originalLogCallback = ErrorManagement::errorMessageProcessFunction;
ErrorManagement::SetErrorProcessFunction(&DebugService::LogCallback);
}
if (!traceBuffer.Init(1024 * 1024)) return false;
(void)logEvent.Create();
PatchRegistry();
ConfigurationDatabase threadData;
threadData.Write("Timeout", (uint32)1000);
threadService.Initialise(threadData);
streamerService.Initialise(threadData);
logStreamerService.Initialise(threadData);
if (!tcpServer.Open()) {
REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to open TCP Server Socket");
@@ -166,13 +137,6 @@ bool DebugService::Initialise(StructuredDataI & data) {
}
printf("[DebugService] UDP Streamer socket opened\n");
if (!logServer.Open()) {
REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to open Log Server Socket");
return false;
}
(void)logServer.Listen(logPort);
printf("[DebugService] Log Server listening on port %u\n", logPort);
if (threadService.Start() != ErrorManagement::NoError) {
REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to start Server thread");
return false;
@@ -181,11 +145,7 @@ bool DebugService::Initialise(StructuredDataI & data) {
REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to start Streamer thread");
return false;
}
if (logStreamerService.Start() != ErrorManagement::NoError) {
REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to start LogStreamer thread");
return false;
}
printf("[DebugService] All worker threads started.\n");
printf("[DebugService] Worker threads started.\n");
}
return true;
@@ -333,11 +293,6 @@ ErrorManagement::ErrorType DebugService::Server(ExecutionInfo & info) {
}
while (info.GetStage() == ExecutionInfo::MainStage) {
if (ErrorManagement::errorMessageProcessFunction != &DebugService::LogCallback) {
originalLogCallback = ErrorManagement::errorMessageProcessFunction;
ErrorManagement::SetErrorProcessFunction(&DebugService::LogCallback);
}
BasicTCPSocket *newClient = tcpServer.WaitConnection(1);
if (newClient != NULL_PTR(BasicTCPSocket *)) {
clientsMutex.FastLock();
@@ -388,72 +343,6 @@ ErrorManagement::ErrorType DebugService::Server(ExecutionInfo & info) {
return ErrorManagement::NoError;
}
ErrorManagement::ErrorType DebugService::LogStreamer(ExecutionInfo & info) {
if (info.GetStage() == ExecutionInfo::TerminationStage) return ErrorManagement::NoError;
if (info.GetStage() == ExecutionInfo::StartupStage) {
logStreamerThreadId = Threads::Id();
return ErrorManagement::NoError;
}
while (info.GetStage() == ExecutionInfo::MainStage) {
BasicTCPSocket *newClient = logServer.WaitConnection(1);
if (newClient != NULL_PTR(BasicTCPSocket *)) {
logClientsMutex.FastLock();
bool added = false;
for (uint32 i=0; i<MAX_CLIENTS; i++) {
if (activeLogClients[i] == NULL_PTR(BasicTCPSocket*)) {
activeLogClients[i] = newClient;
added = true;
break;
}
}
logClientsMutex.FastUnLock();
if (!added) {
newClient->Close();
delete newClient;
}
}
bool hadData = false;
for (uint32 b=0; b<50; b++) {
if (logQueueRead == logQueueWrite) break;
hadData = true;
uint32 idx = logQueueRead % LOG_QUEUE_SIZE;
LogEntry &entry = logQueue[idx];
StreamString level;
ErrorManagement::ErrorCodeToStream(entry.info.header.errorType, level);
StreamString packet;
packet.Printf("LOG %s %s\n", level.Buffer(), entry.description);
uint32 size = packet.Size();
logClientsMutex.FastLock();
for (uint32 j=0; j<MAX_CLIENTS; j++) {
if (activeLogClients[j] != NULL_PTR(BasicTCPSocket*)) {
uint32 s = size;
if (!activeLogClients[j]->Write(packet.Buffer(), s)) {
activeLogClients[j]->Close();
delete activeLogClients[j];
activeLogClients[j] = NULL_PTR(BasicTCPSocket*);
}
}
}
logClientsMutex.FastUnLock();
logQueueRead++;
}
if (!hadData) {
(void)logEvent.Wait(TimeoutType(100));
logEvent.Reset();
} else {
Sleep::MSec(1);
}
}
return ErrorManagement::NoError;
}
ErrorManagement::ErrorType DebugService::Streamer(ExecutionInfo & info) {
if (info.GetStage() == ExecutionInfo::TerminationStage) return ErrorManagement::NoError;
if (info.GetStage() == ExecutionInfo::StartupStage) {
@@ -884,43 +773,4 @@ void DebugService::ListNodes(const char8* path, BasicTCPSocket *client) {
}
}
void DebugService::ConsumeLogMessage(LoggerPage *logPage) {
}
void DebugService::LogCallback(const ErrorManagement::ErrorInformation &errorInfo, const char8 * const errorDescription) {
ThreadIdentifier current = Threads::Id();
if (instance != NULL_PTR(DebugService*)) {
StreamString levelStr;
ErrorManagement::ErrorCodeToStream(errorInfo.header.errorType, levelStr);
printf("[%s] %s\n", levelStr.Buffer(), errorDescription);
fflush(stdout);
bool isWorkerThread = false;
if (instance->serverThreadId != InvalidThreadIdentifier && current == instance->serverThreadId) isWorkerThread = true;
if (instance->streamerThreadId != InvalidThreadIdentifier && current == instance->streamerThreadId) isWorkerThread = true;
if (instance->logStreamerThreadId != InvalidThreadIdentifier && current == instance->logStreamerThreadId) isWorkerThread = true;
if (isWorkerThread) return;
if (instance->suppressTimeoutLogs && StringHelper::SearchString(errorDescription, "Timeout expired in recv()") != NULL_PTR(const char8*)) return;
LoggerPage tempPage;
tempPage.errorInfo = errorInfo;
StringHelper::Copy(tempPage.errorStrBuffer, errorDescription);
instance->InsertLogIntoQueue(&tempPage);
}
if (originalLogCallback) originalLogCallback(errorInfo, errorDescription);
}
void DebugService::InsertLogIntoQueue(LoggerPage *logPage) {
uint32 next = (logQueueWrite + 1) % LOG_QUEUE_SIZE;
if (next != logQueueRead) {
LogEntry &entry = logQueue[logQueueWrite % LOG_QUEUE_SIZE];
entry.info = logPage->errorInfo;
StringHelper::Copy(entry.description, logPage->errorStrBuffer);
logQueueWrite = next;
(void)logEvent.Post();
}
}
}

157
Source/TcpLogger.cpp Normal file
View File

@@ -0,0 +1,157 @@
#include "TcpLogger.h"
#include "ObjectRegistryDatabase.h"
#include "Threads.h"
#include "StringHelper.h"
#include "ConfigurationDatabase.h"
namespace MARTe {
CLASS_REGISTER(TcpLogger, "1.0")
TcpLogger::TcpLogger() :
ReferenceContainer(), LoggerConsumerI(), EmbeddedServiceMethodBinderI(),
service(*this)
{
port = 8082;
readIdx = 0;
writeIdx = 0;
workerThreadId = InvalidThreadIdentifier;
for (uint32 i=0; i<MAX_CLIENTS; i++) {
activeClients[i] = NULL_PTR(BasicTCPSocket*);
}
}
TcpLogger::~TcpLogger() {
service.Stop();
server.Close();
clientsMutex.FastLock();
for (uint32 i=0; i<MAX_CLIENTS; i++) {
if (activeClients[i] != NULL_PTR(BasicTCPSocket*)) {
activeClients[i]->Close();
delete activeClients[i];
activeClients[i] = NULL_PTR(BasicTCPSocket*);
}
}
clientsMutex.FastUnLock();
}
bool TcpLogger::Initialise(StructuredDataI & data) {
if (!ReferenceContainer::Initialise(data)) return false;
if (!data.Read("Port", port)) {
(void)data.Read("TcpPort", port);
}
(void)eventSem.Create();
ConfigurationDatabase threadData;
threadData.Write("Timeout", (uint32)1000);
if (!service.Initialise(threadData)) return false;
if (!server.Open()) return false;
if (!server.Listen(port)) {
return false;
}
printf("[TcpLogger] Listening on port %u\n", port);
if (service.Start() != ErrorManagement::NoError) {
return false;
}
return true;
}
void TcpLogger::ConsumeLogMessage(LoggerPage *logPage) {
if (logPage == NULL_PTR(LoggerPage*)) return;
// 1. Mirror to stdout
StreamString levelStr;
ErrorManagement::ErrorCodeToStream(logPage->errorInfo.header.errorType, levelStr);
printf("[%s] %s\n", levelStr.Buffer(), logPage->errorStrBuffer);
fflush(stdout);
// 2. Queue for TCP
InsertLogIntoQueue(logPage->errorInfo, logPage->errorStrBuffer);
}
void TcpLogger::InsertLogIntoQueue(const ErrorManagement::ErrorInformation &info, const char8 * const description) {
uint32 next = (writeIdx + 1) % QUEUE_SIZE;
if (next != readIdx) {
TcpLogEntry &entry = queue[writeIdx % QUEUE_SIZE];
entry.info = info;
StringHelper::Copy(entry.description, description);
writeIdx = next;
(void)eventSem.Post();
}
}
ErrorManagement::ErrorType TcpLogger::Execute(ExecutionInfo & info) {
if (info.GetStage() == ExecutionInfo::TerminationStage) return ErrorManagement::NoError;
if (info.GetStage() == ExecutionInfo::StartupStage) {
workerThreadId = Threads::Id();
return ErrorManagement::NoError;
}
while (info.GetStage() == ExecutionInfo::MainStage) {
// 1. Check for new connections
BasicTCPSocket *newClient = server.WaitConnection(1);
if (newClient != NULL_PTR(BasicTCPSocket *)) {
clientsMutex.FastLock();
bool added = false;
for (uint32 i=0; i<MAX_CLIENTS; i++) {
if (activeClients[i] == NULL_PTR(BasicTCPSocket*)) {
activeClients[i] = newClient;
added = true;
break;
}
}
clientsMutex.FastUnLock();
if (!added) {
newClient->Close();
delete newClient;
} else {
(void)newClient->SetBlocking(false);
}
}
// 2. Stream data to clients
bool hadData = false;
while (readIdx != writeIdx) {
hadData = true;
uint32 idx = readIdx % QUEUE_SIZE;
TcpLogEntry &entry = queue[idx];
StreamString level;
ErrorManagement::ErrorCodeToStream(entry.info.header.errorType, level);
StreamString packet;
packet.Printf("LOG %s %s\n", level.Buffer(), entry.description);
uint32 size = packet.Size();
clientsMutex.FastLock();
for (uint32 j=0; j<MAX_CLIENTS; j++) {
if (activeClients[j] != NULL_PTR(BasicTCPSocket*)) {
uint32 s = size;
if (!activeClients[j]->Write(packet.Buffer(), s)) {
activeClients[j]->Close();
delete activeClients[j];
activeClients[j] = NULL_PTR(BasicTCPSocket*);
}
}
}
clientsMutex.FastUnLock();
readIdx = (readIdx + 1) % QUEUE_SIZE;
}
if (!hadData) {
(void)eventSem.Wait(TimeoutType(100));
eventSem.Reset();
} else {
Sleep::MSec(1);
}
}
return ErrorManagement::NoError;
}
}