#!/usr/bin/env python3 """ FF-Agent WebAPI client: fetch all Einsätze and write them to a local JSON file. Config via config.yml (baseUrl, webApiToken, webApiKey, optional certs). Endpoint path and HMAC rule are configurable for when ff-agent provides the list-Einsätze spec. """ import argparse import hashlib import hmac as hmac_lib import json import logging import sys from pathlib import Path from datetime import datetime, timezone import requests import yaml logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) DEFAULT_CONFIG_PATH = "config.yml" DEFAULT_OUTPUT_PATH = "data/einsaetze.json" def load_config(path: str) -> dict: """Load YAML config. Raises on missing/invalid file.""" p = Path(path) if not p.is_file(): raise FileNotFoundError(f"Config file not found: {path}") with open(p, "r", encoding="utf-8") as f: data = yaml.safe_load(f) if not data: raise ValueError("Config file is empty") return data def compute_hmac(web_api_key: str, message: str) -> str: """Compute HMAC-SHA256 hex digest over message using webApiKey.""" key = web_api_key.encode("utf-8") message_bytes = message.encode("utf-8") sig = hmac_lib.new(key, message_bytes, hashlib.sha256) return sig.hexdigest() def build_headers(cfg: dict) -> dict: """ Build request headers including HMAC. For GET list endpoint we use [webApiToken] as sign string until ff-agent specifies otherwise. """ token = cfg["webApiToken"] api_key = cfg["webApiKey"] # HMAC rule for GET (no body): sign string = webApiToken (confirm with ff-agent) sign_string = token signature = compute_hmac(api_key, sign_string) return { "Content-Type": "application/json", "Accept": "application/json", "webApiToken": token, "hmac": signature, } def fetch_einsaetze(cfg: dict) -> list: """ GET list-Einsätze from FF-Agent WebAPI. Returns list of Einsatz objects. Raises on HTTP or JSON errors. """ base_url = cfg["baseUrl"].rstrip("/") path = cfg.get("listEinsaetzePath", "einsaetze").strip("/") url = f"{base_url}/{path}" headers = build_headers(cfg) verify = cfg.get("serverCertFile") or True cert = None if cfg.get("clientCertFile") and cfg.get("clientKeyFile"): cert = (cfg["clientCertFile"], cfg["clientKeyFile"]) elif cfg.get("clientCertFile"): # Single file: assume PEM with cert+key or cert only (no p12 support without extra dep) cert = cfg["clientCertFile"] logger.info("Requesting %s", url) resp = requests.get(url, headers=headers, cert=cert, verify=verify, timeout=30) resp.raise_for_status() data = resp.json() # Normalize: API might return {"einsaetze": [...]} or direct array if isinstance(data, list): return data if isinstance(data, dict) and "einsaetze" in data: return data["einsaetze"] if isinstance(data, dict): # Some APIs return {"data": [...]} or similar for key in ("data", "items", "operations"): if isinstance(data.get(key), list): return data[key] return data if isinstance(data, list) else [data] def write_output(einsaetze: list, output_path: str) -> None: """Write einsaetze to JSON file with updated timestamp. Creates parent dir if needed.""" out = Path(output_path) out.parent.mkdir(parents=True, exist_ok=True) payload = { "einsaetze": einsaetze, "updated": datetime.now(timezone.utc).isoformat(), } with open(out, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) logger.info("Wrote %d Einsätze to %s", len(einsaetze), output_path) def main() -> int: parser = argparse.ArgumentParser( description="Fetch FF-Agent Einsätze and write to JSON for ticker backend." ) parser.add_argument( "--config", default=DEFAULT_CONFIG_PATH, help="Path to YAML config (default: config.yml)", ) parser.add_argument( "--output", default=None, help="Output JSON path (overrides config outputPath)", ) args = parser.parse_args() try: cfg = load_config(args.config) except (FileNotFoundError, ValueError) as e: logger.error("%s", e) return 1 output_path = args.output or cfg.get("outputPath") or DEFAULT_OUTPUT_PATH try: einsaetze = fetch_einsaetze(cfg) write_output(einsaetze, output_path) return 0 except requests.RequestException as e: logger.error("Request failed: %s", e) if hasattr(e, "response") and e.response is not None and e.response.text: logger.debug("Response: %s", e.response.text[:500]) return 1 except json.JSONDecodeError as e: logger.error("Invalid JSON response: %s", e) return 1 if __name__ == "__main__": sys.exit(main())