Commit 4a8f21f6 authored by Andrey Filippov's avatar Andrey Filippov

CLAUDE: Add fopen_nadir_compare.py — tile-proc vs COLMAP nadir comparison

Generates a multi-slice float32 TIFF (NaN-padded, ImageJ-compatible):
  Slice 1: tile-processor disparity D scattered to nadir image pixels
           (from *-NADIR-DISPARITY-MAPS.tiff, averaged across all scenes)
  Slice 2: COLMAP sparse Z_cam projected to the same nadir pixel grid
           (optional --dense PLY for OpenMVS dense cloud)
  Slice 3: spatial superposition (tile-proc priority; COLMAP where missing;
           average at overlapping pixels)
  Slice 4: tile-processor scene-count per pixel (--counts flag)

Uses the COLMAP reference-image pose (auto-detected as centroid of camera
centres, or specified via --ref-image) to project 3D points. Handles
OPENCV lens distortion model (--no-distortion to skip).

Intended for evaluating depth-fusion approaches (tile processor + SIFT):
  python3 scripts/fopen_nadir_compare.py \
      --disp   <ts>-NADIR-DISPARITY-MAPS.tiff \
      --colmap sparse/0_txt/ [--dense scene_dense.ply] \
      --out    nadir_compare.tif --counts
