| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- #!/usr/bin/env python3
- """
- FF-Agent WebAPI client: fetch all Einsätze and write them to a local JSON file.
- Config via config.yml (baseUrl, webApiToken, webApiKey, optional certs).
- Endpoint path and HMAC rule are configurable for when ff-agent provides the list-Einsätze spec.
- """
- import argparse
- import hashlib
- import hmac as hmac_lib
- import json
- import logging
- import sys
- from pathlib import Path
- from datetime import datetime, timezone
- import requests
- import yaml
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s %(levelname)s %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S",
- )
- logger = logging.getLogger(__name__)
- DEFAULT_CONFIG_PATH = "config.yml"
- DEFAULT_OUTPUT_PATH = "data/einsaetze.json"
- def load_config(path: str) -> dict:
- """Load YAML config. Raises on missing/invalid file."""
- p = Path(path)
- if not p.is_file():
- raise FileNotFoundError(f"Config file not found: {path}")
- with open(p, "r", encoding="utf-8") as f:
- data = yaml.safe_load(f)
- if not data:
- raise ValueError("Config file is empty")
- return data
- def compute_hmac(web_api_key: str, message: str) -> str:
- """Compute HMAC-SHA256 hex digest over message using webApiKey."""
- key = web_api_key.encode("utf-8")
- message_bytes = message.encode("utf-8")
- sig = hmac_lib.new(key, message_bytes, hashlib.sha256)
- return sig.hexdigest()
- def build_headers(cfg: dict) -> dict:
- """
- Build request headers including HMAC.
- For GET list endpoint we use [webApiToken] as sign string until ff-agent specifies otherwise.
- """
- token = cfg["webApiToken"]
- api_key = cfg["webApiKey"]
- # HMAC rule for GET (no body): sign string = webApiToken (confirm with ff-agent)
- sign_string = token
- signature = compute_hmac(api_key, sign_string)
- return {
- "Content-Type": "application/json",
- "Accept": "application/json",
- "webApiToken": token,
- "hmac": signature,
- }
- def fetch_einsaetze(cfg: dict) -> list:
- """
- GET list-Einsätze from FF-Agent WebAPI.
- Returns list of Einsatz objects. Raises on HTTP or JSON errors.
- """
- base_url = cfg["baseUrl"].rstrip("/")
- path = cfg.get("listEinsaetzePath", "einsaetze").strip("/")
- url = f"{base_url}/{path}"
- headers = build_headers(cfg)
- verify = cfg.get("serverCertFile") or True
- cert = None
- if cfg.get("clientCertFile") and cfg.get("clientKeyFile"):
- cert = (cfg["clientCertFile"], cfg["clientKeyFile"])
- elif cfg.get("clientCertFile"):
- # Single file: assume PEM with cert+key or cert only (no p12 support without extra dep)
- cert = cfg["clientCertFile"]
- logger.info("Requesting %s", url)
- resp = requests.get(url, headers=headers, cert=cert, verify=verify, timeout=30)
- resp.raise_for_status()
- data = resp.json()
- # Normalize: API might return {"einsaetze": [...]} or direct array
- if isinstance(data, list):
- return data
- if isinstance(data, dict) and "einsaetze" in data:
- return data["einsaetze"]
- if isinstance(data, dict):
- # Some APIs return {"data": [...]} or similar
- for key in ("data", "items", "operations"):
- if isinstance(data.get(key), list):
- return data[key]
- return data if isinstance(data, list) else [data]
- def write_output(einsaetze: list, output_path: str) -> None:
- """Write einsaetze to JSON file with updated timestamp. Creates parent dir if needed."""
- out = Path(output_path)
- out.parent.mkdir(parents=True, exist_ok=True)
- payload = {
- "einsaetze": einsaetze,
- "updated": datetime.now(timezone.utc).isoformat(),
- }
- with open(out, "w", encoding="utf-8") as f:
- json.dump(payload, f, ensure_ascii=False, indent=2)
- logger.info("Wrote %d Einsätze to %s", len(einsaetze), output_path)
- def main() -> int:
- parser = argparse.ArgumentParser(
- description="Fetch FF-Agent Einsätze and write to JSON for ticker backend."
- )
- parser.add_argument(
- "--config",
- default=DEFAULT_CONFIG_PATH,
- help="Path to YAML config (default: config.yml)",
- )
- parser.add_argument(
- "--output",
- default=None,
- help="Output JSON path (overrides config outputPath)",
- )
- args = parser.parse_args()
- try:
- cfg = load_config(args.config)
- except (FileNotFoundError, ValueError) as e:
- logger.error("%s", e)
- return 1
- output_path = args.output or cfg.get("outputPath") or DEFAULT_OUTPUT_PATH
- try:
- einsaetze = fetch_einsaetze(cfg)
- write_output(einsaetze, output_path)
- return 0
- except requests.RequestException as e:
- logger.error("Request failed: %s", e)
- if hasattr(e, "response") and e.response is not None and e.response.text:
- logger.debug("Response: %s", e.response.text[:500])
- return 1
- except json.JSONDecodeError as e:
- logger.error("Invalid JSON response: %s", e)
- return 1
- if __name__ == "__main__":
- sys.exit(main())
|