Improved overall features

This commit is contained in:
Martino Ferrari
2026-03-03 21:41:59 +01:00
parent e6102ba433
commit a941563749
10 changed files with 689 additions and 103 deletions

View File

@@ -1,15 +1,18 @@
#include "BasicTCPSocket.h"
#include "ClassRegistryItem.h"
#include "ConfigurationDatabase.h"
#include "DataSourceI.h"
#include "DebugBrokerWrapper.h"
#include "DebugService.h"
#include "GAM.h"
#include "GlobalObjectsDatabase.h"
#include "HighResolutionTimer.h"
#include "ObjectBuilder.h"
#include "ObjectRegistryDatabase.h"
#include "StreamString.h"
#include "TimeoutType.h"
#include "TypeConversion.h"
#include "ReferenceT.h"
namespace MARTe {
@@ -84,6 +87,7 @@ DebugService::DebugService()
threadService(binderServer), streamerService(binderStreamer) {
controlPort = 0;
streamPort = 8081;
logPort = 8082;
streamIP = "127.0.0.1";
isServer = false;
suppressTimeoutLogs = true;
@@ -106,6 +110,7 @@ DebugService::~DebugService() {
for (uint32 i = 0; i < signals.Size(); i++) {
delete signals[i];
}
this->Purge();
}
bool DebugService::Initialise(StructuredDataI &data) {
@@ -132,6 +137,15 @@ bool DebugService::Initialise(StructuredDataI &data) {
(void)data.Read("UdpPort", port);
streamPort = (uint16)port;
}
port = 8082;
if (data.Read("LogPort", port)) {
logPort = (uint16)port;
} else {
(void)data.Read("TcpLogPort", port);
logPort = (uint16)port;
}
StreamString tempIP;
if (data.Read("StreamIP", tempIP)) {
streamIP = tempIP;
@@ -175,6 +189,34 @@ bool DebugService::Initialise(StructuredDataI &data) {
return false;
if (streamerService.Start() != ErrorManagement::NoError)
return false;
if (logPort > 0) {
Reference tcpLogger(
"TcpLogger", GlobalObjectsDatabase::Instance()->GetStandardHeap());
if (tcpLogger.IsValid()) {
ConfigurationDatabase loggerConfig;
loggerConfig.Write("Port", (uint32)logPort);
if (tcpLogger->Initialise(loggerConfig)) {
this->Insert(tcpLogger);
Reference loggerService(
"LoggerService",
GlobalObjectsDatabase::Instance()->GetStandardHeap());
if (loggerService.IsValid()) {
ConfigurationDatabase serviceConfig;
serviceConfig.Write("CPUs", (uint32)1);
ReferenceContainer *lc =
dynamic_cast<ReferenceContainer *>(loggerService.operator->());
if (lc != NULL_PTR(ReferenceContainer *)) {
lc->Insert(tcpLogger);
}
if (loggerService->Initialise(serviceConfig)) {
this->Insert(loggerService);
}
}
}
}
}
}
return true;
}
@@ -430,6 +472,24 @@ ErrorManagement::ErrorType DebugService::Streamer(ExecutionInfo &info) {
uint32 packetOffset = 0;
uint32 sequenceNumber = 0;
while (info.GetStage() == ExecutionInfo::MainStage) {
// Poll monitored signals
uint64 currentTimeMs = (uint64)((float64)HighResolutionTimer::Counter() *
HighResolutionTimer::Period() * 1000.0);
mutex.FastLock();
for (uint32 i = 0; i < monitoredSignals.Size(); i++) {
if (currentTimeMs >= (monitoredSignals[i].lastPollTime + monitoredSignals[i].periodMs)) {
monitoredSignals[i].lastPollTime = currentTimeMs;
uint64 ts = (uint64)((float64)HighResolutionTimer::Counter() *
HighResolutionTimer::Period() * 1000000.0);
void *address = NULL_PTR(void *);
if (monitoredSignals[i].dataSource->GetSignalMemoryBuffer(monitoredSignals[i].signalIdx, 0, address)) {
traceBuffer.Push(monitoredSignals[i].internalID, ts, (uint8 *)address, monitoredSignals[i].size);
}
}
}
mutex.FastUnLock();
uint32 id, size;
uint64 ts;
uint8 sampleData[1024];
@@ -536,7 +596,47 @@ void DebugService::HandleCommand(StreamString cmd, BasicTCPSocket *client) {
}
} else if (token == "DISCOVER")
Discover(client);
else if (token == "CONFIG")
else if (token == "SERVICE_INFO") {
if (client) {
StreamString resp;
resp.Printf("OK SERVICE_INFO TCP_CTRL:%u UDP_STREAM:%u TCP_LOG:%u STATE:%s\n",
controlPort, streamPort, logPort, isPaused ? "PAUSED" : "RUNNING");
uint32 s = resp.Size();
(void)client->Write(resp.Buffer(), s);
}
} else if (token == "MONITOR") {
StreamString subToken;
if (cmd.GetToken(subToken, delims, term) && subToken == "SIGNAL") {
StreamString name, period;
if (cmd.GetToken(name, delims, term) && cmd.GetToken(period, delims, term)) {
uint32 p = 100;
AnyType pVal(UnsignedInteger32Bit, 0u, &p);
AnyType pStr(CharString, 0u, period.Buffer());
(void)TypeConvert(pVal, pStr);
uint32 count = RegisterMonitorSignal(name.Buffer(), p);
if (client) {
StreamString resp;
resp.Printf("OK MONITOR %u\n", count);
uint32 s = resp.Size();
(void)client->Write(resp.Buffer(), s);
}
}
}
} else if (token == "UNMONITOR") {
StreamString subToken;
if (cmd.GetToken(subToken, delims, term) && subToken == "SIGNAL") {
StreamString name;
if (cmd.GetToken(name, delims, term)) {
uint32 count = UnmonitorSignal(name.Buffer());
if (client) {
StreamString resp;
resp.Printf("OK UNMONITOR %u\n", count);
uint32 s = resp.Size();
(void)client->Write(resp.Buffer(), s);
}
}
}
} else if (token == "CONFIG")
ServeConfig(client);
else if (token == "PAUSE") {
SetPaused(true);
@@ -554,7 +654,7 @@ void DebugService::HandleCommand(StreamString cmd, BasicTCPSocket *client) {
StreamString json;
json = "{\"Name\": \"Root\", \"Class\": \"ObjectRegistryDatabase\", "
"\"Children\": [\n";
(void)ExportTree(ObjectRegistryDatabase::Instance(), json);
(void)ExportTree(ObjectRegistryDatabase::Instance(), json, NULL_PTR(const char8 *));
json += "\n]}\nOK TREE\n";
uint32 s = json.Size();
if (client)
@@ -747,7 +847,7 @@ void DebugService::InfoNode(const char8 *path, BasicTCPSocket *client) {
}
uint32 DebugService::ExportTree(ReferenceContainer *container,
StreamString &json) {
StreamString &json, const char8 *pathPrefix) {
if (container == NULL_PTR(ReferenceContainer *))
return 0;
uint32 size = container->Size();
@@ -761,6 +861,14 @@ uint32 DebugService::ExportTree(ReferenceContainer *container,
const char8 *cname = child->GetName();
if (cname == NULL_PTR(const char8 *))
cname = "unnamed";
StreamString currentPath;
if (pathPrefix != NULL_PTR(const char8 *)) {
currentPath.Printf("%s.%s", pathPrefix, cname);
} else {
currentPath = cname;
}
nodeJson += "{\"Name\": \"";
EscapeJson(cname, nodeJson);
nodeJson += "\", \"Class\": \"";
@@ -775,7 +883,7 @@ uint32 DebugService::ExportTree(ReferenceContainer *container,
nodeJson += ", \"Children\": [\n";
uint32 subCount = 0u;
if (inner != NULL_PTR(ReferenceContainer *))
subCount += ExportTree(inner, nodeJson);
subCount += ExportTree(inner, nodeJson, currentPath.Buffer());
if (ds != NULL_PTR(DataSourceI *)) {
uint32 nSignals = ds->GetNumberOfSignals();
for (uint32 j = 0u; j < nSignals; j++) {
@@ -790,12 +898,22 @@ uint32 DebugService::ExportTree(ReferenceContainer *container,
(void)ds->GetSignalNumberOfDimensions(j, dims);
uint32 elems = 0u;
(void)ds->GetSignalNumberOfElements(j, elems);
StreamString signalFullPath;
signalFullPath.Printf("%s.%s", currentPath.Buffer(), sname.Buffer());
bool traceable = false;
bool forcable = false;
(void)IsInstrumented(signalFullPath.Buffer(), traceable, forcable);
nodeJson += "{\"Name\": \"";
EscapeJson(sname.Buffer(), nodeJson);
nodeJson += "\", \"Class\": \"Signal\", \"Type\": \"";
EscapeJson(stype ? stype : "Unknown", nodeJson);
nodeJson.Printf("\", \"Dimensions\": %d, \"Elements\": %u}", dims,
nodeJson.Printf("\", \"Dimensions\": %d, \"Elements\": %u", dims,
elems);
nodeJson.Printf(", \"IsTraceable\": %s, \"IsForcable\": %s}",
traceable ? "true" : "false",
forcable ? "true" : "false");
}
}
if (gam != NULL_PTR(GAM *)) {
@@ -812,12 +930,23 @@ uint32 DebugService::ExportTree(ReferenceContainer *container,
(void)gam->GetSignalNumberOfDimensions(InputSignals, j, dims);
uint32 elems = 0u;
(void)gam->GetSignalNumberOfElements(InputSignals, j, elems);
StreamString signalFullPath;
signalFullPath.Printf("%s.In.%s", currentPath.Buffer(),
sname.Buffer());
bool traceable = false;
bool forcable = false;
(void)IsInstrumented(signalFullPath.Buffer(), traceable, forcable);
nodeJson += "{\"Name\": \"In.";
EscapeJson(sname.Buffer(), nodeJson);
nodeJson += "\", \"Class\": \"InputSignal\", \"Type\": \"";
EscapeJson(stype ? stype : "Unknown", nodeJson);
nodeJson.Printf("\", \"Dimensions\": %u, \"Elements\": %u}", dims,
nodeJson.Printf("\", \"Dimensions\": %u, \"Elements\": %u", dims,
elems);
nodeJson.Printf(", \"IsTraceable\": %s, \"IsForcable\": %s}",
traceable ? "true" : "false",
forcable ? "true" : "false");
}
uint32 nOut = gam->GetNumberOfOutputSignals();
for (uint32 j = 0u; j < nOut; j++) {
@@ -832,12 +961,23 @@ uint32 DebugService::ExportTree(ReferenceContainer *container,
(void)gam->GetSignalNumberOfDimensions(OutputSignals, j, dims);
uint32 elems = 0u;
(void)gam->GetSignalNumberOfElements(OutputSignals, j, elems);
StreamString signalFullPath;
signalFullPath.Printf("%s.Out.%s", currentPath.Buffer(),
sname.Buffer());
bool traceable = false;
bool forcable = false;
(void)IsInstrumented(signalFullPath.Buffer(), traceable, forcable);
nodeJson += "{\"Name\": \"Out.";
EscapeJson(sname.Buffer(), nodeJson);
nodeJson += "\", \"Class\": \"OutputSignal\", \"Type\": \"";
EscapeJson(stype ? stype : "Unknown", nodeJson);
nodeJson.Printf("\", \"Dimensions\": %u, \"Elements\": %u}", dims,
nodeJson.Printf("\", \"Dimensions\": %u, \"Elements\": %u", dims,
elems);
nodeJson.Printf(", \"IsTraceable\": %s, \"IsForcable\": %s}",
traceable ? "true" : "false",
forcable ? "true" : "false");
}
}
nodeJson += "\n]";
@@ -908,13 +1048,121 @@ uint32 DebugService::TraceSignal(const char8 *name, bool enable,
return count;
}
bool DebugService::IsInstrumented(const char8 *fullPath, bool &traceable,
bool &forcable) {
mutex.FastLock();
bool found = false;
for (uint32 i = 0; i < aliases.Size(); i++) {
if (aliases[i].name == fullPath ||
SuffixMatch(aliases[i].name.Buffer(), fullPath)) {
found = true;
break;
}
}
mutex.FastUnLock();
traceable = found;
forcable = found;
return found;
}
uint32 DebugService::RegisterMonitorSignal(const char8 *path, uint32 periodMs) {
mutex.FastLock();
uint32 count = 0;
// Check if already monitored
for (uint32 j = 0; j < monitoredSignals.Size(); j++) {
if (monitoredSignals[j].path == path) {
monitoredSignals[j].periodMs = periodMs;
mutex.FastUnLock();
return 1;
}
}
// Path resolution: find the DataSource object
StreamString fullPath = path;
fullPath.Seek(0);
char8 term;
Vec<StreamString> parts;
StreamString token;
while (fullPath.GetToken(token, ".", term)) {
parts.Push(token);
token = "";
}
if (parts.Size() >= 2) {
StreamString signalName = parts[parts.Size() - 1u];
StreamString dsPath;
for (uint32 i = 0; i < parts.Size() - 1u; i++) {
dsPath += parts[i];
if (i < parts.Size() - 2u)
dsPath += ".";
}
ReferenceT<DataSourceI> ds =
ObjectRegistryDatabase::Instance()->Find(dsPath.Buffer());
if (ds.IsValid()) {
uint32 idx = 0;
if (ds->GetSignalIndex(idx, signalName.Buffer())) {
MonitoredSignal m;
m.dataSource = ds;
m.signalIdx = idx;
m.path = path;
m.periodMs = periodMs;
m.lastPollTime = 0;
m.size = 0;
(void)ds->GetSignalByteSize(idx, m.size);
if (m.size == 0)
m.size = 4;
// Use high-bit for polled signals to avoid conflict with brokered ones
m.internalID = 0x80000000 | monitoredSignals.Size();
// Re-use existing ID if signal is also instrumented via broker
for (uint32 i = 0; i < aliases.Size(); i++) {
if (aliases[i].name == path ||
SuffixMatch(aliases[i].name.Buffer(), path)) {
m.internalID = signals[aliases[i].signalIndex]->internalID;
break;
}
}
monitoredSignals.Push(m);
count = 1;
}
}
}
mutex.FastUnLock();
return count;
}
uint32 DebugService::UnmonitorSignal(const char8 *path) {
mutex.FastLock();
uint32 count = 0;
for (uint32 i = 0; i < monitoredSignals.Size(); i++) {
if (monitoredSignals[i].path == path ||
SuffixMatch(monitoredSignals[i].path.Buffer(), path)) {
(void)monitoredSignals.Remove(i);
i--;
count++;
}
}
mutex.FastUnLock();
return count;
}
void DebugService::Discover(BasicTCPSocket *client) {
if (client) {
StreamString header = "{\n \"Signals\": [\n";
uint32 s = header.Size();
(void)client->Write(header.Buffer(), s);
mutex.FastLock();
uint32 total = 0;
for (uint32 i = 0; i < aliases.Size(); i++) {
if (total > 0) {
uint32 commaSize = 2;
(void)client->Write(",\n", commaSize);
}
StreamString line;
DebugSignalInfo *sig = signals[aliases[i].signalIndex];
const char8 *typeName =
@@ -924,11 +1172,39 @@ void DebugService::Discover(BasicTCPSocket *client) {
typeName ? typeName : "Unknown");
EnrichWithConfig(aliases[i].name.Buffer(), line);
line += "}";
if (i < aliases.Size() - 1)
line += ",";
line += "\n";
s = line.Size();
(void)client->Write(line.Buffer(), s);
total++;
}
// Export monitored signals not already in aliases
for (uint32 i = 0; i < monitoredSignals.Size(); i++) {
bool found = false;
for (uint32 j = 0; j < aliases.Size(); j++) {
if (aliases[j].name == monitoredSignals[i].path) {
found = true;
break;
}
}
if (!found) {
if (total > 0) {
uint32 commaSize = 2;
(void)client->Write(",\n", commaSize);
}
StreamString line;
const char8 *typeName = TypeDescriptor::GetTypeNameFromTypeDescriptor(
monitoredSignals[i].dataSource->GetSignalType(
monitoredSignals[i].signalIdx));
line.Printf(" {\"name\": \"%s\", \"id\": %u, \"type\": \"%s\"",
monitoredSignals[i].path.Buffer(),
monitoredSignals[i].internalID,
typeName ? typeName : "Unknown");
EnrichWithConfig(monitoredSignals[i].path.Buffer(), line);
line += "}";
s = line.Size();
(void)client->Write(line.Buffer(), s);
total++;
}
}
mutex.FastUnLock();
StreamString footer = " ]\n}\nOK DISCOVER\n";