5 Commits

Author SHA1 Message Date
Martino Ferrari
56bb3536fc Implemented all basic features 2026-02-23 13:17:16 +01:00
Martino Ferrari
6b1fc59fc0 Implemetned better buffering and high frequency tracing 2026-02-23 12:00:14 +01:00
Martino Ferrari
253a4989f9 Testing HF data 2026-02-23 11:24:32 +01:00
Martino Ferrari
04fb98bc74 Fixed ui for high frequency data 2026-02-23 11:17:11 +01:00
Martino Ferrari
3ad581d13b Removede scheduler 2026-02-23 10:25:58 +01:00
10 changed files with 515 additions and 942 deletions

View File

@@ -4,6 +4,7 @@
#include "CompilerTypes.h" #include "CompilerTypes.h"
#include "TypeDescriptor.h" #include "TypeDescriptor.h"
#include "StreamString.h" #include "StreamString.h"
#include <cstring> // For memcpy
namespace MARTe { namespace MARTe {
@@ -58,7 +59,14 @@ public:
uint32 packetSize = 4 + 4 + size; uint32 packetSize = 4 + 4 + size;
uint32 read = readIndex; uint32 read = readIndex;
uint32 write = writeIndex; uint32 write = writeIndex;
uint32 available = (read <= write) ? (bufferSize - (write - read) - 1) : (read - write - 1);
// Calculate available space
uint32 available = 0;
if (read <= write) {
available = bufferSize - (write - read) - 1;
} else {
available = read - write - 1;
}
if (available < packetSize) return false; if (available < packetSize) return false;
@@ -68,6 +76,9 @@ public:
WriteToBuffer(&tempWrite, &size, 4); WriteToBuffer(&tempWrite, &size, 4);
WriteToBuffer(&tempWrite, data, size); WriteToBuffer(&tempWrite, data, size);
// Memory Barrier to ensure data is visible before index update
// __sync_synchronize();
// Final atomic update // Final atomic update
writeIndex = tempWrite; writeIndex = tempWrite;
return true; return true;
@@ -79,20 +90,27 @@ public:
if (read == write) return false; if (read == write) return false;
uint32 tempRead = read; uint32 tempRead = read;
uint32 tempId, tempSize; uint32 tempId = 0;
uint32 tempSize = 0;
// Peek header
ReadFromBuffer(&tempRead, &tempId, 4); ReadFromBuffer(&tempRead, &tempId, 4);
ReadFromBuffer(&tempRead, &tempSize, 4); ReadFromBuffer(&tempRead, &tempSize, 4);
if (tempSize > maxSize) { if (tempSize > maxSize) {
// Error case: drop data up to writeIndex // Error case: drop data up to writeIndex (resync)
readIndex = write; readIndex = write;
return false; return false;
} }
ReadFromBuffer(&tempRead, dataBuffer, tempSize); ReadFromBuffer(&tempRead, dataBuffer, tempSize);
signalID = tempId; signalID = tempId;
size = tempSize; size = tempSize;
// Memory Barrier
// __sync_synchronize();
readIndex = tempRead; readIndex = tempRead;
return true; return true;
} }
@@ -106,18 +124,32 @@ public:
private: private:
void WriteToBuffer(uint32 *idx, void* src, uint32 count) { void WriteToBuffer(uint32 *idx, void* src, uint32 count) {
uint8* s = (uint8*)src; uint32 current = *idx;
for (uint32 i=0; i<count; i++) { uint32 spaceToEnd = bufferSize - current;
buffer[*idx] = s[i];
*idx = (*idx + 1) % bufferSize; if (count <= spaceToEnd) {
std::memcpy(&buffer[current], src, count);
*idx = (current + count) % bufferSize;
} else {
std::memcpy(&buffer[current], src, spaceToEnd);
uint32 remaining = count - spaceToEnd;
std::memcpy(&buffer[0], (uint8*)src + spaceToEnd, remaining);
*idx = remaining;
} }
} }
void ReadFromBuffer(uint32 *idx, void* dst, uint32 count) { void ReadFromBuffer(uint32 *idx, void* dst, uint32 count) {
uint8* d = (uint8*)dst; uint32 current = *idx;
for (uint32 i=0; i<count; i++) { uint32 spaceToEnd = bufferSize - current;
d[i] = buffer[*idx];
*idx = (*idx + 1) % bufferSize; if (count <= spaceToEnd) {
std::memcpy(dst, &buffer[current], count);
*idx = (current + count) % bufferSize;
} else {
std::memcpy(dst, &buffer[current], spaceToEnd);
uint32 remaining = count - spaceToEnd;
std::memcpy((uint8*)dst + spaceToEnd, &buffer[0], remaining);
*idx = remaining;
} }
} }

View File

@@ -1,32 +0,0 @@
#ifndef DEBUGFASTSCHEDULER_H
#define DEBUGFASTSCHEDULER_H
#include "FastScheduler.h"
#include "DebugService.h"
namespace MARTe {
class DebugFastScheduler : public FastScheduler {
public:
CLASS_REGISTER_DECLARATION()
DebugFastScheduler();
virtual ~DebugFastScheduler();
virtual bool Initialise(StructuredDataI & data);
ErrorManagement::ErrorType Execute(ExecutionInfo &information);
protected:
virtual void CustomPrepareNextState();
private:
ErrorManagement::ErrorType DebugSetupThreadMap();
EmbeddedServiceMethodBinderT<DebugFastScheduler> debugBinder;
DebugService *debugService;
};
}
#endif

View File

@@ -9,8 +9,25 @@ Implement a "Zero-Code-Change" observability layer for the MARTe2 real-time fram
- **FR-02 (Telemetry):** Stream high-frequency signal data (verified up to 100Hz) to a remote client. - **FR-02 (Telemetry):** Stream high-frequency signal data (verified up to 100Hz) to a remote client.
- **FR-03 (Forcing):** Allow manual override of signal values in memory during execution. - **FR-03 (Forcing):** Allow manual override of signal values in memory during execution.
- **FR-04 (Logs):** Stream global framework logs to a dedicated terminal via a standalone `TcpLogger` service. - **FR-04 (Logs):** Stream global framework logs to a dedicated terminal via a standalone `TcpLogger` service.
- **FR-05 (Execution Control):** Pause and resume the real-time execution threads via scheduler injection. - **FR-05 (Log Filtering):** The client must support filtering logs by type (Debug, Information, Warning, FatalError) and by content using regular expressions.
- **FR-06 (UI):** Provide a native, immediate-mode GUI for visualization (Oscilloscope). - **FR-06 (Execution & UI):**
- Provide a native GUI for visualization.
- Support Pause/Resume of real-time execution threads via scheduler injection.
- **FR-07 (Session Management):**
- The top panel must provide a "Disconnect" button to close active network streams.
- Support runtime re-configuration and "Apply & Reconnect" logic.
- **FR-08 (Decoupled Tracing):**
Clicking `trace` activates telemetry; data is buffered and shown as a "Last Value" in the sidebar, but not plotted until manually assigned.
- **FR-08 (Advanced Plotting):**
- Support multiple plot panels with perfectly synchronized time (X) axes.
- Drag-and-drop signals from the traced list into specific plots.
- Automatic distinct color assignment for each signal added to a plot.
- Plot modes: Standard (Time Series) and Logic Analyzer (Stacked rows).
- Signal transformations: Gain, offset, units, and custom labels.
- Visual styling: Deep customization of colors, line styles (Solid, Dashed, etc.), and marker shapes (Circle, Square, etc.).
- **FR-09 (Navigation):**
- Context menus for resetting zoom (X, Y, or both).
- "Fit to View" functionality that automatically scales both axes to encompass all available buffered data points.
### 2.2 Technical Constraints (TC) ### 2.2 Technical Constraints (TC)
- **TC-01:** No modifications allowed to the MARTe2 core library or component source code. - **TC-01:** No modifications allowed to the MARTe2 core library or component source code.

View File

@@ -1,151 +0,0 @@
#include "DebugFastScheduler.h"
#include "AdvancedErrorManagement.h"
#include "ExecutionInfo.h"
#include "MemoryOperationsHelper.h"
#include "ObjectRegistryDatabase.h"
namespace MARTe {
const uint64 ALL_CPUS = 0xFFFFFFFFFFFFFFFFull;
DebugFastScheduler::DebugFastScheduler() :
FastScheduler(),
debugBinder(*this, &DebugFastScheduler::Execute)
{
debugService = NULL_PTR(DebugService*);
}
DebugFastScheduler::~DebugFastScheduler() {
}
bool DebugFastScheduler::Initialise(StructuredDataI & data) {
bool ret = FastScheduler::Initialise(data);
if (ret) {
ReferenceContainer *root = ObjectRegistryDatabase::Instance();
Reference serviceRef = root->Find("DebugService");
if (serviceRef.IsValid()) {
debugService = dynamic_cast<DebugService*>(serviceRef.operator->());
}
}
return ret;
}
ErrorManagement::ErrorType DebugFastScheduler::DebugSetupThreadMap() {
ErrorManagement::ErrorType err;
ComputeMaxNThreads();
REPORT_ERROR(ErrorManagement::Information, "DebugFastScheduler: Max Threads=%!", maxNThreads);
multiThreadService = new (NULL) MultiThreadService(debugBinder);
multiThreadService->SetNumberOfPoolThreads(maxNThreads);
err = multiThreadService->CreateThreads();
if (err.ErrorsCleared()) {
rtThreadInfo[0] = new RTThreadParam[maxNThreads];
rtThreadInfo[1] = new RTThreadParam[maxNThreads];
for (uint32 i = 0u; i < numberOfStates; i++) {
cpuMap[i] = new uint64[maxNThreads];
for (uint32 j = 0u; j < maxNThreads; j++) {
cpuMap[i][j] = ALL_CPUS;
}
}
if (countingSem.Create(maxNThreads)) {
for (uint32 i = 0u; i < numberOfStates; i++) {
uint32 nThreads = states[i].numberOfThreads;
cpuThreadMap[i] = new uint32[nThreads];
for (uint32 j = 0u; j < nThreads; j++) {
uint64 cpu = static_cast<uint64>(states[i].threads[j].cpu.GetProcessorMask());
CreateThreadMap(cpu, i, j);
}
}
}
}
return err;
}
void DebugFastScheduler::CustomPrepareNextState() {
ErrorManagement::ErrorType err;
err = !realTimeApplicationT.IsValid();
if (err.ErrorsCleared()) {
uint8 nextBuffer = static_cast<uint8>(realTimeApplicationT->GetIndex());
nextBuffer++;
nextBuffer &= 0x1u;
if (!initialised) {
cpuMap = new uint64*[numberOfStates];
cpuThreadMap = new uint32*[numberOfStates];
err = DebugSetupThreadMap();
}
if (err.ErrorsCleared()) {
for (uint32 j = 0u; j < maxNThreads; j++) {
rtThreadInfo[nextBuffer][j].executables = NULL_PTR(ExecutableI **);
rtThreadInfo[nextBuffer][j].numberOfExecutables = 0u;
rtThreadInfo[nextBuffer][j].cycleTime = NULL_PTR(uint32 *);
rtThreadInfo[nextBuffer][j].lastCycleTimeStamp = 0u;
}
ScheduledState *nextState = GetSchedulableStates()[nextBuffer];
uint32 numberOfThreads = nextState->numberOfThreads;
for (uint32 i = 0u; i < numberOfThreads; i++) {
rtThreadInfo[nextBuffer][cpuThreadMap[nextStateIdentifier][i]].executables = nextState->threads[i].executables;
rtThreadInfo[nextBuffer][cpuThreadMap[nextStateIdentifier][i]].numberOfExecutables = nextState->threads[i].numberOfExecutables;
rtThreadInfo[nextBuffer][cpuThreadMap[nextStateIdentifier][i]].cycleTime = nextState->threads[i].cycleTime;
rtThreadInfo[nextBuffer][cpuThreadMap[nextStateIdentifier][i]].lastCycleTimeStamp = 0u;
}
}
}
}
ErrorManagement::ErrorType DebugFastScheduler::Execute(ExecutionInfo & information) {
ErrorManagement::ErrorType ret;
if (information.GetStage() == MARTe::ExecutionInfo::StartupStage) {
}
else if (information.GetStage() == MARTe::ExecutionInfo::MainStage) {
uint32 threadNumber = information.GetThreadNumber();
(void) eventSem.Wait(TTInfiniteWait);
if (superFast == 0u) {
(void) countingSem.WaitForAll(TTInfiniteWait);
}
uint32 idx = static_cast<uint32>(realTimeApplicationT->GetIndex());
if (rtThreadInfo[idx] != NULL_PTR(RTThreadParam *)) {
if (rtThreadInfo[idx][threadNumber].numberOfExecutables > 0u) {
// EXECUTION CONTROL HOOK
if (debugService != NULL_PTR(DebugService*)) {
while (debugService->IsPaused()) {
Sleep::MSec(1);
}
}
bool ok = ExecuteSingleCycle(rtThreadInfo[idx][threadNumber].executables, rtThreadInfo[idx][threadNumber].numberOfExecutables);
if (!ok) {
if (errorMessage.IsValid()) {
(void)MessageI::SendMessage(errorMessage, this);
}
}
uint32 absTime = 0u;
if (rtThreadInfo[idx][threadNumber].lastCycleTimeStamp != 0u) {
uint64 tmp = (HighResolutionTimer::Counter() - rtThreadInfo[idx][threadNumber].lastCycleTimeStamp);
float64 ticksToTime = (static_cast<float64>(tmp) * clockPeriod) * 1e6;
absTime = static_cast<uint32>(ticksToTime);
}
uint32 sizeToCopy = static_cast<uint32>(sizeof(uint32));
(void)MemoryOperationsHelper::Copy(rtThreadInfo[idx][threadNumber].cycleTime, &absTime, sizeToCopy);
rtThreadInfo[idx][threadNumber].lastCycleTimeStamp = HighResolutionTimer::Counter();
}
else {
(void) unusedThreadsSem.Wait(TTInfiniteWait);
}
}
}
return ret;
}
CLASS_REGISTER(DebugFastScheduler, "1.0")
}

View File

@@ -1,8 +1,8 @@
#include "DebugService.h" #include "DebugService.h"
#include "AdvancedErrorManagement.h" #include "StandardParser.h"
#include "StreamString.h" #include "StreamString.h"
#include "BasicSocket.h"
#include "DebugBrokerWrapper.h" #include "DebugBrokerWrapper.h"
#include "DebugFastScheduler.h"
#include "ObjectRegistryDatabase.h" #include "ObjectRegistryDatabase.h"
#include "ClassRegistryItem.h" #include "ClassRegistryItem.h"
#include "ObjectBuilder.h" #include "ObjectBuilder.h"
@@ -12,6 +12,15 @@
#include "GAM.h" #include "GAM.h"
// Explicitly include target brokers for templating // Explicitly include target brokers for templating
#include "MemoryMapInputBroker.h"
#include "MemoryMapOutputBroker.h"
#include "MemoryMapSynchronisedInputBroker.h"
#include "MemoryMapSynchronisedOutputBroker.h"
#include "MemoryMapInterpolatedInputBroker.h"
#include "MemoryMapMultiBufferInputBroker.h"
#include "MemoryMapMultiBufferOutputBroker.h"
#include "MemoryMapSynchronisedMultiBufferInputBroker.h"
#include "MemoryMapSynchronisedMultiBufferOutputBroker.h"
namespace MARTe { namespace MARTe {
@@ -102,7 +111,8 @@ bool DebugService::Initialise(StructuredDataI & data) {
} }
if (isServer) { if (isServer) {
if (!traceBuffer.Init(1024 * 1024)) return false; // 8MB Buffer for lossless tracing at high frequency
if (!traceBuffer.Init(8 * 1024 * 1024)) return false;
PatchRegistry(); PatchRegistry();
@@ -168,10 +178,6 @@ void DebugService::PatchRegistry() {
PatchItemInternal("MemoryMapSynchronisedMultiBufferInputBroker", &b8); PatchItemInternal("MemoryMapSynchronisedMultiBufferInputBroker", &b8);
static DebugMemoryMapSynchronisedMultiBufferOutputBrokerBuilder b9; static DebugMemoryMapSynchronisedMultiBufferOutputBrokerBuilder b9;
PatchItemInternal("MemoryMapSynchronisedMultiBufferOutputBroker", &b9); PatchItemInternal("MemoryMapSynchronisedMultiBufferOutputBroker", &b9);
// Patch Scheduler
static ObjectBuilderT<DebugFastScheduler> schedBuilder;
PatchItemInternal("FastScheduler", &schedBuilder);
} }
void DebugService::ProcessSignal(DebugSignalInfo* s, uint32 size) { void DebugService::ProcessSignal(DebugSignalInfo* s, uint32 size) {
@@ -341,7 +347,7 @@ ErrorManagement::ErrorType DebugService::Streamer(ExecutionInfo & info) {
} }
InternetHost dest(streamPort, streamIP.Buffer()); InternetHost dest(streamPort, streamIP.Buffer());
udpSocket.SetDestination(dest); (void)udpSocket.SetDestination(dest);
uint8 packetBuffer[4096]; uint8 packetBuffer[4096];
uint32 packetOffset = 0; uint32 packetOffset = 0;
@@ -353,6 +359,7 @@ ErrorManagement::ErrorType DebugService::Streamer(ExecutionInfo & info) {
uint8 sampleData[1024]; uint8 sampleData[1024];
bool hasData = false; bool hasData = false;
// TIGHT LOOP: Drain the buffer as fast as possible without sleeping
while ((info.GetStage() == ExecutionInfo::MainStage) && traceBuffer.Pop(id, sampleData, size, 1024)) { while ((info.GetStage() == ExecutionInfo::MainStage) && traceBuffer.Pop(id, sampleData, size, 1024)) {
hasData = true; hasData = true;
if (packetOffset == 0) { if (packetOffset == 0) {
@@ -361,36 +368,44 @@ ErrorManagement::ErrorType DebugService::Streamer(ExecutionInfo & info) {
header.seq = sequenceNumber++; header.seq = sequenceNumber++;
header.timestamp = HighResolutionTimer::Counter(); header.timestamp = HighResolutionTimer::Counter();
header.count = 0; header.count = 0;
MemoryOperationsHelper::Copy(packetBuffer, &header, sizeof(TraceHeader)); std::memcpy(packetBuffer, &header, sizeof(TraceHeader));
packetOffset = sizeof(TraceHeader); packetOffset = sizeof(TraceHeader);
} }
// Packet Packing: Header + [ID:4][Size:4][Data:N]
// If this sample doesn't fit, flush the current packet first
if (packetOffset + 8 + size > 1400) { if (packetOffset + 8 + size > 1400) {
uint32 toWrite = packetOffset; uint32 toWrite = packetOffset;
udpSocket.Write((char8*)packetBuffer, toWrite); (void)udpSocket.Write((char8*)packetBuffer, toWrite);
packetOffset = 0;
// Re-init header for the next packet
TraceHeader header; TraceHeader header;
header.magic = 0xDA7A57AD; header.magic = 0xDA7A57AD;
header.seq = sequenceNumber++; header.seq = sequenceNumber++;
header.timestamp = HighResolutionTimer::Counter(); header.timestamp = HighResolutionTimer::Counter();
header.count = 0; header.count = 0;
MemoryOperationsHelper::Copy(packetBuffer, &header, sizeof(TraceHeader)); std::memcpy(packetBuffer, &header, sizeof(TraceHeader));
packetOffset = sizeof(TraceHeader); packetOffset = sizeof(TraceHeader);
} }
MemoryOperationsHelper::Copy(&packetBuffer[packetOffset], &id, 4); std::memcpy(&packetBuffer[packetOffset], &id, 4);
MemoryOperationsHelper::Copy(&packetBuffer[packetOffset + 4], &size, 4); std::memcpy(&packetBuffer[packetOffset + 4], &size, 4);
MemoryOperationsHelper::Copy(&packetBuffer[packetOffset + 8], sampleData, size); std::memcpy(&packetBuffer[packetOffset + 8], sampleData, size);
packetOffset += (8 + size); packetOffset += (8 + size);
// Update sample count in the current packet header
TraceHeader *h = (TraceHeader*)packetBuffer; TraceHeader *h = (TraceHeader*)packetBuffer;
h->count++; h->count++;
} }
// Flush any remaining data
if (packetOffset > 0) { if (packetOffset > 0) {
uint32 toWrite = packetOffset; uint32 toWrite = packetOffset;
udpSocket.Write((char8*)packetBuffer, toWrite); (void)udpSocket.Write((char8*)packetBuffer, toWrite);
packetOffset = 0; packetOffset = 0;
} }
// Only sleep if the buffer was completely empty
if (!hasData) Sleep::MSec(1); if (!hasData) Sleep::MSec(1);
} }
return ErrorManagement::NoError; return ErrorManagement::NoError;
@@ -419,7 +434,7 @@ void DebugService::HandleCommand(StreamString cmd, BasicTCPSocket *client) {
uint32 count = ForceSignal(name.Buffer(), val.Buffer()); uint32 count = ForceSignal(name.Buffer(), val.Buffer());
if (client) { if (client) {
StreamString resp; resp.Printf("OK FORCE %u\n", count); StreamString resp; resp.Printf("OK FORCE %u\n", count);
uint32 s = resp.Size(); client->Write(resp.Buffer(), s); uint32 s = resp.Size(); (void)client->Write(resp.Buffer(), s);
} }
} }
} }
@@ -429,7 +444,7 @@ void DebugService::HandleCommand(StreamString cmd, BasicTCPSocket *client) {
uint32 count = UnforceSignal(name.Buffer()); uint32 count = UnforceSignal(name.Buffer());
if (client) { if (client) {
StreamString resp; resp.Printf("OK UNFORCE %u\n", count); StreamString resp; resp.Printf("OK UNFORCE %u\n", count);
uint32 s = resp.Size(); client->Write(resp.Buffer(), s); uint32 s = resp.Size(); (void)client->Write(resp.Buffer(), s);
} }
} }
} }
@@ -441,31 +456,31 @@ void DebugService::HandleCommand(StreamString cmd, BasicTCPSocket *client) {
if (cmd.GetToken(decim, delims, term)) { if (cmd.GetToken(decim, delims, term)) {
AnyType decimVal(UnsignedInteger32Bit, 0u, &d); AnyType decimVal(UnsignedInteger32Bit, 0u, &d);
AnyType decimStr(CharString, 0u, decim.Buffer()); AnyType decimStr(CharString, 0u, decim.Buffer());
TypeConvert(decimVal, decimStr); (void)TypeConvert(decimVal, decimStr);
} }
uint32 count = TraceSignal(name.Buffer(), enable, d); uint32 count = TraceSignal(name.Buffer(), enable, d);
if (client) { if (client) {
StreamString resp; resp.Printf("OK TRACE %u\n", count); StreamString resp; resp.Printf("OK TRACE %u\n", count);
uint32 s = resp.Size(); client->Write(resp.Buffer(), s); uint32 s = resp.Size(); (void)client->Write(resp.Buffer(), s);
} }
} }
} }
else if (token == "DISCOVER") Discover(client); else if (token == "DISCOVER") Discover(client);
else if (token == "PAUSE") { else if (token == "PAUSE") {
SetPaused(true); SetPaused(true);
if (client) { uint32 s = 3; client->Write("OK\n", s); } if (client) { uint32 s = 3; (void)client->Write("OK\n", s); }
} }
else if (token == "RESUME") { else if (token == "RESUME") {
SetPaused(false); SetPaused(false);
if (client) { uint32 s = 3; client->Write("OK\n", s); } if (client) { uint32 s = 3; (void)client->Write("OK\n", s); }
} }
else if (token == "TREE") { else if (token == "TREE") {
StreamString json; StreamString json;
json = "{\"Name\": \"Root\", \"Class\": \"ObjectRegistryDatabase\", \"Children\": [\n"; json = "{\"Name\": \"Root\", \"Class\": \"ObjectRegistryDatabase\", \"Children\": [\n";
ExportTree(ObjectRegistryDatabase::Instance(), json); (void)ExportTree(ObjectRegistryDatabase::Instance(), json);
json += "\n]}\nOK TREE\n"; json += "\n]}\nOK TREE\n";
uint32 s = json.Size(); uint32 s = json.Size();
client->Write(json.Buffer(), s); (void)client->Write(json.Buffer(), s);
} }
else if (token == "INFO") { else if (token == "INFO") {
StreamString path; StreamString path;
@@ -479,7 +494,7 @@ void DebugService::HandleCommand(StreamString cmd, BasicTCPSocket *client) {
else if (client) { else if (client) {
const char* msg = "ERROR: Unknown command\n"; const char* msg = "ERROR: Unknown command\n";
uint32 s = StringHelper::Length(msg); uint32 s = StringHelper::Length(msg);
client->Write(msg, s); (void)client->Write(msg, s);
} }
} }
} }
@@ -531,7 +546,7 @@ void DebugService::InfoNode(const char8* path, BasicTCPSocket *client) {
json += "}\nOK INFO\n"; json += "}\nOK INFO\n";
uint32 s = json.Size(); uint32 s = json.Size();
client->Write(json.Buffer(), s); (void)client->Write(json.Buffer(), s);
} }
uint32 DebugService::ExportTree(ReferenceContainer *container, StreamString &json) { uint32 DebugService::ExportTree(ReferenceContainer *container, StreamString &json) {
@@ -665,7 +680,7 @@ void DebugService::Discover(BasicTCPSocket *client) {
if (client) { if (client) {
StreamString header = "{\n \"Signals\": [\n"; StreamString header = "{\n \"Signals\": [\n";
uint32 s = header.Size(); uint32 s = header.Size();
client->Write(header.Buffer(), s); (void)client->Write(header.Buffer(), s);
mutex.FastLock(); mutex.FastLock();
for (uint32 i = 0; i < numberOfAliases; i++) { for (uint32 i = 0; i < numberOfAliases; i++) {
StreamString line; StreamString line;
@@ -676,12 +691,12 @@ void DebugService::Discover(BasicTCPSocket *client) {
if (i < numberOfAliases - 1) line += ","; if (i < numberOfAliases - 1) line += ",";
line += "\n"; line += "\n";
s = line.Size(); s = line.Size();
client->Write(line.Buffer(), s); (void)client->Write(line.Buffer(), s);
} }
mutex.FastUnLock(); mutex.FastUnLock();
StreamString footer = " ]\n}\nOK DISCOVER\n"; StreamString footer = " ]\n}\nOK DISCOVER\n";
s = footer.Size(); s = footer.Size();
client->Write(footer.Buffer(), s); (void)client->Write(footer.Buffer(), s);
} }
} }
@@ -698,7 +713,7 @@ void DebugService::ListNodes(const char8* path, BasicTCPSocket *client) {
StreamString header; StreamString header;
header.Printf("Nodes under %s:\n", path ? path : "/"); header.Printf("Nodes under %s:\n", path ? path : "/");
uint32 s = header.Size(); uint32 s = header.Size();
client->Write(header.Buffer(), s); (void)client->Write(header.Buffer(), s);
ReferenceContainer *container = dynamic_cast<ReferenceContainer*>(ref.operator->()); ReferenceContainer *container = dynamic_cast<ReferenceContainer*>(ref.operator->());
if (container) { if (container) {
@@ -709,7 +724,7 @@ void DebugService::ListNodes(const char8* path, BasicTCPSocket *client) {
StreamString line; StreamString line;
line.Printf(" %s [%s]\n", child->GetName(), child->GetClassProperties()->GetName()); line.Printf(" %s [%s]\n", child->GetName(), child->GetClassProperties()->GetName());
s = line.Size(); s = line.Size();
client->Write(line.Buffer(), s); (void)client->Write(line.Buffer(), s);
} }
} }
} }
@@ -717,15 +732,15 @@ void DebugService::ListNodes(const char8* path, BasicTCPSocket *client) {
DataSourceI *ds = dynamic_cast<DataSourceI*>(ref.operator->()); DataSourceI *ds = dynamic_cast<DataSourceI*>(ref.operator->());
if (ds) { if (ds) {
StreamString dsHeader = " Signals:\n"; StreamString dsHeader = " Signals:\n";
s = dsHeader.Size(); client->Write(dsHeader.Buffer(), s); s = dsHeader.Size(); (void)client->Write(dsHeader.Buffer(), s);
uint32 nSignals = ds->GetNumberOfSignals(); uint32 nSignals = ds->GetNumberOfSignals();
for (uint32 i=0; i<nSignals; i++) { for (uint32 i=0; i<nSignals; i++) {
StreamString sname, line; StreamString sname, line;
ds->GetSignalName(i, sname); (void)ds->GetSignalName(i, sname);
TypeDescriptor stype = ds->GetSignalType(i); TypeDescriptor stype = ds->GetSignalType(i);
const char8* stypeName = TypeDescriptor::GetTypeNameFromTypeDescriptor(stype); const char8* stypeName = TypeDescriptor::GetTypeNameFromTypeDescriptor(stype);
line.Printf(" %s [%s]\n", sname.Buffer(), stypeName ? stypeName : "Unknown"); line.Printf(" %s [%s]\n", sname.Buffer(), stypeName ? stypeName : "Unknown");
s = line.Size(); client->Write(line.Buffer(), s); s = line.Size(); (void)client->Write(line.Buffer(), s);
} }
} }
@@ -735,31 +750,31 @@ void DebugService::ListNodes(const char8* path, BasicTCPSocket *client) {
uint32 nOut = gam->GetNumberOfOutputSignals(); uint32 nOut = gam->GetNumberOfOutputSignals();
StreamString gamHeader; StreamString gamHeader;
gamHeader.Printf(" Input Signals (%d):\n", nIn); gamHeader.Printf(" Input Signals (%d):\n", nIn);
s = gamHeader.Size(); client->Write(gamHeader.Buffer(), s); s = gamHeader.Size(); (void)client->Write(gamHeader.Buffer(), s);
for (uint32 i=0; i<nIn; i++) { for (uint32 i=0; i<nIn; i++) {
StreamString sname, line; StreamString sname, line;
gam->GetSignalName(InputSignals, i, sname); (void)gam->GetSignalName(InputSignals, i, sname);
line.Printf(" %s\n", sname.Buffer()); line.Printf(" %s\n", sname.Buffer());
s = line.Size(); client->Write(line.Buffer(), s); s = line.Size(); (void)client->Write(line.Buffer(), s);
} }
gamHeader.SetSize(0); gamHeader.SetSize(0);
gamHeader.Printf(" Output Signals (%d):\n", nOut); gamHeader.Printf(" Output Signals (%d):\n", nOut);
s = gamHeader.Size(); client->Write(gamHeader.Buffer(), s); s = gamHeader.Size(); (void)client->Write(gamHeader.Buffer(), s);
for (uint32 i=0; i<nOut; i++) { for (uint32 i=0; i<nOut; i++) {
StreamString sname, line; StreamString sname, line;
gam->GetSignalName(OutputSignals, i, sname); (void)gam->GetSignalName(OutputSignals, i, sname);
line.Printf(" %s\n", sname.Buffer()); line.Printf(" %s\n", sname.Buffer());
s = line.Size(); client->Write(line.Buffer(), s); s = line.Size(); (void)client->Write(line.Buffer(), s);
} }
} }
const char* okMsg = "OK LS\n"; const char* okMsg = "OK LS\n";
s = StringHelper::Length(okMsg); s = StringHelper::Length(okMsg);
client->Write(okMsg, s); (void)client->Write(okMsg, s);
} else { } else {
const char* msg = "ERROR: Path not found\n"; const char* msg = "ERROR: Path not found\n";
uint32 s = StringHelper::Length(msg); uint32 s = StringHelper::Length(msg);
client->Write(msg, s); (void)client->Write(msg, s);
} }
} }

View File

@@ -8,7 +8,7 @@
Counter = { Counter = {
DataSource = Timer DataSource = Timer
Type = uint32 Type = uint32
Frequency = 1 Frequency = 1000
} }
Time = { Time = {
DataSource = Timer DataSource = Timer
@@ -32,7 +32,6 @@
DefaultDataSource = DDB DefaultDataSource = DDB
+Timer = { +Timer = {
Class = LinuxTimer Class = LinuxTimer
SleepTime = 1000000
Signals = { Signals = {
Counter = { Counter = {
Type = uint32 Type = uint32

View File

@@ -7,201 +7,166 @@
#include "BasicTCPSocket.h" #include "BasicTCPSocket.h"
#include "RealTimeApplication.h" #include "RealTimeApplication.h"
#include "GlobalObjectsDatabase.h" #include "GlobalObjectsDatabase.h"
#include "RealTimeLoader.h"
#include <assert.h> #include <assert.h>
#include <stdio.h> #include <stdio.h>
using namespace MARTe; using namespace MARTe;
const char8 * const config_text = // Removed '+' prefix from names for simpler lookup
"+DebugService = {" const char8 * const simple_config =
"DebugService = {"
" Class = DebugService " " Class = DebugService "
" ControlPort = 8080 " " ControlPort = 8080 "
" UdpPort = 8081 " " UdpPort = 8081 "
" StreamIP = \"127.0.0.1\" " " StreamIP = \"127.0.0.1\" "
"}" "}"
"+App = {" "App = {"
" Class = RealTimeApplication " " Class = RealTimeApplication "
" +Functions = {" " +Functions = {"
" Class = ReferenceContainer " " Class = ReferenceContainer "
" +GAM1 = {" " +GAM1 = {"
" Class = IOGAM " " Class = IOGAM "
" InputSignals = {" " InputSignals = {"
" Counter = {" " Counter = { DataSource = Timer Type = uint32 Frequency = 1000 }"
" DataSource = Timer " " Time = { DataSource = Timer Type = uint32 }"
" Type = uint32 "
" Frequency = 100 "
" }"
" }" " }"
" OutputSignals = {" " OutputSignals = {"
" Counter = {" " Counter = { DataSource = DDB Type = uint32 }"
" DataSource = DDB " " Time = { DataSource = DDB Type = uint32 }"
" Type = uint32 "
" }"
" }" " }"
" }" " }"
" }" " }"
" +Data = {" " +Data = {"
" Class = ReferenceContainer " " Class = ReferenceContainer "
" DefaultDataSource = DDB " " DefaultDataSource = DDB "
" +Timer = {" " +Timer = { Class = LinuxTimer SleepTime = 1000 Signals = { Counter = { Type = uint32 } Time = { Type = uint32 } } }"
" Class = LinuxTimer " " +DDB = { Class = GAMDataSource Signals = { Counter = { Type = uint32 } Time = { Type = uint32 } } }"
" SleepTime = 10000 "
" Signals = {"
" Counter = { Type = uint32 }"
" Time = { Type = uint32 }"
" }"
" }"
" +DDB = {"
" Class = GAMDataSource "
" Signals = { Counter = { Type = uint32 } }"
" }"
" +DAMS = { Class = TimingDataSource }" " +DAMS = { Class = TimingDataSource }"
" }" " }"
" +States = {" " +States = {"
" Class = ReferenceContainer " " Class = ReferenceContainer "
" +State1 = {" " +State1 = { Class = RealTimeState +Threads = { Class = ReferenceContainer +Thread1 = { Class = RealTimeThread Functions = {GAM1} } } }"
" Class = RealTimeState "
" +Threads = {"
" Class = ReferenceContainer "
" +Thread1 = {"
" Class = RealTimeThread "
" Functions = {GAM1} "
" }"
" }"
" }"
" }"
" +Scheduler = {"
" Class = GAMScheduler "
" TimingDataSource = DAMS "
" }" " }"
" +Scheduler = { Class = GAMScheduler TimingDataSource = DAMS }"
"}"; "}";
void RunValidationTest() { void RunValidationTest() {
printf("--- MARTe2 100Hz Trace Validation Test ---\n"); printf("--- MARTe2 1kHz Lossless Trace Validation Test ---\n");
ObjectRegistryDatabase::Instance()->Purge(); ObjectRegistryDatabase::Instance()->Purge();
ConfigurationDatabase cdb; ConfigurationDatabase cdb;
StreamString ss = config_text; StreamString ss = simple_config;
ss.Seek(0); ss.Seek(0);
StandardParser parser(ss, cdb); StandardParser parser(ss, cdb);
if (!parser.Parse()) { assert(parser.Parse());
printf("ERROR: Failed to parse configuration\n");
return;
}
if (!ObjectRegistryDatabase::Instance()->Initialise(cdb)) { cdb.MoveToRoot();
printf("ERROR: Failed to initialise ObjectRegistryDatabase.\n"); uint32 n = cdb.GetNumberOfChildren();
return; for (uint32 i=0; i<n; i++) {
} const char8* name = cdb.GetChildName(i);
ConfigurationDatabase child;
ReferenceT<DebugService> service = ObjectRegistryDatabase::Instance()->Find("DebugService"); cdb.MoveRelative(name);
if (!service.IsValid()) { cdb.Copy(child);
printf("ERROR: DebugService not found\n"); cdb.MoveToAncestor(1u);
return;
}
ReferenceT<RealTimeApplication> app = ObjectRegistryDatabase::Instance()->Find("App");
if (!app.IsValid()) {
printf("ERROR: App not found\n");
return;
}
if (!app->ConfigureApplication()) {
printf("ERROR: Failed to configure application\n");
return;
}
if (app->PrepareNextState("State1") != ErrorManagement::NoError) {
printf("ERROR: Failed to prepare state State1\n");
return;
}
if (app->StartNextStateExecution() != ErrorManagement::NoError) {
printf("ERROR: Failed to start execution\n");
return;
}
printf("Application and DebugService are active.\n");
Sleep::MSec(1000);
// DIRECT ACTIVATION: Use the public TraceSignal method
printf("Activating trace directly...\n");
// We try multiple potential paths to be safe
uint32 traceCount = 0;
traceCount += service->TraceSignal("App.Data.Timer.Counter", true, 1);
traceCount += service->TraceSignal("Timer.Counter", true, 1);
traceCount += service->TraceSignal("Counter", true, 1);
printf("Trace enabled (Matched Aliases: %u)\n", traceCount);
// 4. Setup UDP Listener
BasicUDPSocket listener;
if (!listener.Open()) { printf("ERROR: Failed to open UDP socket\n"); return; }
if (!listener.Listen(8081)) { printf("ERROR: Failed to listen on UDP 8081\n"); return; }
// 5. Validate for 10 seconds
printf("Validating telemetry for 10 seconds...\n");
uint32 lastVal = 0;
bool first = true;
uint32 packetCount = 0;
uint32 discontinuityCount = 0;
float64 startTime = HighResolutionTimer::Counter() * HighResolutionTimer::Period();
float64 globalTimeout = startTime + 30.0;
while ((HighResolutionTimer::Counter() * HighResolutionTimer::Period() - startTime) < 10.0) {
if (HighResolutionTimer::Counter() * HighResolutionTimer::Period() > globalTimeout) {
printf("CRITICAL ERROR: Global test timeout reached.\n");
break;
}
char buffer[2048];
uint32 size = 2048;
TimeoutType timeout(200);
if (listener.Read(buffer, size, timeout)) { StreamString className;
child.Read("Class", className);
Reference ref(className.Buffer(), GlobalObjectsDatabase::Instance()->GetStandardHeap());
ref->SetName(name);
assert(ref->Initialise(child));
ObjectRegistryDatabase::Instance()->Insert(ref);
}
Reference serviceGeneric = ObjectRegistryDatabase::Instance()->Find("DebugService");
Reference appGeneric = ObjectRegistryDatabase::Instance()->Find("App");
if (!serviceGeneric.IsValid() || !appGeneric.IsValid()) {
printf("ERROR: Objects NOT FOUND even without prefix\n");
return;
}
DebugService *service = dynamic_cast<DebugService*>(serviceGeneric.operator->());
RealTimeApplication *app = dynamic_cast<RealTimeApplication*>(appGeneric.operator->());
assert(service);
assert(app);
if (!app->ConfigureApplication()) {
printf("ERROR: ConfigureApplication failed.\n");
return;
}
assert(app->PrepareNextState("State1") == ErrorManagement::NoError);
assert(app->StartNextStateExecution() == ErrorManagement::NoError);
printf("Application started at 1kHz. Enabling Traces...\n");
Sleep::MSec(500);
// The registered name in DebugBrokerWrapper depends on GetFullObjectName
// With App as root, it should be App.Data.Timer.Counter
service->TraceSignal("App.Data.Timer.Counter", true, 1);
BasicUDPSocket listener;
listener.Open();
listener.Listen(8081);
printf("Validating for 10 seconds...\n");
uint32 lastCounter = 0;
bool first = true;
uint32 totalSamples = 0;
uint32 discontinuities = 0;
uint32 totalPackets = 0;
float64 startTest = HighResolutionTimer::Counter() * HighResolutionTimer::Period();
while ((HighResolutionTimer::Counter() * HighResolutionTimer::Period() - startTest) < 10.0) {
char buffer[4096];
uint32 size = 4096;
if (listener.Read(buffer, size, TimeoutType(100))) {
totalPackets++;
TraceHeader *h = (TraceHeader*)buffer; TraceHeader *h = (TraceHeader*)buffer;
if (h->magic == 0xDA7A57AD && h->count > 0) { if (h->magic != 0xDA7A57AD) continue;
uint32 offset = sizeof(TraceHeader);
// Packet format: [Header][ID:4][Size:4][Value:N] uint32 offset = sizeof(TraceHeader);
for (uint32 i=0; i<h->count; i++) {
uint32 sigId = *(uint32*)(&buffer[offset]);
uint32 val = *(uint32*)(&buffer[offset + 8]); uint32 val = *(uint32*)(&buffer[offset + 8]);
if (!first) { if (sigId == 0) {
if (val != lastVal + 1) { if (!first) {
discontinuityCount++; if (val != lastCounter + 1) {
discontinuities++;
}
} }
lastCounter = val;
totalSamples++;
} }
lastVal = val;
first = false;
packetCount++;
if (packetCount % 200 == 0) { uint32 sigSize = *(uint32*)(&buffer[offset + 4]);
printf("Received %u packets... Current Value: %u\n", packetCount, val); offset += (8 + sigSize);
}
} }
first = false;
} }
} }
printf("Test Finished.\n"); printf("\n--- Test Results ---\n");
printf("Total Packets Received: %u (Expected ~1000)\n", packetCount); printf("Total UDP Packets: %u\n", totalPackets);
printf("Discontinuities: %u\n", discontinuityCount); printf("Total Counter Samples: %u\n", totalSamples);
printf("Counter Discontinuities: %u\n", discontinuities);
float64 actualFreq = (float64)packetCount / 10.0;
printf("Average Frequency: %.2f Hz\n", actualFreq);
if (packetCount < 100) { if (totalSamples < 9000) {
printf("FAILURE: Almost no packets received. Telemetry is broken.\n"); printf("FAILURE: Underflow - samples missing (%u).\n", totalSamples);
} else if (packetCount < 800) { } else if (discontinuities > 10) {
printf("WARNING: Too few packets received (Expected 1000, Got %u).\n", packetCount); printf("FAILURE: Excessive discontinuities detected! (%u)\n", discontinuities);
} else if (discontinuityCount > 20) {
printf("FAILURE: Too many discontinuities (%u).\n", discontinuityCount);
} else { } else {
printf("VALIDATION SUCCESSFUL!\n"); printf("VALIDATION SUCCESSFUL: 1kHz Lossless Tracing Verified.\n");
} }
app->StopCurrentStateExecution(); app->StopCurrentStateExecution();
listener.Close();
ObjectRegistryDatabase::Instance()->Purge(); ObjectRegistryDatabase::Instance()->Purge();
} }

View File

@@ -525,12 +525,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]] [[package]]
name = "byteorder-lite" name = "byteorder-lite"
version = "0.1.0" version = "0.1.0"
@@ -1798,16 +1792,15 @@ dependencies = [
name = "marte_debug_gui" name = "marte_debug_gui"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"byteorder",
"chrono", "chrono",
"crossbeam-channel", "crossbeam-channel",
"eframe", "eframe",
"egui",
"egui_plot", "egui_plot",
"once_cell",
"regex", "regex",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "socket2",
] ]
[[package]] [[package]]
@@ -1859,17 +1852,6 @@ dependencies = [
"simd-adler32", "simd-adler32",
] ]
[[package]]
name = "mio"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc"
dependencies = [
"libc",
"wasi",
"windows-sys 0.61.2",
]
[[package]] [[package]]
name = "moxcms" name = "moxcms"
version = "0.7.11" version = "0.7.11"
@@ -2891,12 +2873,12 @@ dependencies = [
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.6.2" version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0" checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys 0.60.2", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@@ -3081,34 +3063,6 @@ dependencies = [
"zerovec", "zerovec",
] ]
[[package]]
name = "tokio"
version = "1.49.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72a2903cd7736441aac9df9d7688bd0ce48edccaadf181c3b90be801e81d3d86"
dependencies = [
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.61.2",
]
[[package]]
name = "tokio-macros"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "toml_datetime" name = "toml_datetime"
version = "0.7.5+spec-1.1.0" version = "0.7.5+spec-1.1.0"

View File

@@ -5,12 +5,11 @@ edition = "2021"
[dependencies] [dependencies]
eframe = "0.31.0" eframe = "0.31.0"
egui = "0.31.0"
egui_plot = "0.31.0" egui_plot = "0.31.0"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
byteorder = "1.4"
chrono = "0.4" chrono = "0.4"
crossbeam-channel = "0.5" crossbeam-channel = "0.5"
regex = "1.12.3" regex = "1.10"
socket2 = { version = "0.5", features = ["all"] }
once_cell = "1.21.3"

File diff suppressed because it is too large Load Diff