Co-authored-by: 's avatarClaude <claude@elphel.com>
parent 00c33f1a
#!/usr/bin/env python3
"""
fopen_nadir_compare.py - Tile-processor vs COLMAP elevation comparison (nadir view)
Reads the imagej-elphel NADIR-DISPARITY-MAPS.tiff (906 = 302 scenes × 3 channels
pX / pY / D per tile) and a COLMAP sparse-model TXT export, then produces a
multi-slice float32 TIFF with NaN for empty pixels so ImageJ can compute per-pixel
statistics while ignoring empties.
Output slices
─────────────
1 Tile-processor : disparity D scattered onto nadir image pixels
(one dot per tile center, averaged across all scenes)
2 COLMAP sparse : camera-frame depth (Z_cam) projected to nadir image pixels
using the reference-image camera pose
3 Superposition : tile-proc value where available; COLMAP where tile-proc is
missing; average of both where both overlap the same pixel
4 Count-tp : (optional) number of tile-proc scenes that contributed to
each pixel (useful to see coverage / multi-scene averaging)
Usage
─────
python3 fopen_nadir_compare.py \\
--disp <path>/<ts>-NADIR-DISPARITY-MAPS.tiff \\
--colmap <workspace>/sparse/0_txt/ \\
[--ref-image frame_0189.png] # auto-detected if omitted \\
[--dense <workspace>/scene_opencv_dense.ply] \\
[--out <path>/nadir_compare.tif] \\
[--no-distortion] # skip OPENCV undistortion step \\
[--counts] # add count slice
Coordinate conventions
──────────────────────
Tile-processor : pX / pY are already nadir-image pixel coordinates
(0-639, 0-511); D is disparity (higher = closer to camera).
COLMAP : 3D points transformed to the reference camera frame;
Z_cam (positive = in front of camera = below for nadir camera)
is used as the elevation proxy. Points behind the camera
(Z_cam ≤ 0) are discarded.
Both slices are in their own natural units. The superposition keeps values
unconverted — use ImageJ Z-project / Statistics to compare distributions.
"""
import argparse
import math
import sys
from pathlib import Path
import numpy as np
from PIL import Image
# ──────────────────────────────────────────────────────────────
# I/O helpers
# ──────────────────────────────────────────────────────────────
def load_disp_maps(path):
"""Return (n_scenes, 3, H_tiles, W_tiles) float32 array from NADIR-DISPARITY-MAPS.tiff.
Channel 0 = pX, 1 = pY, 2 = D. NaN where tile has no data."""
img = Image.open(path)
n = img.n_frames
if n % 3 != 0:
raise ValueError(f"Expected frame count divisible by 3, got {n}")
n_scenes = n // 3
img.seek(0)
h, w = np.array(img).shape
data = np.full((n_scenes, 3, h, w), np.nan, dtype=np.float32)
for i in range(n):
img.seek(i)
data[i // 3, i % 3] = np.array(img, dtype=np.float32)
return data # (n_scenes, 3, H, W)
def parse_cameras_txt(path):
"""Return dict camera_id -> {'fx','fy','cx','cy','k1','k2','p1','p2','model',...}."""
cameras = {}
with open(path) as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split()
cam_id = int(parts[0])
model = parts[1]
w, h = int(parts[2]), int(parts[3])
params = list(map(float, parts[4:]))
cam = {'model': model, 'width': w, 'height': h,
'fx': params[0], 'fy': params[1],
'cx': params[2], 'cy': params[3]}
if model == 'OPENCV' and len(params) >= 8:
cam.update({'k1': params[4], 'k2': params[5],
'p1': params[6], 'p2': params[7]})
elif model in ('PINHOLE', 'SIMPLE_PINHOLE'):
cam.update({'k1': 0.0, 'k2': 0.0, 'p1': 0.0, 'p2': 0.0})
cameras[cam_id] = cam
return cameras
def quat_to_rot(qw, qx, qy, qz):
"""Quaternion (w,x,y,z) → 3×3 rotation matrix (row-major)."""
n = math.sqrt(qw*qw + qx*qx + qy*qy + qz*qz)
qw, qx, qy, qz = qw/n, qx/n, qy/n, qz/n
return np.array([
[1-2*(qy*qy+qz*qz), 2*(qx*qy-qz*qw), 2*(qx*qz+qy*qw)],
[2*(qx*qy+qz*qw), 1-2*(qx*qx+qz*qz), 2*(qy*qz-qx*qw)],
[2*(qx*qz-qy*qw), 2*(qy*qz+qx*qw), 1-2*(qx*qx+qy*qy)],
], dtype=np.float64)
def parse_images_txt(path):
"""Return list of image dicts with keys: id, name, R (3×3), t (3,), cam_id."""
images = []
with open(path) as f:
lines = [l.rstrip() for l in f if not l.startswith('#') and l.strip()]
i = 0
while i + 1 < len(lines):
parts = lines[i].split()
img_id = int(parts[0])
qw, qx, qy, qz = map(float, parts[1:5])
tx, ty, tz = map(float, parts[5:8])
cam_id = int(parts[8])
name = parts[9]
R = quat_to_rot(qw, qx, qy, qz)
t = np.array([tx, ty, tz], dtype=np.float64)
images.append({'id': img_id, 'name': name, 'R': R, 't': t, 'cam_id': cam_id})
i += 2 # skip POINTS2D line
return images
def parse_points3d_txt(path):
"""Return (N,3) float64 array of world XYZ and (N,) float64 array of errors."""
xyz_list, err_list = [], []
with open(path) as f:
for line in f:
if line.startswith('#') or not line.strip():
continue
parts = line.split()
# POINT3D_ID X Y Z R G B ERROR TRACK[]
xyz_list.append([float(parts[1]), float(parts[2]), float(parts[3])])
err_list.append(float(parts[7]))
return np.array(xyz_list, dtype=np.float64), np.array(err_list, dtype=np.float64)
def load_ply_xyz(path):
"""Minimal PLY loader — returns (N,3) float64 XYZ from a PLY with x y z properties."""
path = Path(path)
with open(path, 'rb') as f:
# Parse header
header_lines = []
while True:
line = f.readline().decode('ascii', errors='replace').strip()
header_lines.append(line)
if line == 'end_header':
break
n_vertices = 0
props = []
for hl in header_lines:
if hl.startswith('element vertex'):
n_vertices = int(hl.split()[-1])
elif hl.startswith('property'):
props.append(hl.split()[-1])
if n_vertices == 0:
return np.zeros((0, 3), dtype=np.float64)
xi = props.index('x'); yi = props.index('y'); zi = props.index('z')
dtype_map = {'float': np.float32, 'double': np.float64,
'uchar': np.uint8, 'int': np.int32, 'uint': np.uint32}
type_str = [hl.split()[2] for hl in header_lines if hl.startswith('property')]
dt_list = [(p, dtype_map.get(t, np.float32)) for p, t in zip(props, type_str)]
struct_dt = np.dtype([(n, t) for n, t in dt_list])
raw = np.frombuffer(f.read(n_vertices * struct_dt.itemsize), dtype=struct_dt)
xyz = np.column_stack([raw['x'].astype(np.float64),
raw['y'].astype(np.float64),
raw['z'].astype(np.float64)])
return xyz
# ──────────────────────────────────────────────────────────────
# Geometry
# ──────────────────────────────────────────────────────────────
def camera_center(R, t):
"""World position of camera = -R^T @ t."""
return -R.T @ t
def auto_select_reference(images):
"""Return the image whose camera center is closest to the centroid of all centres."""
centers = np.array([camera_center(im['R'], im['t']) for im in images])
centroid = centers.mean(axis=0)
dists = np.linalg.norm(centers - centroid, axis=1)
return images[int(np.argmin(dists))]
def project_opencv(xyz_cam, cam, apply_distortion=True):
"""
Project Nx3 camera-frame points to image pixels using OPENCV model.
Returns (pX, pY) arrays; points with Z<=0 or outside reasonable range get NaN.
"""
Z = xyz_cam[:, 2]
valid = Z > 1e-6 # positive depth, avoid division by ~0
Z_safe = np.where(valid, Z, 1.0) # never divide by zero
x = np.where(valid, xyz_cam[:, 0] / Z_safe, np.nan) # normalised coords
y = np.where(valid, xyz_cam[:, 1] / Z_safe, np.nan)
if apply_distortion and 'k1' in cam:
k1, k2 = cam['k1'], cam['k2']
p1, p2 = cam['p1'], cam['p2']
r2 = x*x + y*y
radial = 1 + k1*r2 + k2*r2*r2
xd = x*radial + 2*p1*x*y + p2*(r2 + 2*x*x)
yd = y*radial + p1*(r2 + 2*y*y) + 2*p2*x*y
else:
xd, yd = x, y
pX = cam['fx'] * xd + cam['cx']
pY = cam['fy'] * yd + cam['cy']
# Clamp to a range that won't overflow int32 on rounding
W, H = cam['width'], cam['height']
margin = max(W, H) * 10
pX = np.where(np.abs(pX) < margin, pX, np.nan)
pY = np.where(np.abs(pY) < margin, pY, np.nan)
return pX, pY
# ──────────────────────────────────────────────────────────────
# Scatter helpers
# ──────────────────────────────────────────────────────────────
def scatter_to_image(pX_flat, pY_flat, values_flat, width, height):
"""
Scatter (pX, pY, value) points onto a float32 image by averaging.
Returns float32 ndarray (height, width) with NaN for empty pixels.
"""
accum = np.zeros((height, width), dtype=np.float64)
counts = np.zeros((height, width), dtype=np.int32)
mask = np.isfinite(pX_flat) & np.isfinite(pY_flat) & np.isfinite(values_flat)
px_i = np.round(pX_flat[mask]).astype(np.int32)
py_i = np.round(pY_flat[mask]).astype(np.int32)
vals = values_flat[mask]
in_bounds = (px_i >= 0) & (px_i < width) & (py_i >= 0) & (py_i < height)
px_i, py_i, vals = px_i[in_bounds], py_i[in_bounds], vals[in_bounds]
np.add.at(accum, (py_i, px_i), vals)
np.add.at(counts, (py_i, px_i), 1)
out = np.full((height, width), np.nan, dtype=np.float32)
hit = counts > 0
out[hit] = (accum[hit] / counts[hit]).astype(np.float32)
return out, counts.astype(np.float32)
def superpose(slice_tp, slice_colmap):
"""
Combine two float32 (H,W) images:
- pixel has only tp → tp value
- pixel has only colmap→ colmap value (normalised to tp scale if possible)
- pixel has both → average
- pixel has neither → NaN
Note: the two slices are in different units (disparity vs depth).
To allow direct superposition for overlap counting, values are used as-is.
"""
out = np.full_like(slice_tp, np.nan)
has_tp = np.isfinite(slice_tp)
has_co = np.isfinite(slice_colmap)
both = has_tp & has_co
only_tp = has_tp & ~has_co
only_co = ~has_tp & has_co
out[only_tp] = slice_tp[only_tp]
out[only_co] = slice_colmap[only_co]
out[both] = (slice_tp[both] + slice_colmap[both]) / 2
return out
# ──────────────────────────────────────────────────────────────
# Save multi-slice float32 TIFF
# ──────────────────────────────────────────────────────────────
def save_multipage_tiff(path, slices, labels=None):
"""Save a list of float32 2D arrays as a multi-page TIFF. NaN is preserved."""
pages = [Image.fromarray(s.astype(np.float32), mode='F') for s in slices]
pages[0].save(path, save_all=True, append_images=pages[1:])
print(f"Saved {len(slices)}-slice TIFF → {path}")
if labels:
for i, (s, lbl) in enumerate(zip(slices, labels)):
fin = s[np.isfinite(s)]
if len(fin):
print(f" slice {i+1} [{lbl}]: {len(fin):,} pixels, "
f"min={fin.min():.4f}, max={fin.max():.4f}, "
f"mean={fin.mean():.4f}")
else:
print(f" slice {i+1} [{lbl}]: all NaN")
# ──────────────────────────────────────────────────────────────
# Main
# ──────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument('--disp', required=True,
help='Path to *-NADIR-DISPARITY-MAPS.tiff from imagej-elphel')
ap.add_argument('--colmap', required=True,
help='COLMAP TXT sparse model directory (cameras.txt, images.txt, points3D.txt)')
ap.add_argument('--ref-image', default=None,
help='COLMAP image name for the reference nadir frame (e.g. frame_0189.png). '
'Auto-detected (centroid of camera centres) if omitted.')
ap.add_argument('--dense', default=None,
help='Optional dense PLY from OpenMVS DensifyPointCloud')
ap.add_argument('--out', default=None,
help='Output TIFF path (default: <disp_stem>_compare.tif)')
ap.add_argument('--no-distortion', action='store_true',
help='Skip OPENCV lens-distortion step when projecting COLMAP points')
ap.add_argument('--counts', action='store_true',
help='Append a 4th slice: tile-processor scene-count per pixel')
ap.add_argument('--max-repro-err', type=float, default=None,
help='Discard COLMAP points with reprojection error above this threshold')
args = ap.parse_args()
disp_path = Path(args.disp)
colmap_dir = Path(args.colmap)
out_path = Path(args.out) if args.out else \
disp_path.with_name(disp_path.stem + '_compare.tif')
# ── Load tile-processor disparity maps ───────────────────
print(f"Loading tile-processor disparity maps: {disp_path}")
disp = load_disp_maps(disp_path) # (n_scenes, 3, H_t, W_t)
n_scenes, _, H_t, W_t = disp.shape
print(f" {n_scenes} scenes, tile grid {W_t}×{H_t}")
# ── Load COLMAP model ─────────────────────────────────────
print(f"Loading COLMAP model: {colmap_dir}")
cameras = parse_cameras_txt(colmap_dir / 'cameras.txt')
images = parse_images_txt (colmap_dir / 'images.txt')
pts3d, pts_err = parse_points3d_txt(colmap_dir / 'points3D.txt')
print(f" {len(images)} images, {len(pts3d):,} 3D points")
if args.max_repro_err is not None and len(pts3d):
keep = pts_err <= args.max_repro_err
print(f" Filtering by reprojection error ≤ {args.max_repro_err}: "
f"{keep.sum():,}/{len(pts3d):,} kept")
pts3d = pts3d[keep]
# ── Select reference image ────────────────────────────────
if args.ref_image:
ref_imgs = [im for im in images if im['name'] == args.ref_image]
if not ref_imgs:
sys.exit(f"ERROR: --ref-image '{args.ref_image}' not found in images.txt")
ref_img = ref_imgs[0]
else:
ref_img = auto_select_reference(images)
print(f" Auto-selected reference image: {ref_img['name']} (id={ref_img['id']})")
cam = cameras[ref_img['cam_id']]
W_im = cam['width']
H_im = cam['height']
print(f" Camera: {cam['model']} {W_im}×{H_im} fx={cam['fx']:.2f} fy={cam['fy']:.2f} "
f"cx={cam['cx']:.2f} cy={cam['cy']:.2f}")
# ── Tile-processor slice ──────────────────────────────────
print("Building tile-processor disparity slice …")
pX_all = disp[:, 0].ravel() # all scenes, pX for every tile
pY_all = disp[:, 1].ravel() # pY
D_all = disp[:, 2].ravel() # disparity
slice_tp, count_tp = scatter_to_image(pX_all, pY_all, D_all, W_im, H_im)
print(f" {np.isfinite(slice_tp).sum():,} pixels populated")
# ── COLMAP sparse slice ───────────────────────────────────
print("Building COLMAP sparse slice …")
R_ref, t_ref = ref_img['R'], ref_img['t']
# Transform world → reference camera frame
xyz_cam = (pts3d @ R_ref.T) + t_ref[None, :] # (N,3)
# Z_cam is the depth in camera frame (positive = in front of camera)
Z_cam = xyz_cam[:, 2]
pX_c, pY_c = project_opencv(xyz_cam, cam,
apply_distortion=not args.no_distortion)
# Use Z_cam as elevation proxy (smaller = closer to camera = higher elevation)
slice_colmap, _ = scatter_to_image(pX_c, pY_c, Z_cam, W_im, H_im)
print(f" {np.isfinite(slice_colmap).sum():,} pixels populated")
# ── Dense PLY slice (optional, replaces sparse COLMAP slice) ─
if args.dense:
print(f"Loading dense PLY: {args.dense}")
pts_dense = load_ply_xyz(args.dense)
print(f" {len(pts_dense):,} points")
xyz_cam_d = (pts_dense @ R_ref.T) + t_ref[None, :]
pX_d, pY_d = project_opencv(xyz_cam_d, cam,
apply_distortion=not args.no_distortion)
slice_dense, _ = scatter_to_image(pX_d, pY_d, xyz_cam_d[:, 2], W_im, H_im)
print(f" {np.isfinite(slice_dense).sum():,} pixels populated")
else:
slice_dense = None
# ── Superposition ─────────────────────────────────────────
# Note: TP uses disparity D (≈1/depth), COLMAP uses Z_cam (depth).
# They are in different units — the superposition pixel indicates
# SPATIAL coverage overlap, not value agreement.
# Use ImageJ's "Z Project > Average" on slices 1&2 to compare distributions.
print("Building superposition slice …")
colmap_for_super = slice_dense if slice_dense is not None else slice_colmap
slice_super = superpose(slice_tp, colmap_for_super)
print(f" {np.isfinite(slice_super).sum():,} pixels populated")
# ── Assemble output ───────────────────────────────────────
slices = [slice_tp, slice_colmap]
labels = ['tile-proc disparity D', 'COLMAP Z_cam (sparse)']
if slice_dense is not None:
slices.append(slice_dense)
labels.append('COLMAP Z_cam (dense)')
slices.append(slice_super)
labels.append('superposition')
if args.counts:
slices.append(count_tp.astype(np.float32))
labels.append('tile-proc scene count')
save_multipage_tiff(str(out_path), slices, labels)
# Coverage summary
tp_pix = np.isfinite(slice_tp).sum()
co_pix = np.isfinite(colmap_for_super).sum()
both_pix = (np.isfinite(slice_tp) & np.isfinite(colmap_for_super)).sum()
total = W_im * H_im
print(f"\nCoverage summary ({W_im}×{H_im} = {total:,} pixels):")
print(f" Tile-processor : {tp_pix:6,} ({100*tp_pix/total:.1f}%)")
print(f" COLMAP : {co_pix:6,} ({100*co_pix/total:.1f}%)")
print(f" Both overlap : {both_pix:6,} ({100*both_pix/total:.1f}%)")
print(f" Either source : {np.isfinite(slice_super).sum():6,} "
f"({100*np.isfinite(slice_super).sum()/total:.1f}%)")
if __name__ == '__main__':
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment