diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 08936b4..f279b12 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -25,7 +25,8 @@ The system uses three distinct channels: ### 3.3 Log Streaming (TCP Port 8082) - **Protocol:** Real-time event streaming. -- **Role:** Forwards global `REPORT_ERROR` calls from the framework to the GUI client. +- **Service:** `TcpLogger` (Standalone component). +- **Role:** Forwards global `REPORT_ERROR` calls and `stdout` messages to the GUI client. ## 4. Component Diagram ```text @@ -33,9 +34,11 @@ The system uses three distinct channels: | | + <--- [ DebugBrokerWrapper ] (Registry Patch) | | -[ DebugService ] <----------+ - | - +---- (TCP 8080) ----> [ Rust GUI Client ] - +---- (UDP 8081) ----> [ (Oscilloscope) ] - +---- (TCP 8082) ----> [ (Log Terminal) ] + +---------------------+ + | | +[ DebugService ] [ TcpLogger ] + | | + +--- (TCP 8080) ------+-----> [ Rust GUI Client ] + +--- (UDP 8081) ------+-----> [ (Oscilloscope) ] + +--- (TCP 8082) ------+-----> [ (Log Terminal) ] ``` diff --git a/Headers/DebugService.h b/Headers/DebugService.h index 7d66c94..d591ead 100644 --- a/Headers/DebugService.h +++ b/Headers/DebugService.h @@ -17,23 +17,17 @@ #include "ClassRegistryDatabase.h" #include "ErrorManagement.h" #include "AdvancedErrorManagement.h" -#include "LoggerConsumerI.h" #include "Threads.h" #include "EventSem.h" namespace MARTe { -struct LogEntry { - ErrorManagement::ErrorInformation info; - char8 description[MAX_ERROR_MESSAGE_SIZE]; -}; - struct SignalAlias { StreamString name; uint32 signalIndex; }; -class DebugService : public ReferenceContainer, public MessageI, public EmbeddedServiceMethodBinderI, public LoggerConsumerI { +class DebugService : public ReferenceContainer, public MessageI, public EmbeddedServiceMethodBinderI { public: CLASS_REGISTER_DECLARATION() @@ -46,16 +40,12 @@ public: void ProcessSignal(DebugSignalInfo* signalInfo, uint32 size); virtual ErrorManagement::ErrorType Execute(ExecutionInfo & info); - virtual void ConsumeLogMessage(LoggerPage *logPage); - static void LogCallback(const ErrorManagement::ErrorInformation &errorInfo, const char8 * const errorDescription); - void InsertLogIntoQueue(LoggerPage *logPage); bool IsPaused() const { return isPaused; } void SetPaused(bool paused) { isPaused = paused; } static bool GetFullObjectName(const Object &obj, StreamString &fullPath); - // Made public for integration tests and debug access uint32 ForceSignal(const char8* name, const char8* valueStr); uint32 UnforceSignal(const char8* name); uint32 TraceSignal(const char8* name, bool enable, uint32 decimation = 1); @@ -71,11 +61,9 @@ private: ErrorManagement::ErrorType Server(ExecutionInfo & info); ErrorManagement::ErrorType Streamer(ExecutionInfo & info); - ErrorManagement::ErrorType LogStreamer(ExecutionInfo & info); uint16 controlPort; uint16 streamPort; - uint16 logPort; StreamString streamIP; bool isServer; bool suppressTimeoutLogs; @@ -83,15 +71,13 @@ private: BasicTCPSocket tcpServer; BasicUDPSocket udpSocket; - BasicTCPSocket logServer; class ServiceBinder : public EmbeddedServiceMethodBinderI { public: - enum ServiceType { ServerType, StreamerType, LogStreamerType }; + enum ServiceType { ServerType, StreamerType }; ServiceBinder(DebugService *parent, ServiceType type) : parent(parent), type(type) {} virtual ErrorManagement::ErrorType Execute(ExecutionInfo & info) { if (type == StreamerType) return parent->Streamer(info); - if (type == LogStreamerType) return parent->LogStreamer(info); return parent->Server(info); } private: @@ -101,15 +87,12 @@ private: ServiceBinder binderServer; ServiceBinder binderStreamer; - ServiceBinder binderLogStreamer; SingleThreadService threadService; SingleThreadService streamerService; - SingleThreadService logStreamerService; ThreadIdentifier serverThreadId; ThreadIdentifier streamerThreadId; - ThreadIdentifier logStreamerThreadId; static const uint32 MAX_SIGNALS = 4096; DebugSignalInfo signals[MAX_SIGNALS]; @@ -126,17 +109,7 @@ private: BasicTCPSocket* activeClients[MAX_CLIENTS]; FastPollingMutexSem clientsMutex; - BasicTCPSocket* activeLogClients[MAX_CLIENTS]; - FastPollingMutexSem logClientsMutex; - - static const uint32 LOG_QUEUE_SIZE = 1024; - LogEntry logQueue[LOG_QUEUE_SIZE]; - volatile uint32 logQueueRead; - volatile uint32 logQueueWrite; - EventSem logEvent; - static DebugService* instance; - static ErrorManagement::ErrorProcessFunctionType originalLogCallback; }; } diff --git a/Headers/TcpLogger.h b/Headers/TcpLogger.h new file mode 100644 index 0000000..dddb78f --- /dev/null +++ b/Headers/TcpLogger.h @@ -0,0 +1,66 @@ +#ifndef TCPLOGGER_H +#define TCPLOGGER_H + +#include "LoggerConsumerI.h" +#include "ReferenceContainer.h" +#include "EmbeddedServiceMethodBinderI.h" +#include "BasicTCPSocket.h" +#include "SingleThreadService.h" +#include "FastPollingMutexSem.h" +#include "EventSem.h" +#include "StreamString.h" + +namespace MARTe { + +struct TcpLogEntry { + ErrorManagement::ErrorInformation info; + char8 description[MAX_ERROR_MESSAGE_SIZE]; +}; + +/** + * @brief Logger consumer that publishes framework logs to TCP and stdout. + * @details Implements LoggerConsumerI to be used inside a LoggerService. + */ +class TcpLogger : public ReferenceContainer, public LoggerConsumerI, public EmbeddedServiceMethodBinderI { +public: + CLASS_REGISTER_DECLARATION() + + TcpLogger(); + virtual ~TcpLogger(); + + virtual bool Initialise(StructuredDataI & data); + + /** + * @brief Implementation of LoggerConsumerI. + * Called by LoggerService. + */ + virtual void ConsumeLogMessage(LoggerPage *logPage); + + /** + * @brief Worker thread method for TCP streaming. + */ + virtual ErrorManagement::ErrorType Execute(ExecutionInfo & info); + +private: + void InsertLogIntoQueue(const ErrorManagement::ErrorInformation &info, const char8 * const description); + + uint16 port; + BasicTCPSocket server; + + static const uint32 MAX_CLIENTS = 8; + BasicTCPSocket* activeClients[MAX_CLIENTS]; + FastPollingMutexSem clientsMutex; + + SingleThreadService service; + ThreadIdentifier workerThreadId; + + static const uint32 QUEUE_SIZE = 1024; + TcpLogEntry queue[QUEUE_SIZE]; + volatile uint32 readIdx; + volatile uint32 writeIdx; + EventSem eventSem; +}; + +} + +#endif diff --git a/README.md b/README.md index 9b009c3..4d02a0f 100644 --- a/README.md +++ b/README.md @@ -39,5 +39,13 @@ To enable debugging in your application, add the following to your `.cfg`: ControlPort = 8080 UdpPort = 8081 } + ++LoggerService = { + Class = LoggerService + +DebugConsumer = { + Class = TcpLogger + Port = 8082 + } +} ``` The suite automatically patches the registry to instrument your existing Brokers and Schedulers. diff --git a/SPECS.md b/SPECS.md index 25f5708..9d0426d 100644 --- a/SPECS.md +++ b/SPECS.md @@ -8,7 +8,7 @@ Implement a "Zero-Code-Change" observability layer for the MARTe2 real-time fram - **FR-01 (Discovery):** Discover the full MARTe2 object hierarchy at runtime. - **FR-02 (Telemetry):** Stream high-frequency signal data (verified up to 100Hz) to a remote client. - **FR-03 (Forcing):** Allow manual override of signal values in memory during execution. -- **FR-04 (Logs):** Stream global framework logs to a dedicated terminal. +- **FR-04 (Logs):** Stream global framework logs to a dedicated terminal via a standalone `TcpLogger` service. - **FR-05 (Execution Control):** Pause and resume the real-time execution threads via scheduler injection. - **FR-06 (UI):** Provide a native, immediate-mode GUI for visualization (Oscilloscope). diff --git a/Source/DebugService.cpp b/Source/DebugService.cpp index 5456e55..c132091 100644 --- a/Source/DebugService.cpp +++ b/Source/DebugService.cpp @@ -26,7 +26,6 @@ namespace MARTe { DebugService* DebugService::instance = NULL_PTR(DebugService*); -ErrorManagement::ErrorProcessFunctionType DebugService::originalLogCallback = NULL_PTR(ErrorManagement::ErrorProcessFunctionType); static void EscapeJson(const char8* src, StreamString &dst) { if (src == NULL_PTR(const char8*)) return; @@ -44,17 +43,14 @@ static void EscapeJson(const char8* src, StreamString &dst) { CLASS_REGISTER(DebugService, "1.0") DebugService::DebugService() : - ReferenceContainer(), EmbeddedServiceMethodBinderI(), LoggerConsumerI(), + ReferenceContainer(), EmbeddedServiceMethodBinderI(), binderServer(this, ServiceBinder::ServerType), binderStreamer(this, ServiceBinder::StreamerType), - binderLogStreamer(this, ServiceBinder::LogStreamerType), threadService(binderServer), - streamerService(binderStreamer), - logStreamerService(binderLogStreamer) + streamerService(binderStreamer) { controlPort = 0; streamPort = 8081; - logPort = 8082; streamIP = "127.0.0.1"; numberOfSignals = 0; numberOfAliases = 0; @@ -63,40 +59,27 @@ DebugService::DebugService() : isPaused = false; for (uint32 i=0; iClose(); delete activeClients[i]; } - if (activeLogClients[i] != NULL_PTR(BasicTCPSocket*)) { - activeLogClients[i]->Close(); - delete activeLogClients[i]; - } } } @@ -115,10 +98,6 @@ bool DebugService::Initialise(StructuredDataI & data) { if (!data.Read("StreamPort", streamPort)) { (void)data.Read("UdpPort", streamPort); } - - if (!data.Read("LogPort", logPort)) { - (void)data.Read("TcpLogPort", logPort); - } StreamString tempIP; if (data.Read("StreamIP", tempIP)) { @@ -133,22 +112,14 @@ bool DebugService::Initialise(StructuredDataI & data) { } if (isServer) { - if (ErrorManagement::errorMessageProcessFunction != &DebugService::LogCallback) { - originalLogCallback = ErrorManagement::errorMessageProcessFunction; - ErrorManagement::SetErrorProcessFunction(&DebugService::LogCallback); - } - if (!traceBuffer.Init(1024 * 1024)) return false; - (void)logEvent.Create(); - PatchRegistry(); ConfigurationDatabase threadData; threadData.Write("Timeout", (uint32)1000); threadService.Initialise(threadData); streamerService.Initialise(threadData); - logStreamerService.Initialise(threadData); if (!tcpServer.Open()) { REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to open TCP Server Socket"); @@ -166,13 +137,6 @@ bool DebugService::Initialise(StructuredDataI & data) { } printf("[DebugService] UDP Streamer socket opened\n"); - if (!logServer.Open()) { - REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to open Log Server Socket"); - return false; - } - (void)logServer.Listen(logPort); - printf("[DebugService] Log Server listening on port %u\n", logPort); - if (threadService.Start() != ErrorManagement::NoError) { REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to start Server thread"); return false; @@ -181,11 +145,7 @@ bool DebugService::Initialise(StructuredDataI & data) { REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to start Streamer thread"); return false; } - if (logStreamerService.Start() != ErrorManagement::NoError) { - REPORT_ERROR(ErrorManagement::FatalError, "DebugService: Failed to start LogStreamer thread"); - return false; - } - printf("[DebugService] All worker threads started.\n"); + printf("[DebugService] Worker threads started.\n"); } return true; @@ -333,11 +293,6 @@ ErrorManagement::ErrorType DebugService::Server(ExecutionInfo & info) { } while (info.GetStage() == ExecutionInfo::MainStage) { - if (ErrorManagement::errorMessageProcessFunction != &DebugService::LogCallback) { - originalLogCallback = ErrorManagement::errorMessageProcessFunction; - ErrorManagement::SetErrorProcessFunction(&DebugService::LogCallback); - } - BasicTCPSocket *newClient = tcpServer.WaitConnection(1); if (newClient != NULL_PTR(BasicTCPSocket *)) { clientsMutex.FastLock(); @@ -388,72 +343,6 @@ ErrorManagement::ErrorType DebugService::Server(ExecutionInfo & info) { return ErrorManagement::NoError; } -ErrorManagement::ErrorType DebugService::LogStreamer(ExecutionInfo & info) { - if (info.GetStage() == ExecutionInfo::TerminationStage) return ErrorManagement::NoError; - if (info.GetStage() == ExecutionInfo::StartupStage) { - logStreamerThreadId = Threads::Id(); - return ErrorManagement::NoError; - } - - while (info.GetStage() == ExecutionInfo::MainStage) { - BasicTCPSocket *newClient = logServer.WaitConnection(1); - if (newClient != NULL_PTR(BasicTCPSocket *)) { - logClientsMutex.FastLock(); - bool added = false; - for (uint32 i=0; iClose(); - delete newClient; - } - } - - bool hadData = false; - for (uint32 b=0; b<50; b++) { - if (logQueueRead == logQueueWrite) break; - hadData = true; - - uint32 idx = logQueueRead % LOG_QUEUE_SIZE; - LogEntry &entry = logQueue[idx]; - - StreamString level; - ErrorManagement::ErrorCodeToStream(entry.info.header.errorType, level); - - StreamString packet; - packet.Printf("LOG %s %s\n", level.Buffer(), entry.description); - uint32 size = packet.Size(); - - logClientsMutex.FastLock(); - for (uint32 j=0; jWrite(packet.Buffer(), s)) { - activeLogClients[j]->Close(); - delete activeLogClients[j]; - activeLogClients[j] = NULL_PTR(BasicTCPSocket*); - } - } - } - logClientsMutex.FastUnLock(); - logQueueRead++; - } - - if (!hadData) { - (void)logEvent.Wait(TimeoutType(100)); - logEvent.Reset(); - } else { - Sleep::MSec(1); - } - } - return ErrorManagement::NoError; -} - ErrorManagement::ErrorType DebugService::Streamer(ExecutionInfo & info) { if (info.GetStage() == ExecutionInfo::TerminationStage) return ErrorManagement::NoError; if (info.GetStage() == ExecutionInfo::StartupStage) { @@ -884,43 +773,4 @@ void DebugService::ListNodes(const char8* path, BasicTCPSocket *client) { } } -void DebugService::ConsumeLogMessage(LoggerPage *logPage) { -} - -void DebugService::LogCallback(const ErrorManagement::ErrorInformation &errorInfo, const char8 * const errorDescription) { - ThreadIdentifier current = Threads::Id(); - if (instance != NULL_PTR(DebugService*)) { - StreamString levelStr; - ErrorManagement::ErrorCodeToStream(errorInfo.header.errorType, levelStr); - printf("[%s] %s\n", levelStr.Buffer(), errorDescription); - fflush(stdout); - - bool isWorkerThread = false; - if (instance->serverThreadId != InvalidThreadIdentifier && current == instance->serverThreadId) isWorkerThread = true; - if (instance->streamerThreadId != InvalidThreadIdentifier && current == instance->streamerThreadId) isWorkerThread = true; - if (instance->logStreamerThreadId != InvalidThreadIdentifier && current == instance->logStreamerThreadId) isWorkerThread = true; - - if (isWorkerThread) return; - - if (instance->suppressTimeoutLogs && StringHelper::SearchString(errorDescription, "Timeout expired in recv()") != NULL_PTR(const char8*)) return; - - LoggerPage tempPage; - tempPage.errorInfo = errorInfo; - StringHelper::Copy(tempPage.errorStrBuffer, errorDescription); - instance->InsertLogIntoQueue(&tempPage); - } - if (originalLogCallback) originalLogCallback(errorInfo, errorDescription); -} - -void DebugService::InsertLogIntoQueue(LoggerPage *logPage) { - uint32 next = (logQueueWrite + 1) % LOG_QUEUE_SIZE; - if (next != logQueueRead) { - LogEntry &entry = logQueue[logQueueWrite % LOG_QUEUE_SIZE]; - entry.info = logPage->errorInfo; - StringHelper::Copy(entry.description, logPage->errorStrBuffer); - logQueueWrite = next; - (void)logEvent.Post(); - } -} - } diff --git a/Source/TcpLogger.cpp b/Source/TcpLogger.cpp new file mode 100644 index 0000000..6d8e789 --- /dev/null +++ b/Source/TcpLogger.cpp @@ -0,0 +1,157 @@ +#include "TcpLogger.h" +#include "ObjectRegistryDatabase.h" +#include "Threads.h" +#include "StringHelper.h" +#include "ConfigurationDatabase.h" + +namespace MARTe { + +CLASS_REGISTER(TcpLogger, "1.0") + +TcpLogger::TcpLogger() : + ReferenceContainer(), LoggerConsumerI(), EmbeddedServiceMethodBinderI(), + service(*this) +{ + port = 8082; + readIdx = 0; + writeIdx = 0; + workerThreadId = InvalidThreadIdentifier; + for (uint32 i=0; iClose(); + delete activeClients[i]; + activeClients[i] = NULL_PTR(BasicTCPSocket*); + } + } + clientsMutex.FastUnLock(); +} + +bool TcpLogger::Initialise(StructuredDataI & data) { + if (!ReferenceContainer::Initialise(data)) return false; + + if (!data.Read("Port", port)) { + (void)data.Read("TcpPort", port); + } + + (void)eventSem.Create(); + + ConfigurationDatabase threadData; + threadData.Write("Timeout", (uint32)1000); + if (!service.Initialise(threadData)) return false; + + if (!server.Open()) return false; + if (!server.Listen(port)) { + return false; + } + printf("[TcpLogger] Listening on port %u\n", port); + + if (service.Start() != ErrorManagement::NoError) { + return false; + } + + return true; +} + +void TcpLogger::ConsumeLogMessage(LoggerPage *logPage) { + if (logPage == NULL_PTR(LoggerPage*)) return; + + // 1. Mirror to stdout + StreamString levelStr; + ErrorManagement::ErrorCodeToStream(logPage->errorInfo.header.errorType, levelStr); + printf("[%s] %s\n", levelStr.Buffer(), logPage->errorStrBuffer); + fflush(stdout); + + // 2. Queue for TCP + InsertLogIntoQueue(logPage->errorInfo, logPage->errorStrBuffer); +} + +void TcpLogger::InsertLogIntoQueue(const ErrorManagement::ErrorInformation &info, const char8 * const description) { + uint32 next = (writeIdx + 1) % QUEUE_SIZE; + if (next != readIdx) { + TcpLogEntry &entry = queue[writeIdx % QUEUE_SIZE]; + entry.info = info; + StringHelper::Copy(entry.description, description); + writeIdx = next; + (void)eventSem.Post(); + } +} + +ErrorManagement::ErrorType TcpLogger::Execute(ExecutionInfo & info) { + if (info.GetStage() == ExecutionInfo::TerminationStage) return ErrorManagement::NoError; + if (info.GetStage() == ExecutionInfo::StartupStage) { + workerThreadId = Threads::Id(); + return ErrorManagement::NoError; + } + + while (info.GetStage() == ExecutionInfo::MainStage) { + // 1. Check for new connections + BasicTCPSocket *newClient = server.WaitConnection(1); + if (newClient != NULL_PTR(BasicTCPSocket *)) { + clientsMutex.FastLock(); + bool added = false; + for (uint32 i=0; iClose(); + delete newClient; + } else { + (void)newClient->SetBlocking(false); + } + } + + // 2. Stream data to clients + bool hadData = false; + while (readIdx != writeIdx) { + hadData = true; + uint32 idx = readIdx % QUEUE_SIZE; + TcpLogEntry &entry = queue[idx]; + + StreamString level; + ErrorManagement::ErrorCodeToStream(entry.info.header.errorType, level); + + StreamString packet; + packet.Printf("LOG %s %s\n", level.Buffer(), entry.description); + uint32 size = packet.Size(); + + clientsMutex.FastLock(); + for (uint32 j=0; jWrite(packet.Buffer(), s)) { + activeClients[j]->Close(); + delete activeClients[j]; + activeClients[j] = NULL_PTR(BasicTCPSocket*); + } + } + } + clientsMutex.FastUnLock(); + readIdx = (readIdx + 1) % QUEUE_SIZE; + } + + if (!hadData) { + (void)eventSem.Wait(TimeoutType(100)); + eventSem.Reset(); + } else { + Sleep::MSec(1); + } + } + return ErrorManagement::NoError; +} + +} diff --git a/TUTORIAL.md b/TUTORIAL.md index 79b9076..4d48518 100644 --- a/TUTORIAL.md +++ b/TUTORIAL.md @@ -9,7 +9,7 @@ First, start your application using the provided debug runner. This script launc ```bash ./run_debug_app.sh ``` -*Note: You should see logs indicating the `DebugService` has started on port 8080.* +*Note: You should see logs indicating the `DebugService` has started on port 8080 and `TcpLogger` on port 8082.* ### Start the GUI Client In a new terminal window, navigate to the client directory and run the GUI: @@ -51,6 +51,6 @@ If you need to "freeze" the entire application to inspect a specific state: - Click **▶ Resume** to restart the execution. ### Log Terminal -The bottom panel displays every `REPORT_ERROR` event from the C++ framework. +The bottom panel displays every `REPORT_ERROR` event from the C++ framework, powered by the standalone `TcpLogger` service. - **Regex Filter:** Type a keyword like `Timer` in the filter box to isolate relevant events. - **Pause Logs:** Toggle **⏸ Pause Logs** to stop the scrolling view while data continues to be captured in the background. diff --git a/Test/Configurations/debug_test.cfg b/Test/Configurations/debug_test.cfg index 419ebe7..211db69 100644 --- a/Test/Configurations/debug_test.cfg +++ b/Test/Configurations/debug_test.cfg @@ -1,4 +1,4 @@ -$App = { ++App = { Class = RealTimeApplication +Functions = { Class = ReferenceContainer @@ -32,7 +32,7 @@ $App = { DefaultDataSource = DDB +Timer = { Class = LinuxTimer - SleepTime = 1000000 // 1 second cycle to reduce log spam + SleepTime = 1000000 Signals = { Counter = { Type = uint32 @@ -96,6 +96,7 @@ $App = { Class = LoggerService CPUs = 0x1 +DebugConsumer = { - Class = DebugService + Class = TcpLogger + Port = 8082 } } diff --git a/Test/UnitTests/main.cpp b/Test/UnitTests/main.cpp index 7f53f35..3dd9831 100644 --- a/Test/UnitTests/main.cpp +++ b/Test/UnitTests/main.cpp @@ -1,35 +1,108 @@ #include #include #include "DebugCore.h" +#include "DebugService.h" +#include "TcpLogger.h" +#include "ConfigurationDatabase.h" +#include "ObjectRegistryDatabase.h" +#include "StandardParser.h" using namespace MARTe; void TestRingBuffer() { printf("Testing TraceRingBuffer...\n"); TraceRingBuffer rb; - assert(rb.Init(1024)); + // Each entry is 4(ID) + 4(Size) + 4(Val) = 12 bytes. + // 100 entries = 1200 bytes. + assert(rb.Init(2048)); - uint32 id = 42; - uint32 val = 12345678; + // Fill buffer to test wrap-around + uint32 id = 1; + uint32 val = 0xAAAAAAAA; uint32 size = 4; - assert(rb.Push(id, &val, size)); - assert(rb.Count() > 0); + for (int i=0; i<100; i++) { + id = i; + val = 0xBBBB0000 | i; + if (!rb.Push(id, &val, size)) { + printf("Failed at iteration %d\n", i); + assert(false); + } + } - uint32 poppedId = 0; - uint32 poppedVal = 0; - uint32 poppedSize = 0; + assert(rb.Count() == 100 * (4 + 4 + 4)); - assert(rb.Pop(poppedId, &poppedVal, poppedSize, 4)); - assert(poppedId == 42); - assert(poppedVal == 12345678); - assert(poppedSize == 4); + uint32 pId, pVal, pSize; + for (int i=0; i<100; i++) { + assert(rb.Pop(pId, &pVal, pSize, 4)); + assert(pId == (uint32)i); + assert(pVal == (0xBBBB0000 | (uint32)i)); + } + assert(rb.Count() == 0); printf("TraceRingBuffer test passed.\n"); } +void TestSuffixMatch() { + printf("Testing SuffixMatch...\n"); + + DebugService service; + uint32 mock = 0; + service.RegisterSignal(&mock, UnsignedInteger32Bit, "App.Data.Timer.Counter"); + + // Should match + assert(service.TraceSignal("App.Data.Timer.Counter", true) == 1); + assert(service.TraceSignal("Timer.Counter", true) == 1); + assert(service.TraceSignal("Counter", true) == 1); + + // Should NOT match + assert(service.TraceSignal("App.Timer", true) == 0); + assert(service.TraceSignal("unt", true) == 0); + + printf("SuffixMatch test passed.\n"); +} + +void TestTcpLogger() { + printf("Testing TcpLogger...\n"); + TcpLogger logger; + ConfigurationDatabase config; + config.Write("Port", (uint16)9999); + assert(logger.Initialise(config)); + + REPORT_ERROR_STATIC(ErrorManagement::Information, "Unit Test Log Message"); + + printf("TcpLogger basic test passed.\n"); +} + +void TestDebugServiceRegistration() { + printf("Testing DebugService Signal Registration...\n"); + DebugService service; + uint32 val1 = 10; + float32 val2 = 20.0; + + DebugSignalInfo* s1 = service.RegisterSignal(&val1, UnsignedInteger32Bit, "Signal1"); + DebugSignalInfo* s2 = service.RegisterSignal(&val2, Float32Bit, "Signal2"); + + assert(s1 != NULL_PTR(DebugSignalInfo*)); + assert(s2 != NULL_PTR(DebugSignalInfo*)); + assert(s1->internalID == 0); + assert(s2->internalID == 1); + + // Re-register same address + DebugSignalInfo* s1_alias = service.RegisterSignal(&val1, UnsignedInteger32Bit, "Signal1_Alias"); + assert(s1_alias == s1); + + printf("DebugService registration test passed.\n"); +} + int main(int argc, char **argv) { - printf("Running MARTe2 Component Tests...\n"); + printf("Running MARTe2 Debug Suite Unit Tests...\n"); + TestRingBuffer(); + TestDebugServiceRegistration(); + TestSuffixMatch(); + TestTcpLogger(); + + printf("\nALL UNIT TESTS PASSED!\n"); return 0; } diff --git a/Tools/gui_client/src/main.rs b/Tools/gui_client/src/main.rs index 433eb21..9bd90ff 100644 --- a/Tools/gui_client/src/main.rs +++ b/Tools/gui_client/src/main.rs @@ -58,6 +58,15 @@ struct SignalMetadata { sig_type: String, } +#[derive(Clone)] +struct ConnectionConfig { + ip: String, + tcp_port: String, + udp_port: String, + log_port: String, + version: u64, +} + enum InternalEvent { Log(LogEntry), Discovery(Vec), @@ -65,6 +74,7 @@ enum InternalEvent { CommandResponse(String), NodeInfo(String), Connected, + Disconnected, InternalLog(String), TraceRequested(String), ClearTrace(String), @@ -91,10 +101,9 @@ struct LogFilters { } struct MarteDebugApp { - #[allow(dead_code)] connected: bool, - tcp_addr: String, - log_addr: String, + config: ConnectionConfig, + shared_config: Arc>, signals: Vec, app_tree: Option, @@ -131,34 +140,43 @@ impl MarteDebugApp { let (tx_events, rx_events) = unbounded::(); let internal_tx = tx_events.clone(); - let tcp_addr = "127.0.0.1:8080".to_string(); - let log_addr = "127.0.0.1:8082".to_string(); + let config = ConnectionConfig { + ip: "127.0.0.1".to_string(), + tcp_port: "8080".to_string(), + udp_port: "8081".to_string(), + log_port: "8082".to_string(), + version: 0, + }; + let shared_config = Arc::new(Mutex::new(config.clone())); let id_to_meta = Arc::new(Mutex::new(HashMap::new())); let traced_signals = Arc::new(Mutex::new(HashMap::new())); let id_to_meta_clone = id_to_meta.clone(); let traced_signals_clone = traced_signals.clone(); + let shared_config_cmd = shared_config.clone(); + let shared_config_log = shared_config.clone(); + let shared_config_udp = shared_config.clone(); let tx_events_c = tx_events.clone(); thread::spawn(move || { - tcp_command_worker(tcp_addr, rx_cmd_internal, tx_events_c); + tcp_command_worker(shared_config_cmd, rx_cmd_internal, tx_events_c); }); let tx_events_log = tx_events.clone(); thread::spawn(move || { - tcp_log_worker(log_addr, tx_events_log); + tcp_log_worker(shared_config_log, tx_events_log); }); let tx_events_udp = tx_events.clone(); thread::spawn(move || { - udp_worker(8081, id_to_meta_clone, traced_signals_clone, tx_events_udp); + udp_worker(shared_config_udp, id_to_meta_clone, traced_signals_clone, tx_events_udp); }); Self { connected: false, - tcp_addr: "127.0.0.1:8080".to_string(), - log_addr: "127.0.0.1:8082".to_string(), + config, + shared_config, signals: Vec::new(), app_tree: None, id_to_meta, @@ -237,20 +255,36 @@ impl MarteDebugApp { } } -fn tcp_command_worker(addr: String, rx_cmd: Receiver, tx_events: Sender) { +fn tcp_command_worker(shared_config: Arc>, rx_cmd: Receiver, tx_events: Sender) { + let mut current_version = 0; + let mut current_addr = String::new(); + loop { - if let Ok(mut stream) = TcpStream::connect(&addr) { + // Check for config updates + { + let config = shared_config.lock().unwrap(); + if config.version != current_version { + current_version = config.version; + current_addr = format!("{}:{}", config.ip, config.tcp_port); + } + } + + if let Ok(mut stream) = TcpStream::connect(¤t_addr) { let _ = stream.set_nodelay(true); let mut reader = BufReader::new(stream.try_clone().unwrap()); let _ = tx_events.send(InternalEvent::Connected); let tx_events_inner = tx_events.clone(); + let stop_flag = Arc::new(Mutex::new(false)); + let stop_flag_reader = stop_flag.clone(); + thread::spawn(move || { let mut line = String::new(); let mut json_acc = String::new(); let mut in_json = false; while reader.read_line(&mut line).is_ok() { + if *stop_flag_reader.lock().unwrap() { break; } let trimmed = line.trim(); if trimmed.is_empty() { line.clear(); continue; } @@ -291,21 +325,47 @@ fn tcp_command_worker(addr: String, rx_cmd: Receiver, tx_events: Sender< }); while let Ok(cmd) = rx_cmd.recv() { + // Check if config changed while connected + { + let config = shared_config.lock().unwrap(); + if config.version != current_version { + *stop_flag.lock().unwrap() = true; + break; // Trigger reconnect + } + } if stream.write_all(format!("{}\n", cmd).as_bytes()).is_err() { break; } } + let _ = tx_events.send(InternalEvent::Disconnected); } thread::sleep(std::time::Duration::from_secs(2)); } } -fn tcp_log_worker(addr: String, tx_events: Sender) { +fn tcp_log_worker(shared_config: Arc>, tx_events: Sender) { + let mut current_version = 0; + let mut current_addr = String::new(); + loop { - if let Ok(stream) = TcpStream::connect(&addr) { + { + let config = shared_config.lock().unwrap(); + if config.version != current_version { + current_version = config.version; + current_addr = format!("{}:{}", config.ip, config.log_port); + } + } + + if let Ok(stream) = TcpStream::connect(¤t_addr) { let mut reader = BufReader::new(stream); let mut line = String::new(); while reader.read_line(&mut line).is_ok() { + // Check for config update + { + if shared_config.lock().unwrap().version != current_version { + break; + } + } let trimmed = line.trim(); if trimmed.starts_with("LOG ") { let parts: Vec<&str> = trimmed[4..].splitn(2, ' ').collect(); @@ -324,14 +384,39 @@ fn tcp_log_worker(addr: String, tx_events: Sender) { } } -fn udp_worker(port: u16, id_to_meta: Arc>>, traced_data: Arc>>, tx_events: Sender) { - if let Ok(socket) = UdpSocket::bind(format!("0.0.0.0:{}", port)) { +fn udp_worker(shared_config: Arc>, id_to_meta: Arc>>, traced_data: Arc>>, tx_events: Sender) { + let mut current_version = 0; + let mut socket: Option = None; + + loop { + let (ver, port) = { + let config = shared_config.lock().unwrap(); + (config.version, config.udp_port.clone()) + }; + + if ver != current_version || socket.is_none() { + current_version = ver; + socket = UdpSocket::bind(format!("0.0.0.0:{}", port)).ok(); + if socket.is_none() { + let _ = tx_events.send(InternalEvent::InternalLog(format!("UDP Bind Error on port {}", port))); + thread::sleep(std::time::Duration::from_secs(5)); + continue; + } + let _ = socket.as_ref().unwrap().set_read_timeout(Some(std::time::Duration::from_millis(500))); + } + + let s = socket.as_ref().unwrap(); let mut buf = [0u8; 4096]; let start_time = std::time::Instant::now(); let mut total_packets = 0u64; loop { - if let Ok(n) = socket.recv(&mut buf) { + // Check for config update + if shared_config.lock().unwrap().version != current_version { + break; // Re-bind + } + + if let Ok(n) = s.recv(&mut buf) { total_packets += 1; if (total_packets % 100) == 0 { let _ = tx_events.send(InternalEvent::UdpStats(total_packets)); @@ -450,9 +535,13 @@ impl eframe::App for MarteDebugApp { self.udp_packets = count; } InternalEvent::Connected => { + self.connected = true; let _ = self.tx_cmd.send("TREE".to_string()); let _ = self.tx_cmd.send("DISCOVER".to_string()); } + InternalEvent::Disconnected => { + self.connected = false; + } } } @@ -484,6 +573,38 @@ impl eframe::App for MarteDebugApp { ui.toggle_value(&mut self.show_bottom_panel, "📜 Logs"); ui.separator(); ui.heading("MARTe2 Debug Explorer"); + ui.separator(); + + ui.menu_button("🔌 Connection", |ui| { + egui::Grid::new("conn_grid") + .num_columns(2) + .spacing([40.0, 4.0]) + .show(ui, |ui| { + ui.label("Server IP:"); + ui.text_edit_singleline(&mut self.config.ip); + ui.end_row(); + ui.label("Control Port (TCP):"); + ui.text_edit_singleline(&mut self.config.tcp_port); + ui.end_row(); + ui.label("Telemetry Port (UDP):"); + ui.text_edit_singleline(&mut self.config.udp_port); + ui.end_row(); + ui.label("Log Port (TCP):"); + ui.text_edit_singleline(&mut self.config.log_port); + ui.end_row(); + }); + ui.separator(); + if ui.button("🔄 Apply & Reconnect").clicked() { + self.config.version += 1; + let mut shared = self.shared_config.lock().unwrap(); + *shared = self.config.clone(); + ui.close_menu(); + } + }); + + let status_color = if self.connected { egui::Color32::GREEN } else { egui::Color32::RED }; + ui.label(egui::RichText::new(if self.connected { "● Online" } else { "○ Offline" }).color(status_color)); + ui.separator(); if ui.button("🔄 Refresh").clicked() { let _ = self.tx_cmd.send("TREE".to_string()); diff --git a/app_output.log b/app_output.log new file mode 100644 index 0000000..d924cc4 --- /dev/null +++ b/app_output.log @@ -0,0 +1,150 @@ +MARTe2 Environment Set (MARTe2_DIR=/home/martino/Projects/marte_debug/dependency/MARTe2) +MARTe2 Components Environment Set (MARTe2_Components_DIR=/home/martino/Projects/marte_debug/dependency/MARTe2-components) +Cleaning up lingering processes... +Launching standard MARTeApp.ex with debug_test.cfg... +[Debug - Bootstrap.cpp:79]: Arguments: +-f = "Test/Configurations/debug_test.cfg" +-l = "RealTimeLoader" +-s = "State1" + +[Information - Bootstrap.cpp:207]: Loader parameters: +-f = "Test/Configurations/debug_test.cfg" +-l = "RealTimeLoader" +-s = "State1" +Loader = "RealTimeLoader" +Filename = "Test/Configurations/debug_ +[Information - Loader.cpp:67]: DefaultCPUs set to 1 +[Information - Loader.cpp:74]: SchedulerGranularity is 10000 +[Debug - Loader.cpp:189]: Purging ObjectRegistryDatabase with 0 objects +[Debug - Loader.cpp:192]: Purge ObjectRegistryDatabase. Number of objects left: 0 +[Information - LinuxTimer.cpp:117]: SleepNature was not set. Using Default. +[Information - LinuxTimer.cpp:127]: Phase was not configured, using default 0 +[Warning - LinuxTimer.cpp:164]: ExecutionMode not specified using: IndependentThread +[Warning - LinuxTimer.cpp:185]: CPUMask not specified using: 255 +[Warning - LinuxTimer.cpp:191]: StackSize not specified using: 262144 +[Information - LinuxTimer.cpp:236]: No timer provider specified. Falling back to HighResolutionTimeProvider +[Information - HighResolutionTimeProvider.cpp:163]: Sleep nature was not specified, falling back to default (Sleep::NoMore mode) +[Information - HighResolutionTimeProvider.cpp:61]: Inner initialization succeeded +[Information - LinuxTimer.cpp:267]: Backward compatibility parameters injection unnecessary +[Information - RealTimeThread.cpp:190]: No CPUs defined for the RealTimeThread Thread1 +[Information - RealTimeThread.cpp:193]: No StackSize defined for the RealTimeThread Thread1 +[DebugService] TCP Server listening on port 8080 +[DebugService] UDP Streamer socket opened +[ParametersError - StringHelper.cpp:60]: Error: invalid input arguments +[Warning - Threads.cpp:173]: Requested a thread priority that is higher than the one supported by the selected policy - clipping to the maximum value supported by the policy. +[Warning - Threads.cpp:173]: Requested a thread priority that is higher than the one supported by the selected policy - clipping to the maximum value supported by the policy. +[ParametersError - StringHelper.cpp:60]: Error: invalid input arguments +[Warning - Threads.cpp:173]: Requested a thread priority that is higher than the one supported by the selected policy - clipping to the maximum value supported by the policy. +[Warning - Threads.cpp:173]: Requested a thread priority that is higher than the one supported by the selected policy - clipping to the maximum value supported by the policy. +[DebugService] Worker threads started. +[TcpLogger] Listening on port 8082 +[ParametersError - StringHelper.cpp:60]: Error: invalid input arguments +[Warning - Threads.cpp:173]: Requested a thread priority that is higher than the one supported by the selected policy - clipping to the maximum value supported by the policy. +[Warning - Threads.cpp:173]: Requested a thread priority that is higher than the one supported by the selected policy - clipping to the maximum value supported by the policy. +[Warning] Requested a thread priority that is higher than the one supported by the selected policy - clipping to the maximum value supported by the policy. +[Warning] Requested a thread priority that is higher than the one supported by the selected policy - clipping to the maximum value supported by the policy. +[Information] LoaderPostInit not set +[Information] Going to rtAppBuilder.ConfigureAfterInitialisation() +[Information] Going to InitialiseSignalsDatabase +[Information] Going to FlattenSignalsDatabases +[Information] Caching introspection signals +[Information] Flattening functions input signals +[Debug] Updating the signal database +[Debug] Finished updating the signal database +[Information] Flattening functions output signals +[Debug] Updating the signal database +[Debug] Finished updating the signal database +[Information] Flattening data sources signals +[Debug] Updating the signal database +[Debug] Finished updating the signal database +[Debug] Updating the signal database +[Debug] Finished updating the signal database +[Debug] Updating the signal database +[Debug] Finished updating the signal database +[Information] Going to VerifyDataSourcesSignals +[Debug] Updating the signal database +[Information] Verifying signals for Timer +[Debug] Finished updating the signal database +[Information] Going to ResolveStates +[Information] Resolving state State1 +[Information] Resolving thread container Threads +[Information] Resolving thread State1.Thread1 +[Information] Resolving GAM1 +[Information] Going to ResolveDataSources +[Information] Verifying signals for Logger +[Information] Resolving for function GAM1 [idx: 0] +[Information] Resolving 2 signals +[Information] Resolving 2 signals +[Information] Verifying signals for DDB +[Information] Verifying signals for DAMS +[Information] Going to VerifyConsumersAndProducers +[Information] Started application in state State1 +[Information] Application starting +[Information] Verifying consumers and producers for Timer +[Information] Verifying consumers and producers for Logger +[Information] Verifying consumers and producers for DDB +[Information] Verifying consumers and producers for DAMS +[Information] Going to CleanCaches +[Debug] Purging dataSourcesIndexesCache. Number of children:3 +[Debug] Purging functionsIndexesCache. Number of children:1 +[Debug] Purging dataSourcesSignalIndexCache. Number of children:3 +[Debug] Purging dataSourcesFunctionIndexesCache. Number of children:1 +[Debug] Purging functionsMemoryIndexesCache. Number of children:2 +[Debug] Purged functionsMemoryIndexesCache. Number of children:0 +[Debug] Purged cachedIntrospections. Number of children:0 +[Information] Going to rtAppBuilder.PostConfigureDataSources() +[Information] Going to rtAppBuilder.PostConfigureFunctions() +[Information] Going to rtAppBuilder.Copy() +[Information] Going to AllocateGAMMemory +[Information] Going to AllocateDataSourceMemory() +[Information] Going to AddBrokersToFunctions +[Information] Creating broker MemoryMapSynchronisedInputBroker for GAM1 and signal Counter(0) +[Information] Creating broker MemoryMapInputBroker for GAM1 and signal Time(1) +[Information] Getting input brokers for Timer +[Information] Getting output brokers for Timer +[Information] Getting input brokers for Logger +[Information] Getting output brokers for Logger +[Information] Getting input brokers for DDB +[Information] Getting output brokers for DDB +[Information] Getting input brokers for DAMS +[Information] Getting output brokers for DAMS +[Information] Going to FindStatefulDataSources +[Information] Going to configure scheduler +[Information] Preparing state State1 +[Information] Frequency found = 1.000000 +[Information] Frequency found = 1.000000 +[Information] The timer will be set using a frequency of 1.000000 Hz +[ParametersError] Error: invalid input arguments +[Warning] Requested a thread priority that is higher than the one supported by the selected policy - clipping to the maximum value supported by the policy. +[Warning] Failed to change the thread priority (likely due to insufficient permissions) +[Information] LinuxTimer::Prepared = true +[Warning] Requested a thread priority that is higher than the one supported by the selected policy - clipping to the maximum value supported by the policy. +[Warning] Failed to change the thread priority (likely due to insufficient permissions) +[Information] Time [0:0]:0 +[Information] Time [0:0]:2000000 +[Information] Time [0:0]:3000000 +[Information] Time [0:0]:4000000 +[Information] Time [0:0]:5000000 +[Information] Time [0:0]:6000000 +[Information] Time [0:0]:7000000 +[Information] Time [0:0]:8000000 +[Information] Time [0:0]:9000000 +[Information] Time [0:0]:10000000 +[Information] Time [0:0]:11000000 +[Information] Time [0:0]:12000000 +[Information] Time [0:0]:13000000 +[Information] Time [0:0]:14000000 +[Information] Time [0:0]:15000000 +[Information] Time [0:0]:16000000 +[Information] Time [0:0]:17000000 +[Information] Time [0:0]:18000000 +[Information] Time [0:0]:19000000 +[Information] Time [0:0]:20000000 +[Information] Time [0:0]:21000000 +[Information] Time [0:0]:22000000 +[Information] Time [0:0]:23000000 +[Information] Time [0:0]:24000000 +[Information] Time [0:0]:25000000 +[Information] Time [0:0]:26000000 +[Information] Time [0:0]:27000000 +[Information] Time [0:0]:28000000