Commit 782ef529 authored by Andrey Filippov's avatar Andrey Filippov

CLAUDE: initial import — DNN training/eval/export (migrated from imagej-elphel-internal/c5p_dnn)

L1 RawFCN + L2 ConvGRU(torus), synthetic data gen, training/eval, infer_server,
and export_torchscript.py (self-contained TorchScript for native LibTorch inference).
GPLv3 (Elphel norm); headers on all .py/.sh; LICENSE = GPLv3. runs/ checkpoints untracked.
Co-Authored-By: 's avatarClaude Opus 4.8 (1M context) <noreply@anthropic.com>
parents
Pipeline #3823 canceled with stages
__pycache__/
*.pyc
runs/
*.venv/
export_venv/
*.tif
# C5P DNN front-end — design & decisions
CUAS real-time detector DNN front-end: an **all-convolutional (FCN)** per-pixel target estimator
that replaces the C5P matched-filter velocity bank. Trained on synthetic Gaussian-noise patches,
deployed inside `CuasDetectRT` (ImageJ) via ONNX Runtime (Java, CPU EP). Produces, per pixel, a
velocity posterior over an 11×11 grid (±1.25 px/frame, 0.25 step) + detection confidence `s` +
sub-pixel (dx,dy) offset.
## Files
- `synth.py` — on-the-fly training patches. `generate_sample` (center / off-center / noise classes),
half-cosine bump targets, per-frame constant velocity sampled on a disk, SNR-swept.
- `model.py``RawFCN` (24×24×N → 1×1×124 via valid convs + 2 maxpools, slid densely as an FCN);
`fcn_loss` (det BCE + velocity soft-target CE + offset MSE); `vel_bias_loss` (batch-moment de-bias).
- `train.py` — training loop + ONNX export (dynamic H/W axes). The CLI defaults ARE the "weighted" recipe.
- `velocity_bias.py` — gain-vs-SNR diagnostic (predicted vs true velocity, fit per SNR bin).
- `gen_synth_cuas.py` — builds the full-frame `*-CUAS-SYNTHETIC-CUAS.tiff` velocity-reference grid
for in-ImageJ testing (radial layout, one velocity cell per target). Writes a `.groundtruth.json`.
- `compare_dnn_truth.py` — compares saved DNN output vs the real UAS_log.csv (offset/velocity/time).
- `baselines.py`, `export_onnx.py`, `make_testvec.py`, `viz_*.py`, `extract_B.py` — support/diagnostics.
## Key decisions & findings (2026-06-15)
1. **Runtime temporal depth N.** `CuasDnnInfer` reads N from the ONNX input shape `[B,N,H,W]` and
exposes `getNFrames()`; `CuasDetectRT` uses it instead of a hardcoded 8 → 8- vs 9-frame models swap
purely by changing `curt_dnn_model`. (Committed: imagej-elphel `9d06cce7`.)
2. **Velocity training DOMAIN vs output grid.** `synth.py` confines training velocity to a *disk* of
radius `vmax_px`. It was 1.0, but the output grid runs to ±1.25 (radius-5 cells), so every cell with
|v|>1.0 was untrained → underestimate + asymmetric ghosts at the corners. Diagnosed via the synthetic
grid (the diagonal (1.0,1.0)=|v|1.414 cell got projected onto the trained boundary → argmax (0.75,0.75)).
Fix: train with **vmax_px ≈ 1.4** (covers the ±1.25 on-axis with margin; only super-physical diagonal
corners stay untrained). `velocity_bias.py` couldn't catch this — it samples the same disk.
3. **Velocity SCALE de-bias (`vel_bias_loss`).** Softmax-centroid velocity shrinks as confidence drops
(centroid of a noise-broadened, grid-bounded posterior regresses toward 0): gain 0.97 clean → 0.76 at
SNR=1. **The recurrent layer reduces variance, not bias** — so the bias must be removed in the DNN.
`vel_bias_loss`: per equal-population bin of a conditioning var, pooled least-squares gain-through-origin
of predicted vs true velocity, penalize `(gain−1)²`. This pins the MEAN scale to 1 per bin WITHOUT
penalizing per-sample variance (variance is information-limited; left for the recurrent to average).
Result: gain → ~1.0 across SNR (RMSE preserved at low SNR — the intended signature).
- `--bias_by snr` (default): clean training label; the correction is baked into the weights so the var
need not exist at inference. **Best for the in-loss term.**
- `--bias_by s` (confidence): cause-agnostic — corrects on the net's own uncertainty regardless of WHY
it's low (Gaussian noise vs clutter vs close targets), so it may transfer to non-Gaussian degraders on
REAL data. Near-identical to snr on Gaussian; the real-data A/B (tile-streak, two close targets) is the
only discriminator. If it doesn't transfer, augment synth with those factors.
4. **Half-pixel registration.** Training referenced the patch center at `(W−1)/2 = 11.5` (even patch),
but deployment (`CuasDnnInfer.inferROI`) puts the ROI/output pixel at index `half = P/2 = 12`
(`ix = cx + i − half`, patch spans `[cx−12, cx+11]`). The 0.5 gap was a systematic ½-px position bias.
Fix: set `cx0 = cy0 = P/2` in `generate_sample` (matches the deployment reference; even patch kept).
The 24-patch asymmetry (12 left / 11 right) is exactly what deployment already imposes, so training now
matches it — no odd-25-patch architecture change needed.
5. **Temporal sync (exact).** The DNN output is anchored at the **newest** frame of its N-frame window
(`window[0] = framesD[newest]`, training labels frame i=0 = newest). The output is tagged
`ts[newest] + " f"+newest`, so `f<n>` IS the motion frame (= level-slice index). No ±-frame slack.
6. **Radial synthetic grid math.** `gen_synth_cuas.py` radial layout: cell (vx,vy) node = `center + 30·(vx,vy)`,
velocity `(vx/4, vy/4)` px/frame, `pos = node + v·t`. So the **effective grid spacing = 30 + t/4 px**
(breathes outward 0.25 px/frame). Clean integer-pixel grids occur at `t ≡ 0 (mod 4)`: t=8→32px, 12→33px, …
Between them, sub-pixel offsets fan out per velocity. center = (320.5, 256.5) on a 640×512 frame.
7. **ONNX deploy convention.** torch exports external-data ONNX (`model.onnx` + `model.onnx.data`); the
`.onnx` references the sidecar by its export-time name, so the pair **cannot be renamed flat** (renaming
loads the wrong sidecar → size-mismatch error). Deploy each model in **its own subdir** with the canonical
`model.onnx` / `model.onnx.data`. Cache: `~/.cache/c5p_dnn/<name>/model.onnx`.
8. **Ghostbuster (untrained-corner velocities).** The velocity grid (radius `vel_radius`=5 cells,
corners to R≈7) extends past the trained disk (R = `vmax_px`·`vel_decimate` ≈ 5.6 at vmax 1.4), so
the untrained corner cells emit spurious velocity sidelobes (ghosts) of non-trivial strength (field
value up to ~0.09 vs ~0.15 real) that would confuse the recurrent. `CuasDetectRT.dnnGhostbust` zeros
velocity cells with cell-radius > `curt_dnn_vmax·vel_decimate`; if a pixel's PEAK lands in that region
the whole detection is discarded (field=0, **s=0**). Applied to the DNN field + offset before save and
the recurrent feed. **`curt_dnn_vmax` (px/frame, default 1.4) must match the loaded model's training
`vmax_px`** (PM models = 1.4, `m9_base` = 1.0); too low over-masks trained cells, too high leaves ghosts.
(imagej-elphel `0bd16311`.)
9. **Larger attention area (32-patch) — kills trajectory-alias ghosts (T4).** Widened the receptive
field to a 32-patch: `RawFCN(patch=32)` = 6 conv3 + 2 pool (32→30→28→14→12→10→5→3→1; pools on even
sizes so the cx0=P/2 centering holds), ~119k params. Grows off-center suppression reach
`off_max = P/2-margin-1` from 9→13 px, covering the alias reach `vmax·(N-1) ≈ 11.2 px`. On the real
synthetic grid the ghost field dropped **0.16 → ~0.003** (~50×, essentially gone). NB: the single-
static `ghost_probe.py` did NOT reproduce the ghost (both 24/32 suppress a lone static off-center
target) — the real alias needs the multi-target/conditioning context — but the wider RF fixed it
regardless. Java: `inferROI` patch is configurable via `CuasDnnInfer.setPatch`; new param
**`curt_dnn_patch`** (default 24; set 32 for this model) — MUST match the loaded model. Deployed
`~/.cache/c5p_dnn/m9_p32_s/model.onnx`. IMPORTANT: `curt_dnn_vmax` (ghostbuster) must be ≥ the max
real-target velocity you want to KEEP, not just the training vmax — e.g. (4,4)=|v|1.414 (cell-R 5.657)
needs vmax≥1.42, so use 1.5 (rmax 6.0) to keep (4,4) while still killing far corners (5,5)=7.07.
Training disk was 1.4, so (4,4) is marginally extrapolated; a retrain at vmax_px≈1.45–1.5 trains the
diagonal corners cleanly.
## Training recipe (DGX GB10, NGC pytorch:25.10-py3, ~35 it/s, ~3 min / 6000 steps)
```
python train.py --steps 6000 --nframes 9 --vmax 1.4 --w_bias 10 [--bias_by s] --out runs/<name>
```
Defaults (frac_pos 0.4, frac_off 0.4, w_vel 1.0, w_off 0.3, snr 1 8, patch 24, vel_radius 5, vel_decimate 4,
sigma_v 0.9) = the weighted recipe. Sync scripts to `elphel@192.168.0.62:~/c5p_dnn/`, run in the container.
## Deployed models (`~/.cache/c5p_dnn/`) — "perfect-match" set (half-pixel fix + vmax 1.4 + de-bias)
- `m9_pm/model.onnx` — 9-frame, snr-de-bias
- `m9_pm_s/model.onnx` — 9-frame, s-de-bias
(supersede `m9_dbias*`, which lacked the half-pixel fix; `m9_base` = 9-frame no-de-bias baseline.)
## Open items
- Real-data A/B: snr vs s de-bias on the tile-streak and two-close-targets cases (the s-transfer test).
- Connect the DNN field to the recurrent layer (weights, as-is vs splat); tune re-sharpen.
- (Done — ghostbuster, decision 8.) Corner ghost sidelobes masked at readout.
- Explore in-network recurrent (extra layers / memory) — cf. Andrey's predecessor 2-stage Siamese
tile-disparity CNN (spatial neighbour context; multi-scale 1×1/3×3/5×5 loss).
# C5P DNN v2 — Two-stage MF + learned Hough-vote (2026-06-18, Andrey + Claude)
Redesign agreed after the v1 findings: softmax competition (not 121-resolution) kills ghosts;
the ghostbuster is structurally backwards (clips near-limit reals, keeps under-read fast ghosts);
the trajectory-alias velocity ramp is **structure, not noise**; low-SNR needs **spatial** evidence
integration (the spatial analog of the temporal recurrent). This architecture turns the alias into
the detector.
## Core idea
A true target (head H at newest frame, velocity V, tail T = H − V·(N−1)) makes the per-pixel MF field
fire a **predictable pattern** around H: a neighbor pixel P reports (tail-anchored, exact for tail-only)
`V_P = V + (P − H)/(N−1)` — slope 1/(N−1), the ramp we measured. The invariant: every pixel of the
target back-projects to the **same tail**: `P − V_P·(N−1) = T`. So aliased neighbors don't scatter —
they all point at one place. Accumulating those (s-weighted) votes recovers the target from many weak
detections (~√N gain) — far more noise-resilient than one pixel's s.
## Stage 1 — local matched filter (per-pixel, neighbor-agnostic)
- In: N-frame conditioned patch. Out per pixel: continuous **(Vx, Vy, s)** + K latent "secret-message" channels.
- **No velocity grid** — continuous V (the v1 grid-softmax's competition role moves to Stage 2). Resolves the
grid-vs-reg tension: continuous velocity here, competition there.
- **Velocity range ±2.5–3 from the start** (Andrey 2026-06-18: higher range is needed/beneficial; design wide now to
avoid a narrow→wide re-iteration). Continuous V → NO grid-resolution penalty (unlike v1's wider grid: decimate-2
step-0.5 gave centroid RMSE 0.06 vs 0.03 — here it's just a wider tanh bound). Wider per-DNN range ⇒ FEWER pyramid
levels to stack.
- RF ≈ trajectory reach `vmax·(N−1)` + bump. At vmax 3: **N=9 → reach 24 → patch ~52**; N=5 → reach 12 → patch ~28.
Lock N with the range (T5 coupling) — **lean N=9** (temporal depth = the low-SNR lever; the bigger RF is CNN-shared,
so amortized). Still drops v1's neighbor-suppression margin → smaller than a v1-style net at the same vmax.
- Training: **center-positives only** (report the trajectory through me) + noise negatives (det=0). **Drop the
off-center negatives** — we WANT the alias ramp, not suppression. Trained on causal MB (RT bias absorbed, per
the −0.36→+0.02 result). CNN-shared features (cheaper than literal per-pixel MF, like the 3d3 coarse layer).
## Stage 2 — learned Hough vote (spatial competition + aggregation)
- In: the Stage-1 (Vx,Vy,s)[+latent] field. Each pixel casts an s-weighted vote at `T_P = P − V_P·(N−1)`
through a **learned vote kernel**; accumulate → soft-argmax → per-target detection. Recover head = T + V·(N−1).
- **Physics as init + soft bound, not hard-coded:** seed toward the tail-anchored ramp, clamp the deviation to
the measured spread (the real all-frames MF spread is NARROWER than tail-only, and MB shifts it); let it learn
the rest.
- **Subsumes both** the v1 velocity-softmax competition AND the ghostbuster: a ghost has one voter → loses; a real
target gets coherent votes → wins. No output-velocity clipping (that was backwards).
## Loss schedule — dictated reference → end-to-end (the agreed methodology)
Structure lives in the ARCHITECTURE (differentiable Stage1→Stage2); "dictatorship" is just the aux-loss weight:
1. **Reference (hard):** deep supervision — aux loss pins Stage-1 (Vx,Vy,s) to the MF targets + Stage-2 final loss.
Interpretable, data-efficient, and the permanent **debugging substrate** (v1 was only debuggable because we could
read intermediates).
2. **Anneal:** lower the Stage-1 aux weight; let the K latent channels carry richer inter-stage features.
3. **End-to-end:** final loss only (+ small aux as regularizer). **Compare to the reference**; keep the gain if it
beats it, else the reference stands.
Training order: Stage 1 alone (freezable) → Stage 2 on its field → end-to-end.
## Pairs with the recurrent (T3)
Stage-2 vote = spatial weak-evidence integration; recurrent = temporal. Same principle; eventually one framework.
## To decide at implementation
N and RF size; K latent channels; vote-accumulator sub-pixel resolution; whether Stage 2 is conv/attention/explicit
scatter-add; how the head-recovery (T + V·(N−1)) feeds the output/recurrent. Build the reference first, measure
against v1 (m9_p32_grid121_cblur etc.) on the same synthetic harness.
This diff is collapsed.
# imagej_elphel_dnn
DNN companion to [imagej-elphel](https://git.elphel.com/Elphel/imagej-elphel) for tile-processor
motion detection / ranging. GPLv3.
## Layers
- **L1 — `RawFCN`** (`model.py`): fully-convolutional patch net `[B,N,P,P] -> [B,C,1,1]`, slid densely over
the full frame (no FC layers). Per-tile detection logit + velocity field. Trained on synthetic sequences.
- **L2 — `Layer2Net` / ConvGRU on a torus** (`layer2.py`): learned track-before-detect over the L1 field.
## Workflow
- Training / eval / synthetic data generation: `train.py`, `layer2_train*.py`, `gen_synth_cuas.py`, `synth.py`, eval scripts.
- Inference (dev/remote): `infer_server.py` (+ `run_infer_server.sh`) — PyTorch server.
- **Deployment export:** `export_torchscript.py` produces a self-contained TorchScript `.pt` (weights + graph),
loaded natively (no Python) by LibTorch in [tile_processor_gpu](https://git.elphel.com/Elphel/tile_processor_gpu)
and bundled as a resource in imagej-elphel (`resources/cuas_dnn/`). Build-once on a dev box (PyTorch);
deployment needs only the NVIDIA driver + libtorch runtime + the bundled `.pt`.
Checkpoints (`runs/`) are training outputs and are not tracked.
# baselines.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""
Phase-correlation and matched-filter velocity baselines for the C5P DNN benchmark. # By Claude on 06/13/2026
Pure numpy (no torch) so they can be unit-tested anywhere. Both estimate (vx,vy) px/frame
from an [N,H,W] patch stack (frame 0 = newest; target near center). These are the bars the
network must match (PC) and beat (matched filter = our current linear front-end).
"""
import numpy as np
def _parabolic(c, l, r):
"""Sub-sample peak offset in [-0.5,0.5] from 3 samples (left, center, right), c is max."""
d = (l - r)
den = 2.0 * (l - 2.0 * c + r)
return (d / den) if abs(den) > 1e-12 else 0.0
def pc_shift(a, b):
"""Phase correlation: estimate the shift (sx,sy) of `a` relative to `b` (sub-pixel).
Positive sx means `a`'s content is at larger x than `b`'s."""
H, W = a.shape
Fa = np.fft.fft2(a); Fb = np.fft.fft2(b)
R = Fa * np.conj(Fb)
R /= (np.abs(R) + 1e-9) # whiten -> phase correlation (fat zero eps)
r = np.fft.ifft2(R).real
py, px = np.unravel_index(np.argmax(r), r.shape)
# sub-pixel parabolic on each axis (wrap neighbors)
dx = _parabolic(r[py, px], r[py, (px - 1) % W], r[py, (px + 1) % W])
dy = _parabolic(r[py, px], r[(py - 1) % H, px], r[(py + 1) % H, px])
sx = (px + dx); sy = (py + dy)
if sx > W / 2: sx -= W
if sy > H / 2: sy -= H
return sx, sy
def pc_velocity(frames):
"""NAIVE PC (kept for contrast, DO NOT use as the bar): per-pair whitening then spatial
shift-averaging - amplifies noise, fails at low SNR. See pc_velocity_fd for the real one."""
N = frames.shape[0]
sx = sy = 0.0
for i in range(N - 1):
dx, dy = pc_shift(frames[i], frames[i + 1])
sx += dx; sy += dy
return sx / (N - 1), sy / (N - 1)
def pc_velocity_fd(frames, baseline=1, fatzero=1e-2):
"""Andrey's coherent PC: SUM the cross-power spectra of same-baseline pairs in the # By Claude on 06/13/2026
frequency domain BEFORE normalization, normalize once (fat-zero regularized), ifft ->
one correlation surface from all pairs jointly. For constant velocity every consecutive
pair encodes the same per-frame shift, so they add coherently (~(N-1)x SNR) and a single
whitening then localizes the peak. baseline>1 uses longer-baseline pairs (bigger, better-
resolved displacement for slow targets - the 'increase pairs/baseline when speed is low').
Returns (vx,vy) px/frame. (Masked iterative + tracking-camera refinement not included
here - this is the core FD-combined estimate; refinement would push it further.)"""
N, H, W = frames.shape
F = np.fft.fft2(frames, axes=(-2, -1)) # [N,H,W] complex
R = np.zeros((H, W), dtype=complex)
for i in range(N - baseline):
R += F[i] * np.conj(F[i + baseline]) # combine in FD BEFORE normalization
R /= (np.abs(R) + fatzero * np.abs(R).max()) # single whitening (fat zero)
r = np.fft.ifft2(R).real
py, px = np.unravel_index(np.argmax(r), r.shape)
dx = _parabolic(r[py, px], r[py, (px - 1) % W], r[py, (px + 1) % W])
dy = _parabolic(r[py, px], r[(py - 1) % H, px], r[(py + 1) % H, px])
sx = px + dx; sy = py + dy
if sx > W / 2: sx -= W
if sy > H / 2: sy -= H
return sx / baseline, sy / baseline # peak shift = v*baseline
def _bump(cx, cy, H, W, radial=False):
ys = np.arange(H)[:, None] - cy; xs = np.arange(W)[None, :] - cx
if radial:
rr = np.sqrt(xs * xs + ys * ys)
return np.where(rr < 1.5, np.cos(np.pi / 3.0 * rr), 0.0)
bx = np.where(np.abs(xs) < 1.5, np.cos(np.pi / 3.0 * np.abs(xs)), 0.0)
by = np.where(np.abs(ys) < 1.5, np.cos(np.pi / 3.0 * np.abs(ys)), 0.0)
return bx * by
def mf_velocity(frames, vel_radius=5, vel_decimate=4, radial=False, parabolic=True):
"""Matched-filter velocity (= the C5P statistic at the patch center): correlate the
stack with the swept bump for each velocity cell, argmax over the grid (+ parabolic
sub-cell). Returns (vx,vy) px/frame and the peak response."""
N, H, W = frames.shape
cx0 = (W - 1) / 2.0; cy0 = (H - 1) / 2.0
vdim = 2 * vel_radius + 1
resp = np.empty((vdim, vdim), dtype=np.float64)
for iy, vyc in enumerate(range(-vel_radius, vel_radius + 1)):
for ix, vxc in enumerate(range(-vel_radius, vel_radius + 1)):
vx = vxc / vel_decimate; vy = vyc / vel_decimate
s = 0.0
for i in range(N):
t = _bump(cx0 - vx * i, cy0 - vy * i, H, W, radial)
s += float((frames[i] * t).sum())
resp[iy, ix] = s
iy, ix = np.unravel_index(np.argmax(resp), resp.shape)
vyc, vxc = iy - vel_radius, ix - vel_radius
if parabolic and 0 < ix < vdim - 1 and 0 < iy < vdim - 1:
vxc += _parabolic(resp[iy, ix], resp[iy, ix - 1], resp[iy, ix + 1])
vyc += _parabolic(resp[iy, ix], resp[iy - 1, ix], resp[iy + 1, ix])
return vxc / vel_decimate, vyc / vel_decimate, float(resp[iy, ix])
if __name__ == "__main__":
import synth
rng = np.random.default_rng(7)
print("velRMSE px/fr (PCnaive = bad strawman, PCfd = Andrey's coherent FD-combined, MF = matched filter)")
for snr in [2.0, 3.0, 5.0, 8.0, 100.0]:
en = []; efd = []; efd4 = []; emf = []
for _ in range(200):
f, lab = synth.generate_sample(rng, snr=snr, target=True)
vxn, vyn = pc_velocity(f)
vxf, vyf = pc_velocity_fd(f, baseline=1)
vxf4, vyf4 = pc_velocity_fd(f, baseline=4) # longer baseline for slow targets
vxm, vym, _ = mf_velocity(f)
en.append(np.hypot(vxn - lab["vx"], vyn - lab["vy"]))
efd.append(np.hypot(vxf - lab["vx"], vyf - lab["vy"]))
efd4.append(np.hypot(vxf4 - lab["vx"], vyf4 - lab["vy"]))
emf.append(np.hypot(vxm - lab["vx"], vym - lab["vy"]))
rms = lambda e: np.sqrt(np.mean(np.square(e)))
print(f"snr={snr:6.1f} PCnaive={rms(en):.4f} PCfd(b1)={rms(efd):.4f} "
f"PCfd(b4)={rms(efd4):.4f} MF={rms(emf):.4f}")
# build_combo3.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Build the standard combo3_hyper.tif from per-type eval tiffs. By Claude on 06/24/2026.
The locked L2 viewing standard (Andrey, 2026-06-24): ImageJ hyperstack axes TZCYX,
C=type / Z=frame / T=series, grayscale, each 32x32 frame 2x2-tiled to 64x64 (seam at
center cross), per-slice labels "TYPE sN fF". C=type keeps ImageJ's display range
per-channel so scrubbing frame(Z)/series(T) holds a common contrast.
Channel order matches gap_eval.py/clean_eval_fwhm.py outputs:
gap evals (5 types): L2_det, L1_s, input, truth, signal
clean/easy (3 types): L2_det, L1_s, truth
Reads whatever subset is present in --dir, in that fixed order.
Usage: build_combo3.py --dir runs/l1views/mhc_eval --T 128
build_combo3.py --dir runs/l1views/mhc_easy --T 120
"""
import argparse, os, numpy as np, tifffile
ORDER = ["L2_det", "L1_s", "input", "truth", "signal"] # fixed C order
def tile2x2(a): # (...,32,32) -> (...,64,64), seam at center cross
return np.tile(a, (1, 1, 2, 2)) if a.ndim == 4 else np.tile(a, (2, 2))
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--dir", required=True)
ap.add_argument("--T", type=int, default=128, help="frames per series")
ap.add_argument("--out", default=None)
a = ap.parse_args()
# Default filename = folder basename, so open windows get distinct/descriptive ImageJ
# titles (e.g. mhgb_train12_hyper.tif), not a generic combo3_hyper.tif. By Claude 06/24/2026.
out = a.out or os.path.join(a.dir, os.path.basename(os.path.normpath(a.dir)) + "_hyper.tif")
types = [t for t in ORDER if os.path.exists(os.path.join(a.dir, f"{t}.tif"))]
if not types:
raise SystemExit(f"no per-type tiffs found in {a.dir}")
stacks = []
for t in types:
s = tifffile.imread(os.path.join(a.dir, f"{t}.tif")).astype(np.float32) # (T*Z,32,32)
nser = s.shape[0] // a.T
s = s.reshape(nser, a.T, s.shape[1], s.shape[2]) # (series,frame,32,32)
s = tile2x2(s) # (series,frame,64,64)
stacks.append(s)
nser, nfr = stacks[0].shape[0], stacks[0].shape[1]
C = len(types)
vol = np.stack(stacks, axis=2) # (T=series, Z=frame, C=type, Y, X)
# ImageJ Labels: C fastest, then Z(frame), then T(series)
labels = [f"{types[c]} s{t} f{z}" for t in range(nser) for z in range(nfr) for c in range(C)]
tifffile.imwrite(out, vol, imagej=True,
metadata={"axes": "TZCYX", "mode": "grayscale", "Labels": labels})
print(f"wrote {out} TZCYX={vol.shape} types={types} series={nser} frames={nfr}")
if __name__ == "__main__":
main()
# clean_eval_fwhm.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Clean FWHM eval: trained L2 vs L1, on NO-GAP data, bright frames only. By Claude 06/23/2026"""
import argparse, numpy as np, torch, synth, layer2_data as L1D
from layer2 import Layer2Net
def fwhm_at(field, cx, cy, G):
# roll truth to center, measure half-max width of center row & col (sub-pixel, clean interp)
f=np.roll(np.roll(field,int(round(G//2-cy)),0),int(round(G//2-cx)),1); c=G//2
def w(line):
pk=float(line[c])
if pk<=1e-6: return np.nan
h=pk/2
# right crossing
i=c
while i<len(line)-1 and line[i]>=h: i+=1
r=(i-1)+(line[i-1]-h)/(line[i-1]-line[i]) if line[i-1]>line[i] else float(i-1)
# left
i=c
while i>0 and line[i]>=h: i-=1
l=(i+1)-(line[i+1]-h)/(line[i+1]-line[i]) if line[i+1]>line[i] else float(i+1)
return r-l
return np.nanmean([w(f[c,:]),w(f[:,c])])
ap=argparse.ArgumentParser()
ap.add_argument("--l1",default="runs/weighted9_pm/model.pt")
ap.add_argument("--model",required=True); ap.add_argument("--out",required=True)
ap.add_argument("--T",type=int,default=120); ap.add_argument("--G",type=int,default=32)
a=ap.parse_args(); import os; os.makedirs(a.out,exist_ok=True)
dev="cuda" if torch.cuda.is_available() else "cpu"
net1,N,_=L1D._load_l1(a.l1,dev)
ck=torch.load(a.model,map_location=dev); ar=ck["args"]
net=Layer2Net(ch_in=3,ch_hidden=ar["ch"],grid=a.G,vmax=ar["vmax"]).to(dev)
net.load_state_dict(ck["model"]); net.eval()
rng=np.random.default_rng(777)
# CLEAN no-gap data (matches training)
frames,pos,vel,present=L1D.render_run(rng,T=a.T,G=a.G,vmax=ar["vmax"],snr=ar["snr"],gaps=False)
seq=L1D.gen_field_sequence(net1,frames,pos,a.G,N,dev)
with torch.no_grad():
det,_=net(torch.from_numpy(seq[None]).to(dev))
l2=torch.sigmoid(det)[0,:,0].cpu().numpy()
l1s=seq[:,0]
fl1,fl2,fp_away=[],[],[]
for t in range(a.T):
if not present[t]: continue
cx,cy=pos[t]; ci,cj=int(round(cy))%a.G,int(round(cx))%a.G
if l1s[t,ci,cj]>0.5: fl1.append(fwhm_at(l1s[t],cx,cy,a.G)) # L1 bright frames
if l2[t].max()>0.5:
fl2.append(fwhm_at(l2[t],cx,cy,a.G))
m=np.ones((a.G,a.G),bool); yy,xx=np.ogrid[:a.G,:a.G]
m[(((xx-cj+a.G/2)%a.G-a.G/2)**2+((yy-ci+a.G/2)%a.G-a.G/2)**2)<=9]=False
fp_away.append(float(l2[t][m].max()))
synth.save_tiff_stack(l1s,f"{a.out}/L1_s.tif"); synth.save_tiff_stack(l2,f"{a.out}/L2_det.tif")
tr=np.zeros((a.T,a.G,a.G),np.float32)
for t in range(a.T):
if present[t]: tr[t]=L1D.halfcos_bump_torus(pos[t,0],pos[t,1],a.G)
synth.save_tiff_stack(tr,f"{a.out}/truth.tif")
print(f"=== CLEAN no-gap eval of {a.model} ===")
print(f"present {int(present.sum())}/{a.T} L1 locked {len(fl1)} L2 locked {len(fl2)}")
print(f"FWHM @ target (bright frames): L1 ~ {np.nanmean(fl1):.2f} px L2 ~ {np.nanmean(fl2):.2f} px")
print(f"L2 max-FP away-from-target (locked frames): mean {np.mean(fp_away):.2f} max {np.max(fp_away):.2f}")
This diff is collapsed.
#!/usr/bin/env python3
# dense_check.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Validate full-res shift-and-stitch == per-pixel patch inference. By Claude on 06/20/2026.
Run inside the NGC container (needs model.pt + model.py).
The stride-4 RawFCN, run densely on a whole frame, emits a 1/4-res grid. To recover the
full-res per-input-pixel field (what the Java CPU inferROI produces, CuasDnnInfer.java:111),
we run the dense net on the S*S=16 (S=4) sub-pixel shifts of the zero-padded frame and
interleave the outputs. This proves that reconstruction bit-matches the per-pixel reference
and pins the padding/offset alignment before it goes into the server + Java."""
import sys
import torch
import torch.nn.functional as F
from model import RawFCN
def load(run_dir):
ck = torch.load(run_dir + "/model.pt", map_location="cpu", weights_only=False)
a = ck.get("args", {}) or {}
m = RawFCN(n_frames=a.get("nframes", 8), vel_radius=a.get("vel_radius", 5),
patch=a.get("patch", 24), velocity_mode=a.get("velocity_mode", "grid"),
vmax=a.get("vmax", 1.4))
m.load_state_dict(ck["model"])
m.eval().cuda()
return m, a.get("nframes", 8), a.get("patch", 24)
@torch.no_grad()
def per_pixel(m, x, P, region):
# Reference (mirrors CPU inferROI): for each pixel in region, run the net on its P×P
# patch (zero-padded at borders), keep the channel vector. All patches batched -> 1 forward.
# x: [N, H, W] on GPU. region = (y0, x0, h, w).
N, H, W = x.shape
half = P // 2
y0, x0, h, w = region
xp = F.pad(x, (half, half, half, half)) # [N, H+P, W+P] zero border (== CPU fill)
patches = torch.empty(h * w, N, P, P, device=x.device, dtype=x.dtype) # [npix, N, P, P]
for q in range(h * w):
cy, cx = y0 + q // w, x0 + q % w # pixel center in ORIGINAL coords
patches[q] = xp[:, cy:cy + P, cx:cx + P] # xp[cy:cy+P] == original rows [cy-half, cy+half)
out = m(patches) # [npix, C, 1, 1]
return out[:, :, 0, 0].reshape(h, w, -1) # [h, w, C]
@torch.no_grad()
def shift_stitch(m, x, P, S=4):
# Pad by half so a valid dense pass aligns output cell (oi,oj) to input pixel (S*oi, S*oj).
# Then 16 shifts (sy,sx in 0..S-1) interleave into the full-res [C,H,W] field.
N, H, W = x.shape
half = P // 2
xp = F.pad(x, (half, half, half, half)) # [N, H+P, W+P]
C = m.out_ch
full = torch.zeros(C, H, W, device=x.device, dtype=x.dtype) # [C, H, W] full-res field
for sy in range(S):
for sx in range(S):
xs = xp[:, sy:, sx:] # shift the padded frame by (sy,sx)
y = m(xs[None])[0] # [C, oH, oW] 1/4-res dense grid
oh = min(y.shape[1], (H - sy + S - 1) // S) # cells that land inside [0,H)/[0,W)
ow = min(y.shape[2], (W - sx + S - 1) // S)
full[:, sy::S, sx::S] = y[:, :oh, :ow] # interleave at stride S
return full.permute(1, 2, 0) # [H, W, C]
def compare(m, P, N, tag):
# one alignment check under the current backend/precision settings
torch.manual_seed(0)
H, W = 40, 48
dt = next(m.parameters()).dtype # match model precision (fp32 / fp64)
x = torch.randn(N, H, W, dtype=dt).cuda() # [N, H, W] random conditioned stack
region = (P // 2, P // 2, 8, 8) # interior region (y0,x0,h,w)
ref = per_pixel(m, x, P, region) # [8, 8, C]
full = shift_stitch(m, x, P) # [H, W, C]
y0, x0, h, w = region
sub = full[y0:y0 + h, x0:x0 + w] # [8, 8, C]
d = (ref - sub).abs().max().item()
rng = ref.abs().max().item()
print(f" [{tag}] max|diff|={d:.3e} out~{rng:.2f} rel={d/max(rng,1e-12):.1e} "
f"{'MATCH' if d < 1e-4 else 'mismatch'}")
if __name__ == "__main__":
run = sys.argv[1] if len(sys.argv) > 1 else "runs/weighted9_pm_s"
m, N, P = load(run)
# 1) default: cuDNN on, fp32 -> small-patch and dense convs may pick different algos
torch.backends.cudnn.enabled = True
compare(m, P, N, "cuDNN fp32")
# 2) cuDNN OFF: both paths use the same aten kernels -> isolates algorithm-choice noise
torch.backends.cudnn.enabled = False
compare(m, P, N, "cuDNN OFF fp32")
# 3) fp64: near-exact arithmetic -> proves the ALIGNMENT (geometry) is right
torch.backends.cudnn.enabled = True
compare(m.double(), P, N, "fp64")
# diag_clean.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Why is learned S jittery on clean input? Compare learned S vs EXACT MF path-sum. By Claude 06/18/2026
Usage: python diag_clean.py [model.pt] (default runs/stage1_mfs/model.pt)"""
import sys, numpy as np, torch
import synth, stage2 as S2
from model import RawFCN
dev = "cuda" if torch.cuda.is_available() else "cpu"
N, vmax, P = 9, 2.8, 52; half, Nm1, HW = P // 2, N - 1, 140
ckpt = sys.argv[1] if len(sys.argv) > 1 else "/work/runs/stage1_mfs/model.pt"
print("model:", ckpt)
s1 = RawFCN(n_frames=N, patch=P, velocity_mode="reg", vmax=vmax).to(dev)
s1.load_state_dict(torch.load(ckpt, map_location=dev)["model"]); s1.eval()
def render(V, amp, noise, seed=0):
rng = np.random.default_rng(seed)
fr = rng.standard_normal((N, HW, HW)).astype(np.float32) if noise else np.zeros((N, HW, HW), np.float32)
Hx = HW / 2 + V * Nm1 * 0.5; Hy = HW / 2; nb = 4; subs = np.arange(nb) * (1.0 / nb)
for i in range(N):
acc = np.zeros((HW, HW))
for ss in subs: acc += synth.halfcos_bump(Hx - V * (i + ss), Hy, HW, HW)
fr[i] += (amp * acc / nb).astype(np.float32)
return fr, Hx, Hy
def rough(a): return float(np.std(np.diff(a))) # pixel-to-pixel roughness (0 = perfectly smooth)
for noise, lbl in [(False, "CLEAN (no noise)"), (True, "NOISY amp=5")]:
fr, Hx, Hy = render(1.0, 5, noise)
s_t, vx_t, vy_t = S2.stage1_dense(s1, fr, dev=dev, mf_s=True)
mf = S2.mf_sum(torch.from_numpy(fr).to(dev), vx_t, vy_t, half, N) # EXACT MF along the net's own velocity
s = s_t.cpu().numpy(); vx = vx_t.cpu().numpy(); mfn = mf.cpu().numpy()
yf = int(round(Hy - half)); hxf = Hx - half; xs = np.arange(int(hxf) - 14, int(hxf) + 15)
print(f"\n===== {lbl} =====")
print(" x-off | S_learned Vx_learned S_exactMF")
for x in xs[::2]:
print(" %+4d | %8.2f %+6.3f %8.2f" % (x - hxf, s[yf, x], vx[yf, x], mfn[yf, x]))
print(" roughness(diff-std): S_learned=%.3f S_exactMF=%.3f Vx_learned=%.4f"
% (rough(s[yf, xs]), rough(mfn[yf, xs]), rough(vx[yf, xs])))
# eval_mfs.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Eval the MF-S v2 pipeline (Stage-1 native-S weight + retrained refine). By Claude on 06/18/2026"""
import numpy as np, torch
import stage2 as S2
from model import RawFCN
dev="cuda" if torch.cuda.is_available() else "cpu"; N,vmax,HW=9,2.8,140; Nm1=N-1; half=26
s1=RawFCN(n_frames=N,patch=52,velocity_mode="reg",vmax=vmax).to(dev)
s1.load_state_dict(torch.load("/work/runs/stage1_mfs/model.pt",map_location=dev)["model"]); s1.eval()
net=S2.VoteRefine().to(dev); net.load_state_dict(torch.load("/work/runs/stage2_mfs/model.pt",map_location=dev)["model"]); net.eval()
def peaks(p,th,r=2):
out=[]; H_,W_=p.shape
for y in range(r,H_-r):
for x in range(r,W_-r):
if p[y,x]>th and p[y,x]>=p[y-r:y+r+1,x-r:x+r+1].max()-1e-6: out.append((x,y,p[y,x]))
return out
for TH in (0.5,0.7):
ndet=0;ntot=0;errs=[];gh=[];rng=np.random.default_rng(123)
for t in range(15):
fr,tg=S2.gen_field(rng,HW,4,N,vmax,[3.0,8.0]); s,vx,vy=S2.stage1_dense(s1,fr,dev=dev,mf_s=True)
aS,aVx,aVy=S2.vote_scatter(s,vx,vy,Nm1); nrm=aS.max().clamp(min=1e-6); aS=aS/nrm;aVx=aVx/nrm;aVy=aVy/nrm
with torch.no_grad(): p=torch.sigmoid(net(aS,aVx,aVy)[0]).cpu().numpy()
F_=p.shape[0]; pk=peaks(p,TH)
tt=[((hx-tvx*Nm1)-half,(hy-tvy*Nm1)-half) for hx,hy,tvx,tvy in tg]; tt=[(x,y) for x,y in tt if 0<=x<F_ and 0<=y<F_]
for tx,ty in tt:
ntot+=1; near=[np.hypot(px-tx,py-ty) for px,py,pv in pk if np.hypot(px-tx,py-ty)<8]
if near: ndet+=1; errs.append(min(near))
for px,py,pv in pk:
if all(np.hypot(px-tx,py-ty)>=8 for tx,ty in tt): gh.append(pv)
print("th=%.2f: det %d/%d (%.0f%%) locerr %.2f | TRUE ghosts(>8px) %d max %.3f"%(TH,ndet,ntot,100*ndet/ntot,np.median(errs) if errs else -1,len(gh),max(gh) if gh else 0))
# export_onnx.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Export an existing model.pt checkpoint to ONNX (no retrain). # By Claude on 06/13/2026
Usage: python export_onnx.py /work/runs/weighted/model.pt"""
import sys, torch
from model import RawFCN
ckpt = sys.argv[1]
ck = torch.load(ckpt, map_location="cpu")
a = ck["args"]
N = a.get("nframes", 8); P = a.get("patch", 24); vr = a.get("vel_radius", 5)
m = RawFCN(n_frames=N, vel_radius=vr)
m.load_state_dict(ck["model"]); m.eval()
onnx_path = ckpt[:-3] + ".onnx" if ckpt.endswith(".pt") else ckpt + ".onnx"
dummy = torch.zeros(1, N, P, P)
torch.onnx.export(m, dummy, onnx_path,
input_names=["frames"], output_names=["out"],
dynamic_axes={"frames": {0: "B", 2: "H", 3: "W"}, "out": {0: "B", 2: "Hout", 3: "Wout"}},
opset_version=17)
print(f"exported {onnx_path} (frames[B,{N},H,W] -> out[B,{m.out_ch},Hout,Wout])")
# export_refine.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Export a trained Stage-2 VoteRefine checkpoint to a self-contained ONNX. By Claude on 06/18/2026
CuasStage2Infer feeds a single [1,3,H,W] tensor (accS, accVx, accVy stacked) and reads [1,3,H,W]
(det logit + Vx,Vy). So we export the inner conv stack (VoteRefine.net), which is exactly that
map. dynamo=False keeps weights INLINE (no external .onnx.data) - Java's ORT load wants one file.
Usage: python export_refine.py /work/runs/stage2_mfs/model.pt
"""
import sys, torch
from stage2 import VoteRefine
ckpt = sys.argv[1]
ck = torch.load(ckpt, map_location="cpu")
net = VoteRefine()
net.load_state_dict(ck["model"]); net.eval()
onnx_path = ckpt[:-3] + ".onnx" if ckpt.endswith(".pt") else ckpt + ".onnx"
dummy = torch.zeros(1, 3, 64, 64) # [B,3,H,W]; H,W dynamic below
torch.onnx.export(net.net, dummy, onnx_path,
input_names=["acc"], output_names=["out"],
dynamic_axes={"acc": {0: "B", 2: "H", 3: "W"}, "out": {0: "B", 2: "H", 3: "W"}},
opset_version=17, dynamo=False)
print(f"exported {onnx_path} (acc[B,3,H,W] -> out[B,3,H,W])")
#!/usr/bin/env python3
# extract_B.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
# By Claude on 06/12/2026. Extract the measured 4D ambiguity function B from a
# synthetic-run -C5P-RECT (or -RECT) rendering, verify it against ground truth,
# and solve the regularized 4D deconvolution D (Wiener) with a
# condensation-vs-noise-gain sweep.
#
# RECT layout: ROI pixel (px,py) -> 12x12 block at [1+py*12 : 12+py*12,
# 1+px*12 : 12+px*12] (11x11 velocity cells + 1px NaN gap grid).
import argparse, json
import numpy as np
import tifffile
def main():
ap = argparse.ArgumentParser()
ap.add_argument('--rect', required=True, help='-RECT.tiff rendering (synthetic run)')
ap.add_argument('--synth', required=True, help='the -CUAS-SYNTHETIC-CUAS.tiff (for frame index mapping)')
ap.add_argument('--gt', required=True, help='ground truth json')
ap.add_argument('--roi', default='160,96,320,320', help='SYNTH_ROI x,y,w,h')
ap.add_argument('--srad', type=int, default=8, help='spatial PSF half-extent, px')
ap.add_argument('--slice', type=int, default=-1, help='RECT slice (0-based); -1 = auto: newest frame t%%4==0, mid-file')
args = ap.parse_args()
gt = json.load(open(args.gt))
vrad = gt['vel_radius']; vdim = 2*vrad + 1
rx, ry, rw, rh = [int(v) for v in args.roi.split(',')]
with tifffile.TiffFile(args.synth) as tf:
synth_labels = tf.imagej_metadata['Labels'][1:] # frame t = index
with tifffile.TiffFile(args.rect) as tf:
rect_labels = tf.imagej_metadata['Labels']
nsl = len(tf.pages)
# map each rect slice to frame index of its newest data
frames = [synth_labels.index(l) for l in rect_labels]
sl = args.slice
if sl < 0: # newest frame on the 4-frame phase grid (all targets pixel-centered)
cands = [i for i, t in enumerate(frames) if t % 4 == 0 and 20 <= t]
sl = cands[len(cands)//2] if cands else nsl//2
t = frames[sl]
img = tf.asarray(key=sl)
print('rect slices %d, using slice %d (label %s, frame t=%d)' % (nsl, sl, rect_labels[sl], t))
srad = args.srad
sdim = 2*srad + 1
psfs = {}
for tg in gt['targets']:
vx_c, vy_c = tg['v_cells']
vx, vy = tg['v_pix_per_frame']
x = tg['node'][0] + vx*t
y = tg['node'][1] + vy*t
px = int(np.floor(x)) - rx
py = int(np.floor(y)) - ry
if not (srad <= px < rw-srad and srad <= py < rh-srad):
continue
# extract [sdim, sdim, vdim, vdim] patch (spatial dy, dx, vel vy, vx)
P = np.zeros((sdim, sdim, vdim, vdim), np.float32)
for dy in range(-srad, srad+1):
for dx in range(-srad, srad+1):
bx = px+dx; by = py+dy
P[dy+srad, dx+srad] = img[by*12:by*12+11, bx*12:bx*12+11]
psfs[(vx_c, vy_c)] = P
print('extracted %d target PSFs' % len(psfs))
# --- sanity: blob center at (dp=0, v=v_true)? and +-v symmetry of the static one
P0 = psfs.get((0, 0))
if P0 is not None:
c = P0[srad, srad]
iy, ix = np.unravel_index(np.argmax(c), c.shape)
print('static target: center-pixel argmax at vel (%+d,%+d) (expect 0,0), peak %.2f'
% (ix-vrad, iy-vrad, c[iy, ix]))
row = c[vrad]
asym = np.abs(row - row[::-1]).max() / row.max()
print('static target vx row:', np.round(row, 1))
print(' max +-vx asymmetry: %.3f (relative)' % asym)
# --- shift-invariance in velocity: align central targets' PSFs and compare
keys = [k for k in psfs if max(abs(k[0]), abs(k[1])) <= 2]
aligned = []
for (i, j) in keys:
P = psfs[(i, j)]
A = np.roll(np.roll(P, -j, axis=2), -i, axis=3) # shift v_true to center
# mask cells rolled across the velocity border
M = np.ones((vdim, vdim), bool)
M = np.roll(np.roll(M, -j, axis=0), -i, axis=1)
aligned.append((A, M))
ref = aligned[len(aligned)//2][0]
devs = []
for A, M in aligned:
d = (np.abs(A - ref)[:, :, M]).max() / ref.max()
devs.append(d)
print('velocity shift-invariance over %d central targets: max rel deviation %.3f'
% (len(keys), max(devs)))
# --- average PSF (central targets) = B; Wiener D sweep
B = np.mean([A for A, _ in aligned], axis=0).astype(np.float64)
B /= B.max()
# desired G: separable half-cosine, +-1 cell in all four dims
g1 = np.array([0.5, 1.0, 0.5])
G = np.zeros_like(B)
cs, cv = srad, vrad
for a in range(-1, 2):
for b in range(-1, 2):
for cc in range(-1, 2):
for d in range(-1, 2):
G[cs+a, cs+b, cv+cc, cv+d] = g1[a+1]*g1[b+1]*g1[cc+1]*g1[d+1]
Bf = np.fft.fftn(np.fft.ifftshift(B))
Gf = np.fft.fftn(np.fft.ifftshift(G))
print('\nlambda resid noise_gain out_extent(cells>10%)')
for lam in (1e-4, 1e-3, 1e-2, 0.03, 0.1, 0.3):
Df = np.conj(Bf)*Gf / (np.abs(Bf)**2 + lam)
D = np.real(np.fft.fftshift(np.fft.ifftn(Df)))
out = np.real(np.fft.fftshift(np.fft.ifftn(Df*Bf)))
resid = np.sqrt(((out-G)**2).sum()/ (G**2).sum())
ngain = np.sqrt((D**2).sum())
extent = int((out > 0.1*out.max()).sum())
print('%7.0e %7.3f %8.3f %d (G itself: %d)'
% (lam, resid, ngain, extent, int((G > 0.1).sum())))
np.save(args.rect + '.B.npy', B)
print('\nsaved averaged B to', args.rect + '.B.npy')
if __name__ == '__main__':
main()
# gap_eval.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Gap-L2 eval on HELD-OUT modulated synthetic: sharpness + coast + hallucination. By Claude 06/23/2026"""
import argparse, numpy as np, torch, synth, layer2_data as L1D
from layer2 import Layer2Net
def fwhm_at(field,cx,cy,G):
f=np.roll(np.roll(field,int(round(G//2-cy)),0),int(round(G//2-cx)),1); c=G//2
def w(l):
pk=float(l[c])
if pk<=1e-6: return np.nan
h=pk/2; i=c
while i<len(l)-1 and l[i]>=h: i+=1
r=(i-1)+(l[i-1]-h)/(l[i-1]-l[i]) if l[i-1]>l[i] else float(i-1)
i=c
while i>0 and l[i]>=h: i-=1
L=(i+1)-(l[i+1]-h)/(l[i+1]-l[i]) if l[i+1]>l[i] else float(i+1)
return r-L
return np.nanmean([w(f[c,:]),w(f[:,c])])
ap=argparse.ArgumentParser()
ap.add_argument("--l1",default="runs/weighted9_pm/model.pt"); ap.add_argument("--model",required=True)
ap.add_argument("--out",required=True); ap.add_argument("--T",type=int,default=128); ap.add_argument("--G",type=int,default=32)
ap.add_argument("--seeds",default="3000,3001,3002,3003,3004,3005")
ap.add_argument("--train_n",type=int,default=0,help="if >0: reproduce first N TRAINING sequences (one rng seed 0, sequential, == build_cache) instead of --seeds")
a=ap.parse_args(); import os; os.makedirs(a.out,exist_ok=True)
dev="cuda" if torch.cuda.is_available() else "cpu"
net1,N,_=L1D._load_l1(a.l1,dev)
ck=torch.load(a.model,map_location=dev); ar=ck["args"]
net=Layer2Net(ch_in=3,ch_hidden=ar["ch"],grid=a.G,vmax=ar["vmax"]).to(dev); net.load_state_dict(ck["model"]); net.eval()
G=a.G; seeds=[int(s) for s in a.seeds.split(",")]
# train_n mode: ONE rng(0) generating N sequences sequentially == build_cache training data. By Claude 06/23
train_rng = np.random.default_rng(0) if a.train_n>0 else None
items = list(range(a.train_n)) if a.train_n>0 else seeds
INs,L1s,L2s,TRs,SGs=[],[],[],[],[]
fl1,fl2,s_bright,s_gap,fp_dark=[],[],[],[],[]
for sd in items:
rng = train_rng if a.train_n>0 else np.random.default_rng(sd)
fr,pos,vel,pres,sig=L1D.render_run(rng,T=a.T,G=G,vmax=1.4,snr=6.0,gaps=True,bp_lo=6,bp_hi=18,duty_offset=0.2,starter_len=8,return_signal=True)
seq=L1D.gen_field_sequence(net1,fr,pos,G,N,dev)
with torch.no_grad():
det,_=net(torch.from_numpy(seq[None]).to(dev)); l2=torch.sigmoid(det)[0,:,0].cpu().numpy()
l1s=seq[:,0]
tr=np.zeros((a.T,G,G),np.float32)
for t in range(a.T):
if pres[t]: tr[t]=L1D.halfcos_bump_torus(pos[t,0],pos[t,1],G)
INs.append(fr);L1s.append(l1s);L2s.append(l2);TRs.append(tr);SGs.append(np.broadcast_to(sig[:,None,None],(a.T,G,G)).astype(np.float32))
for t in range(a.T):
if not pres[t]: continue
cx,cy=pos[t]; ci,cj=int(round(cy))%G,int(round(cx))%G
l1v=l1s[t,ci,cj]; l2v=float(l2[t,max(0,ci-1):ci+2,max(0,cj-1):cj+2].max())
if sig[t]>0.5: # BRIGHT frame
s_bright.append(l2v)
if l1v>0.5: fl1.append(fwhm_at(l1s[t],cx,cy,G))
if l2[t].max()>0.5: fl2.append(fwhm_at(l2[t],cx,cy,G))
else: # GAP frame (present, L1 starved)
s_gap.append(l2v)
m=np.ones((G,G),bool); yy,xx=np.ogrid[:G,:G]
m[(((xx-cj+G/2)%G-G/2)**2+((yy-ci+G/2)%G-G/2)**2)<=9]=False
fp_dark.append(float(l2[t][m].max()))
def cat(L): return np.concatenate(L,0)
for nm,st in [("input",INs),("L1_s",L1s),("L2_det",L2s),("truth",TRs),("signal",SGs)]:
synth.save_tiff_stack(cat(st).astype(np.float32),f"{a.out}/{nm}.tif")
_src = f"TRAINING seqs 0..{a.train_n-1}" if a.train_n>0 else f"held-out seeds {seeds}"
print(f"=== GAP-L2 eval ({a.model}) {_src} ===")
print(f"FWHM @ target (bright): L1 ~ {np.nanmean(fl1):.2f} L2 ~ {np.nanmean(fl2):.2f} px")
print(f"COAST L2 s@truth: BRIGHT frames ~ {np.mean(s_bright):.2f} GAP frames ~ {np.mean(s_gap):.2f} (want gap HIGH = coasting)")
print(f"HALLUCINATION L2 max away-from-target on GAP frames: mean {np.mean(fp_dark):.2f} max {np.max(fp_dark):.2f}")
This diff is collapsed.
#!/usr/bin/env python3
# ghost_probe.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Trajectory-alias ghost probe. # By Claude on 06/16/2026
A STATIC bright target placed at offset d from the patch center (= the output/ROI pixel) should be
SUPPRESSED (det s -> 0): it's a neighbour's target, not a fast target through me. The failure mode
(seen in the clean synthetic grid) is the net hallucinating a FAST velocity whose tail lands on the
static blob -> a ghost detection at d in the untrained off-center band. This sweeps d and prints s +
the argmax velocity, so we can compare the 24-patch (off_max=9) vs 32-patch (off_max=13) models:
the wider RF + extended off-center suppression should drive s->~0 out to a larger d."""
import argparse, numpy as np, torch, synth
from model import RawFCN
def sigmoid(z): return 1.0 / (1.0 + np.exp(-z))
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap.add_argument("ck")
ap.add_argument("--nframes", type=int, default=9)
ap.add_argument("--patch", type=int, default=24)
ap.add_argument("--vel_radius", type=int, default=5)
ap.add_argument("--vel_decimate", type=int, default=4)
ap.add_argument("--amp", type=float, default=8.0) # static target peak (training: snr*bump, snr 1..8)
a = ap.parse_args()
dev = "cuda" if torch.cuda.is_available() else "cpu"
ck = torch.load(a.ck, map_location=dev)
m = RawFCN(n_frames=a.nframes, vel_radius=a.vel_radius, patch=a.patch).to(dev)
m.load_state_dict(ck["model"]); m.eval()
P = a.patch; cx = P / 2.0; cy = P / 2.0 # deployment reference = P/2
n = 2 * a.vel_radius + 1; step = 1.0 / a.vel_decimate
ix = np.arange(n * n); vxc = (ix % n - a.vel_radius) * step; vyc = (ix // n - a.vel_radius) * step
print(f"model {a.ck} patch={a.patch} amp={a.amp} off_max={P/2 - 2 - 1:.0f}px")
print(f" STATIC target at offset d along +x; want s->0 as d grows (ghost = high s + fast v)")
print(f"{'d':>4} {'s':>7} {'argmax v (cells)':>17} {'|v| px':>7}")
for d in range(0, P // 2):
frames = np.empty((a.nframes, P, P), dtype=np.float32)
bump = (a.amp * synth.halfcos_bump(cx + d, cy, P, P)).astype(np.float32)
for i in range(a.nframes):
frames[i] = bump # static: identical every frame
with torch.no_grad():
out = m(torch.from_numpy(frames[None]).to(dev))[0, :, 0, 0].cpu().numpy()
s = sigmoid(out[0]); k = int(np.argmax(out[1:1 + n * n]))
vx, vy = vxc[k], vyc[k]
print(f"{d:4d} {s:7.3f} ({vx:+.2f},{vy:+.2f}) {np.hypot(vx, vy):6.3f}")
This diff is collapsed.
# l1_samples.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Multi-sample L1-output viewer, 2x2-tiled (64x64) only. By Claude on 06/23/2026
Runs frozen L1 over several synthetic gap-runs and writes 2x2-tiled stacks so Andrey can scrub L1
behavior across samples in Fiji (seam through the center cross => torus continuity visible; no need
for the single 32x32). Per quantity, pages = nsamples*T concatenated. Channels: input, L1 s, truth
marker, signal (1=bump rendered / 0=gap). The point: SEE how L1's s-field clears noise near the
target when present, and how noise returns in a gap (coast-a-gap = higher noise)."""
import argparse
import numpy as np
import torch
import synth
import layer2_data as L1D
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--l1", default="runs/weighted9_pm/model.pt")
ap.add_argument("--T", type=int, default=64); ap.add_argument("--G", type=int, default=32)
ap.add_argument("--nsamples", type=int, default=6)
ap.add_argument("--vmax", type=float, default=1.4); ap.add_argument("--snr", type=float, default=6.0)
ap.add_argument("--gaps", action="store_true")
ap.add_argument("--bp_lo", type=int, default=3); ap.add_argument("--bp_hi", type=int, default=9)
ap.add_argument("--duty_offset", type=float, default=-0.3); ap.add_argument("--starter_len", type=int, default=8)
ap.add_argument("--out", default="runs/l1_samples")
a = ap.parse_args()
import os; os.makedirs(a.out, exist_ok=True)
dev = "cuda" if torch.cuda.is_available() else "cpu"
net, N, meta = L1D._load_l1(a.l1, dev)
G, T = a.G, a.T
rkw = dict(gaps=a.gaps, bp_lo=a.bp_lo, bp_hi=a.bp_hi, duty_offset=a.duty_offset, starter_len=a.starter_len)
def tile2x2(st): # [T,G,G] -> [T,2G,2G]
return np.tile(st, (1, 2, 2))
inputs, sfields, truths, signals = [], [], [], []
print(f"L1 {a.l1} N={N}; {a.nsamples} samples T={T} gaps={a.gaps}", flush=True)
for k in range(a.nsamples):
rng = np.random.default_rng(100 + k) # distinct, reproducible per sample
frames, pos, vel, present, signal = L1D.render_run(rng, T=T, G=G, vmax=vmax_(a), snr=a.snr,
return_signal=True, **rkw)
seq = L1D.gen_field_sequence(net, frames, pos, G, N, dev) # [T,3,G,G]
truth = np.zeros((T, G, G), np.float32)
for t in range(T):
if present[t]:
truth[t] = L1D.halfcos_bump_torus(pos[t, 0], pos[t, 1], G)
inputs.append(tile2x2(frames))
sfields.append(tile2x2(seq[:, 0]))
truths.append(tile2x2(truth))
signals.append(tile2x2(np.broadcast_to(signal[:, None, None], (T, G, G)).astype(np.float32)))
ng = int((present > 0).sum()); ngap = int(((present > 0) & (signal < 0.5)).sum())
print(f" sample {k}: present {ng}/{T}, gap {ngap}", flush=True)
# concatenate samples along the page axis -> one scrubable stack per quantity
synth.save_tiff_stack(np.concatenate(inputs, 0), f"{a.out}/input_2x2.tif")
synth.save_tiff_stack(np.concatenate(sfields, 0), f"{a.out}/s_2x2.tif")
synth.save_tiff_stack(np.concatenate(truths, 0), f"{a.out}/truth_2x2.tif")
synth.save_tiff_stack(np.concatenate(signals, 0), f"{a.out}/signal_2x2.tif")
print(f"wrote {a.out}/{{input,s,truth,signal}}_2x2.tif "
f"({a.nsamples}x{T}={a.nsamples*T} pages, 64x64, 32-bit; sample k = pages [k*{T},(k+1)*{T}))", flush=True)
def vmax_(a):
return a.vmax
if __name__ == "__main__":
main()
# l2_fp_analysis.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""L2 false-positive + P_d analysis vs UAS flight-log truth, per pyramid level. By Claude on 06/24/2026.
PLUMBING/CORRECTNESS first pass (not final numbers). For one sequence dir (the center's vNNN dir), it:
- finds every *-OFFSET-<model>.tiff (level from "-LEVn-"; an untagged legacy file = level 0),
- reads only the s-channel pages (label starts with "s:") one at a time (memory-safe; LEV0 is ~3GB),
- matches each frame to the UAS truth in *-UAS_DATA.tsv by timestamp,
- applies the clean-sky ROI geometry (see project_l2_fp_measurement), counts FP "blobs" (local maxima
of s above threshold, numpy-only) per pixel-hectare in the HIGH/LOW sky zones, excluding a disk around
the UAS truth on IN-FoV frames, and scores P_d (UAS detected within that disk),
- sweeps the s-threshold and prints a per-level table.
UAS not always on screen is handled by the TSV `status` (IN FoV / OUT OF FoV / no entry): P_d only on IN-FoV
frames; other frames contribute to FP only. A sequence with no IN-FoV frames -> pure FP (P_d = n/a).
Run: /home/elphel/.venvs/c5p/bin/python l2_fp_analysis.py --dir <.../center/vNNN> [--model mexhat_gaps_boost40]
"""
import argparse, glob, os, re, numpy as np, tifffile
# --- clean-sky FP geometry (per-sequence; the 620->560m UAS clip family) -----------------------------
ROI = (42, 45, 555, 270) # clean ROI x,y,w,h -> x in [42,597), y in [45,315)
SIGN = (27, 200, 123, 135) # bomb-sign + rotation artefacts exclusion x,y,w,h (right edge 150: artefacts reach x=149). By Claude 06/25/2026
SKY_SPLIT = 230 # high sky y<230, low sky y>=230
HECTARE = 100.0 * 100.0 # 10000 px
def build_zone_masks(H, W):
base = np.zeros((H, W), bool)
x0, y0, w, h = ROI
base[y0:y0 + h, x0:x0 + w] = True
sx, sy, sw, sh = SIGN
base[sy:sy + sh, sx:sx + sw] = False
high = base.copy(); high[SKY_SPLIT:, :] = False
low = base.copy(); low[:SKY_SPLIT, :] = False
return high, low
def local_maxima(s, thr):
"""boolean map: strict-enough 3x3 local maxima with s > thr (one mark per blob). numpy-only."""
from numpy.lib.stride_tricks import sliding_window_view
sp = np.pad(s, 1, mode="constant", constant_values=-np.inf)
nmax = sliding_window_view(sp, (3, 3)).max(axis=(2, 3)) # 3x3 neighborhood max
return (s >= nmax) & (s > thr)
def norm_ts(label):
"""'s:1773135520_851518-0 f8' or '1773135519.534413' -> '1773135519.534413' (bare ts string)."""
t = label.replace("_", ".")
m = re.search(r"\d{5,10}\.\d{6}", t)
return m.group(0) if m else t
def load_truth(tsv_path):
"""ts(str, rounded) -> (status, px, py). Only rows with a parseable ts."""
truth = {}
with open(tsv_path) as f:
header = f.readline()
for line in f:
c = line.rstrip("\n").split("\t")
if len(c) < 5:
continue
ts = norm_ts(c[1])
try:
key = round(float(ts), 6)
except ValueError:
continue
status = c[2]
px = float(c[3]) if c[3] else np.nan
py = float(c[4]) if c[4] else np.nan
truth[key] = (status, px, py)
return truth
def level_of(path):
m = re.search(r"-LEV(\d+)-", os.path.basename(path))
return int(m.group(1)) if m else 0 # untagged legacy file = level 0
def analyze_level(tiff_path, truth, thresholds, disk_r, dbg=False):
"""Return per-threshold dict of accumulated FP blobs / valid hectares / P_d counts."""
disk_r2 = disk_r * disk_r
acc = {t: dict(fp_hi=0, fp_lo=0, ha_hi=0.0, ha_lo=0.0, pd_hit=0, pd_tot=0) for t in thresholds}
matched = 0
unmatched = 0
with tifffile.TiffFile(tiff_path) as tf:
labels = (tf.imagej_metadata or {}).get("Labels") or []
H, W = tf.pages[0].shape
high0, low0 = build_zone_masks(H, W)
yy, xx = np.ogrid[:H, :W]
for i, page in enumerate(tf.pages):
lab = labels[i] if i < len(labels) else ""
if not lab.startswith("s:"): # s-channel pages only
continue
key = None
try:
key = round(float(norm_ts(lab)), 6)
except ValueError:
pass
tr = truth.get(key)
if tr is None:
unmatched += 1
else:
matched += 1
s = np.asarray(page.asarray(), dtype=np.float32)
valid = np.isfinite(s)
s = np.where(valid, s, -np.inf)
# target disk (only on IN-FoV frames)
in_fov = (tr is not None) and (tr[0] == "IN FoV") and np.isfinite(tr[1]) and np.isfinite(tr[2])
if in_fov:
px, py = tr[1], tr[2]
disk = ((xx - px) ** 2 + (yy - py) ** 2) <= disk_r2
else:
disk = np.zeros((H, W), bool)
hi = high0 & valid
lo = low0 & valid
for t in thresholds:
peaks = local_maxima(s, t)
# FP = peaks in zone, outside target disk
acc[t]["fp_hi"] += int(np.count_nonzero(peaks & hi & ~disk))
acc[t]["fp_lo"] += int(np.count_nonzero(peaks & lo & ~disk))
acc[t]["ha_hi"] += np.count_nonzero(hi) / HECTARE
acc[t]["ha_lo"] += np.count_nonzero(lo) / HECTARE
if in_fov:
acc[t]["pd_tot"] += 1
if np.any((s > t) & disk):
acc[t]["pd_hit"] += 1
return acc, matched, unmatched
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--dir", required=True, help="center vNNN dir with -OFFSET tiffs + -UAS_DATA.tsv")
ap.add_argument("--model", default="mexhat_gaps_boost40")
ap.add_argument("--disk", type=float, default=6.0, help="target-disk radius (px) for P_d / FP exclusion")
ap.add_argument("--thr", default="0.1,0.15,0.2,0.25,0.3,0.35,0.4,0.45,0.5,0.6")
a = ap.parse_args()
thresholds = [float(x) for x in a.thr.split(",")]
tsvs = glob.glob(os.path.join(a.dir, "*-UAS_DATA.tsv"))
truth = load_truth(tsvs[0]) if tsvs else {}
print(f"truth: {'<none>' if not tsvs else os.path.basename(tsvs[0])} rows={len(truth)} "
f"IN-FoV={sum(1 for v in truth.values() if v[0]=='IN FoV')}")
tiffs = sorted(glob.glob(os.path.join(a.dir, f"*-OFFSET-{a.model}.tiff")), key=level_of)
if not tiffs:
raise SystemExit(f"no *-OFFSET-{a.model}.tiff in {a.dir}")
seq = os.path.basename(tiffs[0]).split("-SUBAVG")[0] # center name, for cross-sequence concat
rows = [] # reusable summary rows (raw counts + derived), one per (level, threshold)
for f in tiffs:
lev = level_of(f)
acc, matched, unmatched = analyze_level(f, truth, thresholds, a.disk)
print(f"\n=== LEV{lev} (frames matched={matched} unmatched={unmatched}) {os.path.basename(f)[:48]}... ===")
print(f"{'thr':>5} {'FP/ha_hi':>9} {'FP/ha_lo':>9} {'P_d':>6}")
for t in thresholds:
d = acc[t]
fph = d["fp_hi"] / d["ha_hi"] if d["ha_hi"] > 0 else float("nan")
fpl = d["fp_lo"] / d["ha_lo"] if d["ha_lo"] > 0 else float("nan")
pd = (d["pd_hit"] / d["pd_tot"]) if d["pd_tot"] > 0 else float("nan")
print(f"{t:5.2f} {fph:9.3f} {fpl:9.3f} {pd:6.2f}")
rows.append((seq, lev, t, d["fp_hi"], d["ha_hi"], fph, d["fp_lo"], d["ha_lo"], fpl,
d["pd_hit"], d["pd_tot"], pd, matched, unmatched))
# reusable summary CSV in the same dir (raw counts kept so densities/P_d can be re-aggregated
# across sequences without re-reading the tiffs). By Claude on 06/24/2026
out = os.path.join(a.dir, f"{seq}-L2FP-{a.model}.csv")
with open(out, "w") as fo:
fo.write(f"# L2 FP/P_d summary model={a.model} disk_r={a.disk}px "
f"ROI={ROI} SIGN={SIGN} sky_split_y={SKY_SPLIT} hectare={int(HECTARE)}px\n")
fo.write("seq,level,thr,fp_hi,ha_hi,fp_per_ha_hi,fp_lo,ha_lo,fp_per_ha_lo,pd_hit,pd_tot,pd,matched,unmatched\n")
for r in rows:
fo.write(",".join(str(x) for x in r) + "\n")
print(f"\nsummary -> {out}")
if __name__ == "__main__":
main()
This diff is collapsed.
This diff is collapsed.
# layer2_eval.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""C5P Layer-2 testing/visualization on same-generator input. By Claude 06/22/2026
Loads a trained Layer2Net + frozen L1, generates a same-generator test sequence (render_run ->
frozen-L1 field), runs L2, and writes 32-bit float TIFF stacks AND 2x2-tiled versions (seam =
center cross, same check we used for L1) so the L2 output can be watched the same way:
L1_s = L1 input confidence field
L2_det = L2 track-before-detect output (sigmoid)
L2_s_v = L2 detection masked-overlay of |V| (optional sanity of velocity field)
truth = target-shape bump at truth position
Also prints per-frame lock / FP metrics. Run on DGX:
python layer2_eval.py --l2 runs/l2_v1/model.pt --T 120 --out runs/l2_v1/test
"""
import argparse
import numpy as np
import torch
import synth
import layer2_data as L1D
from layer2 import Layer2Net
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--l1", default="runs/weighted9_pm/model.pt")
ap.add_argument("--l2", default="runs/l2_v1/model.pt")
ap.add_argument("--T", type=int, default=120); ap.add_argument("--seed", type=int, default=777)
ap.add_argument("--snr", type=float, default=6.0); ap.add_argument("--out", default="runs/l2_v1/test")
a = ap.parse_args()
import os; os.makedirs(a.out, exist_ok=True)
dev = "cuda" if torch.cuda.is_available() else "cpu"
net1, N, _ = L1D._load_l1(a.l1, dev)
ck = torch.load(a.l2, map_location=dev); la = ck.get("args", {})
G = la.get("G", 32); vmax = la.get("vmax", 1.4)
net2 = Layer2Net(ch_in=3, ch_hidden=la.get("ch", 24), grid=G, vmax=vmax).to(dev)
net2.load_state_dict(ck["model"]); net2.eval()
print(f"L1={a.l1} (N={N}) L2={a.l2} (ch={la.get('ch',24)}, G={G}) dev={dev}", flush=True)
rng = np.random.default_rng(a.seed)
frames, pos, vel, present = L1D.render_run(rng, T=a.T, G=G, vmax=vmax, snr=a.snr)
seq = L1D.gen_field_sequence(net1, frames, pos, G, N, dev) # [T,3,G,G]
with torch.no_grad():
det_logit, velo = net2(torch.from_numpy(seq[None]).to(dev))
l2det = torch.sigmoid(det_logit)[0, :, 0].cpu().numpy() # [T,G,G]
l2vx = velo[0, :, 0].cpu().numpy(); l2vy = velo[0, :, 1].cpu().numpy() # [T,G,G] px/level-frame
truth = np.zeros((a.T, G, G), np.float32)
for t in range(a.T):
if present[t]:
truth[t] = L1D.halfcos_bump_torus(pos[t, 0], pos[t, 1], G)
L1_s = seq[:, 0]
stacks = {"L1_s": L1_s, "L2_det": l2det, "L2_Vx": l2vx, "L2_Vy": l2vy, "truth": truth}
for nm, st in stacks.items():
synth.save_tiff_stack(st, f"{a.out}/{nm}.tif")
synth.save_tiff_stack(np.tile(st, (1, 2, 2)), f"{a.out}/{nm}_2x2.tif")
print(f"wrote {a.out}/{{L1_s,L2_det,L2_Vx,L2_Vy,truth}}{{,_2x2}}.tif ({a.T} pages, 32-bit float)", flush=True)
# velocity accuracy: L2 (Vx,Vy) read at the detected peak vs truth velocity
verr = []
for t in range(a.T):
if not present[t]:
continue
pj = int(np.argmax(l2det[t])); py, px = divmod(pj, G)
verr.append((l2vx[t, py, px] - vel[t, 0], l2vy[t, py, px] - vel[t, 1])) # vel is per-frame [T,2]
verr = np.array(verr[3:]) # skip pre-lock
vp = vel[present > 0]
print(f"truth vel (mean over present) = ({vp[:,0].mean():+.3f},{vp[:,1].mean():+.3f}) px/level-frame; "
f"L2 vel@peak error: mean |dV| {np.hypot(verr[:,0],verr[:,1]).mean():.3f} "
f"(bias {verr[:,0].mean():+.3f},{verr[:,1].mean():+.3f})", flush=True)
L1D._tiled_montage(np.tile(l2det, (1, 2, 2)), G, f"{a.out}/L2_det_2x2.png")
# L1 -> L2 -> truth comparison montage (the cloud->clean-blob transformation)
import matplotlib; matplotlib.use("Agg"); import matplotlib.pyplot as plt
idx = np.linspace(0, a.T - 1, 7).astype(int)
fig, axs = plt.subplots(3, 7, figsize=(15, 6.5))
for j, t in enumerate(idx):
for r, (img, lab) in enumerate([(L1_s[t], "L1 s"), (l2det[t], "L2 det"), (truth[t], "truth")]):
axs[r, j].imshow(img, vmin=0, vmax=1, cmap="magma", origin="upper")
axs[r, j].set_xticks([]); axs[r, j].set_yticks([])
if j == 0: axs[r, j].set_ylabel(lab, fontsize=11)
axs[0, j].set_title(f"f{t} {'tgt' if present[t] else 'noise'}", fontsize=9)
fig.suptitle("L1 input (clouds) -> L2 detection (clean) -> truth")
fig.tight_layout(); fig.savefig(f"{a.out}/compare.png", dpi=90); plt.close(fig)
print(f"wrote {a.out}/compare.png", flush=True)
# per-frame metrics: s@truth (lock) vs HONEST FP = max bg excluding a radius-R disk around truth
yy, xx = np.mgrid[0:G, 0:G]; R = 4.0
print("\nframe present s@truth FP(>4px from tgt) note", flush=True)
locked = None; onset = int(np.argmax(present)) if present.any() else 0
fp_present = []
for t in range(a.T):
core = truth[t] > 0.3
sat = float(l2det[t][core].mean()) if core.any() else float('nan')
if present[t]:
dx = (xx - pos[t, 0] + G / 2) % G - G / 2; dy = (yy - pos[t, 1] + G / 2) % G - G / 2
far = (dx * dx + dy * dy) > R * R # exclude target neighborhood
else:
far = np.ones((G, G), bool)
fp = float(l2det[t][far].max()) if far.any() else 0.0
if present[t]: fp_present.append(fp)
note = ""
if present[t] and locked is None and sat > 0.5:
locked = t; note = f"<- LOCK (+{t - onset} fr)"
if t % 8 == 0 or note:
print(f"{t:4d} {'tgt' if present[t] else 'noise':5s} {sat:6.3f} {fp:6.3f} {note}", flush=True)
print(f"\nlock frame: {locked} (onset {onset}); FP on locked frames: "
f"mean {np.mean(fp_present[2:] or [0]):.3f} max {np.max(fp_present[2:] or [0]):.3f}", flush=True)
if __name__ == "__main__":
main()
# layer2_gapcheck.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""C5P Layer-2 gap-coast diagnostic — the direct test for the wild-test failure. By Claude 06/22/2026
The 2026-06-22 wild test found L2 drops the track the instant the L1 signal does (v1 had no gaps in
training, so the recurrence never learned to coast). This script builds a DETERMINISTIC run with a
single, explicit signal GAP in the middle of an otherwise clean straight track — no clutter, no
death, no maneuver — runs frozen L1 then trained L2, and reports, frame by frame:
L1 s@truth — frozen-L1 confidence AT the (moving) truth position. COLLAPSES inside the gap.
L2 s@truth — trained-L2 confidence AT the truth position. Should COAST (stay high).
L2 pos-err — wrapped distance from L2 peak to truth (px). Should stay small in the gap.
PASS = L2 holds the track through the gap while L1 has gone dark. That is the memory doing its job.
Run on DGX: python layer2_gapcheck.py --l2 runs/l2_v2/model.pt --gap 18 26
"""
import argparse
import numpy as np
import torch
import synth
import layer2_data as L1D
from layer2 import Layer2Net
def render_gap(T, G, vmax, snr, gap, onset=8, seed=0):
"""Clean straight track with ONE explicit signal gap [gap0,gap1). By Claude 06/22
Returns frames[T,G,G], pos[T,2] (truth, defined from onset), vel[2], present[T], gapmask[T].
The target keeps MOVING through the gap (truth pos advances); only the rendered bump is removed,
so the frozen 9-frame L1 window is starved while truth-present stays 1."""
rng = np.random.default_rng(seed)
frames = rng.standard_normal((T, G, G)).astype(np.float32)
pos = np.full((T, 2), np.nan, np.float32)
present = np.zeros(T, np.float32)
gapmask = np.zeros(T, bool); gapmask[gap[0]:gap[1]] = True
vx, vy = L1D._sample_velocity(rng, vmax)
x0 = rng.uniform(0, G); y0 = rng.uniform(0, G)
for t in range(onset, T):
k = t - onset
cx = (x0 + vx * k) % G; cy = (y0 + vy * k) % G
pos[t] = (cx, cy); present[t] = 1.0
if not gapmask[t]: # gap => bump NOT rendered (L1 starves)
frames[t] += snr * L1D.halfcos_bump_torus(cx, cy, G)
return frames, pos, np.array([vx, vy], np.float32), present, gapmask
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--l1", default="runs/weighted9_pm/model.pt")
ap.add_argument("--l2", default="runs/l2_v2/model.pt")
ap.add_argument("--T", type=int, default=40); ap.add_argument("--gap", type=int, nargs=2, default=[18, 26])
ap.add_argument("--snr", type=float, default=6.0); ap.add_argument("--seed", type=int, default=0)
ap.add_argument("--out", default="runs/l2_v2/gapcheck")
a = ap.parse_args()
import os; os.makedirs(a.out, exist_ok=True)
dev = "cuda" if torch.cuda.is_available() else "cpu"
net1, N, _ = L1D._load_l1(a.l1, dev)
ck = torch.load(a.l2, map_location=dev); la = ck.get("args", {})
G = la.get("G", 32); vmax = la.get("vmax", 1.4)
net2 = Layer2Net(ch_in=3, ch_hidden=la.get("ch", 24), grid=G, vmax=vmax).to(dev)
net2.load_state_dict(ck["model"]); net2.eval()
print(f"L1={a.l1} (N={N}) L2={a.l2} (ch={la.get('ch',24)}, G={G}) gap={a.gap} dev={dev}", flush=True)
frames, pos, vel, present, gap = render_gap(a.T, G, vmax, a.snr, a.gap, seed=a.seed)
seq = L1D.gen_field_sequence(net1, frames, pos, G, N, dev) # [T,3,G,G]
with torch.no_grad():
det_logit, _ = net2(torch.from_numpy(seq[None]).to(dev))
l2 = torch.sigmoid(det_logit)[0, :, 0].cpu().numpy() # [T,G,G]
l1s = seq[:, 0]
truth = np.zeros((a.T, G, G), np.float32)
for t in range(a.T):
if present[t]: truth[t] = L1D.halfcos_bump_torus(pos[t, 0], pos[t, 1], G)
for nm, st in [("L1_s", l1s), ("L2_det", l2), ("truth", truth)]:
synth.save_tiff_stack(st, f"{a.out}/{nm}.tif")
print(f"wrote {a.out}/{{L1_s,L2_det,truth}}.tif ({a.T} pages)\n", flush=True)
print("frame present in_gap L1 s@truth L2 s@truth L2 pos-err", flush=True)
pre, ingap = [], []
for t in range(a.T):
core = truth[t] > 0.3
l1v = float(l1s[t][core].mean()) if core.any() else float('nan')
l2v = float(l2[t][core].mean()) if core.any() else float('nan')
pj = int(np.argmax(l2[t])); py, px = divmod(pj, G)
perr = L1D.wrapped_err(np.array([px, py], float), pos[t], G) if present[t] else float('nan')
flag = "GAP" if gap[t] else ("tgt" if present[t] else "noise")
print(f"{t:4d} {int(present[t])} {flag:5s} {l1v:7.3f} {l2v:7.3f} {perr:6.2f}", flush=True)
if present[t] and not gap[t] and t < a.gap[0]:
pre.append((l1v, l2v))
if gap[t]:
ingap.append((l1v, l2v, perr))
pre = np.array(pre); ingap = np.array(ingap)
l1_drop = ingap[:, 0].mean(); l2_hold = ingap[:, 1].mean(); perr_gap = np.nanmean(ingap[:, 2])
print(f"\nbefore gap (locked): L1 s@truth {pre[:,0].mean():.3f} L2 s@truth {pre[:,1].mean():.3f}", flush=True)
print(f"inside gap : L1 s@truth {l1_drop:.3f} L2 s@truth {l2_hold:.3f} L2 pos-err {perr_gap:.2f}px", flush=True)
# PASS: L1 collapsed in the gap (signal truly gone) AND L2 coasted (held high, near truth)
coasted = (l1_drop < 0.5) and (l2_hold > 0.5) and (perr_gap < 4.0)
print("COAST PASS — L2 holds the track through the gap while L1 goes dark." if coasted
else "COAST FAIL — L2 did not coast (still follows the live L1 signal). Train w/ stronger gaps.",
flush=True)
if __name__ == "__main__":
main()
# layer2_train.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""C5P Layer-2 v1 training: track-before-detect recurrent over frozen-L1 fields. By Claude 06/22/2026
(a) first end-to-end L2 train on the CLEAN steady runs (Andrey 06/21). Layer 1 is FROZEN, so the
field sequences are PRE-COMPUTED ONCE into a cache (stage2.py pattern), then Layer2Net trains fast
on the cache via BPTT. Supervision per frame:
- target PRESENT -> det = toroidal Gaussian bump at truth pos; (Vx,Vy) at the bump support;
- target ABSENT (noise prefix) -> det = 0 everywhere == the false-positive-suppression signal.
The net must (i) stay quiet on the noise prefix + clutter clouds, (ii) lock onto the faded-in
target as coherent evidence accumulates (survival), (iii) hold + report velocity across seams.
Step rate = raw per-level-frame (decimation / stride-2-overlap parked for later). Run on DGX:
python layer2_train.py --l1 runs/weighted9_pm/model.pt --nseq 128 --T 48 --steps 4000 --out runs/l2_v1
"""
import argparse
import numpy as np
import torch
import torch.nn.functional as F
import synth
import layer2_data as L1D
from layer2 import Layer2Net, bump_target, layer2_loss
def build_cache(net, N, nseq, T, G, vmax, snr, dev, seed=0, render_kw=None):
"""Pre-compute nseq frozen-L1 field sequences + truth. By Claude 06/22
Returns lists of: seq[T,3,G,G], pos[T,2], vel[2], present[T] (all torch on dev).
render_kw overrides render_run realism knobs (e.g. --v1 zeroes gaps/clutter). By Claude 06/22"""
rng = np.random.default_rng(seed)
render_kw = render_kw or {}
cache = []
for k in range(nseq):
frames, pos, vel, present = L1D.render_run(rng, T=T, G=G, vmax=vmax, snr=snr, **render_kw)
seq = L1D.gen_field_sequence(net, frames, pos, G, N, dev) # [T,3,G,G]
cache.append((torch.from_numpy(seq).to(dev),
torch.from_numpy(np.nan_to_num(pos)).float().to(dev),
torch.from_numpy(vel).to(dev),
torch.from_numpy(present).to(dev)))
if (k + 1) % 32 == 0:
print(f" cache {k+1}/{nseq}", flush=True)
return cache
def make_targets(batch, G, dev, sigma=1.5):
"""Stack a minibatch -> (seq, det_t, vel_t, present). By Claude 06/22
det_t: bump at truth, zeroed on absent frames. vel_t: constant per-seq velocity broadcast."""
seq = torch.stack([b[0] for b in batch], 0) # [B,T,3,G,G]
pos = torch.stack([b[1] for b in batch], 0) # [B,T,2]
vel = torch.stack([b[2] for b in batch], 0) # [B,T,2] per-frame (v2: maneuver)
present = torch.stack([b[3] for b in batch], 0) # [B,T] truth-present (1 thru gaps)
det_t = bump_target(pos, G, sigma=sigma, device=dev) # [B,T,1,G,G]
det_t = det_t * present[:, :, None, None, None] # zero where no target (prefix/death)
B, T = present.shape
vel_t = vel[:, :, :, None, None].expand(B, T, 2, G, G).contiguous() # [B,T,2,G,G]
return seq, det_t, vel_t, present
def evaluate(net, batch, G, dev, sigma=1.5):
"""Per-seq lock/FP metrics over a held-out batch. By Claude 06/22
sigma must match training supervision so the 'core' mask matches the bump. By Claude 06/22"""
seq, det_t, vel_t, present = make_targets(batch, G, dev, sigma=sigma)
with torch.no_grad():
det_logit, vel = net(seq)
p = torch.sigmoid(det_logit)[:, :, 0] # [B,T,G,G]
pres = present.bool()
core = det_t[:, :, 0] > 0.3 # [B,T,G,G] truth bump core
peak = float(p[core].mean()) if core.any() else 0.0 # s at truth (want ->1)
absent = ~pres # noise-prefix frames
bg_absent = float(p[absent].amax()) if absent.any() else 0.0 # worst FP on noise (want ->0)
# background on PRESENT frames, away from the target (clutter-cloud FP)
bg_present = float((p * (~core) * pres[:, :, None, None]).amax())
return peak, bg_absent, bg_present
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--l1", default="runs/weighted9_pm/model.pt")
ap.add_argument("--nseq", type=int, default=128); ap.add_argument("--T", type=int, default=48)
ap.add_argument("--G", type=int, default=32); ap.add_argument("--vmax", type=float, default=1.4)
ap.add_argument("--snr", type=float, default=6.0); ap.add_argument("--steps", type=int, default=4000)
ap.add_argument("--bs", type=int, default=8); ap.add_argument("--lr", type=float, default=2e-3)
ap.add_argument("--ch", type=int, default=24); ap.add_argument("--out", default="runs/l2_v1")
# --- loss / supervision knobs (Option A artifact check). By Claude 06/22 ---
ap.add_argument("--sigma", type=float, default=1.5, help="supervision bump sigma (A: try 0.7-0.8)")
ap.add_argument("--pos_weight", type=float, default=30.0, help="BCE positive weight (A: try 3-8 AFTER sigma)")
ap.add_argument("--v1", action="store_true", help="v1-equivalent data: no gaps/maneuver/abrupt/death/clutter (isolate the sigma sweep)")
a = ap.parse_args()
import os; os.makedirs(a.out, exist_ok=True)
dev = "cuda" if torch.cuda.is_available() else "cpu"
# --v1 zeroes the v2 realism knobs so Option A varies ONLY the supervision width. By Claude 06/22
render_kw = dict(p_gap=0.0, p_maneuver=0.0, p_abrupt=0.0, p_death=0.0, clutter_n=(0, 0)) if a.v1 else {}
net1, N, _ = L1D._load_l1(a.l1, dev)
print(f"L1 {a.l1} N={N}; precomputing {a.nseq} field sequences (T={a.T}, G={a.G}) "
f"sigma={a.sigma} pos_weight={a.pos_weight} v1={a.v1}...", flush=True)
cache = build_cache(net1, N, a.nseq, a.T, a.G, a.vmax, a.snr, dev, render_kw=render_kw)
nval = max(8, a.nseq // 8); val = cache[:nval]; train = cache[nval:]
print(f"cache: {len(train)} train / {len(val)} val sequences", flush=True)
net = Layer2Net(ch_in=3, ch_hidden=a.ch, grid=a.G, vmax=a.vmax).to(dev)
opt = torch.optim.Adam(net.parameters(), a.lr)
nparams = sum(p.numel() for p in net.parameters())
print(f"Layer2Net {nparams} params; training {a.steps} steps bs={a.bs}", flush=True)
rng = np.random.default_rng(1)
for step in range(1, a.steps + 1):
idx = rng.integers(0, len(train), a.bs)
seq, det_t, vel_t, present = make_targets([train[i] for i in idx], a.G, dev, sigma=a.sigma)
det_logit, vel = net(seq)
loss, comp = layer2_loss(det_logit, vel, det_t, vel_t, pos_weight=a.pos_weight)
opt.zero_grad(); loss.backward(); opt.step()
if step % 250 == 0 or step == 1:
peak, bga, bgp = evaluate(net, val, a.G, dev, sigma=a.sigma)
print(f"step {step:5d} det {comp['det']:.4f} vel {comp['vel']:.4f} | "
f"val: s@truth {peak:.3f} max-FP(noise) {bga:.3f} max-FP(clutter) {bgp:.3f}", flush=True)
torch.save({"model": net.state_dict(), "args": vars(a)}, f"{a.out}/model.pt")
print(f"saved {a.out}/model.pt", flush=True)
# eval viz: run trained L2 on a fresh sequence, dump tiffs to watch track-before-detect
rng2 = np.random.default_rng(777)
frames, pos, vel, present = L1D.render_run(rng2, T=120, G=a.G, vmax=a.vmax, snr=a.snr)
seq = L1D.gen_field_sequence(net1, frames, pos, a.G, N, dev)
with torch.no_grad():
det_logit, velo = net(torch.from_numpy(seq[None]).to(dev))
l2s = torch.sigmoid(det_logit)[0, :, 0].cpu().numpy() # [T,G,G] L2 detection
truth = np.zeros((120, a.G, a.G), np.float32)
for t in range(120):
if present[t]: truth[t] = L1D.halfcos_bump_torus(pos[t, 0], pos[t, 1], a.G)
synth.save_tiff_stack(seq[:, 0], f"{a.out}/L1_s.tif") # L1 input field
synth.save_tiff_stack(l2s, f"{a.out}/L2_det.tif") # L2 track-before-detect output
synth.save_tiff_stack(truth, f"{a.out}/truth.tif")
print(f"wrote {a.out}/{{L1_s,L2_det,truth}}.tif (120 pages) — compare L2_det vs L1_s vs truth", flush=True)
if __name__ == "__main__":
main()
This diff is collapsed.
This diff is collapsed.
# layer2p.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""C5P Layer-2 PARAMETRIC head (position + presence), known-width readout. By Claude on 06/22/2026
Endpoint of the "we KNOW the target is FWHM 2.0" decision (Andrey 06/22). Instead of the dense
sigmoid-bump detector (Layer2Net), the net emits, per frame, ONLY:
- a spatial-SOFTMAX "where" map over the GxG torus -> sub-pixel position (toroidal centroid),
- a scalar "whether" presence logit,
- an expected velocity (softmax-weighted Vx,Vy).
The detection image, if needed, is RENDERED as a fixed FWHM-2 blob at the predicted position,
scaled by presence -> the output width is correct BY CONSTRUCTION; a "wrong skirt" cannot occur.
Why softmax (not sigmoid/BCE): a softmax map's total mass is FIXED at 1, so the net cannot lower
its loss by spreading into a skirt -- it MUST concentrate. That removes BOTH the fat-skirt failure
of the dense head AND the rare-positive class imbalance that forced pos_weight=30 (Andrey 06/22:
"it does not spread wide, so ratio of positives is not a concern"). Single-target by construction.
Reuses ConvGRUCellTorus + the toroidal Gaussian from layer2.py; the deployed dense Layer2Net is
left untouched (the inference server keeps loading it + runs/l2_v1). By Claude on 06/22/2026.
"""
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from layer2 import ConvGRUCellTorus, bump_target
def _torus_centroid(P, G):
"""Sub-pixel toroidal centroid of a probability map. By Claude on 06/22/2026
P: [B, G, G] (>=0, sums to 1 over the GxG grid). Returns pos [B, 2] = (x, y) in cells, in
[0, G). Uses the CIRCULAR mean (map each index to an angle, average sin/cos, atan2 back) so a
peak straddling the wrap seam averages to the seam -- NOT to the middle of the grid as a plain
weighted mean would. x = last axis (columns), y = first spatial axis (rows)."""
dev = P.device
ang = (2.0 * np.pi / G) * torch.arange(G, device=dev, dtype=P.dtype) # [G] cell index -> angle
cos_x = torch.cos(ang)[None, None, :] # [1,1,G] over columns (x)
sin_x = torch.sin(ang)[None, None, :]
cos_y = torch.cos(ang)[None, :, None] # [1,G,1] over rows (y)
sin_y = torch.sin(ang)[None, :, None]
Cx = (P * cos_x).sum(dim=(-1, -2)); Sx = (P * sin_x).sum(dim=(-1, -2)) # [B] each
Cy = (P * cos_y).sum(dim=(-1, -2)); Sy = (P * sin_y).sum(dim=(-1, -2))
x = torch.atan2(Sx, Cx) % (2.0 * np.pi) * (G / (2.0 * np.pi)) # [B] in [0,G)
y = torch.atan2(Sy, Cy) % (2.0 * np.pi) * (G / (2.0 * np.pi))
return torch.stack([x, y], dim=-1) # [B, 2]
class Layer2NetP(nn.Module):
"""Recurrent track-before-detect with a PARAMETRIC (position + presence) readout. By Claude 06/22
forward(seq) -> per frame: log-softmax where-map [B,T,G,G], position [B,T,2] (cells, sub-pixel),
presence logit [B,T], velocity [B,T,2] (px/level-frame). Same ConvGRU recurrence as Layer2Net.
"""
def __init__(self, ch_in=3, ch_hidden=24, grid=32, vmax=1.4, k=3):
super().__init__()
self.ch_hidden = ch_hidden
self.grid = grid
self.vmax = vmax
self.cell = ConvGRUCellTorus(ch_in, ch_hidden, k=k)
self.score_head = nn.Conv2d(ch_hidden, 1, 1) # -> spatial logits, softmaxed to "where"
self.vel_head = nn.Conv2d(ch_hidden, 2, 1) # -> per-cell velocity field
self.pres_head = nn.Linear(ch_hidden, 1) # -> scalar "whether" from pooled hidden
def init_hidden(self, B, device, dtype):
return torch.zeros(B, self.ch_hidden, self.grid, self.grid, device=device, dtype=dtype)
def decode(self, h):
# h: [B, Ch, G, G]
B, _, G, _ = h.shape
score = self.score_head(h).view(B, G * G) # [B, G*G] spatial logits
logP = F.log_softmax(score, dim=1).view(B, G, G) # [B, G, G] log "where" map (sums to 1)
P = logP.exp() # [B, G, G] probability map
pos = _torus_centroid(P, G) # [B, 2] sub-pixel (x,y) in cells
pooled = h.mean(dim=(-1, -2)) # [B, Ch] global context for presence
pres_logit = self.pres_head(pooled)[:, 0] # [B] "is a target present this frame"
vel_field = self.vmax * torch.tanh(self.vel_head(h)) # [B, 2, G, G] bounded velocity field
vel = (P[:, None] * vel_field).sum(dim=(-1, -2)) # [B, 2] expected velocity under "where"
return logP, pos, pres_logit, vel
def forward(self, seq, h=None):
# seq: [B, T, Cin, G, G]
B, T = seq.shape[0], seq.shape[1]
if h is None:
h = self.init_hidden(B, seq.device, seq.dtype)
logPs, poss, press, vels = [], [], [], []
for t in range(T): # BPTT unrolls this loop
h = self.cell(seq[:, t], h)
logP, pos, pres, vel = self.decode(h)
logPs.append(logP); poss.append(pos); press.append(pres); vels.append(vel)
return (torch.stack(logPs, 1), # [B,T,G,G] log where-map
torch.stack(poss, 1), # [B,T,2] position (cells)
torch.stack(press, 1), # [B,T] presence logit
torch.stack(vels, 1)) # [B,T,2] velocity
def _wrap(d, G):
"""Toroidal signed difference into (-G/2, G/2]. By Claude on 06/22/2026"""
return (d + G / 2.0) % G - G / 2.0
def layer2p_loss(logP, pos, pres_logit, vel, pos_t, present, vel_t, G,
sigma_where=0.85, w_pos=0.2, w_pres=1.0, w_vel=0.3, pres_pos_weight=2.0):
"""Parametric loss. By Claude on 06/22/2026
logP [B,T,G,G] log-softmax where-map pos [B,T,2] predicted (x,y) cells
pres_logit [B,T] presence logit vel [B,T,2] predicted velocity
pos_t [B,T,2] truth position (cells) present [B,T] 1=target present
vel_t [B,T,2] truth velocity
Terms (NO pos_weight on a spatial map -- softmax already fixes total mass, so no skirt / no
class imbalance; the only scalar weight is on the per-frame presence BCE):
- WHERE : soft cross-entropy of the softmax map vs a SHARP toroidal-Gaussian truth target
Q (sigma_where ~ FWHM 2). Concentrates one peak at truth; cannot win by spreading.
- POS : toroidal MSE on the sub-pixel centroid (sub-cell refinement of WHERE).
- PRES : BCE on the per-frame presence scalar (present vs absent/gap frames).
- VEL : MSE of expected velocity, on present frames only.
WHERE/POS/VEL are masked to present frames; PRES is supervised every frame."""
B, T = present.shape
pres = present.float()
m = present.bool()
# WHERE: build a sharp normalized target distribution Q at truth, cross-entropy = -sum Q*logP.
Q = bump_target(pos_t, G, sigma=sigma_where, device=logP.device)[:, :, 0] # [B,T,G,G] (peak 1)
Q = Q / Q.sum(dim=(-1, -2), keepdim=True).clamp_min(1e-8) # normalize -> sums to 1
ce = -(Q * logP).sum(dim=(-1, -2)) # [B,T] cross-entropy
l_where = ce[m].mean() if m.any() else logP.sum() * 0.0
# POS: sub-pixel toroidal MSE on present frames.
dxy = _wrap(pos - pos_t, G) # [B,T,2]
l_pos = (dxy[m] ** 2).mean() if m.any() else pos.sum() * 0.0
# PRES: per-frame presence BCE (scalar imbalance handled by a small pos_weight, NOT spatial).
pw = torch.tensor(pres_pos_weight, device=pres_logit.device)
l_pres = F.binary_cross_entropy_with_logits(pres_logit, pres, pos_weight=pw)
# VEL: expected-velocity MSE on present frames.
l_vel = F.mse_loss(vel[m], vel_t[m]) if m.any() else vel.sum() * 0.0
total = l_where + w_pos * l_pos + w_pres * l_pres + w_vel * l_vel
return total, {"where": float(l_where.detach()), "pos": float(l_pos.detach()),
"pres": float(l_pres.detach()), "vel": float(l_vel.detach())}
def render_blob(pos, pres, G, sigma=0.85):
"""Render the KNOWN-width detection image from (position, presence). By Claude on 06/22/2026
pos [T,2] cells, pres [T] in [0,1] -> [T,G,G] = pres * toroidal Gaussian(FWHM=2.355*sigma) at pos.
Width is fixed here, so the output FWHM is correct by construction."""
T = pos.shape[0]
p = torch.as_tensor(pos, dtype=torch.float32).view(1, T, 2)
g = bump_target(p, G, sigma=sigma, device="cpu")[0, :, 0].numpy() # [T,G,G] peak 1
return g * np.asarray(pres, dtype=np.float32)[:, None, None]
# make_testvec.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Save a fixed (input, raw-output) pair for Java-vs-PyTorch verification. # By Claude on 06/13/2026
python make_testvec.py /work/runs/weighted/model.pt /work/runs/weighted/testvec"""
import sys, numpy as np, torch
import synth
from model import RawFCN
ck = torch.load(sys.argv[1], map_location="cpu"); a = ck["args"]
N = a.get("nframes", 8); P = a.get("patch", 24); vr = a.get("vel_radius", 5)
m = RawFCN(n_frames=N, vel_radius=vr); m.load_state_dict(ck["model"]); m.eval()
rng = np.random.default_rng(999)
f, lab = synth.generate_sample(rng, N=N, H=P, W=P, snr=6.0, place="center")
with torch.no_grad():
out = m(torch.from_numpy(f[None])).reshape(-1).numpy() # [124] raw network output
f.astype('<f4').tofile(sys.argv[2] + "_in.bin") # [N,H,W] row-major LE float32
out.astype('<f4').tofile(sys.argv[2] + "_out.bin") # [124] LE float32
print(f"testvec N={N} P={P} outlen={out.size} true vx={lab['vx']:+.3f} vy={lab['vy']:+.3f} "
f"det_logit={out[0]:.4f}")
This diff is collapsed.
#!/usr/bin/env python3
# nettest.py - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Raw-socket throughput tester (stdlib only). By Claude on 06/20/2026.
server: nettest.py server [port]
client: nettest.py client HOST PORT MB DIR (DIR=up client->server | down server->client)
Receiver times wall-clock from first to last byte and reports MB/s + Gbit/s (no encryption,
single bulk stream -> clean link throughput)."""
import socket, struct, sys, time
CHUNK = 1 << 20 # 1 MiB
def recv_timed(conn, nbytes):
got = 0
buf = bytearray(CHUNK)
t0 = None
while got < nbytes:
n = conn.recv_into(buf, min(CHUNK, nbytes - got))
if not n:
break
if t0 is None:
t0 = time.perf_counter()
got += n
dt = time.perf_counter() - t0 if t0 else 0.0
return got, dt
def send_all(conn, nbytes):
block = b"\0" * CHUNK
sent = 0
while sent < nbytes:
sent += conn.send(block[:min(CHUNK, nbytes - sent)])
return sent
def server(port):
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("0.0.0.0", port))
s.listen(1)
print(f"nettest server on :{port}", flush=True)
while True:
c, a = s.accept()
c.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
try:
hdr = c.recv(16)
if len(hdr) < 16:
c.close(); continue
direction, nbytes = struct.unpack(">qq", hdr) # 0=up(server recvs) 1=down(server sends)
if direction == 0:
got, dt = recv_timed(c, nbytes)
mbps = got / 1e6 / dt if dt else 0
print(f" UP recv {got/1e6:.0f}MB {dt*1e3:.1f}ms = {mbps:.0f} MB/s ({mbps*8/1000:.2f} Gbit/s)", flush=True)
c.sendall(struct.pack(">d", dt))
else:
send_all(c, nbytes)
finally:
c.close()
def client(host, port, mb, direction):
nbytes = mb * (1 << 20)
c = socket.socket()
c.connect((host, port))
c.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
d = 0 if direction == "up" else 1
c.sendall(struct.pack(">qq", d, nbytes))
if d == 0:
send_all(c, nbytes)
dt, = struct.unpack(">d", c.recv(8))
mbps = nbytes / 1e6 / dt if dt else 0
print(f"UP client->server {mb}MB: {mbps:.0f} MB/s = {mbps*8/1000:.2f} Gbit/s (server-timed)")
else:
got, dt = recv_timed(c, nbytes)
mbps = got / 1e6 / dt if dt else 0
print(f"DOWN server->client {got/1e6:.0f}MB: {mbps:.0f} MB/s = {mbps*8/1000:.2f} Gbit/s")
c.close()
if __name__ == "__main__":
if sys.argv[1] == "server":
server(int(sys.argv[2]) if len(sys.argv) > 2 else 5578)
else:
client(sys.argv[2], int(sys.argv[3]), int(sys.argv[4]), sys.argv[5])
This diff is collapsed.
#!/usr/bin/env bash
# run_infer_server.sh - part of imagej_elphel_dnn (Elphel DNN: tile-processor motion detection / ranging)
#
# Copyright (C) 2026 Elphel, Inc.
#
# -----------------------------------------------------------------------------
# imagej_elphel_dnn is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
# Start/stop the CUAS DGX inference server (PyTorch RawFCN, cuDNN) in the NGC container.
# By Claude on 06/20/2026. Run on the DGX (elphel@192.168.0.62).
# start|stop|logs|status env: RUN=runs/<model> RUN2=runs/<l2> PORT=5577
set -euo pipefail
NAME=cuas_infer
IMG=nvcr.io/nvidia/pytorch:25.10-py3
CODE=/home/elphel/c5p_dnn
RUN="${RUN:-runs/weighted9_pm_s}"
RUN2="${RUN2:-}" # optional Layer-2 run dir; empty -> L1-only. By Claude 06/22/2026
PORT="${PORT:-5577}"
case "${1:-start}" in
start)
docker rm -f "$NAME" >/dev/null 2>&1 || true
L2ARG=""; [ -n "$RUN2" ] && L2ARG="--l2run $RUN2"
docker run -d --name "$NAME" --gpus all --network host \
-v "$CODE":/work -w /work "$IMG" \
python infer_server.py --run "$RUN" $L2ARG --port "$PORT" >/dev/null
echo "started $NAME (run=$RUN l2=${RUN2:-off} port=$PORT)"; sleep 3; docker logs "$NAME"
;;
stop) docker rm -f "$NAME" >/dev/null 2>&1 && echo "stopped" || echo "not running" ;;
logs) docker logs --tail 60 "$NAME" ;;
status) docker ps --filter "name=$NAME" --format "{{.Names}} {{.Status}}" ;;
*) echo "usage: $0 {start|stop|logs|status} (env: RUN=, RUN2=, PORT=)"; exit 1 ;;
esac
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment