Implemented all basic features

This commit is contained in:
Martino Ferrari
2026-02-23 13:17:16 +01:00
parent 6b1fc59fc0
commit 56bb3536fc
5 changed files with 334 additions and 592 deletions

View File

@@ -9,8 +9,25 @@ Implement a "Zero-Code-Change" observability layer for the MARTe2 real-time fram
- **FR-02 (Telemetry):** Stream high-frequency signal data (verified up to 100Hz) to a remote client. - **FR-02 (Telemetry):** Stream high-frequency signal data (verified up to 100Hz) to a remote client.
- **FR-03 (Forcing):** Allow manual override of signal values in memory during execution. - **FR-03 (Forcing):** Allow manual override of signal values in memory during execution.
- **FR-04 (Logs):** Stream global framework logs to a dedicated terminal via a standalone `TcpLogger` service. - **FR-04 (Logs):** Stream global framework logs to a dedicated terminal via a standalone `TcpLogger` service.
- **FR-05 (Execution Control):** Pause and resume the real-time execution threads via scheduler injection. - **FR-05 (Log Filtering):** The client must support filtering logs by type (Debug, Information, Warning, FatalError) and by content using regular expressions.
- **FR-06 (UI):** Provide a native, immediate-mode GUI for visualization (Oscilloscope). - **FR-06 (Execution & UI):**
- Provide a native GUI for visualization.
- Support Pause/Resume of real-time execution threads via scheduler injection.
- **FR-07 (Session Management):**
- The top panel must provide a "Disconnect" button to close active network streams.
- Support runtime re-configuration and "Apply & Reconnect" logic.
- **FR-08 (Decoupled Tracing):**
Clicking `trace` activates telemetry; data is buffered and shown as a "Last Value" in the sidebar, but not plotted until manually assigned.
- **FR-08 (Advanced Plotting):**
- Support multiple plot panels with perfectly synchronized time (X) axes.
- Drag-and-drop signals from the traced list into specific plots.
- Automatic distinct color assignment for each signal added to a plot.
- Plot modes: Standard (Time Series) and Logic Analyzer (Stacked rows).
- Signal transformations: Gain, offset, units, and custom labels.
- Visual styling: Deep customization of colors, line styles (Solid, Dashed, etc.), and marker shapes (Circle, Square, etc.).
- **FR-09 (Navigation):**
- Context menus for resetting zoom (X, Y, or both).
- "Fit to View" functionality that automatically scales both axes to encompass all available buffered data points.
### 2.2 Technical Constraints (TC) ### 2.2 Technical Constraints (TC)
- **TC-01:** No modifications allowed to the MARTe2 core library or component source code. - **TC-01:** No modifications allowed to the MARTe2 core library or component source code.

View File

