Initial commit

This commit is contained in:
Martino Ferrari
2026-02-24 13:15:57 +01:00
commit 607e1b4dd1
11 changed files with 3057 additions and 0 deletions

81
hmi/cli_client.py Normal file
View File

@@ -0,0 +1,81 @@
import serial
import argparse
import sys
import time
def handle_comm(ser, cmd):
ser.reset_input_buffer()
ser.write((cmd + "\n").encode())
time.sleep(0.1)
start = time.time()
while time.time() - start < 1.0:
line = ser.readline().decode(errors='replace').strip()
if line and any(x in line for x in ["ACK", "PONG", "ERR", "READY"]):
return line
return "[No Response]"
def main():
parser = argparse.ArgumentParser(description='ESP32-P4 Waveform Gen CLI')
parser.add_argument('--port', default='/dev/ttyACM0', help='Serial port')
parser.add_argument('--interactive', action='store_true', help='Interactive mode')
# State Commands
parser.add_argument('--enable', action='store_true', help='Enable power stage')
parser.add_argument('--disable', action='store_true', help='Force both outputs LOW')
parser.add_argument('--on', action='store_true', help='Manual ON (A=0, B=1)')
parser.add_argument('--off', action='store_true', help='Manual OFF (A=1, B=0)')
parser.add_argument('--mod-on', action='store_true', help='Start modulation')
parser.add_argument('--mod-off', action='store_true', help='Stop modulation (return to OFF)')
parser.add_argument('--freq', type=int, help='Set frequency (10-5000 Hz)')
parser.add_argument('--ping', action='store_true')
args = parser.parse_args()
try:
ser = serial.Serial()
ser.port = args.port
ser.baudrate = 115200
ser.timeout = 0.5
ser.dtr = False
ser.rts = False
ser.open()
if args.interactive:
print(f"Connected to {args.port}.")
print("Commands: ENABLE, DISABLE, ON, OFF, MOD_ON, MOD_OFF, F <hz>, PING, exit")
while True:
try:
cmd = input(">> ").strip()
if cmd.lower() in ['exit', 'quit']: break
if not cmd: continue
print(f"Response: {handle_comm(ser, cmd)}")
except KeyboardInterrupt:
break
ser.close()
return
# Determine which command to send
cmd = None
if args.ping: cmd = "PING"
elif args.enable: cmd = "ENABLE"
elif args.disable: cmd = "DISABLE"
elif args.on: cmd = "ON"
elif args.off: cmd = "OFF"
elif args.mod_on: cmd = "MOD_ON"
elif args.mod_off: cmd = "MOD_OFF"
elif args.freq is not None: cmd = f"F {args.freq}"
if cmd:
print(f"Sending: {cmd}")
print(f"Response: {handle_comm(ser, cmd)}")
else:
print("Error: No action specified. Use --help for options or --interactive.")
ser.close()
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == '__main__':
main()

123
hmi/hmi.py Normal file
View File

@@ -0,0 +1,123 @@
import threading
import time
import tkinter as tk
from tkinter import messagebox, ttk
import serial
import serial.tools.list_ports
class WaveformGenHMI:
def __init__(self, root):
self.root = root
self.root.title("ESP32-P4 Waveform Generator")
self.root.geometry("400x300")
self.ser = None
# UI Elements
ttk.Label(root, text="Serial Port:").grid(
row=0, column=0, padx=5, pady=5, sticky="w"
)
self.port_var = tk.StringVar()
self.port_combo = ttk.Combobox(root, textvariable=self.port_var)
self.port_combo.grid(row=0, column=1, padx=5, pady=5)
self.refresh_ports()
ttk.Button(root, text="Connect", command=self.toggle_connect).grid(
row=0, column=2, padx=5, pady=5
)
ttk.Label(root, text="GPIO Pin:").grid(
row=1, column=0, padx=5, pady=5, sticky="w"
)
self.pin_entry = ttk.Entry(root)
self.pin_entry.insert(0, "21")
self.pin_entry.grid(row=1, column=1, padx=5, pady=5)
ttk.Label(root, text="Frequency (Hz):").grid(
row=2, column=0, padx=5, pady=5, sticky="w"
)
self.freq_slider = tk.Scale(
root, from_=0, to=5000, orient=tk.HORIZONTAL, length=200
)
self.freq_slider.grid(row=2, column=1, columnspan=2, padx=5, pady=5)
self.freq_entry = ttk.Entry(root, width=10)
self.freq_entry.insert(0, "1000")
self.freq_entry.grid(row=3, column=1, padx=5, pady=5, sticky="w")
ttk.Button(root, text="Apply", command=self.send_config).grid(
row=3, column=2, padx=5, pady=5
)
self.status_label = ttk.Label(root, text="Disconnected", foreground="red")
self.status_label.grid(row=4, column=0, columnspan=3, pady=10)
# Sync slider and entry
self.freq_slider.bind("<Motion>", lambda e: self.sync_entry_to_slider())
self.freq_entry.bind("<Return>", lambda e: self.sync_slider_to_entry())
def refresh_ports(self):
ports = [p.device for p in serial.tools.list_ports.comports()]
self.port_combo["values"] = ports
if ports:
self.port_combo.current(0)
def sync_entry_to_slider(self):
self.freq_entry.delete(0, tk.END)
self.freq_entry.insert(0, str(self.freq_slider.get()))
def sync_slider_to_entry(self):
try:
val = int(self.freq_entry.get())
if 0 <= val <= 5000:
self.freq_slider.set(val)
except ValueError:
pass
def toggle_connect(self):
if self.ser and self.ser.is_open:
self.ser.close()
self.status_label.config(text="Disconnected", foreground="red")
else:
try:
self.ser = serial.Serial(self.port_var.get(), 115200, timeout=1)
self.status_label.config(
text=f"Connected to {self.port_var.get()}", foreground="green"
)
except Exception as e:
messagebox.showerror("Error", f"Failed to connect: {e}")
def send_config(self):
if not self.ser or not self.ser.is_open:
messagebox.showwarning("Warning", "Not connected to serial port")
return
try:
pin = int(self.pin_entry.get())
freq = int(self.freq_entry.get())
if not (0 <= pin <= 54):
raise ValueError("Pin must be 0-54")
if not (0 <= freq <= 5000):
raise ValueError("Frequency must be 0-5000")
cmd = f"SET {pin} {freq}"
self.ser.write(cmd.encode())
# Optional: read response
line = self.ser.readline().decode().strip()
if line:
print(f"ESP32: {line}")
except ValueError as e:
messagebox.showerror("Error", str(e))
except Exception as e:
messagebox.showerror("Error", f"Serial communication error: {e}")
if __name__ == "__main__":
root = tk.Tk()
app = WaveformGenHMI(root)
root.mainloop()

138
hmi/hmi_web.py Normal file
View File

@@ -0,0 +1,138 @@
from flask import Flask, render_template_string, request, jsonify
import serial
import serial.tools.list_ports
import time
app = Flask(__name__)
ser = None
HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<title>ESP32-P4 Waveform Gen Dual</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
body { font-family: sans-serif; max-width: 600px; margin: 20px auto; padding: 20px; background: #f4f4f9; }
.card { background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); margin-bottom: 20px; }
.channel { border-left: 5px solid #007bff; padding-left: 15px; margin: 20px 0; }
.channel.b { border-left-color: #28a745; }
.group { margin-bottom: 15px; }
label { display: block; margin-bottom: 5px; font-weight: bold; }
input[type="number"], select { width: 100%; padding: 8px; box-sizing: border-box; }
button { background: #007bff; color: white; border: none; padding: 10px 15px; border-radius: 4px; cursor: pointer; margin-right: 5px; }
button.off { background: #dc3545; }
button.on { background: #28a745; }
.status { padding: 10px; border-radius: 4px; font-size: 0.9em; margin-top: 10px; }
.connected { background: #d4edda; color: #155724; }
.disconnected { background: #f8d7da; color: #721c24; }
</style>
</head>
<body>
<div class="card">
<h2>Connection</h2>
<select id="port">
{% for port in ports %}
<option value="{{ port }}">{{ port }}</option>
{% endfor %}
</select>
<button onclick="connect()" style="margin-top:10px; width:100%">Connect</button>
<div id="status" class="status disconnected">Disconnected</div>
</div>
<div class="card channel">
<h3>Channel A (Pin 33)</h3>
<div class="group">
<label>Frequency (Hz)</label>
<input type="range" id="freq_a_slider" min="0" max="5000" value="10" oninput="document.getElementById('freq_a').value=this.value">
<input type="number" id="freq_a" value="10" min="0" max="5000">
</div>
<button onclick="sendCmd('A ' + document.getElementById('freq_a').value)">Set Freq</button>
<button class="on" onclick="sendCmd('A ON')">Enable</button>
<button class="off" onclick="sendCmd('A OFF')">Disable</button>
</div>
<div class="card channel b">
<h3>Channel B (Pin 32)</h3>
<div class="group">
<label>Frequency (Hz)</label>
<input type="range" id="freq_b_slider" min="0" max="5000" value="5000" oninput="document.getElementById('freq_b').value=this.value">
<input type="number" id="freq_b" value="5000" min="0" max="5000">
</div>
<button onclick="sendCmd('B ' + document.getElementById('freq_b').value)">Set Freq</button>
<button class="on" onclick="sendCmd('B ON')">Enable</button>
<button class="off" onclick="sendCmd('B OFF')">Disable</button>
</div>
<div class="card">
<div id="response" style="font-family:monospace; font-size:0.8em;">Log waiting...</div>
</div>
<script>
async function connect() {
const port = document.getElementById('port').value;
const res = await fetch('/connect', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({port})
});
const data = await res.json();
const div = document.getElementById('status');
div.innerText = data.message;
div.className = 'status ' + (data.success ? 'connected' : 'disconnected');
}
async function sendCmd(cmd) {
const res = await fetch('/cmd', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({cmd})
});
const data = await res.json();
document.getElementById('response').innerText = "ESP: " + (data.response || "No response");
}
</script>
</body>
</html>
"""
@app.route('/')
def index():
ports = [p.device for p in serial.tools.list_ports.comports()]
return render_template_string(HTML_TEMPLATE, ports=ports)
@app.route('/connect', methods=['POST'])
def connect():
global ser
port = request.json.get('port')
try:
if ser: ser.close()
ser = serial.Serial(port, 115200, timeout=0.1)
return jsonify(success=True, message=f"Connected to {port}")
except Exception as e:
return jsonify(success=False, message=str(e))
@app.route('/cmd', methods=['POST'])
def cmd():
global ser
if not ser: return jsonify(success=False, response="Not connected")
command = request.json.get('cmd')
try:
# Clear buffer
ser.reset_input_buffer()
ser.write((command + "\n").encode())
# Wait a bit longer for the ESP to process and respond
time.sleep(0.2)
# Read the line response
resp = ser.read_all().decode(errors='replace').strip()
if not resp:
# Try one more read in case it's slow
time.sleep(0.3)
resp = ser.read_all().decode(errors='replace').strip()
return jsonify(success=True, response=resp or "Empty response")
except Exception as e:
return jsonify(success=False, response=str(e))
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)

1
hmi/requirements.txt Normal file
View File

@@ -0,0 +1 @@
pyserial