Commit fb16f180 authored by Andrey Filippov's avatar Andrey Filippov

Python helper scripts, mostly using MCP to control the program

parent e55621a0
# Project scripts
This file is generated from `scripts/catalog.json`.
## How to register a new script
1) Add an entry in `scripts/catalog.json` under `scripts`.
2) Run:
```bash
/home/elphel/git/imagej-elphel/scripts/catalog_to_md.py
```
---
## mcp_filter_list_by_model_version.py
Filter a .list file to keep only sequences with an existing model version subdir.
Path: `scripts/mcp_filter_list_by_model_version.py`
Example:
```bash
/home/elphel/git/imagej-elphel/scripts/mcp_filter_list_by_model_version.py v89
```
Inputs:
- MCP server (http://127.0.0.1:48888)
- list file (default: /media/elphel/btrfs-data/lwir16-proc/NC/lists/nc_site.list)
- model version (from MCP dialog or CLI argument)
Outputs:
- Filtered .list file (default: /home/elphel/git/imagej-elphel/attic/CODEX/lists/nc_site-v89.list)
Dependencies:
- MCP server
- -Delphel.mcp.allowed.configdir
Tags: mcp, list, models, filter
## field_calib_aggregate.py
Aggregate FIELD_CALIBRATION.corr-xml into CSV with per-sensor angles and noise metrics.
Path: `scripts/field_calib_aggregate.py`
Example:
```bash
/home/elphel/git/imagej-elphel/scripts/field_calib_aggregate.py /media/elphel/btrfs-data/lwir16-proc/NC/models/1763232117-1763234145 -o /home/elphel/CODEX/tmp/field_calib_1763232117-1763234145.csv
```
Inputs:
- Models root directory containing *-FIELD_CALIBRATION.corr-xml
Outputs:
- CSV file with per-sensor azimuth/tilt/roll columns and stats
Tags: csv, models, aggregation, plot
{
"version": 1,
"scripts": [
{
"name": "mcp_filter_list_by_model_version.py",
"path": "scripts/mcp_filter_list_by_model_version.py",
"purpose": "Filter a .list file to keep only sequences with an existing model version subdir.",
"inputs": [
"MCP server (http://127.0.0.1:48888)",
"list file (default: /media/elphel/btrfs-data/lwir16-proc/NC/lists/nc_site.list)",
"model version (from MCP dialog or CLI argument)"
],
"outputs": [
"Filtered .list file (default: /home/elphel/git/imagej-elphel/attic/CODEX/lists/nc_site-v89.list)"
],
"example": "/home/elphel/git/imagej-elphel/scripts/mcp_filter_list_by_model_version.py v89",
"tags": ["mcp", "list", "models", "filter"],
"dependencies": ["MCP server", "-Delphel.mcp.allowed.configdir"],
"owner": "codex",
"created": "2026-02-03"
},
{
"name": "field_calib_aggregate.py",
"path": "scripts/field_calib_aggregate.py",
"purpose": "Aggregate FIELD_CALIBRATION.corr-xml into CSV with per-sensor angles and noise metrics.",
"inputs": [
"Models root directory containing *-FIELD_CALIBRATION.corr-xml"
],
"outputs": [
"CSV file with per-sensor azimuth/tilt/roll columns and stats"
],
"example": "/home/elphel/git/imagej-elphel/scripts/field_calib_aggregate.py /media/elphel/btrfs-data/lwir16-proc/NC/models/1763232117-1763234145 -o /home/elphel/CODEX/tmp/field_calib_1763232117-1763234145.csv",
"tags": ["csv", "models", "aggregation", "plot"],
"dependencies": [],
"owner": "codex",
"created": "2026-02-03"
}
]
}
#!/usr/bin/env python3
import json
from pathlib import Path
HERE = Path(__file__).resolve().parent
CATALOG = HERE / "catalog.json"
OUT = HERE / "README.md"
with CATALOG.open("r", encoding="utf-8") as f:
data = json.load(f)
lines = [
"# Project scripts",
"",
"This file is generated from `scripts/catalog.json`.",
"",
"## How to register a new script",
"",
"1) Add an entry in `scripts/catalog.json` under `scripts`.",
"2) Run:",
"",
"```bash",
str(HERE / "catalog_to_md.py"),
"```",
"",
"---",
"",
]
for s in data.get("scripts", []):
lines.append(f"## {s.get('name')}")
lines.append("")
lines.append(s.get("purpose", ""))
lines.append("")
if s.get("path"):
lines.append(f"Path: `{s['path']}`")
lines.append("")
if s.get("example"):
lines.append("Example:")
lines.append("")
lines.append("```bash")
lines.append(s["example"])
lines.append("```")
lines.append("")
if s.get("inputs"):
lines.append("Inputs:")
for i in s["inputs"]:
lines.append(f"- {i}")
lines.append("")
if s.get("outputs"):
lines.append("Outputs:")
for o in s["outputs"]:
lines.append(f"- {o}")
lines.append("")
if s.get("dependencies"):
lines.append("Dependencies:")
for d in s["dependencies"]:
lines.append(f"- {d}")
lines.append("")
if s.get("tags"):
lines.append("Tags: " + ", ".join(s["tags"]))
lines.append("")
OUT.write_text("\n".join(lines).rstrip() + "\n", encoding="utf-8")
#!/usr/bin/env python3
# NOTE (for future Java port):
# - Scan for files ending with "-FIELD_CALIBRATION.corr-xml"
# - Parse <entry key="PREFIX.extrinsic_corr_{azimuth|tilt|roll}{sensor}">value</entry>
# - Output CSV with columns: scene_id, noise, azimuth0, tilt0, roll0, azimuth1, tilt1, roll1, ... (sensor-major order)
# - scene_id is the parent directory name of the file (timestamp)
import argparse
import csv
import os
import re
import sys
import xml.etree.ElementTree as ET
KEY_RE = re.compile(r'^(?P<prefix>\w+)\.extrinsic_corr_(?P<kind>azimuth|tilt|roll)(?P<sensor>\d+)$')
def iter_files(root):
for dirpath, _, filenames in os.walk(root):
for name in filenames:
if name.endswith('-FIELD_CALIBRATION.corr-xml'):
yield os.path.join(dirpath, name)
def parse_file(path):
try:
tree = ET.parse(path)
except Exception as e:
return None, f"parse_error: {e}"
root = tree.getroot()
data = {} # (prefix, sensor) -> {kind: value}
for entry in root.findall('entry'):
key = entry.get('key')
if not key:
continue
m = KEY_RE.match(key)
if not m:
continue
prefix = m.group('prefix')
kind = m.group('kind')
sensor = int(m.group('sensor'))
try:
value = float((entry.text or '').strip())
except Exception:
continue
data.setdefault((prefix, sensor), {})[kind] = value
return data, None
def scene_id_from_path(path):
# Folder name just above the file is the scene timestamp
# Example: .../1763232535_880101/1763232535_880101-FIELD_CALIBRATION.corr-xml
return os.path.basename(os.path.dirname(path))
def main():
ap = argparse.ArgumentParser(description='Aggregate FIELD_CALIBRATION corr-xml into CSV')
ap.add_argument('root', help='Root directory to scan (e.g., /media/elphel/.../models/1763232117-1763234145)')
ap.add_argument('-o', '--out', required=True, help='Output CSV path')
ap.add_argument('--prefix', default='EYESIS_DCT_AUX', help='Prefix filter (default: EYESIS_DCT_AUX). Use ALL for any.')
ap.add_argument('--window', type=int, default=10, help='Running average window for noise (default: 10)')
args = ap.parse_args()
row_count = 0
errors = 0
# scene_id -> {(prefix, sensor): {kind: value}}
by_scene = {}
for path in iter_files(args.root):
data, err = parse_file(path)
if err:
errors += 1
continue
if not data:
continue
sid = scene_id_from_path(path)
by_scene.setdefault(sid, {})
for key, vals in data.items():
by_scene[sid][key] = vals
# Build headers: angle + sensor number
sensors = list(range(16))
angles = ['azimuth', 'tilt', 'roll']
headers = ['scene_id', 'noise', 'noise_azimuth', 'noise_tilt', 'noise_roll'] + [f"{a}{s}" for s in sensors for a in angles]
with open(args.out, 'w', newline='') as f:
w = csv.writer(f)
w.writerow(headers)
# Sort scene IDs by numeric timestamp if possible (replace '_' with '.')
def scene_key(s):
try:
return float(s.replace('_', '.'))
except Exception:
return s
sorted_sids = sorted(by_scene.keys(), key=scene_key)
# Precompute running averages per column
cols = [f"{a}{s}" for s in sensors for a in angles]
col_values = {c: [] for c in cols} # list aligned with sorted_sids
for sid in sorted_sids:
scene_data = by_scene[sid]
for c in cols:
# c like azimuth3 -> angle+sensor
a = ''.join([ch for ch in c if ch.isalpha()])
s = int(''.join([ch for ch in c if ch.isdigit()]))
val = None
if args.prefix == 'ALL':
for (prefix, sensor), v in scene_data.items():
if sensor == s and a in v:
val = v[a]
break
else:
v = scene_data.get((args.prefix, s))
if v is not None:
val = v.get(a)
col_values[c].append(val)
def running_avg(vals, idx, window):
# centered window over non-null values
if window <= 1:
return vals[idx] if vals[idx] is not None else None
half = window // 2
start = max(0, idx - half)
end = min(len(vals) - 1, idx + (window - half - 1))
acc = 0.0
n = 0
for j in range(start, end + 1):
v = vals[j]
if v is None:
continue
acc += v
n += 1
return (acc / n) if n else None
for i, sid in enumerate(sorted_sids):
row = [sid]
scene_data = by_scene[sid]
# compute noise (RMS), total + per-angle with per-angle sample counts
noise_sum_by_angle = {a: 0.0 for a in angles}
noise_count_by_angle = {a: 0 for a in angles}
for c in cols:
v = col_values[c][i]
if v is None:
continue
avg = running_avg(col_values[c], i, args.window)
if avg is None:
continue
hp = v - avg
# angle name is alpha prefix of column
a = ''.join([ch for ch in c if ch.isalpha()])
if a in noise_sum_by_angle:
noise_sum_by_angle[a] += hp * hp
noise_count_by_angle[a] += 1
az_rms = (noise_sum_by_angle['azimuth'] / noise_count_by_angle['azimuth']) ** 0.5 if noise_count_by_angle['azimuth'] else 0.0
tilt_rms = (noise_sum_by_angle['tilt'] / noise_count_by_angle['tilt']) ** 0.5 if noise_count_by_angle['tilt'] else 0.0
roll_rms = (noise_sum_by_angle['roll'] / noise_count_by_angle['roll']) ** 0.5 if noise_count_by_angle['roll'] else 0.0
noise = ((az_rms * az_rms + tilt_rms * tilt_rms + roll_rms * roll_rms) / 3.0) ** 0.5
row.append(noise)
row.append(az_rms)
row.append(tilt_rms)
row.append(roll_rms)
for s in sensors:
for a in angles:
val = None
# prefer matching prefix if specified
if args.prefix == 'ALL':
# pick first available prefix
for (prefix, sensor), vals in scene_data.items():
if sensor == s and a in vals:
val = vals[a]
break
else:
vals = scene_data.get((args.prefix, s))
if vals is not None:
val = vals.get(a)
row.append(val)
w.writerow(row)
row_count += 1
# Append stats at the bottom (name/value/comment in first 3 columns)
w.writerow([])
w.writerow(["STAT", "name", "value", "comment"])
# Collect stats
def stddev(vals):
if not vals:
return None
mean = sum(vals) / len(vals)
var = sum((v - mean) ** 2 for v in vals) / len(vals)
return var ** 0.5
angle_stats = {a: [] for a in angles} # (std, sensor, n)
overall = []
for s in sensors:
for a in angles:
vals = []
# rebuild vals from by_scene
for sid in by_scene.keys():
scene_data = by_scene[sid]
if args.prefix == 'ALL':
found = None
for (prefix, sensor), v in scene_data.items():
if sensor == s and a in v:
found = v[a]
break
if found is not None:
vals.append(found)
else:
v = scene_data.get((args.prefix, s))
if v is not None and a in v:
vals.append(v[a])
sd = stddev(vals)
if sd is not None:
angle_stats[a].append((sd, s, len(vals)))
overall.append((sd, a, s, len(vals)))
for a in angles:
if angle_stats[a]:
angle_stats[a].sort(reverse=True)
sd, s, n = angle_stats[a][0]
w.writerow(["STAT", f"worst_{a}_sensor", s, f"std={sd:.6g}, n={n}"])
overall.sort(reverse=True)
w.writerow(["STAT", "top5_by_std", "", "format: angle sensor std n"])
for sd, a, s, n in overall[:5]:
w.writerow(["STAT", f"{a}{s}", f"{sd:.6g}", f"n={n}"])
print(f"Wrote {row_count} rows to {args.out}")
if errors:
print(f"Skipped {errors} files due to parse errors", file=sys.stderr)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import json
import os
import re
import sys
import urllib.parse
import urllib.request
MCP_BASE = "http://127.0.0.1:48888"
LIST_PATH = "/media/elphel/btrfs-data/lwir16-proc/NC/lists/nc_site.list"
OUT_PATH = "/home/elphel/git/imagej-elphel/attic/CODEX/lists/nc_site-v89.list"
LABEL_VERSION = "x3d model version"
LABEL_DIALOG = "Setup CLT Batch parameters"
def http_get(path, params=None, timeout=2.0):
url = MCP_BASE + path
if params:
url += "?" + urllib.parse.urlencode(params)
with urllib.request.urlopen(url, timeout=timeout) as resp:
return resp.read().decode("utf-8")
def http_post(path, data, timeout=2.0):
url = MCP_BASE + path
body = urllib.parse.urlencode(data).encode("utf-8")
req = urllib.request.Request(url, data=body, method="POST")
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read().decode("utf-8")
def parse_json(text):
return json.loads(text)
def get_version_from_mcp():
http_post("/mcp/button", {"label": LABEL_DIALOG})
dialog = None
for _ in range(25):
dialog = parse_json(http_get("/mcp/dialog"))
if dialog.get("ok") and dialog.get("dialog") is not None:
break
import time
time.sleep(0.2)
if not dialog or dialog.get("dialog") is None:
raise RuntimeError("No active MCP dialog")
dialog_id = dialog["dialog"].get("id")
fields = dialog["dialog"].get("fields", [])
version = None
for f in fields:
if f.get("label") == LABEL_VERSION:
version = f.get("default")
break
if version is None:
raise RuntimeError("x3d model version not found in MCP dialog")
# cancel dialog to avoid applying defaults
if dialog_id:
http_post("/mcp/dialog/submit", {"id": dialog_id, "ok": "0"})
else:
http_post("/mcp/dialog/submit", {"ok": "0"})
return version
def read_list_file():
data = parse_json(http_get("/mcp/fs/read", {"path": LIST_PATH, "offset": 0, "maxBytes": 1024 * 1024}, timeout=30.0))
if not data.get("ok"):
raise RuntimeError("Failed to read list file")
return data.get("data", "")
def parse_set_paths(list_text):
base_path = os.path.dirname(LIST_PATH)
root_dir = None
linked_models = None
x3d_dir = None
for raw in list_text.splitlines():
line = raw.split("#", 1)[0].strip()
if not line:
continue
tokens = re.split(r"[\s,;=]+", line)
if len(tokens) >= 2 and tokens[0].upper() == "SET":
key = tokens[1]
val = tokens[2] if len(tokens) >= 3 else ""
if key == "rootDirectory":
root_dir = val
elif key == "linkedModels":
linked_models = val
elif key == "x3dDirectory":
x3d_dir = val
if root_dir:
base_path = os.path.normpath(os.path.join(base_path, root_dir))
chosen = x3d_dir or linked_models
if not chosen:
raise RuntimeError("x3dDirectory or linkedModels not found in list file")
models_dir = os.path.normpath(os.path.join(base_path, chosen))
return models_dir
def parse_sequences(list_text):
seqs = []
lines = list_text.splitlines()
for idx, raw in enumerate(lines):
stripped = raw.lstrip()
if not stripped or stripped.startswith("#"):
continue
line = raw.split("#", 1)[0].strip()
if not line:
continue
tokens = re.split(r"[\s,;=]+", line)
if len(tokens) == 0:
continue
if tokens[0].upper() == "SET":
continue
seqs.append((tokens[0], idx))
return seqs, lines
def glob_v_dirs(version, models_dir):
pattern = f"{models_dir}/**/{version}"
data = parse_json(http_get("/mcp/fs/glob", {"pattern": pattern, "max": 200000, "maxDepth": 4}, timeout=30.0))
if not data.get("ok"):
raise RuntimeError("Failed to glob version dirs")
return data.get("matches", [])
def main():
version = sys.argv[1] if len(sys.argv) > 1 else get_version_from_mcp()
list_text = read_list_file()
models_dir = parse_set_paths(list_text)
seqs, lines = parse_sequences(list_text)
if not seqs:
raise RuntimeError("No sequences found")
matches = glob_v_dirs(version, models_dir)
model_dir_norm = os.path.normpath(models_dir)
scene_ts = []
for m in matches:
m_norm = os.path.normpath(m)
if not m_norm.startswith(model_dir_norm + os.sep):
continue
parent = os.path.dirname(m_norm)
ts = os.path.basename(parent)
if ts:
scene_ts.append(ts)
scene_ts = sorted(set(scene_ts))
keep = set()
if scene_ts:
i = 0
for idx, (seq, line_idx) in enumerate(seqs):
next_seq = seqs[idx + 1][0] if idx + 1 < len(seqs) else None
while i < len(scene_ts) and scene_ts[i] < seq:
i += 1
if i < len(scene_ts):
if next_seq is None or scene_ts[i] < next_seq:
keep.add(seq)
out_lines = []
for raw in lines:
stripped = raw.lstrip()
if not stripped:
out_lines.append(raw)
continue
if stripped.startswith("#"):
out_lines.append(raw)
continue
line_no_comment = raw.split("#", 1)[0].strip()
if not line_no_comment:
out_lines.append(raw)
continue
tokens = re.split(r"[\s,;=]+", line_no_comment)
if tokens and tokens[0].upper() == "SET":
out_lines.append(raw)
continue
seq = tokens[0]
if seq in keep:
out_lines.append(raw)
else:
out_lines.append("#" + raw)
os.makedirs(os.path.dirname(OUT_PATH), exist_ok=True)
with open(OUT_PATH, "w", encoding="utf-8") as f:
f.write("\n".join(out_lines) + ("\n" if out_lines else ""))
print(f"Version: {version}")
print(f"Models dir: {models_dir}")
print(f"Sequences kept: {len(keep)} / {len(seqs)}")
print(f"Output: {OUT_PATH}")
if __name__ == "__main__":
main()
......@@ -1833,6 +1833,7 @@ public class EyesisCorrectionParameters {
if ((i != KEY_INDEX_UAS_LOGS) && (i != KEY_INDEX_SKY_MASK)) { // cuasUasLogs, cuasSkyMask are files, not directories
if (!dir_file.exists()) {
System.out.println(KEY_DIRS[i]+" directory "+dir_path.toString()+" does not exist, ignoring "+seq_str);
System.out.println("Please run:\nmkdir -p "+dir_path.toString());
return null;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment