#include "TcpLogger.h" #include "Threads.h" #include "StringHelper.h" #include "ConfigurationDatabase.h" namespace MARTe { CLASS_REGISTER(TcpLogger, "1.0") TcpLogger::TcpLogger() : ReferenceContainer(), LoggerConsumerI(), EmbeddedServiceMethodBinderI(), service(*this) { port = 8082; readIdx = 0; writeIdx = 0; workerThreadId = InvalidThreadIdentifier; for (uint32 i=0; iClose(); delete activeClients[i]; activeClients[i] = NULL_PTR(BasicTCPSocket*); } } clientsMutex.FastUnLock(); } bool TcpLogger::Initialise(StructuredDataI & data) { if (!ReferenceContainer::Initialise(data)) return false; if (!data.Read("Port", port)) { (void)data.Read("TcpPort", port); } (void)eventSem.Create(); ConfigurationDatabase threadData; threadData.Write("Timeout", (uint32)1000); if (!service.Initialise(threadData)) return false; if (!server.Open()) return false; if (!server.Listen(port)) { return false; } printf("[TcpLogger] Listening on port %u\n", port); if (service.Start() != ErrorManagement::NoError) { return false; } return true; } void TcpLogger::ConsumeLogMessage(LoggerPage *logPage) { if (logPage == NULL_PTR(LoggerPage*)) return; // 1. Mirror to stdout StreamString levelStr; ErrorManagement::ErrorCodeToStream(logPage->errorInfo.header.errorType, levelStr); printf("[%s] %s\n", levelStr.Buffer(), logPage->errorStrBuffer); fflush(stdout); // 2. Queue for TCP InsertLogIntoQueue(logPage->errorInfo, logPage->errorStrBuffer); } void TcpLogger::InsertLogIntoQueue(const ErrorManagement::ErrorInformation &info, const char8 * const description) { uint32 next = (writeIdx + 1) % QUEUE_SIZE; if (next != readIdx) { TcpLogEntry &entry = queue[writeIdx % QUEUE_SIZE]; entry.info = info; StringHelper::Copy(entry.description, description); writeIdx = next; (void)eventSem.Post(); } } ErrorManagement::ErrorType TcpLogger::Execute(ExecutionInfo & info) { if (info.GetStage() == ExecutionInfo::TerminationStage) return ErrorManagement::NoError; if (info.GetStage() == ExecutionInfo::StartupStage) { workerThreadId = Threads::Id(); return ErrorManagement::NoError; } while (info.GetStage() == ExecutionInfo::MainStage) { // 1. Check for new connections BasicTCPSocket *newClient = server.WaitConnection(1); if (newClient != NULL_PTR(BasicTCPSocket *)) { clientsMutex.FastLock(); bool added = false; for (uint32 i=0; iClose(); delete newClient; } else { (void)newClient->SetBlocking(false); } } // 2. Stream data to clients bool hadData = false; while (readIdx != writeIdx) { hadData = true; uint32 idx = readIdx % QUEUE_SIZE; TcpLogEntry &entry = queue[idx]; StreamString level; ErrorManagement::ErrorCodeToStream(entry.info.header.errorType, level); StreamString packet; packet.Printf("LOG %s %s\n", level.Buffer(), entry.description); uint32 size = packet.Size(); clientsMutex.FastLock(); for (uint32 j=0; jWrite(packet.Buffer(), s)) { activeClients[j]->Close(); delete activeClients[j]; activeClients[j] = NULL_PTR(BasicTCPSocket*); } } } clientsMutex.FastUnLock(); readIdx = (readIdx + 1) % QUEUE_SIZE; } if (!hadData) { (void)eventSem.Wait(TimeoutType(100)); eventSem.Reset(); } else { Sleep::MSec(1); } } return ErrorManagement::NoError; } }