better perf

This commit is contained in:
Martino Ferrari
2026-02-25 16:51:07 +01:00
parent aaf69c0949
commit dfb399bbba
12 changed files with 713 additions and 633 deletions

View File

@@ -13,13 +13,13 @@ use socket2::{Socket, Domain, Type, Protocol};
use regex::Regex;
use once_cell::sync::Lazy;
use rfd::FileDialog;
use arrow::array::{Float64Array, Array};
use arrow::array::Float64Array;
use arrow::record_batch::RecordBatch;
use arrow::datatypes::{DataType, Field, Schema};
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::file::properties::WriterProperties;
static APP_START_TIME: Lazy<std::time::Instant> = Lazy::new(std::time::Instant::now);
static BASE_TELEM_TS: Lazy<Mutex<Option<u64>>> = Lazy::new(|| Mutex::new(None));
// --- Models ---
@@ -157,6 +157,7 @@ enum InternalEvent {
UdpDropped(u32),
RecordPathChosen(String, String), // SignalName, FilePath
RecordingError(String, String), // SignalName, ErrorMessage
TelemMatched(u32), // Signal ID
}
// --- App State ---
@@ -209,6 +210,7 @@ struct MarteDebugApp {
node_info: String,
udp_packets: u64,
udp_dropped: u64,
telem_match_count: HashMap<u32, u64>,
forcing_dialog: Option<ForcingDialog>,
style_editor: Option<(usize, usize)>,
tx_cmd: Sender<String>,
@@ -246,7 +248,7 @@ impl MarteDebugApp {
log_filters: LogFilters { show_debug: true, show_info: true, show_warning: true, show_error: true, paused: false, content_regex: "".to_string() },
show_left_panel: true, show_right_panel: true, show_bottom_panel: true,
selected_node: "".to_string(), node_info: "".to_string(),
udp_packets: 0, udp_dropped: 0,
udp_packets: 0, udp_dropped: 0, telem_match_count: HashMap::new(),
forcing_dialog: None, style_editor: None,
tx_cmd, rx_events, internal_tx,
shared_x_range: None,
@@ -339,13 +341,38 @@ fn tcp_command_worker(shared_config: Arc<Mutex<ConnectionConfig>>, rx_cmd: Recei
if *stop_flag_reader.lock().unwrap() { break; }
let trimmed = line.trim();
if trimmed.is_empty() { line.clear(); continue; }
if !in_json && trimmed.starts_with("{") { in_json = true; json_acc.clear(); }
if in_json {
json_acc.push_str(trimmed);
if trimmed == "OK DISCOVER" { in_json = false; let json_clean = json_acc.trim_end_matches("OK DISCOVER").trim(); if let Ok(resp) = serde_json::from_str::<DiscoverResponse>(json_clean) { let _ = tx_events_inner.send(InternalEvent::Discovery(resp.signals)); } json_acc.clear(); }
else if trimmed == "OK TREE" { in_json = false; let json_clean = json_acc.trim_end_matches("OK TREE").trim(); if let Ok(resp) = serde_json::from_str::<TreeItem>(json_clean) { let _ = tx_events_inner.send(InternalEvent::Tree(resp)); } json_acc.clear(); }
else if trimmed == "OK INFO" { in_json = false; let json_clean = json_acc.trim_end_matches("OK INFO").trim(); let _ = tx_events_inner.send(InternalEvent::NodeInfo(json_clean.to_string())); json_acc.clear(); }
} else { let _ = tx_events_inner.send(InternalEvent::CommandResponse(trimmed.to_string())); }
if trimmed.contains("OK DISCOVER") {
in_json = false;
let json_clean = json_acc.split("OK DISCOVER").next().unwrap_or("").trim();
match serde_json::from_str::<DiscoverResponse>(json_clean) {
Ok(resp) => { let _ = tx_events_inner.send(InternalEvent::Discovery(resp.signals)); }
Err(e) => { let _ = tx_events_inner.send(InternalEvent::InternalLog(format!("Discovery JSON Error: {} | Payload: {}", e, json_clean))); }
}
json_acc.clear();
}
else if trimmed.contains("OK TREE") {
in_json = false;
let json_clean = json_acc.split("OK TREE").next().unwrap_or("").trim();
match serde_json::from_str::<TreeItem>(json_clean) {
Ok(resp) => { let _ = tx_events_inner.send(InternalEvent::Tree(resp)); }
Err(e) => { let _ = tx_events_inner.send(InternalEvent::InternalLog(format!("Tree JSON Error: {}", e))); }
}
json_acc.clear();
}
else if trimmed.contains("OK INFO") {
in_json = false;
let json_clean = json_acc.split("OK INFO").next().unwrap_or("").trim();
let _ = tx_events_inner.send(InternalEvent::NodeInfo(json_clean.to_string()));
json_acc.clear();
}
} else {
let _ = tx_events_inner.send(InternalEvent::CommandResponse(trimmed.to_string()));
}
line.clear();
}
});
@@ -411,10 +438,13 @@ fn udp_worker(shared_config: Arc<Mutex<ConnectionConfig>>, id_to_meta: Arc<Mutex
let mut current_version = 0;
let mut socket: Option<UdpSocket> = None;
let mut last_seq: Option<u32> = None;
let mut last_warning_time = std::time::Instant::now();
loop {
let (ver, port) = { let config = shared_config.lock().unwrap(); (config.version, config.udp_port.clone()) };
if ver != current_version || socket.is_none() {
current_version = ver;
{ let mut base = BASE_TELEM_TS.lock().unwrap(); *base = None; }
if port.is_empty() { socket = None; continue; }
let port_num: u16 = port.parse().unwrap_or(8081);
let s = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).ok();
@@ -445,18 +475,40 @@ fn udp_worker(shared_config: Arc<Mutex<ConnectionConfig>>, id_to_meta: Arc<Mutex
if let Some(last) = last_seq { if seq != last + 1 && seq > last { let _ = tx_events.send(InternalEvent::UdpDropped(seq - last - 1)); } }
last_seq = Some(seq);
let count = u32::from_le_bytes(buf[16..20].try_into().unwrap());
let now = APP_START_TIME.elapsed().as_secs_f64();
let mut offset = 20;
let (mut local_updates, mut last_values): (HashMap<String, Vec<[f64; 2]>>, HashMap<String, f64>) = (HashMap::new(), HashMap::new());
let mut local_updates: HashMap<String, Vec<[f64; 2]>> = HashMap::new();
let mut last_values: HashMap<String, f64> = HashMap::new();
let metas = id_to_meta.lock().unwrap();
if metas.is_empty() && count > 0 && last_warning_time.elapsed().as_secs() > 5 {
let _ = tx_events.send(InternalEvent::InternalLog("UDP received but Metadata empty. Still discovering?".to_string()));
last_warning_time = std::time::Instant::now();
}
for _ in 0..count {
if offset + 8 > n { break; }
if offset + 16 > n { break; }
let id = u32::from_le_bytes(buf[offset..offset+4].try_into().unwrap());
let size = u32::from_le_bytes(buf[offset+4..offset+8].try_into().unwrap());
offset += 8;
let ts_raw = u64::from_le_bytes(buf[offset+4..offset+12].try_into().unwrap());
let size = u32::from_le_bytes(buf[offset+12..offset+16].try_into().unwrap());
offset += 16;
if offset + size as usize > n { break; }
let data_slice = &buf[offset..offset + size as usize];
let mut base_ts_guard = BASE_TELEM_TS.lock().unwrap();
if base_ts_guard.is_none() { *base_ts_guard = Some(ts_raw); }
let base = base_ts_guard.unwrap();
let ts_s = if ts_raw >= base {
(ts_raw - base) as f64 / 1000000.0
} else {
0.0 // Avoid huge jitter wrap-around
};
drop(base_ts_guard);
if let Some(meta) = metas.get(&id) {
let _ = tx_events.send(InternalEvent::TelemMatched(id));
let t = meta.sig_type.as_str();
let val = match size {
1 => { if t.contains('u') { data_slice[0] as f64 } else { (data_slice[0] as i8) as f64 } },
@@ -465,7 +517,10 @@ fn udp_worker(shared_config: Arc<Mutex<ConnectionConfig>>, id_to_meta: Arc<Mutex
8 => { let b = data_slice[0..8].try_into().unwrap(); if t.contains("float") { f64::from_le_bytes(b) } else if t.contains('u') { u64::from_le_bytes(b) as f64 } else { i64::from_le_bytes(b) as f64 } },
_ => 0.0,
};
for name in &meta.names { local_updates.entry(name.clone()).or_default().push([now, val]); last_values.insert(name.clone(), val); }
for name in &meta.names {
local_updates.entry(name.clone()).or_default().push([ts_s, val]);
last_values.insert(name.clone(), val);
}
}
offset += size as usize;
}
@@ -474,7 +529,10 @@ fn udp_worker(shared_config: Arc<Mutex<ConnectionConfig>>, id_to_meta: Arc<Mutex
let mut data_map = traced_data.lock().unwrap();
for (name, new_points) in local_updates {
if let Some(entry) = data_map.get_mut(&name) {
for point in new_points { entry.values.push_back(point); if let Some(tx) = &entry.recording_tx { let _ = tx.send(point); } }
for point in new_points {
entry.values.push_back(point);
if let Some(tx) = &entry.recording_tx { let _ = tx.send(point); }
}
if let Some(lv) = last_values.get(&name) { entry.last_value = *lv; }
while entry.values.len() > 100000 { entry.values.pop_front(); }
}
@@ -490,10 +548,22 @@ impl eframe::App for MarteDebugApp {
while let Ok(event) = self.rx_events.try_recv() {
match event {
InternalEvent::Log(log) => { if !self.log_filters.paused { self.logs.push_back(log); if self.logs.len() > 2000 { self.logs.pop_front(); } } }
InternalEvent::Discovery(signals) => { let mut metas = self.id_to_meta.lock().unwrap(); metas.clear(); for s in &signals { let meta = metas.entry(s.id).or_insert_with(|| SignalMetadata { names: Vec::new(), sig_type: s.sig_type.clone() }); if !meta.names.contains(&s.name) { meta.names.push(s.name.clone()); } } }
InternalEvent::Discovery(signals) => {
let mut metas = self.id_to_meta.lock().unwrap();
metas.clear();
for s in &signals {
let meta = metas.entry(s.id).or_insert_with(|| SignalMetadata { names: Vec::new(), sig_type: s.sig_type.clone() });
if !meta.names.contains(&s.name) { meta.names.push(s.name.clone()); }
}
self.logs.push_back(LogEntry { time: Local::now().format("%H:%M:%S").to_string(), level: "GUI_INFO".to_string(), message: format!("Discovery complete: {} signals mapped", signals.len()) });
}
InternalEvent::Tree(tree) => { self.app_tree = Some(tree); }
InternalEvent::NodeInfo(info) => { self.node_info = info; }
InternalEvent::TraceRequested(name) => { let mut data_map = self.traced_signals.lock().unwrap(); data_map.entry(name).or_insert_with(|| TraceData { values: VecDeque::with_capacity(10000), last_value: 0.0, recording_tx: None, recording_path: None }); }
InternalEvent::TraceRequested(name) => {
let mut data_map = self.traced_signals.lock().unwrap();
data_map.entry(name.clone()).or_insert_with(|| TraceData { values: VecDeque::with_capacity(10000), last_value: 0.0, recording_tx: None, recording_path: None });
self.logs.push_back(LogEntry { time: Local::now().format("%H:%M:%S").to_string(), level: "GUI_INFO".to_string(), message: format!("Trace requested for: {}", name) });
}
InternalEvent::ClearTrace(name) => { let mut data_map = self.traced_signals.lock().unwrap(); data_map.remove(&name); for plot in &mut self.plots { plot.signals.retain(|s| s.source_name != name); } }
InternalEvent::UdpStats(count) => { self.udp_packets = count; }
InternalEvent::UdpDropped(dropped) => { self.udp_dropped += dropped as u64; }
@@ -501,6 +571,7 @@ impl eframe::App for MarteDebugApp {
InternalEvent::Disconnected => { self.connected = false; }
InternalEvent::InternalLog(msg) => { self.logs.push_back(LogEntry { time: Local::now().format("%H:%M:%S").to_string(), level: "GUI_ERROR".to_string(), message: msg }); }
InternalEvent::CommandResponse(resp) => { self.logs.push_back(LogEntry { time: Local::now().format("%H:%M:%S").to_string(), level: "CMD_RESP".to_string(), message: resp }); }
InternalEvent::TelemMatched(id) => { *self.telem_match_count.entry(id).or_insert(0) += 1; }
InternalEvent::RecordPathChosen(name, path) => {
let mut data_map = self.traced_signals.lock().unwrap();
if let Some(entry) = data_map.get_mut(&name) {
@@ -585,6 +656,7 @@ impl eframe::App for MarteDebugApp {
ui.label("Logs:"); ui.text_edit_singleline(&mut self.config.log_port); ui.end_row();
});
if ui.button("🔄 Apply").clicked() { self.config.version += 1; *self.shared_config.lock().unwrap() = self.config.clone(); ui.close_menu(); }
if ui.button("📡 Re-Discover").clicked() { let _ = self.tx_cmd.send("DISCOVER".to_string()); ui.close_menu(); }
if ui.button("❌ Off").clicked() { self.config.version += 1; let mut cfg = self.config.clone(); cfg.ip = "".to_string(); *self.shared_config.lock().unwrap() = cfg; ui.close_menu(); }
});
ui.with_layout(egui::Layout::right_to_left(egui::Align::Center), |ui| { ui.label(format!("UDP: OK[{}] DROP[{}]", self.udp_packets, self.udp_dropped)); });
@@ -642,10 +714,16 @@ impl eframe::App for MarteDebugApp {
let regex = if !self.log_filters.content_regex.is_empty() { Regex::new(&self.log_filters.content_regex).ok() } else { None };
egui::ScrollArea::vertical().stick_to_bottom(true).auto_shrink([false, false]).show(ui, |ui| {
for log in &self.logs {
let show = match log.level.as_str() { "Debug" => self.log_filters.show_debug, "Information" => self.log_filters.show_info, "Warning" => self.log_filters.show_warning, "FatalError" | "OSError" | "ParametersError" => self.log_filters.show_error, _ => true };
let show = match log.level.as_str() {
"Debug" => self.log_filters.show_debug,
"Information" | "GUI_INFO" | "GUI_WARN" | "CMD_RESP" => self.log_filters.show_info,
"Warning" => self.log_filters.show_warning,
"FatalError" | "OSError" | "ParametersError" | "GUI_ERROR" | "REC_ERROR" => self.log_filters.show_error,
_ => true
};
if !show { continue; }
if let Some(re) = &regex { if !re.is_match(&log.message) && !re.is_match(&log.level) { continue; } }
let color = match log.level.as_str() { "FatalError" | "OSError" | "ParametersError" => egui::Color32::from_rgb(255, 100, 100), "Warning" => egui::Color32::from_rgb(255, 255, 100), "Information" => egui::Color32::from_rgb(100, 255, 100), "Debug" => egui::Color32::from_rgb(100, 100, 255), "GUI_ERROR" => egui::Color32::from_rgb(255, 50, 255), "CMD_RESP" => egui::Color32::from_rgb(255, 255, 255), _ => egui::Color32::WHITE };
let color = match log.level.as_str() { "FatalError" | "OSError" | "ParametersError" | "GUI_ERROR" | "REC_ERROR" => egui::Color32::from_rgb(255, 100, 100), "Warning" | "GUI_WARN" => egui::Color32::from_rgb(255, 255, 100), "Information" | "GUI_INFO" => egui::Color32::from_rgb(100, 255, 100), "Debug" => egui::Color32::from_rgb(100, 100, 255), "CMD_RESP" => egui::Color32::from_rgb(255, 255, 255), _ => egui::Color32::WHITE };
ui.horizontal_wrapped(|ui| { ui.label(egui::RichText::new(&log.time).color(egui::Color32::GRAY).monospace()); ui.label(egui::RichText::new(format!("[{}]", log.level)).color(color).strong()); ui.add(egui::Label::new(&log.message).wrap()); });
}
});
@@ -662,9 +740,26 @@ impl eframe::App for MarteDebugApp {
ui.group(|ui| {
ui.horizontal(|ui| { ui.label(egui::RichText::new(&plot_inst.id).strong()); ui.selectable_value(&mut plot_inst.plot_type, PlotType::Normal, "Series"); ui.selectable_value(&mut plot_inst.plot_type, PlotType::LogicAnalyzer, "Logic"); if ui.button("🗑").clicked() { to_remove = Some(p_idx); } });
let mut plot = Plot::new(&plot_inst.id).height(plot_height - 40.0).show_axes([true, true]);
plot = plot.x_axis_formatter(|mark, _range| {
let val = mark.value;
let hours = (val / 3600.0) as u32;
let mins = ((val % 3600.0) / 60.0) as u32;
let secs = val % 60.0;
format!("{:02}:{:02}:{:05.2}", hours, mins, secs)
});
let data_map = self.traced_signals.lock().unwrap();
let mut latest_t = 0.0;
for sig_cfg in &plot_inst.signals {
if let Some(data) = data_map.get(&sig_cfg.source_name) {
if let Some(last) = data.values.back() { if last[0] > latest_t { latest_t = last[0]; } }
}
}
if self.scope.enabled {
let window_s = self.scope.window_ms / 1000.0;
let center_t = if self.scope.mode == AcquisitionMode::Triggered { if self.scope.trigger_active { self.scope.last_trigger_time } else { APP_START_TIME.elapsed().as_secs_f64() } } else { APP_START_TIME.elapsed().as_secs_f64() };
let center_t = if self.scope.mode == AcquisitionMode::Triggered { if self.scope.trigger_active { self.scope.last_trigger_time } else { latest_t } } else { latest_t };
let x_min = center_t - (self.scope.pre_trigger_percent / 100.0) * window_s;
plot = plot.include_x(x_min).include_x(x_min + window_s);
if !self.scope.paused { plot = plot.auto_bounds(egui::Vec2b::new(true, true)); }
@@ -672,23 +767,25 @@ impl eframe::App for MarteDebugApp {
if let Some(range) = self.shared_x_range { if !plot_inst.auto_bounds { plot = plot.include_x(range[0]).include_x(range[1]); } }
if plot_inst.auto_bounds { plot = plot.auto_bounds(egui::Vec2b::new(true, true)); }
}
let plot_resp = plot.show(ui, |plot_ui| {
if !self.scope.enabled && !plot_inst.auto_bounds { if let Some(range) = self.shared_x_range { let bounds = plot_ui.plot_bounds(); plot_ui.set_plot_bounds(PlotBounds::from_min_max([range[0], bounds.min()[1]], [range[1], bounds.max()[1]])); } }
if self.scope.enabled && self.scope.mode == AcquisitionMode::Triggered && self.scope.trigger_active { plot_ui.vline(VLine::new(self.scope.last_trigger_time).color(egui::Color32::YELLOW).style(LineStyle::Dashed { length: 5.0 })); }
let data_map = self.traced_signals.lock().unwrap();
for (s_idx, sig_cfg) in plot_inst.signals.iter().enumerate() {
if let Some(data) = data_map.get(&sig_cfg.source_name) {
let mut points = Vec::new();
for [t, v] in &data.values {
let points_iter = data.values.iter().rev().take(5000).rev().map(|[t, v]| {
let mut final_v = *v * sig_cfg.gain + sig_cfg.offset;
if plot_inst.plot_type == PlotType::LogicAnalyzer { final_v = (s_idx as f64 * 1.5) + (if final_v > 0.5 { 1.0 } else { 0.0 }); }
points.push([*t, final_v]);
}
plot_ui.line(Line::new(PlotPoints::from(points)).name(&sig_cfg.label).color(sig_cfg.color));
[*t, final_v]
});
plot_ui.line(Line::new(PlotPoints::from_iter(points_iter)).name(&sig_cfg.label).color(sig_cfg.color));
}
}
if p_idx == 0 || current_range.is_none() { let b = plot_ui.plot_bounds(); current_range = Some([b.min()[0], b.max()[0]]); }
});
drop(data_map);
if plot_resp.response.hovered() && ctx.input(|i| i.pointer.any_released()) {
if let Some(dropped) = ctx.data_mut(|d| d.get_temp::<String>(egui::Id::new("drag_signal"))) {
let color = Self::next_color(plot_inst.signals.len());