Commit 00a639ff authored by Andrey Filippov's avatar Andrey Filippov

CLAUDE: Add printer_kvm.py — Ender 3 physical stylus control for phone touchscreen

Serial (USB/CH340G, 115200 baud) G-code driver for Creality Ender 3.
Provides tap(), swipe(), home(), calibrate() over /dev/ttyUSB0.
Part of the NanoKVM + Ender 3 physical phone-control project.
Co-authored-by: 's avatarClaude <claude@elphel.com>
parent 9942ae46
#!/usr/bin/env python3
"""
printer_kvm.py — Physical phone touchscreen control via Creality Ender 3.
The Ender 3 is mounted with a silicone stylus attached to its carriage.
Agents call tap()/swipe() with phone screen pixel coordinates; this script
converts them to printer XY positions and drives Z up/down for each touch.
Design doc: attic/CLAUDE/printer_kvm_design.md
Usage (CLI test):
python3 scripts/printer_kvm.py --port /dev/ttyUSB0 tap 540 960
python3 scripts/printer_kvm.py --port /dev/ttyUSB0 swipe 100 500 900 500
python3 scripts/printer_kvm.py --port /dev/ttyUSB0 calibrate
python3 scripts/printer_kvm.py --port /dev/ttyUSB0 home
"""
import argparse
import json
import logging
import os
import re
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional, Tuple
try:
import serial
import serial.tools.list_ports
except ImportError:
serial = None # noqa: N816 — checked at runtime
log = logging.getLogger("printer_kvm")
# ---------------------------------------------------------------------------
# Safety envelope defaults (mm, printer frame)
# Ender 3 build volume: 220 x 220 x 250 mm.
# Override via SafetyEnvelope or --envelope JSON flag.
# ---------------------------------------------------------------------------
@dataclass
class SafetyEnvelope:
x_min: float = 10.0
x_max: float = 100.0 # phone ≈ 70 mm wide + margins
y_min: float = 30.0
y_max: float = 185.0 # phone ≈ 150 mm tall + margins
z_press: float = -0.4 # max downward travel (firmware endstop matches)
z_contact: float = 0.0 # screen surface
z_hover: float = 1.5 # slow-approach height
z_safe: float = 12.0 # fast XY travel height
# Feed rates (mm/min)
feed_xy: float = 3000.0
feed_z_approach: float = 120.0 # gentle press
feed_z_retract: float = 600.0
feed_z_travel: float = 1200.0
def validate(self) -> None:
assert self.x_min < self.x_max, "x_min must be < x_max"
assert self.y_min < self.y_max, "y_min must be < y_max"
assert self.z_press <= self.z_contact <= self.z_hover < self.z_safe
assert self.feed_xy > 0 and self.feed_z_approach > 0
def clamp_xy(self, x: float, y: float) -> Tuple[float, float]:
cx = max(self.x_min, min(self.x_max, x))
cy = max(self.y_min, min(self.y_max, y))
if (cx, cy) != (x, y):
raise SafetyError(
f"XY ({x:.3f}, {y:.3f}) outside envelope "
f"[{self.x_min},{self.x_max}] x [{self.y_min},{self.y_max}]"
)
return cx, cy
@dataclass
class ScreenCalibration:
"""Affine mapping from phone pixels to printer XY (mm)."""
# phone screen dimensions (pixels)
screen_w: int = 1080
screen_h: int = 2400
# printer XY of the phone's top-left and bottom-right screen corners
tl_mm: Tuple[float, float] = (12.0, 32.0) # pixel (0, 0)
br_mm: Tuple[float, float] = (82.0, 178.0) # pixel (screen_w, screen_h)
def to_mm(self, px_u: float, px_v: float) -> Tuple[float, float]:
sx = (self.br_mm[0] - self.tl_mm[0]) / self.screen_w
sy = (self.br_mm[1] - self.tl_mm[1]) / self.screen_h
x = self.tl_mm[0] + sx * px_u
y = self.tl_mm[1] + sy * px_v
return x, y
def to_dict(self) -> dict:
return {
"screen_w": self.screen_w,
"screen_h": self.screen_h,
"tl_mm": list(self.tl_mm),
"br_mm": list(self.br_mm),
}
@classmethod
def from_dict(cls, d: dict) -> "ScreenCalibration":
c = cls()
c.screen_w = d["screen_w"]
c.screen_h = d["screen_h"]
c.tl_mm = tuple(d["tl_mm"])
c.br_mm = tuple(d["br_mm"])
return c
class PrinterKVMError(RuntimeError):
pass
class SafetyError(PrinterKVMError):
pass
class NotConnectedError(PrinterKVMError):
pass
# ---------------------------------------------------------------------------
# Low-level serial transport
# ---------------------------------------------------------------------------
class GCodeSerial:
OK_RE = re.compile(r"^ok", re.IGNORECASE)
ERROR_RE = re.compile(r"^error", re.IGNORECASE)
def __init__(self, port: str, baud: int = 115200, timeout: float = 10.0):
if serial is None:
raise ImportError("pyserial is required: pip install pyserial")
self._ser = serial.Serial(port, baud, timeout=timeout)
time.sleep(2.0) # Marlin bootloader delay
self._flush_startup()
def _flush_startup(self) -> None:
deadline = time.monotonic() + 3.0
while time.monotonic() < deadline:
line = self._ser.readline().decode("ascii", errors="replace").strip()
if line:
log.debug("startup: %s", line)
def send(self, cmd: str, wait_ok: bool = True) -> list[str]:
cmd = cmd.strip()
log.debug(">> %s", cmd)
self._ser.write((cmd + "\n").encode())
if not wait_ok:
return []
return self._read_until_ok()
def _read_until_ok(self, timeout: float = 30.0) -> list[str]:
lines = []
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
raw = self._ser.readline()
line = raw.decode("ascii", errors="replace").strip()
if not line:
continue
log.debug("<< %s", line)
lines.append(line)
if self.OK_RE.match(line):
return lines
if self.ERROR_RE.match(line):
raise PrinterKVMError(f"Firmware error: {line}")
raise PrinterKVMError("Timeout waiting for 'ok'")
def close(self) -> None:
if self._ser and self._ser.is_open:
self._ser.close()
# ---------------------------------------------------------------------------
# Main control class
# ---------------------------------------------------------------------------
class PrinterKVM:
"""
Controls a Creality Ender 3 to physically tap a phone touchscreen.
Quick start:
kvm = PrinterKVM()
kvm.connect("/dev/ttyUSB0")
kvm.home()
kvm.tap(540, 960) # centre of a 1080×1920 screen
kvm.disconnect()
"""
DEFAULT_CALIB_PATH = Path(__file__).parent.parent / "attic/CLAUDE/printer_kvm_calib.json"
def __init__(
self,
envelope: Optional[SafetyEnvelope] = None,
calibration: Optional[ScreenCalibration] = None,
):
self._gcs: Optional[GCodeSerial] = None
self.envelope = envelope or SafetyEnvelope()
self.envelope.validate()
self.calib = calibration or ScreenCalibration()
# ------------------------------------------------------------------
# Connection management
# ------------------------------------------------------------------
def connect(self, port: str, baud: int = 115200) -> None:
self._gcs = GCodeSerial(port, baud)
log.info("Connected to %s @ %d", port, baud)
self._init_printer()
def disconnect(self) -> None:
if self._gcs:
self._safe_park()
self._gcs.send("M84") # disable steppers
self._gcs.close()
self._gcs = None
log.info("Disconnected")
def __enter__(self) -> "PrinterKVM":
return self
def __exit__(self, *_) -> None:
self.disconnect()
def _require_connection(self) -> GCodeSerial:
if self._gcs is None:
raise NotConnectedError("Call connect() first")
return self._gcs
def _init_printer(self) -> None:
gcs = self._require_connection()
gcs.send("M104 S0") # heater off
gcs.send("M140 S0") # bed heater off
gcs.send("M211 S1") # enable software endstops
# Apply Z min endstop at z_press via G-code endstop offset would require
# EEPROM changes; we enforce it in software instead (see _clamp_z).
# ------------------------------------------------------------------
# Homing and position reference
# ------------------------------------------------------------------
def home(self) -> None:
"""Home all axes (G28). Raises to z_safe afterward."""
gcs = self._require_connection()
log.info("Homing...")
gcs.send("G28", wait_ok=True) # can take ~60 s
gcs.send(f"G0 Z{self.envelope.z_safe:.3f} F{self.envelope.feed_z_travel:.0f}")
log.info("Homed")
def zero_z_here(self) -> None:
"""
Set Z=0 at the current stylus position (paper-method contact plane).
Move the carriage to the screen surface manually first.
"""
gcs = self._require_connection()
gcs.send("G92 Z0")
log.info("Z origin set at current position")
# ------------------------------------------------------------------
# Raw move (safety-checked)
# ------------------------------------------------------------------
def _clamp_z(self, z: float) -> float:
lo, hi = self.envelope.z_press, self.envelope.z_safe
if not (lo - 0.001 <= z <= hi + 0.001):
raise SafetyError(f"Z {z:.3f} outside [{lo}, {hi}]")
return max(lo, min(hi, z))
def move_to(self, x: float, y: float, z: Optional[float] = None,
feed: Optional[float] = None) -> None:
"""Move to absolute printer XY (and optionally Z) position."""
gcs = self._require_connection()
x, y = self.envelope.clamp_xy(x, y)
parts = [f"G0 X{x:.3f} Y{y:.3f}"]
if z is not None:
z = self._clamp_z(z)
parts[0] = f"G1 X{x:.3f} Y{y:.3f} Z{z:.3f}"
f = feed or self.envelope.feed_xy
parts.append(f"F{f:.0f}")
gcs.send(" ".join(parts))
def move_z(self, z: float, feed: Optional[float] = None) -> None:
"""Move Z axis only."""
gcs = self._require_connection()
z = self._clamp_z(z)
f = feed or self.envelope.feed_z_travel
gcs.send(f"G1 Z{z:.3f} F{f:.0f}")
def _safe_park(self) -> None:
try:
gcs = self._require_connection()
gcs.send(f"G0 Z{self.envelope.z_safe:.3f} F{self.envelope.feed_z_travel:.0f}")
except Exception: # noqa: BLE001
pass
# ------------------------------------------------------------------
# Touch primitives
# ------------------------------------------------------------------
def _do_tap(self, x_mm: float, y_mm: float,
press_z: Optional[float] = None,
dwell_ms: int = 80) -> None:
"""Internal: tap at printer XY coordinates."""
gcs = self._require_connection()
env = self.envelope
tap_z = self._clamp_z(press_z if press_z is not None else env.z_press)
# 1. Move to target XY at safe Z
gcs.send(f"G0 X{x_mm:.3f} Y{y_mm:.3f} Z{env.z_safe:.3f} F{env.feed_xy:.0f}")
# 2. Lower to hover
gcs.send(f"G1 Z{env.z_hover:.3f} F{env.feed_z_retract:.0f}")
# 3. Press
gcs.send(f"G1 Z{tap_z:.3f} F{env.feed_z_approach:.0f}")
# 4. Dwell
if dwell_ms > 0:
gcs.send(f"G4 P{dwell_ms}")
# 5. Retract to hover
gcs.send(f"G1 Z{env.z_hover:.3f} F{env.feed_z_retract:.0f}")
# 6. Rise to safe
gcs.send(f"G0 Z{env.z_safe:.3f} F{env.feed_z_travel:.0f}")
def tap(self, px_u: float, px_v: float,
dwell_ms: int = 80,
press_z: Optional[float] = None) -> None:
"""
Tap phone screen at pixel coordinates (px_u, px_v).
Args:
px_u: horizontal pixel (0 = left edge)
px_v: vertical pixel (0 = top edge)
dwell_ms: contact dwell time in milliseconds
press_z: override press depth (mm from z_contact); default z_press
"""
x_mm, y_mm = self.calib.to_mm(px_u, px_v)
self.envelope.clamp_xy(x_mm, y_mm) # validate before moving
log.info("tap px=(%.0f,%.0f) → mm=(%.2f,%.2f)", px_u, px_v, x_mm, y_mm)
self._do_tap(x_mm, y_mm, press_z=press_z, dwell_ms=dwell_ms)
def long_press(self, px_u: float, px_v: float, duration_ms: int = 800) -> None:
"""Long-press at pixel (px_u, px_v) for duration_ms milliseconds."""
self.tap(px_u, px_v, dwell_ms=duration_ms)
def swipe(self,
px_u1: float, px_v1: float,
px_u2: float, px_v2: float,
duration_ms: int = 400,
steps: int = 20) -> None:
"""
Swipe from (px_u1, px_v1) to (px_u2, px_v2) across the screen.
The stylus stays in contact (at z_contact) throughout the move,
then retracts. Total motion time ≈ duration_ms.
Args:
px_u1, px_v1: start pixel
px_u2, px_v2: end pixel
duration_ms: total swipe time in ms
steps: number of intermediate waypoints
"""
gcs = self._require_connection()
env = self.envelope
x1, y1 = self.calib.to_mm(px_u1, px_v1)
x2, y2 = self.calib.to_mm(px_u2, px_v2)
self.envelope.clamp_xy(x1, y1)
self.envelope.clamp_xy(x2, y2)
# Feed rate for the swipe segment (mm/min)
dx = ((x2 - x1) ** 2 + (y2 - y1) ** 2) ** 0.5
swipe_feed = max(60.0, min(env.feed_xy, dx / (duration_ms / 60_000.0)))
log.info(
"swipe px=(%.0f,%.0f)→(%.0f,%.0f) %.0f ms feed=%.0f",
px_u1, px_v1, px_u2, px_v2, duration_ms, swipe_feed,
)
# Approach
gcs.send(f"G0 X{x1:.3f} Y{y1:.3f} Z{env.z_safe:.3f} F{env.feed_xy:.0f}")
gcs.send(f"G1 Z{env.z_hover:.3f} F{env.feed_z_retract:.0f}")
gcs.send(f"G1 Z{env.z_contact:.3f} F{env.feed_z_approach:.0f}")
# Swipe
gcs.send(f"G1 X{x2:.3f} Y{y2:.3f} F{swipe_feed:.0f}")
# Retract
gcs.send(f"G1 Z{env.z_hover:.3f} F{env.feed_z_retract:.0f}")
gcs.send(f"G0 Z{env.z_safe:.3f} F{env.feed_z_travel:.0f}")
def scroll_up(self, px_u: float, px_v: float, distance_px: float = 600,
duration_ms: int = 400) -> None:
"""Scroll up (swipe finger upward = content moves down)."""
self.swipe(px_u, px_v + distance_px / 2, px_u, px_v - distance_px / 2,
duration_ms=duration_ms)
def scroll_down(self, px_u: float, px_v: float, distance_px: float = 600,
duration_ms: int = 400) -> None:
"""Scroll down (swipe finger downward = content moves up)."""
self.swipe(px_u, px_v - distance_px / 2, px_u, px_v + distance_px / 2,
duration_ms=duration_ms)
# ------------------------------------------------------------------
# Emergency stop
# ------------------------------------------------------------------
def emergency_stop(self) -> None:
"""Send M112 (firmware halt). Requires power cycle + reconnect."""
if self._gcs:
try:
self._gcs.send("M112", wait_ok=False)
finally:
self._gcs.close()
self._gcs = None
log.critical("Emergency stop issued — power cycle printer to recover")
# ------------------------------------------------------------------
# Status / diagnostics
# ------------------------------------------------------------------
def get_position(self) -> dict:
"""Return current XYZ position as reported by M114."""
gcs = self._require_connection()
lines = gcs.send("M114")
# Marlin: "X:10.00 Y:20.00 Z:5.00 E:0.00 Count ..."
pos = {}
for line in lines:
m = re.search(r"X:([\d.\-]+)\s+Y:([\d.\-]+)\s+Z:([\d.\-]+)", line)
if m:
pos = {"x": float(m.group(1)), "y": float(m.group(2)),
"z": float(m.group(3))}
break
return pos
def get_endstop_states(self) -> str:
"""Return endstop state string from M119."""
gcs = self._require_connection()
lines = gcs.send("M119")
return "\n".join(lines)
# ------------------------------------------------------------------
# Calibration I/O
# ------------------------------------------------------------------
def save_calibration(self, path: Optional[Path] = None) -> None:
p = Path(path or self.DEFAULT_CALIB_PATH)
p.parent.mkdir(parents=True, exist_ok=True)
data = {
"calibration": self.calib.to_dict(),
"envelope": {
k: v for k, v in vars(self.envelope).items()
},
}
p.write_text(json.dumps(data, indent=2))
log.info("Calibration saved to %s", p)
def load_calibration(self, path: Optional[Path] = None) -> None:
p = Path(path or self.DEFAULT_CALIB_PATH)
data = json.loads(p.read_text())
self.calib = ScreenCalibration.from_dict(data["calibration"])
for k, v in data.get("envelope", {}).items():
setattr(self.envelope, k, v)
self.envelope.validate()
log.info("Calibration loaded from %s", p)
# ------------------------------------------------------------------
# Interactive calibration wizard
# ------------------------------------------------------------------
def calibrate_interactive(self) -> None:
"""
Walk the user through touching four on-screen targets and recording
the printer XY positions. Updates self.calib in place.
"""
gcs = self._require_connection()
print("\n=== PrinterKVM calibration wizard ===")
print("You will be prompted to position the stylus over four screen corners.")
print("Jog using your preferred G-code sender, then press Enter here.\n")
def _record_point(label: str) -> Tuple[float, float]:
input(f" Position stylus over {label}, then press Enter...")
pos = self.get_position()
print(f" → X={pos['x']:.3f} Y={pos['y']:.3f}")
return pos["x"], pos["y"]
sw = int(input("Phone screen width (px): "))
sh = int(input("Phone screen height (px): "))
tl = _record_point("top-left corner (pixel 0,0)")
_record_point("top-right corner") # for validation only
_record_point("bottom-left corner") # for validation only
br = _record_point(f"bottom-right corner (pixel {sw},{sh})")
self.calib = ScreenCalibration(
screen_w=sw, screen_h=sh,
tl_mm=tl, br_mm=br,
)
print("\nCalibration complete.")
save = input("Save to disk? [Y/n] ").strip().lower()
if save != "n":
self.save_calibration()
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def _build_arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="Physical phone control via Ender 3 printer stylus."
)
p.add_argument("--port", default="/dev/ttyUSB0", help="Serial port")
p.add_argument("--baud", type=int, default=115200)
p.add_argument("--calib", help="Calibration JSON path")
p.add_argument("--verbose", "-v", action="store_true")
sub = p.add_subparsers(dest="cmd", required=True)
sub.add_parser("home", help="Home all axes")
sub.add_parser("calibrate", help="Run interactive calibration wizard")
sub.add_parser("pos", help="Print current position")
t = sub.add_parser("tap", help="Tap a pixel coordinate")
t.add_argument("u", type=float, help="Horizontal pixel")
t.add_argument("v", type=float, help="Vertical pixel")
t.add_argument("--dwell", type=int, default=80)
lp = sub.add_parser("longpress", help="Long-press a pixel coordinate")
lp.add_argument("u", type=float)
lp.add_argument("v", type=float)
lp.add_argument("--duration", type=int, default=800)
sw = sub.add_parser("swipe", help="Swipe between two pixel coordinates")
sw.add_argument("u1", type=float)
sw.add_argument("v1", type=float)
sw.add_argument("u2", type=float)
sw.add_argument("v2", type=float)
sw.add_argument("--duration", type=int, default=400)
sub.add_parser("estop", help="Emergency stop (M112)")
return p
def main() -> None:
args = _build_arg_parser().parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(levelname)s %(message)s",
)
kvm = PrinterKVM()
if args.calib:
kvm.load_calibration(args.calib)
elif PrinterKVM.DEFAULT_CALIB_PATH.exists():
kvm.load_calibration()
if args.cmd == "estop":
# Try to connect and halt even without a calibration
kvm.connect(args.port, args.baud)
kvm.emergency_stop()
return
kvm.connect(args.port, args.baud)
try:
if args.cmd == "home":
kvm.home()
elif args.cmd == "calibrate":
kvm.home()
kvm.calibrate_interactive()
elif args.cmd == "pos":
print(kvm.get_position())
elif args.cmd == "tap":
kvm.tap(args.u, args.v, dwell_ms=args.dwell)
elif args.cmd == "longpress":
kvm.long_press(args.u, args.v, duration_ms=args.duration)
elif args.cmd == "swipe":
kvm.swipe(args.u1, args.v1, args.u2, args.v2, duration_ms=args.duration)
finally:
kvm.disconnect()
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment