Commit f0e7dc49 authored by Andrey Filippov's avatar Andrey Filippov

CLAUDE: Add lwir_map scripts (renamed from fopen_nc_summary/fopen_overlay)

- scripts/lwir_map/lwir_scene_summary.py  (was fopen_nc_summary.py)
- scripts/lwir_map/lwir_map_overlay.py    (was fopen_overlay.py)
- scripts/lwir_map/README.md              (pipeline doc + known limitations)
- Updated catalog.json and scripts/README.md with new names/paths

Scripts read imagej-elphel linked LWIR output and generate per-scene
TSV + GeoJSON/KML/SVG map overlays for satellite atlas navigation.
Not FOPEN-specific; usable for any airplane/drone LWIR acquisition.
Co-authored-by: 's avatarClaude <claude@elphel.com>
parent 1579c9b3
......@@ -8,7 +8,7 @@ This file is generated from `scripts/catalog.json`.
2) Run:
```bash
/home/elphel/git/imagej-elphel/scripts/catalog_to_md.py
/home/elphel/wksp2/imagej-elphel/scripts/catalog_to_md.py
```
---
......@@ -75,6 +75,55 @@ Outputs:
Tags: csv, models, aggregation, plot
## lwir_scene_summary.py
Read imagej-elphel linked LWIR scene output and write a per-scene TSV with GPS corners, AGL, heading, forestation, and ground-plane columns.
Path: `scripts/lwir_map/lwir_scene_summary.py`
See also: `scripts/lwir_map/README.md` for full pipeline documentation.
Example:
```bash
/home/elphel/wksp2/imagej-elphel/scripts/lwir_map/lwir_scene_summary.py /home/elphel/lwir16-proc/NC/linked/linked_1763232117-1763234145-v88 -o /home/elphel/lwir16-proc/NC/summaries/linked_1763232117-1763234145-v88-summary.tsv
```
Inputs:
- Linked view root containing timestamp and *-index directories
- *-elevations_histogram.csv and *-egomotion.csv under each scene version directory
- *-ground_planes.csv under each following *-index version directory
Outputs:
- TSV file with ~65 columns: status, group metadata, scene IDs, timestamps, GPS corners, heading, AGL, forestation, ground-plane data
Tags: lwir, map, atlas, summary, tsv
## lwir_map_overlay.py
Generate KML, GeoJSON, SVG, and bounds TSV map overlays from a LWIR scene summary TSV.
Path: `scripts/lwir_map/lwir_map_overlay.py`
See also: `scripts/lwir_map/README.md` for full pipeline documentation.
Example:
```bash
/home/elphel/wksp2/imagej-elphel/scripts/lwir_map/lwir_map_overlay.py /home/elphel/lwir16-proc/NC/summaries/linked_1763232117-1763234145-v88-summary.tsv --out-prefix /home/elphel/lwir16-proc/NC/summaries/linked_1763232117-1763234145-v88-overlay --svg-fill none --svg-color-by agl_m --svg-label-mode first-per-group --svg-direction-cue broken-right
```
Inputs:
- Summary TSV generated by lwir_scene_summary.py
Outputs:
- Bounds TSV with min/max latitude and longitude
- GeoJSON polygon overlay
- KML polygon and label overlay
- SVG vector page with projected rectangles, configurable labels, fill, color source, and image-right direction cue
Tags: lwir, map, atlas, overlay, kml, geojson, svg
## rag_index.py
Build a local RAG index from attic/CODEX/rag_sources using fastembed + hnswlib.
......
{
"version": 1,
"docs": [
{
"name": "lwir_map/README.md",
"path": "scripts/lwir_map/README.md",
"purpose": "Pipeline doc for LWIR scene atlas: purpose, 4-step workflow, script options, known limitations (tilt during turns), and future improvements."
},
{
"name": "mcp-http-howto.md",
"path": "scripts/mcp-http-howto.md",
......@@ -47,6 +52,43 @@
"owner": "codex",
"created": "2026-02-03"
},
{
"name": "lwir_scene_summary.py",
"path": "scripts/lwir_map/lwir_scene_summary.py",
"purpose": "Read imagej-elphel linked LWIR scene output and write a per-scene TSV with GPS corners, AGL, heading, forestation, and ground-plane columns.",
"inputs": [
"Linked view root containing timestamp and *-index directories",
"*-elevations_histogram.csv and *-egomotion.csv under each scene version directory",
"*-ground_planes.csv under each following *-index version directory"
],
"outputs": [
"TSV file with ~65 columns: status, group metadata, scene IDs, timestamps, GPS corners, heading, AGL, forestation, ground-plane data"
],
"example": "/home/elphel/wksp2/imagej-elphel/scripts/lwir_map/lwir_scene_summary.py /home/elphel/lwir16-proc/NC/linked/linked_1763232117-1763234145-v88 -o /home/elphel/lwir16-proc/NC/summaries/linked_1763232117-1763234145-v88-summary.tsv",
"tags": ["lwir", "map", "atlas", "summary", "tsv"],
"dependencies": [],
"owner": "codex",
"created": "2026-05-14"
},
{
"name": "lwir_map_overlay.py",
"path": "scripts/lwir_map/lwir_map_overlay.py",
"purpose": "Generate KML, GeoJSON, SVG, and bounds TSV map overlays from a LWIR scene summary TSV.",
"inputs": [
"Summary TSV generated by lwir_scene_summary.py"
],
"outputs": [
"Bounds TSV with min/max latitude and longitude",
"GeoJSON polygon overlay",
"KML polygon and label overlay",
"SVG vector page with projected rectangles, configurable labels, fill, color source, and image-right direction cue"
],
"example": "/home/elphel/wksp2/imagej-elphel/scripts/lwir_map/lwir_map_overlay.py /home/elphel/lwir16-proc/NC/summaries/linked_1763232117-1763234145-v88-summary.tsv --out-prefix /home/elphel/lwir16-proc/NC/summaries/linked_1763232117-1763234145-v88-overlay --svg-fill none --svg-color-by agl_m --svg-label-mode first-per-group --svg-direction-cue broken-right",
"tags": ["lwir", "map", "atlas", "overlay", "kml", "geojson", "svg"],
"dependencies": [],
"owner": "codex",
"created": "2026-05-14"
},
{
"name": "rag_index.py",
"path": "scripts/rag_index.py",
......
# lwir_map — LWIR Scene Map Tools
Utilities for building a navigable satellite-image atlas from imagej-elphel
LWIR scene output. Useful after any aerial/drone LWIR acquisition to quickly
locate thousands of captured frames on a map without opening individual files.
---
## Purpose
imagej-elphel produces per-scene directories under a linked root such as:
```
/home/elphel/lwir16-proc/NC/linked/linked_1763232117-1763234145-v88/
```
These scripts read that output and produce:
1. A spreadsheet-friendly **TSV summary** — one row per scene, with GPS
corners, AGL, heading, forestation estimate, ground-plane data, and
status flags.
2. **Map overlay files** (GeoJSON, KML, SVG, bounds TSV) ready for the
foliage-internal atlas rendering pipeline.
---
## Pipeline
```
imagej-elphel linked output
▼ lwir_scene_summary.py
summaries/<dataset>-summary.tsv
▼ lwir_map_overlay.py
summaries/<dataset>-overlay.geojson (+ .kml .svg .bounds.tsv)
▼ foliage-internal/scripts/download_basemap.py
atlas/<dataset>/basemap.mbtiles + metadata.json
▼ foliage-internal/scripts/render_overlay_atlas_pdf.py
atlas/<dataset>/atlas-4x4.pdf ← final product
```
Suggested output root: `/home/elphel/lwir16-proc/<site>/`
---
## Scripts
### `lwir_scene_summary.py`
Reads a linked scene root and writes a TSV with ~65 columns per scene.
Key columns: `status`, `scene_id`, `scene_timestamp_s`, `center_lat/long`,
`corner{1-4}_lat/long`, `rect_x_axis_heading_deg`, `agl_m`,
`forestation_percent`, `gp_*` (ground plane), `hfov_deg`, `footprint_x/y_m`.
```bash
scripts/lwir_map/lwir_scene_summary.py \
/home/elphel/lwir16-proc/NC/linked/linked_1763232117-1763234145-v88 \
-o /home/elphel/lwir16-proc/NC/summaries/linked_1763232117-1763234145-v88-summary.tsv
```
### `lwir_map_overlay.py`
Reads the TSV and generates map overlay files.
```bash
scripts/lwir_map/lwir_map_overlay.py \
/home/elphel/lwir16-proc/NC/summaries/linked_1763232117-1763234145-v88-summary.tsv \
--out-prefix /home/elphel/lwir16-proc/NC/summaries/linked_1763232117-1763234145-v88-overlay \
--svg-fill none \
--svg-color-by agl_m \
--svg-label-mode first-per-group \
--svg-direction-cue broken-right
```
Key options:
| Option | Values | Default |
|---|---|---|
| `--svg-fill` | `color`, `none` | `color` |
| `--svg-color-by` | `forestation_percent`, `agl_m`, `status` | `forestation_percent` |
| `--svg-label-mode` | `every`, `none`, `first-per-group`, `last-per-group` | `every` |
| `--svg-direction-cue` | `none`, `broken-right`, `open-right` | `broken-right` |
| `--ok-only` | flag | off |
The `broken-right` direction cue omits the middle third of the image-right
edge (corner1→corner2 side) as a flight-direction indicator.
---
## Generated outputs (NC dataset, 2026-05-14)
Summary TSV: 2193 scenes, 169 index groups, 2067 ok / 126 missing.
| File | Description |
|---|---|
| `*-summary.tsv` | Per-scene metadata table |
| `*-overlay.geojson` | GeoJSON polygon footprints |
| `*-overlay.kml` | KML for Google Earth |
| `*-overlay.svg` | Standalone SVG (equirectangular, A0) |
| `*-overlay.bounds.tsv` | Coverage bbox (~6.5 km × 4.0 km, NC site) |
| `*-overlay-agl-nofill-sparse.*` | Sparse variant: AGL color, no fill, first-per-group labels |
---
## Known limitations / future improvements
### Tilt during turns (high priority)
Rectangles are plotted centred on the aircraft GPS position with no attitude
correction. During turns the aircraft banks, shifting the actual footprint
laterally from the nadir point. For straight-and-level flight this error is
small; in tight turns it can be significant.
**Fix**: read roll/pitch from egomotion (or IMU) and project the four corners
through the actual camera-to-ground ray using the known AGL and sensor FoV.
`lwir_scene_summary.py` already reads egomotion CSV; roll/pitch columns would
need to be extracted and the footprint geometry updated in `_compute_corners()`.
### Other known gaps
- Forestation fraction uses a fixed elevation threshold; making it
configurable per-dataset would improve accuracy.
- The SVG equirectangular projection is separate from the Web Mercator
projection used by the foliage-internal rendering scripts; unifying them
would simplify the pipeline.
- No support yet for multi-sensor rigs with per-sensor pointing offsets.
#!/usr/bin/env python3
"""Generate vector/geospatial overlays from a FOPEN summary TSV."""
import argparse
import csv
import html
import json
import math
from pathlib import Path
EARTH_RADIUS_M = 6378137.0
def parse_float(value):
if value is None:
return None
value = str(value).strip()
if not value:
return None
try:
return float(value)
except ValueError:
return None
def read_summary(path):
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f, delimiter="\t")
return list(reader)
def geometry_from_row(row):
coords = []
for idx in range(1, 5):
lat = parse_float(row.get(f"corner{idx}_lat"))
lon = parse_float(row.get(f"corner{idx}_long"))
if lat is None or lon is None:
return None
coords.append((lat, lon))
coords.append(coords[0])
return coords
def center_from_row(row):
lat = parse_float(row.get("center_lat"))
lon = parse_float(row.get("center_long"))
if lat is None or lon is None:
return None
return lat, lon
def collect_features(rows, include_missing):
features = []
for row in rows:
coords = geometry_from_row(row)
center = center_from_row(row)
if coords is None or center is None:
continue
if not include_missing and row.get("status") != "ok":
continue
features.append((row, coords, center))
return features
def bounds(features):
min_lat = min(lat for _, coords, _ in features for lat, _ in coords[:-1])
max_lat = max(lat for _, coords, _ in features for lat, _ in coords[:-1])
min_lon = min(lon for _, coords, _ in features for _, lon in coords[:-1])
max_lon = max(lon for _, coords, _ in features for _, lon in coords[:-1])
return min_lat, max_lat, min_lon, max_lon
def project_factory(min_lat, max_lat, min_lon, max_lon):
mid_lat = 0.5 * (min_lat + max_lat)
cos_mid = math.cos(math.radians(mid_lat))
def project(lat, lon):
x = math.radians(lon - min_lon) * EARTH_RADIUS_M * cos_mid
y = math.radians(max_lat - lat) * EARTH_RADIUS_M
return x, y
width_m = math.radians(max_lon - min_lon) * EARTH_RADIUS_M * cos_mid
height_m = math.radians(max_lat - min_lat) * EARTH_RADIUS_M
return project, width_m, height_m, mid_lat
def value_color(value, value_min=0.0, value_max=100.0):
if value is None:
return "#808080"
if value_max <= value_min:
t = 0.5
else:
t = max(0.0, min(1.0, (value - value_min) / (value_max - value_min)))
# Brown/yellow through green, intentionally subdued for satellite-map overlays.
r0, g0, b0 = 185, 130, 55
r1, g1, b1 = 35, 150, 70
r = round(r0 + t * (r1 - r0))
g = round(g0 + t * (g1 - g0))
b = round(b0 + t * (b1 - b0))
return f"#{r:02x}{g:02x}{b:02x}"
def row_properties(row):
keys = [
"scene_id",
"scene_label",
"status",
"group_index",
"index_id",
"agl_m",
"forestation_percent",
"center_lat",
"center_long",
"flight_heading_deg",
"rect_x_axis_heading_deg",
"heading_source",
"has_elevations_histogram",
"has_egomotion",
"has_ground_plane_row",
"warnings",
"linked_path",
]
props = {}
for key in keys:
value = row.get(key, "")
number = parse_float(value)
props[key] = number if number is not None and key not in {"scene_id", "scene_label", "status", "index_id", "heading_source", "warnings", "linked_path"} else value
return props
def feature_key(row):
return row.get("index_id") or row.get("group_index") or row.get("scene_id", "")
def label_keys(features, label_mode):
if label_mode not in {"first-per-group", "last-per-group"}:
return set()
selected = {}
for idx, (row, _, _) in enumerate(features):
key = feature_key(row)
if label_mode == "first-per-group" and key in selected:
continue
selected[key] = idx
return set(selected.values())
def color_range(features, color_by):
if color_by == "status":
return None, None
if color_by == "forestation_percent":
return 0.0, 100.0
values = [parse_float(row.get(color_by)) for row, _, _ in features]
values = [value for value in values if value is not None]
if not values:
return 0.0, 100.0
return min(values), max(values)
def row_color(row, color_by, value_min, value_max):
if color_by == "status":
return "#1f77b4" if row.get("status") == "ok" else "#b00020"
return value_color(parse_float(row.get(color_by)), value_min, value_max)
def point_between(p0, p1, fraction):
return (
p0[0] + fraction * (p1[0] - p0[0]),
p0[1] + fraction * (p1[1] - p0[1]),
)
def svg_path_from_points(points, direction_cue):
if direction_cue == "none":
point_text = " ".join(f"{x:.3f},{y:.3f}" for x, y in points)
return "polygon", point_text
c1, c2, c3, c4 = points
if direction_cue == "open-right":
path = f"M {c2[0]:.3f},{c2[1]:.3f} L {c3[0]:.3f},{c3[1]:.3f} L {c4[0]:.3f},{c4[1]:.3f} L {c1[0]:.3f},{c1[1]:.3f}"
return "path", path
c1_stub = point_between(c1, c2, 1.0 / 3.0)
c2_stub = point_between(c2, c1, 1.0 / 3.0)
path = (
f"M {c2[0]:.3f},{c2[1]:.3f} "
f"L {c3[0]:.3f},{c3[1]:.3f} "
f"L {c4[0]:.3f},{c4[1]:.3f} "
f"L {c1[0]:.3f},{c1[1]:.3f} "
f"L {c1_stub[0]:.3f},{c1_stub[1]:.3f} "
f"M {c2[0]:.3f},{c2[1]:.3f} "
f"L {c2_stub[0]:.3f},{c2_stub[1]:.3f}"
)
return "path", path
def write_bounds(path, features, min_lat, max_lat, min_lon, max_lon, width_m, height_m, mid_lat):
centers = [center for _, _, center in features]
min_center_lat = min(lat for lat, _ in centers)
max_center_lat = max(lat for lat, _ in centers)
min_center_lon = min(lon for _, lon in centers)
max_center_lon = max(lon for _, lon in centers)
with open(path, "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f, delimiter="\t", lineterminator="\n")
writer.writerow(["name", "value"])
writer.writerow(["features", len(features)])
writer.writerow(["corner_min_lat", f"{min_lat:.10f}"])
writer.writerow(["corner_max_lat", f"{max_lat:.10f}"])
writer.writerow(["corner_min_long", f"{min_lon:.10f}"])
writer.writerow(["corner_max_long", f"{max_lon:.10f}"])
writer.writerow(["center_min_lat", f"{min_center_lat:.10f}"])
writer.writerow(["center_max_lat", f"{max_center_lat:.10f}"])
writer.writerow(["center_min_long", f"{min_center_lon:.10f}"])
writer.writerow(["center_max_long", f"{max_center_lon:.10f}"])
writer.writerow(["mid_lat", f"{mid_lat:.10f}"])
writer.writerow(["width_m", f"{width_m:.3f}"])
writer.writerow(["height_m", f"{height_m:.3f}"])
writer.writerow(["aspect_w_over_h", f"{width_m / height_m:.6f}"])
def write_geojson(path, features):
collection = {
"type": "FeatureCollection",
"features": [],
}
for row, coords, center in features:
props = row_properties(row)
props["label_lat"] = center[0]
props["label_long"] = center[1]
collection["features"].append(
{
"type": "Feature",
"properties": props,
"geometry": {
"type": "Polygon",
"coordinates": [[[lon, lat] for lat, lon in coords]],
},
}
)
path.write_text(json.dumps(collection, indent=2) + "\n", encoding="utf-8")
def write_kml(path, features):
lines = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<kml xmlns="http://www.opengis.net/kml/2.2">',
" <Document>",
" <name>FOPEN NC rectangles</name>",
" <Style id=\"rect-ok\"><LineStyle><color>cc00ffff</color><width>1</width></LineStyle><PolyStyle><color>2200ff80</color></PolyStyle></Style>",
" <Style id=\"rect-missing\"><LineStyle><color>cc0000ff</color><width>1</width></LineStyle><PolyStyle><color>220000ff</color></PolyStyle></Style>",
" <Style id=\"label\"><IconStyle><scale>0</scale></IconStyle><LabelStyle><scale>0.55</scale><color>ffffffff</color></LabelStyle></Style>",
]
for row, coords, center in features:
label = html.escape(row.get("scene_label", row.get("scene_id", "")))
desc = html.escape(
f"scene_id={row.get('scene_id','')}\n"
f"AGL={row.get('agl_m','')} m\n"
f"forestation={row.get('forestation_percent','')} %\n"
f"warnings={row.get('warnings','')}"
)
style = "rect-ok" if row.get("status") == "ok" else "rect-missing"
coord_text = " ".join(f"{lon:.10f},{lat:.10f},0" for lat, lon in coords)
lines.extend(
[
" <Placemark>",
f" <name>{label}</name>",
f" <description>{desc}</description>",
f" <styleUrl>#{style}</styleUrl>",
" <Polygon><outerBoundaryIs><LinearRing>",
f" <coordinates>{coord_text}</coordinates>",
" </LinearRing></outerBoundaryIs></Polygon>",
" </Placemark>",
" <Placemark>",
f" <name>{label}</name>",
" <styleUrl>#label</styleUrl>",
f" <Point><coordinates>{center[1]:.10f},{center[0]:.10f},0</coordinates></Point>",
" </Placemark>",
]
)
lines.extend([" </Document>", "</kml>"])
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def write_svg(path, features, page_width_mm, page_height_mm, label_every, label_mode, svg_fill, color_by, direction_cue):
min_lat, max_lat, min_lon, max_lon = bounds(features)
project, width_m, height_m, mid_lat = project_factory(min_lat, max_lat, min_lon, max_lon)
margin_m = max(width_m, height_m) * 0.025
view_x = -margin_m
view_y = -margin_m
view_w = width_m + 2.0 * margin_m
view_h = height_m + 2.0 * margin_m
if page_height_mm is None:
page_height_mm = page_width_mm * view_h / view_w
selected_label_keys = label_keys(features, label_mode)
value_min, value_max = color_range(features, color_by)
lines = [
'<?xml version="1.0" encoding="UTF-8"?>',
f'<svg xmlns="http://www.w3.org/2000/svg" width="{page_width_mm:.3f}mm" height="{page_height_mm:.3f}mm" viewBox="{view_x:.3f} {view_y:.3f} {view_w:.3f} {view_h:.3f}">',
" <title>FOPEN NC reference-scene rectangles</title>",
" <desc>Projected using local equirectangular coordinates. Units in viewBox are meters.</desc>",
" <style>",
" .frame { fill: #f7f7f2; stroke: #222; stroke-width: 4; }",
" .rect { fill-opacity: 0.18; stroke-opacity: 0.8; stroke-width: 1.25; }",
" .missing { stroke-dasharray: 8 5; }",
" .label { font-family: DejaVu Sans, Arial, sans-serif; font-size: 14px; fill: #111; paint-order: stroke; stroke: #fff; stroke-width: 3px; stroke-linejoin: round; }",
" .meta { font-family: DejaVu Sans, Arial, sans-serif; font-size: 28px; fill: #111; }",
" </style>",
f' <rect class="frame" x="0" y="0" width="{width_m:.3f}" height="{height_m:.3f}"/>',
" <g id=\"rectangles\">",
]
for idx, (row, coords, center) in enumerate(features, start=1):
points = []
for lat, lon in coords[:-1]:
x, y = project(lat, lon)
points.append((x, y))
color = row_color(row, color_by, value_min, value_max)
fill = "none" if svg_fill == "none" or direction_cue != "none" else color
css_class = "rect" if row.get("status") == "ok" else "rect missing"
element, geometry = svg_path_from_points(points, direction_cue)
if element == "polygon":
lines.append(f' <polygon class="{css_class}" points="{geometry}" fill="{fill}" stroke="{color}"/>')
else:
lines.append(f' <path class="{css_class}" d="{geometry}" fill="none" stroke="{color}"/>')
if (
label_mode == "every" and label_every > 0 and idx % label_every == 0
) or (
label_mode in {"first-per-group", "last-per-group"} and (idx - 1) in selected_label_keys
):
cx, cy = project(center[0], center[1])
label = html.escape(row.get("scene_label") or row.get("scene_id", ""))
lines.append(f' <text class="label" x="{cx:.3f}" y="{cy:.3f}" text-anchor="middle" dominant-baseline="central">{label}</text>')
lines.extend(
[
" </g>",
f' <text class="meta" x="0" y="{-0.35 * margin_m:.3f}">FOPEN NC rectangles: {len(features)} scenes, bounds {min_lat:.6f},{min_lon:.6f} to {max_lat:.6f},{max_lon:.6f}</text>',
f' <text class="meta" x="0" y="{height_m + 0.55 * margin_m:.3f}">Approx. coverage {width_m:.0f} m x {height_m:.0f} m, mid-lat {mid_lat:.6f}</text>',
"</svg>",
]
)
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
return width_m, height_m, mid_lat
def main():
parser = argparse.ArgumentParser(description="Generate FOPEN map overlay files from a summary TSV.")
parser.add_argument("summary_tsv", type=Path, help="Summary TSV from fopen_nc_summary.py")
parser.add_argument("--out-prefix", type=Path, required=True, help="Output prefix without extension")
parser.add_argument("--ok-only", action="store_true", help="Only include rows whose summary status is ok")
parser.add_argument("--page-width-mm", type=float, default=1189.0, help="SVG page width in mm (default A0 landscape width)")
parser.add_argument("--page-height-mm", type=float, default=841.0, help="SVG page height in mm (default A0 landscape height)")
parser.add_argument("--label-every", type=int, default=1, help="Label every Nth rectangle in SVG; 0 disables labels")
parser.add_argument(
"--svg-fill",
choices=["color", "none"],
default="color",
help="SVG rectangle fill mode (default: color)",
)
parser.add_argument(
"--svg-color-by",
choices=["forestation_percent", "agl_m", "status"],
default="forestation_percent",
help="SVG frame/fill color source (default: forestation_percent)",
)
parser.add_argument(
"--svg-label-mode",
choices=["every", "none", "first-per-group", "last-per-group"],
default="every",
help="SVG label selection mode; --label-every applies only to 'every' (default: every)",
)
parser.add_argument(
"--svg-direction-cue",
choices=["none", "broken-right", "open-right"],
default="none",
help="SVG direction cue on the image-right side from corner1 to corner2 (default: none)",
)
args = parser.parse_args()
rows = read_summary(args.summary_tsv)
features = collect_features(rows, include_missing=not args.ok_only)
if not features:
raise SystemExit("ERROR: no rows with complete rectangle geometry")
out_prefix = args.out_prefix.resolve()
out_prefix.parent.mkdir(parents=True, exist_ok=True)
min_lat, max_lat, min_lon, max_lon = bounds(features)
_, width_m, height_m, mid_lat = project_factory(min_lat, max_lat, min_lon, max_lon)
write_bounds(out_prefix.with_suffix(".bounds.tsv"), features, min_lat, max_lat, min_lon, max_lon, width_m, height_m, mid_lat)
write_geojson(out_prefix.with_suffix(".geojson"), features)
write_kml(out_prefix.with_suffix(".kml"), features)
write_svg(
out_prefix.with_suffix(".svg"),
features,
args.page_width_mm,
args.page_height_mm,
args.label_every,
args.svg_label_mode,
args.svg_fill,
args.svg_color_by,
args.svg_direction_cue,
)
print(f"features: {len(features)}")
print(f"bounds: lat {min_lat:.10f}..{max_lat:.10f}, long {min_lon:.10f}..{max_lon:.10f}")
print(f"coverage_m: {width_m:.1f} x {height_m:.1f}")
print(f"wrote: {out_prefix.with_suffix('.bounds.tsv')}")
print(f"wrote: {out_prefix.with_suffix('.geojson')}")
print(f"wrote: {out_prefix.with_suffix('.kml')}")
print(f"wrote: {out_prefix.with_suffix('.svg')}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""Create a spreadsheet-friendly summary for FOPEN NC linked result directories."""
import argparse
import csv
import math
import os
import sys
from dataclasses import dataclass
from pathlib import Path
EARTH_RADIUS_M = 6378137.0
@dataclass
class EgoRow:
scene_index: int
timestamp_s: float
pose: tuple
lat: float
lon: float
alt: float
raw: list
def parse_float(value):
if value is None:
return None
value = str(value).strip()
if not value:
return None
try:
return float(value)
except ValueError:
return None
def parse_int(value):
number = parse_float(value)
if number is None:
return None
try:
return int(number)
except (TypeError, ValueError):
return None
def read_rows(path):
with open(path, newline="", encoding="utf-8") as f:
sample = f.read(4096)
f.seek(0)
delimiter = "\t" if "\t" in sample else ","
return list(csv.reader(f, delimiter=delimiter))
def scene_id_base(name):
return name[:-6] if name.endswith("-index") else name
def scene_id_to_seconds(scene_id):
base = scene_id_base(scene_id)
if "_" not in base:
return None
try:
sec, frac = base.split("_", 1)
return float(f"{sec}.{frac}")
except ValueError:
return None
def scene_label(scene_id):
base = scene_id_base(scene_id)
if "_" not in base:
return base
sec, frac = base.split("_", 1)
try:
sec_mod = int(sec) % 10000
except ValueError:
return base
tenth = (frac + "0")[0]
return f"{sec_mod:04d}.{tenth}"
def fmt(value, digits=9):
if value is None:
return ""
if isinstance(value, int):
return str(value)
if isinstance(value, float):
if math.isnan(value) or math.isinf(value):
return ""
return f"{value:.{digits}g}"
return str(value)
def first_header_index(header, name, occurrence=1):
seen = 0
for idx, value in enumerate(header):
if value == name:
seen += 1
if seen == occurrence:
return idx
return None
def parse_elevations_histogram(path, threshold_m):
rows = read_rows(path)
if len(rows) < 2:
return {}, "empty_elevation_histogram"
agl_m = parse_float(rows[1][0] if rows[1] else None)
hist = []
for row in rows[1:]:
if len(row) < 3:
continue
elevation = parse_float(row[1])
fraction_lower = parse_float(row[2])
if elevation is None or fraction_lower is None:
continue
hist.append((elevation, fraction_lower))
if not hist:
return {"agl_m": agl_m}, "no_histogram_bins"
hist.sort(key=lambda item: item[0])
fraction_lower_at_threshold = None
if threshold_m <= hist[0][0]:
fraction_lower_at_threshold = hist[0][1]
elif threshold_m >= hist[-1][0]:
fraction_lower_at_threshold = hist[-1][1]
else:
for (e0, f0), (e1, f1) in zip(hist, hist[1:]):
if e0 <= threshold_m <= e1:
if e1 == e0:
fraction_lower_at_threshold = f1
else:
alpha = (threshold_m - e0) / (e1 - e0)
fraction_lower_at_threshold = f0 + alpha * (f1 - f0)
break
forestation_fraction = None
if fraction_lower_at_threshold is not None:
forestation_fraction = max(0.0, min(1.0, 1.0 - fraction_lower_at_threshold))
return {
"agl_m": agl_m,
"hist_min_elev_m": hist[0][0],
"hist_max_elev_m": hist[-1][0],
"fraction_lower_at_threshold": fraction_lower_at_threshold,
"forestation_fraction": forestation_fraction,
}, ""
def parse_egomotion(path, scene_seconds):
rows = read_rows(path)
if not rows:
return {}, "empty_egomotion"
header = rows[0]
idx_scene = first_header_index(header, "#")
idx_timestamp = first_header_index(header, "timestamp")
idx_x = first_header_index(header, "x(m)")
idx_y = first_header_index(header, "y(m)")
idx_z = first_header_index(header, "z(m)")
idx_a = first_header_index(header, "a(rad)")
idx_tilt = first_header_index(header, "tilt(rad)")
idx_roll = first_header_index(header, "roll(rad)")
idx_lat = first_header_index(header, "lat", 1)
idx_lon = first_header_index(header, "long", 1)
idx_alt = first_header_index(header, "alt", 1)
required = [idx_scene, idx_timestamp, idx_x, idx_y, idx_z, idx_a, idx_tilt, idx_roll, idx_lat, idx_lon, idx_alt]
if any(idx is None for idx in required):
return {}, "missing_egomotion_columns"
max_idx = max(required)
ego_rows = []
for row in rows[1:]:
if len(row) <= max_idx:
continue
scene_index = parse_int(row[idx_scene])
timestamp_s = parse_float(row[idx_timestamp])
pose = tuple(parse_float(row[idx]) for idx in (idx_x, idx_y, idx_z, idx_a, idx_tilt, idx_roll))
lat = parse_float(row[idx_lat])
lon = parse_float(row[idx_lon])
alt = parse_float(row[idx_alt])
if scene_index is None or timestamp_s is None or any(v is None for v in pose):
continue
if lat is None or lon is None:
continue
ego_rows.append(EgoRow(scene_index, timestamp_s, pose, lat, lon, alt, row))
if not ego_rows:
return {}, "no_numeric_egomotion_rows"
ref = None
for row in ego_rows:
if all(v == 0.0 for v in row.pose):
ref = row
ref_source = "zero_pose"
break
warnings = []
if ref is None:
if scene_seconds is not None:
ref = min(ego_rows, key=lambda r: abs(r.timestamp_s - scene_seconds))
ref_source = "nearest_timestamp"
warnings.append("zero_pose_not_found")
else:
ref = ego_rows[len(ego_rows) // 2]
ref_source = "middle_row"
warnings.append("zero_pose_not_found")
first = ego_rows[0]
last = ego_rows[-1]
segment_heading_deg = bearing_deg(first.lat, first.lon, last.lat, last.lon)
heading_deg, heading_source, heading_from, heading_to, heading_warning = local_heading(ego_rows, ref)
if heading_warning:
warnings.append(heading_warning)
image_x_axis_deg = None if heading_deg is None else (heading_deg + 180.0) % 360.0
return {
"ego_row_count": len(ego_rows),
"first": first,
"last": last,
"ref": ref,
"ref_source": ref_source,
"segment_heading_deg": segment_heading_deg,
"heading_source": heading_source,
"heading_from": heading_from,
"heading_to": heading_to,
"flight_heading_deg": heading_deg,
"image_x_axis_deg": image_x_axis_deg,
}, ";".join(warnings)
def bearing_deg(lat1, lon1, lat2, lon2):
if None in (lat1, lon1, lat2, lon2):
return None
if lat1 == lat2 and lon1 == lon2:
return None
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dlam = math.radians(lon2 - lon1)
x = math.sin(dlam) * math.cos(phi2)
y = math.cos(phi1) * math.sin(phi2) - math.sin(phi1) * math.cos(phi2) * math.cos(dlam)
return (math.degrees(math.atan2(x, y)) + 360.0) % 360.0
def local_heading(ego_rows, ref):
by_scene_index = {row.scene_index: row for row in ego_rows}
before = by_scene_index.get(ref.scene_index - 1)
after = by_scene_index.get(ref.scene_index + 1)
source = "ref_pm1_scene_index"
warning = ""
if before is None or after is None:
try:
pos = next(i for i, row in enumerate(ego_rows) if row is ref)
except StopIteration:
pos = -1
before = ego_rows[pos - 1] if pos > 0 else None
after = ego_rows[pos + 1] if 0 <= pos < len(ego_rows) - 1 else None
source = "adjacent_rows"
warning = "heading_used_adjacent_rows"
if before is not None and after is not None:
return bearing_deg(before.lat, before.lon, after.lat, after.lon), source, before, after, warning
if after is not None:
return bearing_deg(ref.lat, ref.lon, after.lat, after.lon), "ref_to_next_row", ref, after, "heading_used_one_neighbor"
if before is not None:
return bearing_deg(before.lat, before.lon, ref.lat, ref.lon), "prev_row_to_ref", before, ref, "heading_used_one_neighbor"
if len(ego_rows) >= 2:
return bearing_deg(ego_rows[0].lat, ego_rows[0].lon, ego_rows[-1].lat, ego_rows[-1].lon), "first_to_last_fallback", ego_rows[0], ego_rows[-1], "heading_used_first_last"
return None, "unavailable", None, None, "heading_unavailable"
def footprint_m(agl_m, hfov_deg):
if agl_m is None:
return None, None
vfov_deg = (512.0 / 640.0) * hfov_deg
width_m = 2.0 * agl_m * math.tan(math.radians(hfov_deg) / 2.0)
height_m = 2.0 * agl_m * math.tan(math.radians(vfov_deg) / 2.0)
return width_m, height_m
def rectangle_corners(lat, lon, width_m, height_m, x_axis_heading_deg):
if None in (lat, lon, width_m, height_m, x_axis_heading_deg):
return [None] * 8
lat_rad = math.radians(lat)
cos_lat = math.cos(lat_rad)
if abs(cos_lat) < 1.0e-12:
return [None] * 8
def axis_unit(heading_deg):
heading = math.radians(heading_deg)
return math.sin(heading), math.cos(heading) # east, north
xe, xn = axis_unit(x_axis_heading_deg)
ye, yn = axis_unit((x_axis_heading_deg + 90.0) % 360.0)
corners = []
for sx, sy in ((1, 1), (1, -1), (-1, -1), (-1, 1)):
east = sx * 0.5 * width_m * xe + sy * 0.5 * height_m * ye
north = sx * 0.5 * width_m * xn + sy * 0.5 * height_m * yn
clat = lat + math.degrees(north / EARTH_RADIUS_M)
clon = lon + math.degrees(east / (EARTH_RADIUS_M * cos_lat))
corners.extend([clat, clon])
return corners
def parse_ground_planes(path):
if not path.exists():
return {}, "missing_ground_planes"
rows = read_rows(path)
if not rows:
return {}, "empty_ground_planes"
header = rows[0]
by_scene = {}
for row in rows[1:]:
if not row:
continue
key = row[0].strip()
if not key:
continue
values = {}
for idx, name in enumerate(header):
if idx >= len(row):
continue
values[name] = row[idx]
by_scene[key] = values
return by_scene, ""
def choose_version_dir(scene_dir, version):
preferred = scene_dir / version
if preferred.is_dir():
return preferred, version, ""
versions = sorted(p.name for p in scene_dir.iterdir() if p.is_dir() and p.name.startswith("v"))
if not versions:
return preferred, version, "missing_version_dir"
return scene_dir / versions[-1], versions[-1], f"used_fallback_version:{versions[-1]}"
def build_groups(linked_root):
entries = []
for entry in os.scandir(linked_root):
if entry.name.startswith("."):
continue
if not entry.is_dir(follow_symlinks=True):
continue
entries.append(entry.name)
entries.sort()
groups = []
pending = []
for name in entries:
if name.endswith("-index"):
groups.append((len(groups) + 1, name, pending))
pending = []
else:
pending.append(name)
if pending:
groups.append((len(groups) + 1, "", pending))
return groups
def make_row(linked_root, version, group_index, index_id, group_scenes, scene_id, args, ground_rows, group_warning):
warnings = [w for w in [group_warning] if w]
scene_dir = linked_root / scene_id
scene_seconds = scene_id_to_seconds(scene_id)
version_dir, version_used, version_warning = choose_version_dir(scene_dir, version)
if version_warning:
warnings.append(version_warning)
hist_path = version_dir / f"{scene_id}-elevations_histogram.csv"
ego_path = version_dir / f"{scene_id}-egomotion.csv"
has_elevations_histogram = hist_path.exists()
has_egomotion = ego_path.exists()
hist, hist_warning = ({}, "missing_elevations_histogram")
if has_elevations_histogram:
hist, hist_warning = parse_elevations_histogram(hist_path, args.forest_threshold_m)
if hist_warning:
warnings.append(hist_warning)
ego, ego_warning = ({}, "missing_egomotion")
if has_egomotion:
ego, ego_warning = parse_egomotion(ego_path, scene_seconds)
if ego_warning:
warnings.append(ego_warning)
agl_m = hist.get("agl_m")
width_m, height_m = footprint_m(agl_m, args.hfov_deg)
ref = ego.get("ref")
first = ego.get("first")
last = ego.get("last")
heading_from = ego.get("heading_from")
heading_to = ego.get("heading_to")
center_lat = ref.lat if ref else None
center_lon = ref.lon if ref else None
center_alt = ref.alt if ref else None
x_axis_heading = ego.get("image_x_axis_deg")
corners = rectangle_corners(center_lat, center_lon, width_m, height_m, x_axis_heading)
gp = ground_rows.get(scene_id, {})
has_ground_plane_row = bool(gp)
if index_id and not has_ground_plane_row:
warnings.append("missing_ground_plane_row")
group_start = group_scenes[0] if group_scenes else ""
group_end = group_scenes[-1] if group_scenes else ""
return {
"status": "missing" if warnings else "ok",
"group_index": group_index,
"index_id": index_id,
"index_timestamp_s": scene_id_to_seconds(index_id) if index_id else None,
"group_size": len(group_scenes),
"group_start_id": group_start,
"group_end_id": group_end,
"scene_id": scene_id,
"scene_label": scene_label(scene_id),
"scene_timestamp_s": scene_seconds,
"version": version_used,
"has_version_dir": version_dir.is_dir(),
"has_elevations_histogram": has_elevations_histogram,
"has_egomotion": has_egomotion,
"has_ground_plane_row": has_ground_plane_row,
"agl_m": agl_m,
"hfov_deg": args.hfov_deg,
"vfov_deg": (512.0 / 640.0) * args.hfov_deg,
"footprint_x_m": width_m,
"footprint_y_m": height_m,
"forest_threshold_m": args.forest_threshold_m,
"fraction_lower_at_threshold": hist.get("fraction_lower_at_threshold"),
"forestation_fraction": hist.get("forestation_fraction"),
"forestation_percent": None if hist.get("forestation_fraction") is None else 100.0 * hist["forestation_fraction"],
"hist_min_elev_m": hist.get("hist_min_elev_m"),
"hist_max_elev_m": hist.get("hist_max_elev_m"),
"ego_row_count": ego.get("ego_row_count"),
"ref_source": ego.get("ref_source"),
"ref_scene_index": ref.scene_index if ref else None,
"ref_timestamp_s": ref.timestamp_s if ref else None,
"center_lat": center_lat,
"center_long": center_lon,
"center_alt_m": center_alt,
"first_scene_index": first.scene_index if first else None,
"first_timestamp_s": first.timestamp_s if first else None,
"first_lat": first.lat if first else None,
"first_long": first.lon if first else None,
"last_scene_index": last.scene_index if last else None,
"last_timestamp_s": last.timestamp_s if last else None,
"last_lat": last.lat if last else None,
"last_long": last.lon if last else None,
"segment_heading_deg": ego.get("segment_heading_deg"),
"heading_source": ego.get("heading_source"),
"heading_from_scene_index": heading_from.scene_index if heading_from else None,
"heading_from_timestamp_s": heading_from.timestamp_s if heading_from else None,
"heading_from_lat": heading_from.lat if heading_from else None,
"heading_from_long": heading_from.lon if heading_from else None,
"heading_to_scene_index": heading_to.scene_index if heading_to else None,
"heading_to_timestamp_s": heading_to.timestamp_s if heading_to else None,
"heading_to_lat": heading_to.lat if heading_to else None,
"heading_to_long": heading_to.lon if heading_to else None,
"flight_heading_deg": ego.get("flight_heading_deg"),
"rect_x_axis_heading_deg": x_axis_heading,
"corner1_lat": corners[0],
"corner1_long": corners[1],
"corner2_lat": corners[2],
"corner2_long": corners[3],
"corner3_lat": corners[4],
"corner3_long": corners[5],
"corner4_lat": corners[6],
"corner4_long": corners[7],
"gp_good": parse_float(gp.get("Good")),
"gp_x_m": parse_float(gp.get("X")),
"gp_y_m": parse_float(gp.get("Y")),
"gp_z_m": parse_float(gp.get("Z")),
"gp_lat": parse_float(gp.get("Latitude")),
"gp_long": parse_float(gp.get("Longitude")),
"gp_alt_m": parse_float(gp.get("Altitude")),
"gp_disp_center": parse_float(gp.get("disp_center")),
"linked_path": str(scene_dir),
"warnings": ";".join(warnings),
}
def write_summary(rows, out_path):
headers = [
"status",
"group_index",
"index_id",
"index_timestamp_s",
"group_size",
"group_start_id",
"group_end_id",
"scene_id",
"scene_label",
"scene_timestamp_s",
"version",
"has_version_dir",
"has_elevations_histogram",
"has_egomotion",
"has_ground_plane_row",
"agl_m",
"hfov_deg",
"vfov_deg",
"footprint_x_m",
"footprint_y_m",
"forest_threshold_m",
"fraction_lower_at_threshold",
"forestation_fraction",
"forestation_percent",
"hist_min_elev_m",
"hist_max_elev_m",
"ego_row_count",
"ref_source",
"ref_scene_index",
"ref_timestamp_s",
"center_lat",
"center_long",
"center_alt_m",
"first_scene_index",
"first_timestamp_s",
"first_lat",
"first_long",
"last_scene_index",
"last_timestamp_s",
"last_lat",
"last_long",
"segment_heading_deg",
"heading_source",
"heading_from_scene_index",
"heading_from_timestamp_s",
"heading_from_lat",
"heading_from_long",
"heading_to_scene_index",
"heading_to_timestamp_s",
"heading_to_lat",
"heading_to_long",
"flight_heading_deg",
"rect_x_axis_heading_deg",
"corner1_lat",
"corner1_long",
"corner2_lat",
"corner2_long",
"corner3_lat",
"corner3_long",
"corner4_lat",
"corner4_long",
"gp_good",
"gp_x_m",
"gp_y_m",
"gp_z_m",
"gp_lat",
"gp_long",
"gp_alt_m",
"gp_disp_center",
"linked_path",
"warnings",
]
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f, delimiter="\t", lineterminator="\n")
writer.writerow(headers)
for row in rows:
writer.writerow([fmt(row.get(header)) for header in headers])
def main():
parser = argparse.ArgumentParser(
description="Summarize FOPEN NC linked result directories into a TSV for LibreOffice Calc."
)
parser.add_argument(
"linked_root",
type=Path,
help="Linked view root containing timestamp and *-index directories.",
)
parser.add_argument("-o", "--out", type=Path, required=True, help="Output TSV path.")
parser.add_argument("--version", default="v88", help="Preferred result version directory (default: v88).")
parser.add_argument(
"--forest-threshold-m",
type=float,
default=1.0,
help="Elevation threshold above ground used for forestation fraction (default: 1.0).",
)
parser.add_argument("--hfov-deg", type=float, default=32.0, help="Camera horizontal FoV in degrees.")
args = parser.parse_args()
linked_root = args.linked_root.resolve()
if not linked_root.is_dir():
print(f"ERROR: linked root is not a directory: {linked_root}", file=sys.stderr)
return 1
rows = []
groups = build_groups(linked_root)
for group_index, index_id, group_scenes in groups:
ground_rows = {}
group_warning = ""
if index_id:
index_base = scene_id_base(index_id)
index_dir = linked_root / index_id
index_version_dir, _, index_version_warning = choose_version_dir(index_dir, args.version)
ground_path = index_version_dir / f"{index_base}-ground_planes.csv"
ground_rows, ground_warning = parse_ground_planes(ground_path)
group_warning = ";".join(w for w in [index_version_warning, ground_warning] if w)
else:
group_warning = "missing_following_index"
for scene_id in group_scenes:
rows.append(
make_row(
linked_root,
args.version,
group_index,
index_id,
group_scenes,
scene_id,
args,
ground_rows,
group_warning,
)
)
write_summary(rows, args.out.resolve())
warning_rows = sum(1 for row in rows if row.get("warnings"))
print(f"wrote {len(rows)} rows to {args.out.resolve()}")
print(f"groups: {len(groups)}; rows with warnings: {warning_rows}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment