Commit 92c178fd authored by Andrey Filippov's avatar Andrey Filippov

moved unrelated scripts to codex-elphel repo

parent e6bd52a4
......@@ -27,18 +27,6 @@ Host daemon workflow that removes repeated approval prompts by using a local fil
Path: `scripts/eyesis-host-daemon-howto.md`
### email_send.py
Send email via SSH+sendmail on community.elphel.com by default (set ELPHEL_SEND_MODE=smtp to force SMTP).
Path: `scripts/email_send.py`
### email_fetch.py
Fetch email via IMAP (mail.elphel.com) and save messages to files.
Path: `scripts/email_fetch.py`
---
## mcp_filter_list_by_model_version.py
......
......@@ -10,16 +10,6 @@
"name": "eyesis-host-daemon-howto.md",
"path": "scripts/eyesis-host-daemon-howto.md",
"purpose": "Host daemon workflow that removes repeated approval prompts by using a local file queue."
},
{
"name": "email_send.py",
"path": "scripts/email_send.py",
"purpose": "Send email via SSH+sendmail on community.elphel.com by default (set ELPHEL_SEND_MODE=smtp to force SMTP)."
},
{
"name": "email_fetch.py",
"path": "scripts/email_fetch.py",
"purpose": "Fetch email via IMAP (mail.elphel.com) and save messages to files."
}
],
"scripts": [
......
#!/usr/bin/env python3
import email
import imaplib
import os
import re
from email.header import decode_header
from pathlib import Path
IMAP_HOST = os.environ.get("ELPHEL_IMAP_HOST", "mail.elphel.com")
IMAP_PORT = int(os.environ.get("ELPHEL_IMAP_PORT", "993"))
IMAP_USER = os.environ.get("ELPHEL_IMAP_USER", "codex@elphel.com")
IMAP_PASS = os.environ.get("ELPHEL_IMAP_PASS")
MAILBOX = os.environ.get("ELPHEL_IMAP_BOX", "INBOX")
OUT_DIR = Path(os.environ.get("EMAIL_OUT_DIR", "attic/email_inbox"))
SEARCH = os.environ.get("EMAIL_SEARCH", "ALL")
def _decode(value):
if value is None:
return ""
parts = decode_header(value)
out = ""
for text, enc in parts:
if isinstance(text, bytes):
out += text.decode(enc or "utf-8", errors="replace")
else:
out += text
return out
def _safe_subject(subject):
return "".join(c if c.isalnum() or c in ("-", "_") else "_" for c in subject)[:80] or "message"
def _write_index(index_path, values):
line = ",".join(v.replace(",", " ") for v in values)
index_path.parent.mkdir(parents=True, exist_ok=True)
with index_path.open("a", encoding="utf-8") as f:
f.write(line + "\n")
def _decoded_text_parts(msg):
parts = []
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() != "text/plain":
continue
payload = part.get_payload(decode=True)
if payload is None:
continue
charset = part.get_content_charset() or "utf-8"
parts.append(payload.decode(charset, errors="replace"))
else:
payload = msg.get_payload(decode=True)
if payload is not None:
charset = msg.get_content_charset() or "utf-8"
parts.append(payload.decode(charset, errors="replace"))
return "\n".join(parts)
def _extract_original_headers(text):
if not text:
return "", "", ""
# Find forwarded blocks and keep the deepest match.
pattern = re.compile(
r"Forwarded message.*?From:\s*(?P<from>.+?)\r?\n"
r"(?:To:\s*.+?\r?\n)?"
r"Date:\s*(?P<date>.+?)\r?\n"
r"Subject:\s*(?P<subject>.+?)\r?\n",
re.IGNORECASE | re.DOTALL,
)
matches = list(pattern.finditer(text))
if not matches:
return "", "", ""
last = matches[-1]
original_from = " ".join(last.group("from").split())
original_date = " ".join(last.group("date").split())
original_subject = " ".join(last.group("subject").split())
return original_from, original_subject, original_date
def fetch():
if not IMAP_PASS:
raise SystemExit("Set ELPHEL_IMAP_PASS")
with imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT) as imap:
imap.login(IMAP_USER, IMAP_PASS)
imap.select(MAILBOX)
status, data = imap.search(None, SEARCH)
if status != "OK":
raise SystemExit("IMAP search failed")
ids = data[0].split()
for idx, msg_id in enumerate(ids, start=1):
status, msg_data = imap.fetch(msg_id, "(RFC822)")
if status != "OK":
continue
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
subject = _decode(msg.get("Subject"))
date = _decode(msg.get("Date"))
from_ = _decode(msg.get("From"))
to_ = _decode(msg.get("To"))
cc_ = _decode(msg.get("Cc"))
text = _decoded_text_parts(msg)
original_from, original_subject, original_date = _extract_original_headers(text)
date_dir = OUT_DIR / (date[:10].replace(" ", "_") if date else "unknown-date")
date_dir.mkdir(parents=True, exist_ok=True)
seq = f"{idx:04d}"
fname = f"{seq}_{_safe_subject(subject)}.eml"
out_path = date_dir / fname
out_path.write_bytes(raw)
meta_path = date_dir / f"{seq}_meta.txt"
meta_path.write_text(
f"From: {from_}\n"
f"To: {to_}\n"
f"Cc: {cc_}\n"
f"Date: {date}\n"
f"Subject: {subject}\n"
f"Original-From: {original_from}\n"
f"Original-Date: {original_date}\n"
f"Original-Subject: {original_subject}\n",
encoding="utf-8",
)
_write_index(
OUT_DIR / "index.csv",
[date, from_, to_, cc_, subject, original_from, original_date, original_subject, str(out_path)],
)
if __name__ == "__main__":
fetch()
#!/usr/bin/env python3
import os
import smtplib
import subprocess
import email.utils
from email.message import EmailMessage
from pathlib import Path
SMTP_HOST = os.environ.get("ELPHEL_SMTP_HOST", "mail.elphel.com")
SMTP_PORT = int(os.environ.get("ELPHEL_SMTP_PORT", "465"))
SMTP_USER = os.environ.get("ELPHEL_SMTP_USER", "codex@elphel.com")
SMTP_PASS = os.environ.get("ELPHEL_SMTP_PASS")
# ELPHEL_SEND_MODE=ssh_sendmail (default) or smtp.
# Defaulting to ssh_sendmail avoids sending from the local LAN IP, which can trigger
# recipient-side policy/reputation blocks.
SEND_MODE = os.environ.get("ELPHEL_SEND_MODE", "ssh_sendmail").lower()
SEND_SSH_HOST = os.environ.get("ELPHEL_SEND_SSH_HOST", "community.elphel.com")
SEND_SSH_USER = os.environ.get("ELPHEL_SEND_SSH_USER", "elphel5")
SEND_SSH_PORT = os.environ.get("ELPHEL_SEND_SSH_PORT", "")
OUT_DIR = os.environ.get("EMAIL_OUT_DIR", "attic/email_outbox")
def _safe_subject(subject):
return "".join(c if c.isalnum() or c in ("-", "_") else "_" for c in subject)[:80] or "message"
def _write_index(index_path, values):
line = ",".join(v.replace(",", " ") for v in values)
index_path.parent.mkdir(parents=True, exist_ok=True)
with index_path.open("a", encoding="utf-8") as f:
f.write(line + "\n")
def _send_via_smtp(msg, recipients):
if not SMTP_PASS:
raise SystemExit("Set ELPHEL_SMTP_PASS")
with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT) as s:
s.login(SMTP_USER, SMTP_PASS)
s.send_message(msg, to_addrs=recipients)
def _send_via_ssh_sendmail(msg):
ssh_target = f"{SEND_SSH_USER}@{SEND_SSH_HOST}"
ssh_cmd = ["ssh"]
if SEND_SSH_PORT:
ssh_cmd += ["-p", SEND_SSH_PORT]
ssh_cmd += [ssh_target, "/usr/sbin/sendmail", "-t", "-f", SMTP_USER]
try:
subprocess.run(ssh_cmd, input=msg.as_bytes(), check=True)
except FileNotFoundError:
raise SystemExit("ssh not found; install openssh-client or use SMTP mode")
def send_email(subject, body, to_addrs, cc_addrs=None):
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = SMTP_USER
msg["To"] = ", ".join(to_addrs)
if cc_addrs:
msg["Cc"] = ", ".join(cc_addrs)
msg.set_content(body)
recipients = list(to_addrs) + (list(cc_addrs) if cc_addrs else [])
out_dir = Path(OUT_DIR)
date_dir = out_dir / email.utils.formatdate(localtime=True)[:10].replace(" ", "_")
date_dir.mkdir(parents=True, exist_ok=True)
safe_subject = _safe_subject(subject)
seq = f"{len(list(date_dir.glob('*.eml'))) + 1:04d}"
out_path = date_dir / f"{seq}_{safe_subject}.eml"
out_path.write_bytes(msg.as_bytes())
meta_path = date_dir / f"{seq}_meta.txt"
meta_path.write_text(
f"From: {SMTP_USER}\nTo: {', '.join(to_addrs)}\nCc: {', '.join(cc_addrs or [])}\nSubject: {subject}\n",
encoding="utf-8",
)
_write_index(out_dir / "index.csv", [email.utils.formatdate(localtime=True), SMTP_USER, ",".join(to_addrs), ",".join(cc_addrs or []), subject, str(out_path)])
if SEND_MODE == "ssh_sendmail":
_send_via_ssh_sendmail(msg)
else:
_send_via_smtp(msg, recipients)
def main():
subject = os.environ.get("EMAIL_SUBJECT", "MCP access test")
body = os.environ.get("EMAIL_BODY", "Hi,\n\nTest message.\n")
to_addrs = os.environ.get("EMAIL_TO", "").split(",")
cc_addrs = os.environ.get("EMAIL_CC", "").split(",") if os.environ.get("EMAIL_CC") else []
to_addrs = [a.strip() for a in to_addrs if a.strip()]
cc_addrs = [a.strip() for a in cc_addrs if a.strip()]
if not to_addrs:
raise SystemExit("Set EMAIL_TO=addr1,addr2")
send_email(subject, body, to_addrs, cc_addrs)
if __name__ == "__main__":
main()
#!/usr/bin/env bash
set -euo pipefail
TARGET_DEFAULT="elphel@192.168.0.224"
TARGET="${TARGET:-$TARGET_DEFAULT}"
MODE="remote"
KWIN_ONLY=0
FORCE_DISPLAY="${DISPLAY_OVERRIDE:-}"
FORCE_XAUTH="${XAUTHORITY_OVERRIDE:-}"
SSH_CONNECT_TIMEOUT="${SSH_CONNECT_TIMEOUT:-7}"
usage() {
cat <<'EOF'
Recover KDE desktop responsiveness without reboot.
Default mode:
Connect to elphel@192.168.0.224 over SSH and run recovery there.
Usage:
recover_kde_gui.sh [options]
Options:
--target USER@HOST SSH destination (default: elphel@192.168.0.224)
--local Run recovery on current machine (no SSH)
--kwin-only Restart only kwin service (skip plasmashell restart)
--display VALUE Force DISPLAY (example: :0)
--xauthority PATH Force XAUTHORITY path
-h, --help Show this help
Examples:
recover_kde_gui.sh
recover_kde_gui.sh --target elphel@192.168.0.224
recover_kde_gui.sh --local --display :0
recover_kde_gui.sh --local --kwin-only
EOF
}
log() {
printf '[%s] %s\n' "$(date '+%F %T')" "$*"
}
detect_proc_env() {
local pid="$1"
local key="$2"
if [[ -r "/proc/$pid/environ" ]]; then
tr '\0' '\n' <"/proc/$pid/environ" | sed -n "s/^${key}=//p" | head -n1
fi
}
recover_local() {
local uid display xauth kwin_pid
uid="$(id -u)"
export XDG_RUNTIME_DIR="/run/user/$uid"
if [[ ! -d "$XDG_RUNTIME_DIR" ]]; then
log "ERROR: $XDG_RUNTIME_DIR is missing (no user runtime dir)."
return 2
fi
log "Restarting plasma-kwin_x11.service"
if ! systemctl --user restart plasma-kwin_x11.service; then
log "ERROR: failed to restart plasma-kwin_x11.service"
return 3
fi
if [[ "$KWIN_ONLY" -eq 1 ]]; then
log "Done (kwin-only mode)."
return 0
fi
kwin_pid="$(pgrep -u "$uid" -n -x kwin_x11 || true)"
display="$FORCE_DISPLAY"
if [[ -z "$display" && -n "$kwin_pid" ]]; then
display="$(detect_proc_env "$kwin_pid" DISPLAY || true)"
fi
if [[ -z "$display" ]]; then
display=":0"
fi
xauth="$FORCE_XAUTH"
if [[ -z "$xauth" && -n "$kwin_pid" ]]; then
xauth="$(detect_proc_env "$kwin_pid" XAUTHORITY || true)"
fi
if [[ -z "$xauth" ]]; then
xauth="$HOME/.Xauthority"
fi
export DISPLAY="$display"
export XAUTHORITY="$xauth"
log "Using DISPLAY=$DISPLAY XAUTHORITY=$XAUTHORITY"
log "Restarting plasmashell"
kquitapp5 plasmashell >/dev/null 2>&1 || true
sleep 1
kstart5 plasmashell >/dev/null 2>&1 || true
if command -v qdbus >/dev/null 2>&1; then
qdbus org.kde.KWin /Compositor suspend >/dev/null 2>&1 || true
sleep 1
qdbus org.kde.KWin /Compositor resume >/dev/null 2>&1 || true
fi
log "KDE recovery commands completed."
}
while [[ $# -gt 0 ]]; do
case "$1" in
--target)
TARGET="$2"
shift 2
;;
--local)
MODE="local"
shift
;;
--kwin-only)
KWIN_ONLY=1
shift
;;
--display)
FORCE_DISPLAY="$2"
shift 2
;;
--xauthority)
FORCE_XAUTH="$2"
shift 2
;;
-h|--help)
usage
exit 0
;;
*)
echo "Unknown argument: $1" >&2
usage
exit 1
;;
esac
done
if [[ "$MODE" == "remote" ]]; then
log "Connecting to $TARGET"
remote_cmd="$(printf "KWIN_ONLY=%q FORCE_DISPLAY=%q FORCE_XAUTH=%q bash -s -- --local" "$KWIN_ONLY" "$FORCE_DISPLAY" "$FORCE_XAUTH")"
exec ssh -o BatchMode=yes -o ConnectTimeout="$SSH_CONNECT_TIMEOUT" "$TARGET" "$remote_cmd" <"$0"
fi
recover_local
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment