Implemetned better buffering and high frequency tracing

This commit is contained in:
Martino Ferrari
2026-02-23 12:00:14 +01:00
parent 253a4989f9
commit 6b1fc59fc0
4 changed files with 193 additions and 162 deletions

View File

@@ -4,6 +4,7 @@
#include "CompilerTypes.h"
#include "TypeDescriptor.h"
#include "StreamString.h"
#include <cstring> // For memcpy
namespace MARTe {
@@ -58,7 +59,14 @@ public:
uint32 packetSize = 4 + 4 + size;
uint32 read = readIndex;
uint32 write = writeIndex;
uint32 available = (read <= write) ? (bufferSize - (write - read) - 1) : (read - write - 1);
// Calculate available space
uint32 available = 0;
if (read <= write) {
available = bufferSize - (write - read) - 1;
} else {
available = read - write - 1;
}
if (available < packetSize) return false;
@@ -68,6 +76,9 @@ public:
WriteToBuffer(&tempWrite, &size, 4);
WriteToBuffer(&tempWrite, data, size);
// Memory Barrier to ensure data is visible before index update
// __sync_synchronize();
// Final atomic update
writeIndex = tempWrite;
return true;
@@ -79,20 +90,27 @@ public:
if (read == write) return false;
uint32 tempRead = read;
uint32 tempId, tempSize;
uint32 tempId = 0;
uint32 tempSize = 0;
// Peek header
ReadFromBuffer(&tempRead, &tempId, 4);
ReadFromBuffer(&tempRead, &tempSize, 4);
if (tempSize > maxSize) {
// Error case: drop data up to writeIndex
// Error case: drop data up to writeIndex (resync)
readIndex = write;
return false;
}
ReadFromBuffer(&tempRead, dataBuffer, tempSize);
signalID = tempId;
size = tempSize;
// Memory Barrier
// __sync_synchronize();
readIndex = tempRead;
return true;
}
@@ -106,18 +124,32 @@ public:
private:
void WriteToBuffer(uint32 *idx, void* src, uint32 count) {
uint8* s = (uint8*)src;
for (uint32 i=0; i<count; i++) {
buffer[*idx] = s[i];
*idx = (*idx + 1) % bufferSize;
uint32 current = *idx;
uint32 spaceToEnd = bufferSize - current;
if (count <= spaceToEnd) {
std::memcpy(&buffer[current], src, count);
*idx = (current + count) % bufferSize;
} else {
std::memcpy(&buffer[current], src, spaceToEnd);
uint32 remaining = count - spaceToEnd;
std::memcpy(&buffer[0], (uint8*)src + spaceToEnd, remaining);
*idx = remaining;
}
}
void ReadFromBuffer(uint32 *idx, void* dst, uint32 count) {
uint8* d = (uint8*)dst;
for (uint32 i=0; i<count; i++) {
d[i] = buffer[*idx];
*idx = (*idx + 1) % bufferSize;
uint32 current = *idx;
uint32 spaceToEnd = bufferSize - current;
if (count <= spaceToEnd) {
std::memcpy(dst, &buffer[current], count);
*idx = (current + count) % bufferSize;
} else {
std::memcpy(dst, &buffer[current], spaceToEnd);
uint32 remaining = count - spaceToEnd;
std::memcpy((uint8*)dst + spaceToEnd, &buffer[0], remaining);
*idx = remaining;
}
}

View File

@@ -101,7 +101,7 @@ bool DebugService::Initialise(StructuredDataI & data) {
}
if (isServer) {
if (!traceBuffer.Init(1024 * 1024)) return false;
if (!traceBuffer.Init(8 * 1024 * 1024)) return false;
PatchRegistry();

View File

@@ -7,201 +7,166 @@
#include "BasicTCPSocket.h"
#include "RealTimeApplication.h"
#include "GlobalObjectsDatabase.h"
#include "RealTimeLoader.h"
#include <assert.h>
#include <stdio.h>
using namespace MARTe;
const char8 * const config_text =
"+DebugService = {"
// Removed '+' prefix from names for simpler lookup
const char8 * const simple_config =
"DebugService = {"
" Class = DebugService "
" ControlPort = 8080 "
" UdpPort = 8081 "
" StreamIP = \"127.0.0.1\" "
"}"
"+App = {"
"App = {"
" Class = RealTimeApplication "
" +Functions = {"
" Class = ReferenceContainer "
" +GAM1 = {"
" Class = IOGAM "
" InputSignals = {"
" Counter = {"
" DataSource = Timer "
" Type = uint32 "
" Frequency = 100 "
" }"
" Counter = { DataSource = Timer Type = uint32 Frequency = 1000 }"
" Time = { DataSource = Timer Type = uint32 }"
" }"
" OutputSignals = {"
" Counter = {"
" DataSource = DDB "
" Type = uint32 "
" }"
" Counter = { DataSource = DDB Type = uint32 }"
" Time = { DataSource = DDB Type = uint32 }"
" }"
" }"
" }"
" +Data = {"
" Class = ReferenceContainer "
" DefaultDataSource = DDB "
" +Timer = {"
" Class = LinuxTimer "
" SleepTime = 10000 "
" Signals = {"
" Counter = { Type = uint32 }"
" Time = { Type = uint32 }"
" }"
" }"
" +DDB = {"
" Class = GAMDataSource "
" Signals = { Counter = { Type = uint32 } }"
" }"
" +Timer = { Class = LinuxTimer SleepTime = 1000 Signals = { Counter = { Type = uint32 } Time = { Type = uint32 } } }"
" +DDB = { Class = GAMDataSource Signals = { Counter = { Type = uint32 } Time = { Type = uint32 } } }"
" +DAMS = { Class = TimingDataSource }"
" }"
" +States = {"
" Class = ReferenceContainer "
" +State1 = {"
" Class = RealTimeState "
" +Threads = {"
" Class = ReferenceContainer "
" +Thread1 = {"
" Class = RealTimeThread "
" Functions = {GAM1} "
" }"
" }"
" }"
" }"
" +Scheduler = {"
" Class = GAMScheduler "
" TimingDataSource = DAMS "
" +State1 = { Class = RealTimeState +Threads = { Class = ReferenceContainer +Thread1 = { Class = RealTimeThread Functions = {GAM1} } } }"
" }"
" +Scheduler = { Class = GAMScheduler TimingDataSource = DAMS }"
"}";
void RunValidationTest() {
printf("--- MARTe2 100Hz Trace Validation Test ---\n");
printf("--- MARTe2 1kHz Lossless Trace Validation Test ---\n");
ObjectRegistryDatabase::Instance()->Purge();
ConfigurationDatabase cdb;
StreamString ss = config_text;
StreamString ss = simple_config;
ss.Seek(0);
StandardParser parser(ss, cdb);
if (!parser.Parse()) {
printf("ERROR: Failed to parse configuration\n");
assert(parser.Parse());
cdb.MoveToRoot();
uint32 n = cdb.GetNumberOfChildren();
for (uint32 i=0; i<n; i++) {
const char8* name = cdb.GetChildName(i);
ConfigurationDatabase child;
cdb.MoveRelative(name);
cdb.Copy(child);
cdb.MoveToAncestor(1u);
StreamString className;
child.Read("Class", className);
Reference ref(className.Buffer(), GlobalObjectsDatabase::Instance()->GetStandardHeap());
ref->SetName(name);
assert(ref->Initialise(child));
ObjectRegistryDatabase::Instance()->Insert(ref);
}
Reference serviceGeneric = ObjectRegistryDatabase::Instance()->Find("DebugService");
Reference appGeneric = ObjectRegistryDatabase::Instance()->Find("App");
if (!serviceGeneric.IsValid() || !appGeneric.IsValid()) {
printf("ERROR: Objects NOT FOUND even without prefix\n");
return;
}
if (!ObjectRegistryDatabase::Instance()->Initialise(cdb)) {
printf("ERROR: Failed to initialise ObjectRegistryDatabase.\n");
return;
}
DebugService *service = dynamic_cast<DebugService*>(serviceGeneric.operator->());
RealTimeApplication *app = dynamic_cast<RealTimeApplication*>(appGeneric.operator->());
ReferenceT<DebugService> service = ObjectRegistryDatabase::Instance()->Find("DebugService");
if (!service.IsValid()) {
printf("ERROR: DebugService not found\n");
return;
}
ReferenceT<RealTimeApplication> app = ObjectRegistryDatabase::Instance()->Find("App");
if (!app.IsValid()) {
printf("ERROR: App not found\n");
return;
}
assert(service);
assert(app);
if (!app->ConfigureApplication()) {
printf("ERROR: Failed to configure application\n");
printf("ERROR: ConfigureApplication failed.\n");
return;
}
if (app->PrepareNextState("State1") != ErrorManagement::NoError) {
printf("ERROR: Failed to prepare state State1\n");
return;
}
assert(app->PrepareNextState("State1") == ErrorManagement::NoError);
assert(app->StartNextStateExecution() == ErrorManagement::NoError);
if (app->StartNextStateExecution() != ErrorManagement::NoError) {
printf("ERROR: Failed to start execution\n");
return;
}
printf("Application started at 1kHz. Enabling Traces...\n");
Sleep::MSec(500);
printf("Application and DebugService are active.\n");
Sleep::MSec(1000);
// The registered name in DebugBrokerWrapper depends on GetFullObjectName
// With App as root, it should be App.Data.Timer.Counter
service->TraceSignal("App.Data.Timer.Counter", true, 1);
// DIRECT ACTIVATION: Use the public TraceSignal method
printf("Activating trace directly...\n");
// We try multiple potential paths to be safe
uint32 traceCount = 0;
traceCount += service->TraceSignal("App.Data.Timer.Counter", true, 1);
traceCount += service->TraceSignal("Timer.Counter", true, 1);
traceCount += service->TraceSignal("Counter", true, 1);
printf("Trace enabled (Matched Aliases: %u)\n", traceCount);
// 4. Setup UDP Listener
BasicUDPSocket listener;
if (!listener.Open()) { printf("ERROR: Failed to open UDP socket\n"); return; }
if (!listener.Listen(8081)) { printf("ERROR: Failed to listen on UDP 8081\n"); return; }
listener.Open();
listener.Listen(8081);
// 5. Validate for 10 seconds
printf("Validating telemetry for 10 seconds...\n");
uint32 lastVal = 0;
printf("Validating for 10 seconds...\n");
uint32 lastCounter = 0;
bool first = true;
uint32 packetCount = 0;
uint32 discontinuityCount = 0;
uint32 totalSamples = 0;
uint32 discontinuities = 0;
uint32 totalPackets = 0;
float64 startTime = HighResolutionTimer::Counter() * HighResolutionTimer::Period();
float64 globalTimeout = startTime + 30.0;
float64 startTest = HighResolutionTimer::Counter() * HighResolutionTimer::Period();
while ((HighResolutionTimer::Counter() * HighResolutionTimer::Period() - startTime) < 10.0) {
if (HighResolutionTimer::Counter() * HighResolutionTimer::Period() > globalTimeout) {
printf("CRITICAL ERROR: Global test timeout reached.\n");
break;
}
char buffer[2048];
uint32 size = 2048;
TimeoutType timeout(200);
if (listener.Read(buffer, size, timeout)) {
while ((HighResolutionTimer::Counter() * HighResolutionTimer::Period() - startTest) < 10.0) {
char buffer[4096];
uint32 size = 4096;
if (listener.Read(buffer, size, TimeoutType(100))) {
totalPackets++;
TraceHeader *h = (TraceHeader*)buffer;
if (h->magic == 0xDA7A57AD && h->count > 0) {
uint32 offset = sizeof(TraceHeader);
// Packet format: [Header][ID:4][Size:4][Value:N]
if (h->magic != 0xDA7A57AD) continue;
uint32 offset = sizeof(TraceHeader);
for (uint32 i=0; i<h->count; i++) {
uint32 sigId = *(uint32*)(&buffer[offset]);
uint32 val = *(uint32*)(&buffer[offset + 8]);
if (!first) {
if (val != lastVal + 1) {
discontinuityCount++;
if (sigId == 0) {
if (!first) {
if (val != lastCounter + 1) {
discontinuities++;
}
}
lastCounter = val;
totalSamples++;
}
lastVal = val;
first = false;
packetCount++;
if (packetCount % 200 == 0) {
printf("Received %u packets... Current Value: %u\n", packetCount, val);
}
uint32 sigSize = *(uint32*)(&buffer[offset + 4]);
offset += (8 + sigSize);
}
first = false;
}
}
printf("Test Finished.\n");
printf("Total Packets Received: %u (Expected ~1000)\n", packetCount);
printf("Discontinuities: %u\n", discontinuityCount);
printf("\n--- Test Results ---\n");
printf("Total UDP Packets: %u\n", totalPackets);
printf("Total Counter Samples: %u\n", totalSamples);
printf("Counter Discontinuities: %u\n", discontinuities);
float64 actualFreq = (float64)packetCount / 10.0;
printf("Average Frequency: %.2f Hz\n", actualFreq);
if (packetCount < 100) {
printf("FAILURE: Almost no packets received. Telemetry is broken.\n");
} else if (packetCount < 800) {
printf("WARNING: Too few packets received (Expected 1000, Got %u).\n", packetCount);
} else if (discontinuityCount > 20) {
printf("FAILURE: Too many discontinuities (%u).\n", discontinuityCount);
if (totalSamples < 9000) {
printf("FAILURE: Underflow - samples missing (%u).\n", totalSamples);
} else if (discontinuities > 10) {
printf("FAILURE: Excessive discontinuities detected! (%u)\n", discontinuities);
} else {
printf("VALIDATION SUCCESSFUL!\n");
printf("VALIDATION SUCCESSFUL: 1kHz Lossless Tracing Verified.\n");
}
app->StopCurrentStateExecution();
listener.Close();
ObjectRegistryDatabase::Instance()->Purge();
}

View File

@@ -80,6 +80,7 @@ enum InternalEvent {
TraceRequested(String),
ClearTrace(String),
UdpStats(u64),
UdpDropped(u32),
}
// --- App State ---
@@ -126,6 +127,7 @@ struct MarteDebugApp {
node_info: String,
udp_packets: u64,
udp_dropped: u64,
forcing_dialog: Option<ForcingDialog>,
@@ -198,6 +200,7 @@ impl MarteDebugApp {
selected_node: "".to_string(),
node_info: "".to_string(),
udp_packets: 0,
udp_dropped: 0,
forcing_dialog: None,
tx_cmd,
rx_events,
@@ -206,7 +209,6 @@ impl MarteDebugApp {
}
fn render_tree(&mut self, ui: &mut egui::Ui, item: &TreeItem, path: String) {
// Strip "Root" from paths to match server discovery
let current_path = if path.is_empty() {
if item.name == "Root" { "".to_string() } else { item.name.clone() }
} else {
@@ -387,6 +389,7 @@ fn tcp_log_worker(shared_config: Arc<Mutex<ConnectionConfig>>, tx_events: Sender
fn udp_worker(shared_config: Arc<Mutex<ConnectionConfig>>, id_to_meta: Arc<Mutex<HashMap<u32, SignalMetadata>>>, traced_data: Arc<Mutex<HashMap<String, TraceData>>>, tx_events: Sender<InternalEvent>) {
let mut current_version = 0;
let mut socket: Option<UdpSocket> = None;
let mut last_seq: Option<u32> = None;
loop {
let (ver, port) = {
@@ -404,6 +407,9 @@ fn udp_worker(shared_config: Arc<Mutex<ConnectionConfig>>, id_to_meta: Arc<Mutex
#[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
let _ = sock.set_reuse_port(true);
// CRITICAL: Increase receive buffer to 10MB to avoid drops at 100kHz
let _ = sock.set_recv_buffer_size(10 * 1024 * 1024);
let addr = format!("0.0.0.0:{}", port_num).parse::<std::net::SocketAddr>().unwrap();
if sock.bind(&addr.into()).is_ok() {
socket = Some(sock.into());
@@ -417,6 +423,7 @@ fn udp_worker(shared_config: Arc<Mutex<ConnectionConfig>>, id_to_meta: Arc<Mutex
continue;
}
let _ = socket.as_ref().unwrap().set_read_timeout(Some(std::time::Duration::from_millis(500)));
last_seq = None;
}
let s = socket.as_ref().unwrap();
@@ -431,7 +438,7 @@ fn udp_worker(shared_config: Arc<Mutex<ConnectionConfig>>, id_to_meta: Arc<Mutex
if let Ok(n) = s.recv(&mut buf) {
total_packets += 1;
if (total_packets % 100) == 0 {
if (total_packets % 500) == 0 {
let _ = tx_events.send(InternalEvent::UdpStats(total_packets));
}
@@ -440,14 +447,27 @@ fn udp_worker(shared_config: Arc<Mutex<ConnectionConfig>>, id_to_meta: Arc<Mutex
let mut magic_buf = [0u8; 4]; magic_buf.copy_from_slice(&buf[0..4]);
if u32::from_le_bytes(magic_buf) != 0xDA7A57AD { continue; }
// Sequence check
let mut seq_buf = [0u8; 4]; seq_buf.copy_from_slice(&buf[4..8]);
let seq = u32::from_le_bytes(seq_buf);
if let Some(last) = last_seq {
if seq != last + 1 {
let dropped = if seq > last { seq - last - 1 } else { 0 };
if dropped > 0 {
let _ = tx_events.send(InternalEvent::UdpDropped(dropped));
}
}
}
last_seq = Some(seq);
let mut count_buf = [0u8; 4]; count_buf.copy_from_slice(&buf[16..20]);
let count = u32::from_le_bytes(count_buf);
let now = start_time.elapsed().as_secs_f64();
let mut offset = 20;
let mut local_updates: HashMap<String, Vec<[f64; 2]>> = HashMap::new();
let metas = id_to_meta.lock().unwrap();
let mut data_map = traced_data.lock().unwrap();
for _ in 0..count {
if offset + 8 > n { break; }
@@ -484,14 +504,26 @@ fn udp_worker(shared_config: Arc<Mutex<ConnectionConfig>>, id_to_meta: Arc<Mutex
};
for name in &meta.names {
if let Some(entry) = data_map.get_mut(name) {
entry.values.push_back([now, val]);
if entry.values.len() > 5000 { entry.values.pop_front(); }
}
local_updates.entry(name.clone()).or_default().push([now, val]);
}
}
offset += size as usize;
}
drop(metas);
if !local_updates.is_empty() {
let mut data_map = traced_data.lock().unwrap();
for (name, new_points) in local_updates {
if let Some(entry) = data_map.get_mut(&name) {
for point in new_points {
entry.values.push_back(point);
}
while entry.values.len() > 10000 {
entry.values.pop_front();
}
}
}
}
}
}
}
@@ -531,7 +563,7 @@ impl eframe::App for MarteDebugApp {
}
InternalEvent::TraceRequested(name) => {
let mut data_map = self.traced_signals.lock().unwrap();
data_map.entry(name).or_insert_with(|| TraceData { values: VecDeque::with_capacity(5000) });
data_map.entry(name).or_insert_with(|| TraceData { values: VecDeque::with_capacity(10000) });
}
InternalEvent::ClearTrace(name) => {
let mut data_map = self.traced_signals.lock().unwrap();
@@ -547,6 +579,9 @@ impl eframe::App for MarteDebugApp {
InternalEvent::UdpStats(count) => {
self.udp_packets = count;
}
InternalEvent::UdpDropped(dropped) => {
self.udp_dropped += dropped as u64;
}
InternalEvent::Connected => {
self.connected = true;
let _ = self.tx_cmd.send("TREE".to_string());
@@ -636,7 +671,7 @@ impl eframe::App for MarteDebugApp {
}
}
ui.with_layout(egui::Layout::right_to_left(egui::Align::Center), |ui| {
ui.label(format!("UDP Packets: {}", self.udp_packets));
ui.label(format!("UDP: OK [{}] / DROPPED [{}]", self.udp_packets, self.udp_dropped));
});
});
});
@@ -797,7 +832,6 @@ impl eframe::App for MarteDebugApp {
ui.horizontal(|ui| {
ui.heading("Oscilloscope");
if ui.button("🔄 Reset View").clicked() {
// This will force auto-bounds to re-calculate on next frame
}
});
let plot = Plot::new("traces_plot")