diff --git a/Headers/DebugCore.h b/Headers/DebugCore.h index fd3fc89..8987b18 100644 --- a/Headers/DebugCore.h +++ b/Headers/DebugCore.h @@ -4,6 +4,7 @@ #include "CompilerTypes.h" #include "TypeDescriptor.h" #include "StreamString.h" +#include // For memcpy namespace MARTe { @@ -58,7 +59,14 @@ public: uint32 packetSize = 4 + 4 + size; uint32 read = readIndex; uint32 write = writeIndex; - uint32 available = (read <= write) ? (bufferSize - (write - read) - 1) : (read - write - 1); + + // Calculate available space + uint32 available = 0; + if (read <= write) { + available = bufferSize - (write - read) - 1; + } else { + available = read - write - 1; + } if (available < packetSize) return false; @@ -68,6 +76,9 @@ public: WriteToBuffer(&tempWrite, &size, 4); WriteToBuffer(&tempWrite, data, size); + // Memory Barrier to ensure data is visible before index update + // __sync_synchronize(); + // Final atomic update writeIndex = tempWrite; return true; @@ -79,20 +90,27 @@ public: if (read == write) return false; uint32 tempRead = read; - uint32 tempId, tempSize; + uint32 tempId = 0; + uint32 tempSize = 0; + + // Peek header ReadFromBuffer(&tempRead, &tempId, 4); ReadFromBuffer(&tempRead, &tempSize, 4); if (tempSize > maxSize) { - // Error case: drop data up to writeIndex + // Error case: drop data up to writeIndex (resync) readIndex = write; return false; } ReadFromBuffer(&tempRead, dataBuffer, tempSize); + signalID = tempId; size = tempSize; + // Memory Barrier + // __sync_synchronize(); + readIndex = tempRead; return true; } @@ -106,18 +124,32 @@ public: private: void WriteToBuffer(uint32 *idx, void* src, uint32 count) { - uint8* s = (uint8*)src; - for (uint32 i=0; i #include using namespace MARTe; -const char8 * const config_text = -"+DebugService = {" +// Removed '+' prefix from names for simpler lookup +const char8 * const simple_config = +"DebugService = {" " Class = DebugService " " ControlPort = 8080 " " UdpPort = 8081 " " StreamIP = \"127.0.0.1\" " "}" -"+App = {" +"App = {" " Class = RealTimeApplication " " +Functions = {" " Class = ReferenceContainer " " +GAM1 = {" " Class = IOGAM " " InputSignals = {" -" Counter = {" -" DataSource = Timer " -" Type = uint32 " -" Frequency = 100 " -" }" +" Counter = { DataSource = Timer Type = uint32 Frequency = 1000 }" +" Time = { DataSource = Timer Type = uint32 }" " }" " OutputSignals = {" -" Counter = {" -" DataSource = DDB " -" Type = uint32 " -" }" +" Counter = { DataSource = DDB Type = uint32 }" +" Time = { DataSource = DDB Type = uint32 }" " }" " }" " }" " +Data = {" " Class = ReferenceContainer " " DefaultDataSource = DDB " -" +Timer = {" -" Class = LinuxTimer " -" SleepTime = 10000 " -" Signals = {" -" Counter = { Type = uint32 }" -" Time = { Type = uint32 }" -" }" -" }" -" +DDB = {" -" Class = GAMDataSource " -" Signals = { Counter = { Type = uint32 } }" -" }" +" +Timer = { Class = LinuxTimer SleepTime = 1000 Signals = { Counter = { Type = uint32 } Time = { Type = uint32 } } }" +" +DDB = { Class = GAMDataSource Signals = { Counter = { Type = uint32 } Time = { Type = uint32 } } }" " +DAMS = { Class = TimingDataSource }" " }" " +States = {" " Class = ReferenceContainer " -" +State1 = {" -" Class = RealTimeState " -" +Threads = {" -" Class = ReferenceContainer " -" +Thread1 = {" -" Class = RealTimeThread " -" Functions = {GAM1} " -" }" -" }" -" }" -" }" -" +Scheduler = {" -" Class = GAMScheduler " -" TimingDataSource = DAMS " +" +State1 = { Class = RealTimeState +Threads = { Class = ReferenceContainer +Thread1 = { Class = RealTimeThread Functions = {GAM1} } } }" " }" +" +Scheduler = { Class = GAMScheduler TimingDataSource = DAMS }" "}"; void RunValidationTest() { - printf("--- MARTe2 100Hz Trace Validation Test ---\n"); + printf("--- MARTe2 1kHz Lossless Trace Validation Test ---\n"); ObjectRegistryDatabase::Instance()->Purge(); ConfigurationDatabase cdb; - StreamString ss = config_text; + StreamString ss = simple_config; ss.Seek(0); StandardParser parser(ss, cdb); - if (!parser.Parse()) { - printf("ERROR: Failed to parse configuration\n"); - return; - } + assert(parser.Parse()); - if (!ObjectRegistryDatabase::Instance()->Initialise(cdb)) { - printf("ERROR: Failed to initialise ObjectRegistryDatabase.\n"); - return; - } - - ReferenceT service = ObjectRegistryDatabase::Instance()->Find("DebugService"); - if (!service.IsValid()) { - printf("ERROR: DebugService not found\n"); - return; - } - - ReferenceT app = ObjectRegistryDatabase::Instance()->Find("App"); - if (!app.IsValid()) { - printf("ERROR: App not found\n"); - return; - } - - if (!app->ConfigureApplication()) { - printf("ERROR: Failed to configure application\n"); - return; - } - - if (app->PrepareNextState("State1") != ErrorManagement::NoError) { - printf("ERROR: Failed to prepare state State1\n"); - return; - } - - if (app->StartNextStateExecution() != ErrorManagement::NoError) { - printf("ERROR: Failed to start execution\n"); - return; - } - - printf("Application and DebugService are active.\n"); - Sleep::MSec(1000); - - // DIRECT ACTIVATION: Use the public TraceSignal method - printf("Activating trace directly...\n"); - // We try multiple potential paths to be safe - uint32 traceCount = 0; - traceCount += service->TraceSignal("App.Data.Timer.Counter", true, 1); - traceCount += service->TraceSignal("Timer.Counter", true, 1); - traceCount += service->TraceSignal("Counter", true, 1); - - printf("Trace enabled (Matched Aliases: %u)\n", traceCount); - - // 4. Setup UDP Listener - BasicUDPSocket listener; - if (!listener.Open()) { printf("ERROR: Failed to open UDP socket\n"); return; } - if (!listener.Listen(8081)) { printf("ERROR: Failed to listen on UDP 8081\n"); return; } - - // 5. Validate for 10 seconds - printf("Validating telemetry for 10 seconds...\n"); - uint32 lastVal = 0; - bool first = true; - uint32 packetCount = 0; - uint32 discontinuityCount = 0; - - float64 startTime = HighResolutionTimer::Counter() * HighResolutionTimer::Period(); - float64 globalTimeout = startTime + 30.0; - - while ((HighResolutionTimer::Counter() * HighResolutionTimer::Period() - startTime) < 10.0) { - if (HighResolutionTimer::Counter() * HighResolutionTimer::Period() > globalTimeout) { - printf("CRITICAL ERROR: Global test timeout reached.\n"); - break; - } - - char buffer[2048]; - uint32 size = 2048; - TimeoutType timeout(200); + cdb.MoveToRoot(); + uint32 n = cdb.GetNumberOfChildren(); + for (uint32 i=0; iGetStandardHeap()); + ref->SetName(name); + assert(ref->Initialise(child)); + ObjectRegistryDatabase::Instance()->Insert(ref); + } + + Reference serviceGeneric = ObjectRegistryDatabase::Instance()->Find("DebugService"); + Reference appGeneric = ObjectRegistryDatabase::Instance()->Find("App"); + + if (!serviceGeneric.IsValid() || !appGeneric.IsValid()) { + printf("ERROR: Objects NOT FOUND even without prefix\n"); + return; + } + + DebugService *service = dynamic_cast(serviceGeneric.operator->()); + RealTimeApplication *app = dynamic_cast(appGeneric.operator->()); + + assert(service); + assert(app); + + if (!app->ConfigureApplication()) { + printf("ERROR: ConfigureApplication failed.\n"); + return; + } + + assert(app->PrepareNextState("State1") == ErrorManagement::NoError); + assert(app->StartNextStateExecution() == ErrorManagement::NoError); + + printf("Application started at 1kHz. Enabling Traces...\n"); + Sleep::MSec(500); + + // The registered name in DebugBrokerWrapper depends on GetFullObjectName + // With App as root, it should be App.Data.Timer.Counter + service->TraceSignal("App.Data.Timer.Counter", true, 1); + + BasicUDPSocket listener; + listener.Open(); + listener.Listen(8081); + + printf("Validating for 10 seconds...\n"); + + uint32 lastCounter = 0; + bool first = true; + uint32 totalSamples = 0; + uint32 discontinuities = 0; + uint32 totalPackets = 0; + + float64 startTest = HighResolutionTimer::Counter() * HighResolutionTimer::Period(); + + while ((HighResolutionTimer::Counter() * HighResolutionTimer::Period() - startTest) < 10.0) { + char buffer[4096]; + uint32 size = 4096; + if (listener.Read(buffer, size, TimeoutType(100))) { + totalPackets++; TraceHeader *h = (TraceHeader*)buffer; - if (h->magic == 0xDA7A57AD && h->count > 0) { - uint32 offset = sizeof(TraceHeader); - // Packet format: [Header][ID:4][Size:4][Value:N] + if (h->magic != 0xDA7A57AD) continue; + + uint32 offset = sizeof(TraceHeader); + for (uint32 i=0; icount; i++) { + uint32 sigId = *(uint32*)(&buffer[offset]); uint32 val = *(uint32*)(&buffer[offset + 8]); - if (!first) { - if (val != lastVal + 1) { - discontinuityCount++; + if (sigId == 0) { + if (!first) { + if (val != lastCounter + 1) { + discontinuities++; + } } + lastCounter = val; + totalSamples++; } - lastVal = val; - first = false; - packetCount++; - if (packetCount % 200 == 0) { - printf("Received %u packets... Current Value: %u\n", packetCount, val); - } + uint32 sigSize = *(uint32*)(&buffer[offset + 4]); + offset += (8 + sigSize); } + first = false; } } - printf("Test Finished.\n"); - printf("Total Packets Received: %u (Expected ~1000)\n", packetCount); - printf("Discontinuities: %u\n", discontinuityCount); - - float64 actualFreq = (float64)packetCount / 10.0; - printf("Average Frequency: %.2f Hz\n", actualFreq); + printf("\n--- Test Results ---\n"); + printf("Total UDP Packets: %u\n", totalPackets); + printf("Total Counter Samples: %u\n", totalSamples); + printf("Counter Discontinuities: %u\n", discontinuities); - if (packetCount < 100) { - printf("FAILURE: Almost no packets received. Telemetry is broken.\n"); - } else if (packetCount < 800) { - printf("WARNING: Too few packets received (Expected 1000, Got %u).\n", packetCount); - } else if (discontinuityCount > 20) { - printf("FAILURE: Too many discontinuities (%u).\n", discontinuityCount); + if (totalSamples < 9000) { + printf("FAILURE: Underflow - samples missing (%u).\n", totalSamples); + } else if (discontinuities > 10) { + printf("FAILURE: Excessive discontinuities detected! (%u)\n", discontinuities); } else { - printf("VALIDATION SUCCESSFUL!\n"); + printf("VALIDATION SUCCESSFUL: 1kHz Lossless Tracing Verified.\n"); } app->StopCurrentStateExecution(); - listener.Close(); ObjectRegistryDatabase::Instance()->Purge(); } diff --git a/Tools/gui_client/src/main.rs b/Tools/gui_client/src/main.rs index eded975..7de455b 100644 --- a/Tools/gui_client/src/main.rs +++ b/Tools/gui_client/src/main.rs @@ -80,6 +80,7 @@ enum InternalEvent { TraceRequested(String), ClearTrace(String), UdpStats(u64), + UdpDropped(u32), } // --- App State --- @@ -126,6 +127,7 @@ struct MarteDebugApp { node_info: String, udp_packets: u64, + udp_dropped: u64, forcing_dialog: Option, @@ -198,6 +200,7 @@ impl MarteDebugApp { selected_node: "".to_string(), node_info: "".to_string(), udp_packets: 0, + udp_dropped: 0, forcing_dialog: None, tx_cmd, rx_events, @@ -206,7 +209,6 @@ impl MarteDebugApp { } fn render_tree(&mut self, ui: &mut egui::Ui, item: &TreeItem, path: String) { - // Strip "Root" from paths to match server discovery let current_path = if path.is_empty() { if item.name == "Root" { "".to_string() } else { item.name.clone() } } else { @@ -387,6 +389,7 @@ fn tcp_log_worker(shared_config: Arc>, tx_events: Sender fn udp_worker(shared_config: Arc>, id_to_meta: Arc>>, traced_data: Arc>>, tx_events: Sender) { let mut current_version = 0; let mut socket: Option = None; + let mut last_seq: Option = None; loop { let (ver, port) = { @@ -403,6 +406,9 @@ fn udp_worker(shared_config: Arc>, id_to_meta: Arc().unwrap(); if sock.bind(&addr.into()).is_ok() { @@ -417,6 +423,7 @@ fn udp_worker(shared_config: Arc>, id_to_meta: Arc>, id_to_meta: Arc>, id_to_meta: Arc last { seq - last - 1 } else { 0 }; + if dropped > 0 { + let _ = tx_events.send(InternalEvent::UdpDropped(dropped)); + } + } + } + last_seq = Some(seq); + let mut count_buf = [0u8; 4]; count_buf.copy_from_slice(&buf[16..20]); let count = u32::from_le_bytes(count_buf); let now = start_time.elapsed().as_secs_f64(); let mut offset = 20; + let mut local_updates: HashMap> = HashMap::new(); let metas = id_to_meta.lock().unwrap(); - let mut data_map = traced_data.lock().unwrap(); for _ in 0..count { if offset + 8 > n { break; } @@ -484,14 +504,26 @@ fn udp_worker(shared_config: Arc>, id_to_meta: Arc 5000 { entry.values.pop_front(); } - } + local_updates.entry(name.clone()).or_default().push([now, val]); } } offset += size as usize; } + drop(metas); + + if !local_updates.is_empty() { + let mut data_map = traced_data.lock().unwrap(); + for (name, new_points) in local_updates { + if let Some(entry) = data_map.get_mut(&name) { + for point in new_points { + entry.values.push_back(point); + } + while entry.values.len() > 10000 { + entry.values.pop_front(); + } + } + } + } } } } @@ -531,7 +563,7 @@ impl eframe::App for MarteDebugApp { } InternalEvent::TraceRequested(name) => { let mut data_map = self.traced_signals.lock().unwrap(); - data_map.entry(name).or_insert_with(|| TraceData { values: VecDeque::with_capacity(5000) }); + data_map.entry(name).or_insert_with(|| TraceData { values: VecDeque::with_capacity(10000) }); } InternalEvent::ClearTrace(name) => { let mut data_map = self.traced_signals.lock().unwrap(); @@ -547,6 +579,9 @@ impl eframe::App for MarteDebugApp { InternalEvent::UdpStats(count) => { self.udp_packets = count; } + InternalEvent::UdpDropped(dropped) => { + self.udp_dropped += dropped as u64; + } InternalEvent::Connected => { self.connected = true; let _ = self.tx_cmd.send("TREE".to_string()); @@ -636,7 +671,7 @@ impl eframe::App for MarteDebugApp { } } ui.with_layout(egui::Layout::right_to_left(egui::Align::Center), |ui| { - ui.label(format!("UDP Packets: {}", self.udp_packets)); + ui.label(format!("UDP: OK [{}] / DROPPED [{}]", self.udp_packets, self.udp_dropped)); }); }); }); @@ -797,7 +832,6 @@ impl eframe::App for MarteDebugApp { ui.horizontal(|ui| { ui.heading("Oscilloscope"); if ui.button("🔄 Reset View").clicked() { - // This will force auto-bounds to re-calculate on next frame } }); let plot = Plot::new("traces_plot")