@@ -1,6 +1,7 @@
#include "DebugService.h" #include "DebugService.h"
#include "AdvancedErrorManagement.h" #include "StandardParser.h"
#include "StreamString.h" #include "StreamString.h"
#include "BasicSocket.h"
#include "DebugBrokerWrapper.h" #include "DebugBrokerWrapper.h"
#include "ObjectRegistryDatabase.h" #include "ObjectRegistryDatabase.h"
#include "ClassRegistryItem.h" #include "ClassRegistryItem.h"
@@ -11,6 +12,15 @@
#include "GAM.h" #include "GAM.h"
// Explicitly include target brokers for templating // Explicitly include target brokers for templating
#include "MemoryMapInputBroker.h"
#include "MemoryMapOutputBroker.h"
#include "MemoryMapSynchronisedInputBroker.h"
#include "MemoryMapSynchronisedOutputBroker.h"
#include "MemoryMapInterpolatedInputBroker.h"
#include "MemoryMapMultiBufferInputBroker.h"
#include "MemoryMapMultiBufferOutputBroker.h"
#include "MemoryMapSynchronisedMultiBufferInputBroker.h"
#include "MemoryMapSynchronisedMultiBufferOutputBroker.h"
namespace MARTe { namespace MARTe {
@@ -101,6 +111,7 @@ bool DebugService::Initialise(StructuredDataI & data) {
} }
if (isServer) { if (isServer) {
// 8MB Buffer for lossless tracing at high frequency
if (!traceBuffer.Init(8 * 1024 * 1024)) return false; if (!traceBuffer.Init(8 * 1024 * 1024)) return false;
PatchRegistry(); PatchRegistry();
@@ -167,7 +178,6 @@ void DebugService::PatchRegistry() {
PatchItemInternal("MemoryMapSynchronisedMultiBufferInputBroker", &b8); PatchItemInternal("MemoryMapSynchronisedMultiBufferInputBroker", &b8);
static DebugMemoryMapSynchronisedMultiBufferOutputBrokerBuilder b9; static DebugMemoryMapSynchronisedMultiBufferOutputBrokerBuilder b9;
PatchItemInternal("MemoryMapSynchronisedMultiBufferOutputBroker", &b9); PatchItemInternal("MemoryMapSynchronisedMultiBufferOutputBroker", &b9);
} }
void DebugService::ProcessSignal(DebugSignalInfo* s, uint32 size) { void DebugService::ProcessSignal(DebugSignalInfo* s, uint32 size) {
@@ -337,7 +347,7 @@ ErrorManagement::ErrorType DebugService::Streamer(ExecutionInfo & info) {
} }
InternetHost dest(streamPort, streamIP.Buffer()); InternetHost dest(streamPort, streamIP.Buffer());
udpSocket.SetDestination(dest); (void)udpSocket.SetDestination(dest);
uint8 packetBuffer[4096]; uint8 packetBuffer[4096];
uint32 packetOffset = 0; uint32 packetOffset = 0;
@@ -349,6 +359,7 @@ ErrorManagement::ErrorType DebugService::Streamer(ExecutionInfo & info) {
uint8 sampleData[1024]; uint8 sampleData[1024];
bool hasData = false; bool hasData = false;
// TIGHT LOOP: Drain the buffer as fast as possible without sleeping
while ((info.GetStage() == ExecutionInfo::MainStage) && traceBuffer.Pop(id, sampleData, size, 1024)) { while ((info.GetStage() == ExecutionInfo::MainStage) && traceBuffer.Pop(id, sampleData, size, 1024)) {
hasData = true; hasData = true;
if (packetOffset == 0) { if (packetOffset == 0) {
@@ -357,36 +368,44 @@ ErrorManagement::ErrorType DebugService::Streamer(ExecutionInfo & info) {
header.seq = sequenceNumber++; header.seq = sequenceNumber++;
header.timestamp = HighResolutionTimer::Counter(); header.timestamp = HighResolutionTimer::Counter();
header.count = 0; header.count = 0;
MemoryOperationsHelper::Copy(packetBuffer, &header, sizeof(TraceHeader)); std::memcpy(packetBuffer, &header, sizeof(TraceHeader));
packetOffset = sizeof(TraceHeader); packetOffset = sizeof(TraceHeader);
} }
// Packet Packing: Header + [ID:4][Size:4][Data:N]
// If this sample doesn't fit, flush the current packet first
if (packetOffset + 8 + size > 1400) { if (packetOffset + 8 + size > 1400) {
uint32 toWrite = packetOffset; uint32 toWrite = packetOffset;
udpSocket.Write((char8*)packetBuffer, toWrite); (void)udpSocket.Write((char8*)packetBuffer, toWrite);
packetOffset = 0;
// Re-init header for the next packet
TraceHeader header; TraceHeader header;
header.magic = 0xDA7A57AD; header.magic = 0xDA7A57AD;
header.seq = sequenceNumber++; header.seq = sequenceNumber++;
header.timestamp = HighResolutionTimer::Counter(); header.timestamp = HighResolutionTimer::Counter();
header.count = 0; header.count = 0;
MemoryOperationsHelper::Copy(packetBuffer, &header, sizeof(TraceHeader)); std::memcpy(packetBuffer, &header, sizeof(TraceHeader));
packetOffset = sizeof(TraceHeader); packetOffset = sizeof(TraceHeader);
} }
MemoryOperationsHelper::Copy(&packetBuffer[packetOffset], &id, 4); std::memcpy(&packetBuffer[packetOffset], &id, 4);
MemoryOperationsHelper::Copy(&packetBuffer[packetOffset + 4], &size, 4); std::memcpy(&packetBuffer[packetOffset + 4], &size, 4);
MemoryOperationsHelper::Copy(&packetBuffer[packetOffset + 8], sampleData, size); std::memcpy(&packetBuffer[packetOffset + 8], sampleData, size);
packetOffset += (8 + size); packetOffset += (8 + size);
// Update sample count in the current packet header
TraceHeader *h = (TraceHeader*)packetBuffer; TraceHeader *h = (TraceHeader*)packetBuffer;
h->count++; h->count++;
} }
// Flush any remaining data
if (packetOffset > 0) { if (packetOffset > 0) {
uint32 toWrite = packetOffset; uint32 toWrite = packetOffset;
udpSocket.Write((char8*)packetBuffer, toWrite); (void)udpSocket.Write((char8*)packetBuffer, toWrite);
packetOffset = 0; packetOffset = 0;
} }
// Only sleep if the buffer was completely empty
if (!hasData) Sleep::MSec(1); if (!hasData) Sleep::MSec(1);
} }
return ErrorManagement::NoError; return ErrorManagement::NoError;
@@ -415,7 +434,7 @@ void DebugService::HandleCommand(StreamString cmd, BasicTCPSocket *client) {
uint32 count = ForceSignal(name.Buffer(), val.Buffer()); uint32 count = ForceSignal(name.Buffer(), val.Buffer());
if (client) { if (client) {
StreamString resp; resp.Printf("OK FORCE %u\n", count); StreamString resp; resp.Printf("OK FORCE %u\n", count);
uint32 s = resp.Size(); client->Write(resp.Buffer(), s); uint32 s = resp.Size(); (void)client->Write(resp.Buffer(), s);
} }
} }
} }
@@ -425,7 +444,7 @@ void DebugService::HandleCommand(StreamString cmd, BasicTCPSocket *client) {
uint32 count = UnforceSignal(name.Buffer()); uint32 count = UnforceSignal(name.Buffer());
if (client) { if (client) {
StreamString resp; resp.Printf("OK UNFORCE %u\n", count); StreamString resp; resp.Printf("OK UNFORCE %u\n", count);
uint32 s = resp.Size(); client->Write(resp.Buffer(), s); uint32 s = resp.Size(); (void)client->Write(resp.Buffer(), s);
} }
} }
} }
@@ -437,31 +456,31 @@ void DebugService::HandleCommand(StreamString cmd, BasicTCPSocket *client) {
if (cmd.GetToken(decim, delims, term)) { if (cmd.GetToken(decim, delims, term)) {
AnyType decimVal(UnsignedInteger32Bit, 0u, &d); AnyType decimVal(UnsignedInteger32Bit, 0u, &d);
AnyType decimStr(CharString, 0u, decim.Buffer()); AnyType decimStr(CharString, 0u, decim.Buffer());
TypeConvert(decimVal, decimStr); (void)TypeConvert(decimVal, decimStr);
} }
uint32 count = TraceSignal(name.Buffer(), enable, d); uint32 count = TraceSignal(name.Buffer(), enable, d);
if (client) { if (client) {
StreamString resp; resp.Printf("OK TRACE %u\n", count); StreamString resp; resp.Printf("OK TRACE %u\n", count);
uint32 s = resp.Size(); client->Write(resp.Buffer(), s); uint32 s = resp.Size(); (void)client->Write(resp.Buffer(), s);
} }
} }
} }
else if (token == "DISCOVER") Discover(client); else if (token == "DISCOVER") Discover(client);
else if (token == "PAUSE") { else if (token == "PAUSE") {
SetPaused(true); SetPaused(true);
if (client) { uint32 s = 3; client->Write("OK\n", s); } if (client) { uint32 s = 3; (void)client->Write("OK\n", s); }
} }
else if (token == "RESUME") { else if (token == "RESUME") {
SetPaused(false); SetPaused(false);
if (client) { uint32 s = 3; client->Write("OK\n", s); } if (client) { uint32 s = 3; (void)client->Write("OK\n", s); }
} }
else if (token == "TREE") { else if (token == "TREE") {
StreamString json; StreamString json;
json = "{\"Name\": \"Root\", \"Class\": \"ObjectRegistryDatabase\", \"Children\": [\n"; json = "{\"Name\": \"Root\", \"Class\": \"ObjectRegistryDatabase\", \"Children\": [\n";
ExportTree(ObjectRegistryDatabase::Instance(), json); (void)ExportTree(ObjectRegistryDatabase::Instance(), json);
json += "\n]}\nOK TREE\n"; json += "\n]}\nOK TREE\n";
uint32 s = json.Size(); uint32 s = json.Size();
client->Write(json.Buffer(), s); (void)client->Write(json.Buffer(), s);
} }
else if (token == "INFO") { else if (token == "INFO") {
StreamString path; StreamString path;
@@ -475,7 +494,7 @@ void DebugService::HandleCommand(StreamString cmd, BasicTCPSocket *client) {
else if (client) { else if (client) {
const char* msg = "ERROR: Unknown command\n"; const char* msg = "ERROR: Unknown command\n";
uint32 s = StringHelper::Length(msg); uint32 s = StringHelper::Length(msg);
client->Write(msg, s); (void)client->Write(msg, s);
} }
} }
} }
@@ -527,7 +546,7 @@ void DebugService::InfoNode(const char8* path, BasicTCPSocket *client) {
json += "}\nOK INFO\n"; json += "}\nOK INFO\n";
uint32 s = json.Size(); uint32 s = json.Size();
client->Write(json.Buffer(), s); (void)client->Write(json.Buffer(), s);
} }
uint32 DebugService::ExportTree(ReferenceContainer *container, StreamString &json) { uint32 DebugService::ExportTree(ReferenceContainer *container, StreamString &json) {
@@ -661,7 +680,7 @@ void DebugService::Discover(BasicTCPSocket *client) {
if (client) { if (client) {
StreamString header = "{\n \"Signals\": [\n"; StreamString header = "{\n \"Signals\": [\n";
uint32 s = header.Size(); uint32 s = header.Size();
client->Write(header.Buffer(), s); (void)client->Write(header.Buffer(), s);
mutex.FastLock(); mutex.FastLock();
for (uint32 i = 0; i < numberOfAliases; i++) { for (uint32 i = 0; i < numberOfAliases; i++) {
StreamString line; StreamString line;
@@ -672,12 +691,12 @@ void DebugService::Discover(BasicTCPSocket *client) {
if (i < numberOfAliases - 1) line += ","; if (i < numberOfAliases - 1) line += ",";
line += "\n"; line += "\n";
s = line.Size(); s = line.Size();
client->Write(line.Buffer(), s); (void)client->Write(line.Buffer(), s);
} }
mutex.FastUnLock(); mutex.FastUnLock();
StreamString footer = " ]\n}\nOK DISCOVER\n"; StreamString footer = " ]\n}\nOK DISCOVER\n";
s = footer.Size(); s = footer.Size();
client->Write(footer.Buffer(), s); (void)client->Write(footer.Buffer(), s);
} }
} }
@@ -694,7 +713,7 @@ void DebugService::ListNodes(const char8* path, BasicTCPSocket *client) {
StreamString header; StreamString header;
header.Printf("Nodes under %s:\n", path ? path : "/"); header.Printf("Nodes under %s:\n", path ? path : "/");
uint32 s = header.Size(); uint32 s = header.Size();
client->Write(header.Buffer(), s); (void)client->Write(header.Buffer(), s);
ReferenceContainer *container = dynamic_cast<ReferenceContainer*>(ref.operator->()); ReferenceContainer *container = dynamic_cast<ReferenceContainer*>(ref.operator->());
if (container) { if (container) {
@@ -705,7 +724,7 @@ void DebugService::ListNodes(const char8* path, BasicTCPSocket *client) {
StreamString line; StreamString line;
line.Printf(" %s [%s]\n", child->GetName(), child->GetClassProperties()->GetName()); line.Printf(" %s [%s]\n", child->GetName(), child->GetClassProperties()->GetName());
s = line.Size(); s = line.Size();
client->Write(line.Buffer(), s); (void)client->Write(line.Buffer(), s);
} }
} }
} }
@@ -713,15 +732,15 @@ void DebugService::ListNodes(const char8* path, BasicTCPSocket *client) {
DataSourceI *ds = dynamic_cast<DataSourceI*>(ref.operator->()); DataSourceI *ds = dynamic_cast<DataSourceI*>(ref.operator->());
if (ds) { if (ds) {
StreamString dsHeader = " Signals:\n"; StreamString dsHeader = " Signals:\n";
s = dsHeader.Size(); client->Write(dsHeader.Buffer(), s); s = dsHeader.Size(); (void)client->Write(dsHeader.Buffer(), s);
uint32 nSignals = ds->GetNumberOfSignals(); uint32 nSignals = ds->GetNumberOfSignals();
for (uint32 i=0; i<nSignals; i++) { for (uint32 i=0; i<nSignals; i++) {
StreamString sname, line; StreamString sname, line;
ds->GetSignalName(i, sname); (void)ds->GetSignalName(i, sname);
TypeDescriptor stype = ds->GetSignalType(i); TypeDescriptor stype = ds->GetSignalType(i);
const char8* stypeName = TypeDescriptor::GetTypeNameFromTypeDescriptor(stype); const char8* stypeName = TypeDescriptor::GetTypeNameFromTypeDescriptor(stype);
line.Printf(" %s [%s]\n", sname.Buffer(), stypeName ? stypeName : "Unknown"); line.Printf(" %s [%s]\n", sname.Buffer(), stypeName ? stypeName : "Unknown");
s = line.Size(); client->Write(line.Buffer(), s); s = line.Size(); (void)client->Write(line.Buffer(), s);
} }
} }
@@ -731,31 +750,31 @@ void DebugService::ListNodes(const char8* path, BasicTCPSocket *client) {
uint32 nOut = gam->GetNumberOfOutputSignals(); uint32 nOut = gam->GetNumberOfOutputSignals();
StreamString gamHeader; StreamString gamHeader;
gamHeader.Printf(" Input Signals (%d):\n", nIn); gamHeader.Printf(" Input Signals (%d):\n", nIn);
s = gamHeader.Size(); client->Write(gamHeader.Buffer(), s); s = gamHeader.Size(); (void)client->Write(gamHeader.Buffer(), s);
for (uint32 i=0; i<nIn; i++) { for (uint32 i=0; i<nIn; i++) {
StreamString sname, line; StreamString sname, line;
gam->GetSignalName(InputSignals, i, sname); (void)gam->GetSignalName(InputSignals, i, sname);
line.Printf(" %s\n", sname.Buffer()); line.Printf(" %s\n", sname.Buffer());
s = line.Size(); client->Write(line.Buffer(), s); s = line.Size(); (void)client->Write(line.Buffer(), s);
} }
gamHeader.SetSize(0); gamHeader.SetSize(0);
gamHeader.Printf(" Output Signals (%d):\n", nOut); gamHeader.Printf(" Output Signals (%d):\n", nOut);
s = gamHeader.Size(); client->Write(gamHeader.Buffer(), s); s = gamHeader.Size(); (void)client->Write(gamHeader.Buffer(), s);
for (uint32 i=0; i<nOut; i++) { for (uint32 i=0; i<nOut; i++) {
StreamString sname, line; StreamString sname, line;
gam->GetSignalName(OutputSignals, i, sname); (void)gam->GetSignalName(OutputSignals, i, sname);
line.Printf(" %s\n", sname.Buffer()); line.Printf(" %s\n", sname.Buffer());
s = line.Size(); client->Write(line.Buffer(), s); s = line.Size(); (void)client->Write(line.Buffer(), s);
} }
} }
const char* okMsg = "OK LS\n"; const char* okMsg = "OK LS\n";
s = StringHelper::Length(okMsg); s = StringHelper::Length(okMsg);
client->Write(okMsg, s); (void)client->Write(okMsg, s);
} else { } else {
const char* msg = "ERROR: Path not found\n"; const char* msg = "ERROR: Path not found\n";
uint32 s = StringHelper::Length(msg); uint32 s = StringHelper::Length(msg);
client->Write(msg, s); (void)client->Write(msg, s);
} }
} }

View File

@@ -1796,6 +1796,7 @@ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"eframe", "eframe",
"egui_plot", "egui_plot",
"once_cell",
"regex", "regex",
"serde", "serde",
"serde_json", "serde_json",

View File

@@ -12,3 +12,4 @@ chrono = "0.4"
crossbeam-channel = "0.5" crossbeam-channel = "0.5"
regex = "1.10" regex = "1.10"
socket2 = { version = "0.5", features = ["all"] } socket2 = { version = "0.5", features = ["all"] }
once_cell = "1.21.3"

File diff suppressed because it is too large Load Diff