fetch_einsaetze.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. #!/usr/bin/env python3
  2. """
  3. FF-Agent WebAPI client: fetch all Einsätze and write them to a local JSON file.
  4. Config via config.yml (baseUrl, webApiToken, webApiKey, optional certs).
  5. Endpoint path and HMAC rule are configurable for when ff-agent provides the list-Einsätze spec.
  6. """
  7. import argparse
  8. import hashlib
  9. import hmac as hmac_lib
  10. import json
  11. import logging
  12. import sys
  13. from pathlib import Path
  14. from datetime import datetime, timezone
  15. import requests
  16. import yaml
  17. logging.basicConfig(
  18. level=logging.INFO,
  19. format="%(asctime)s %(levelname)s %(message)s",
  20. datefmt="%Y-%m-%d %H:%M:%S",
  21. )
  22. logger = logging.getLogger(__name__)
  23. DEFAULT_CONFIG_PATH = "config.yml"
  24. DEFAULT_OUTPUT_PATH = "data/einsaetze.json"
  25. def load_config(path: str) -> dict:
  26. """Load YAML config. Raises on missing/invalid file."""
  27. p = Path(path)
  28. if not p.is_file():
  29. raise FileNotFoundError(f"Config file not found: {path}")
  30. with open(p, "r", encoding="utf-8") as f:
  31. data = yaml.safe_load(f)
  32. if not data:
  33. raise ValueError("Config file is empty")
  34. return data
  35. def compute_hmac(web_api_key: str, message: str) -> str:
  36. """Compute HMAC-SHA256 hex digest over message using webApiKey."""
  37. key = web_api_key.encode("utf-8")
  38. message_bytes = message.encode("utf-8")
  39. sig = hmac_lib.new(key, message_bytes, hashlib.sha256)
  40. return sig.hexdigest()
  41. def build_headers(cfg: dict) -> dict:
  42. """
  43. Build request headers including HMAC.
  44. For GET list endpoint we use [webApiToken] as sign string until ff-agent specifies otherwise.
  45. """
  46. token = cfg["webApiToken"]
  47. api_key = cfg["webApiKey"]
  48. # HMAC rule for GET (no body): sign string = webApiToken (confirm with ff-agent)
  49. sign_string = token
  50. signature = compute_hmac(api_key, sign_string)
  51. return {
  52. "Content-Type": "application/json",
  53. "Accept": "application/json",
  54. "webApiToken": token,
  55. "hmac": signature,
  56. }
  57. def fetch_einsaetze(cfg: dict) -> list:
  58. """
  59. GET list-Einsätze from FF-Agent WebAPI.
  60. Returns list of Einsatz objects. Raises on HTTP or JSON errors.
  61. """
  62. base_url = cfg["baseUrl"].rstrip("/")
  63. path = cfg.get("listEinsaetzePath", "einsaetze").strip("/")
  64. url = f"{base_url}/{path}"
  65. headers = build_headers(cfg)
  66. verify = cfg.get("serverCertFile") or True
  67. cert = None
  68. if cfg.get("clientCertFile") and cfg.get("clientKeyFile"):
  69. cert = (cfg["clientCertFile"], cfg["clientKeyFile"])
  70. elif cfg.get("clientCertFile"):
  71. # Single file: assume PEM with cert+key or cert only (no p12 support without extra dep)
  72. cert = cfg["clientCertFile"]
  73. logger.info("Requesting %s", url)
  74. resp = requests.get(url, headers=headers, cert=cert, verify=verify, timeout=30)
  75. resp.raise_for_status()
  76. data = resp.json()
  77. # Normalize: API might return {"einsaetze": [...]} or direct array
  78. if isinstance(data, list):
  79. return data
  80. if isinstance(data, dict) and "einsaetze" in data:
  81. return data["einsaetze"]
  82. if isinstance(data, dict):
  83. # Some APIs return {"data": [...]} or similar
  84. for key in ("data", "items", "operations"):
  85. if isinstance(data.get(key), list):
  86. return data[key]
  87. return data if isinstance(data, list) else [data]
  88. def write_output(einsaetze: list, output_path: str) -> None:
  89. """Write einsaetze to JSON file with updated timestamp. Creates parent dir if needed."""
  90. out = Path(output_path)
  91. out.parent.mkdir(parents=True, exist_ok=True)
  92. payload = {
  93. "einsaetze": einsaetze,
  94. "updated": datetime.now(timezone.utc).isoformat(),
  95. }
  96. with open(out, "w", encoding="utf-8") as f:
  97. json.dump(payload, f, ensure_ascii=False, indent=2)
  98. logger.info("Wrote %d Einsätze to %s", len(einsaetze), output_path)
  99. def main() -> int:
  100. parser = argparse.ArgumentParser(
  101. description="Fetch FF-Agent Einsätze and write to JSON for ticker backend."
  102. )
  103. parser.add_argument(
  104. "--config",
  105. default=DEFAULT_CONFIG_PATH,
  106. help="Path to YAML config (default: config.yml)",
  107. )
  108. parser.add_argument(
  109. "--output",
  110. default=None,
  111. help="Output JSON path (overrides config outputPath)",
  112. )
  113. args = parser.parse_args()
  114. try:
  115. cfg = load_config(args.config)
  116. except (FileNotFoundError, ValueError) as e:
  117. logger.error("%s", e)
  118. return 1
  119. output_path = args.output or cfg.get("outputPath") or DEFAULT_OUTPUT_PATH
  120. try:
  121. einsaetze = fetch_einsaetze(cfg)
  122. write_output(einsaetze, output_path)
  123. return 0
  124. except requests.RequestException as e:
  125. logger.error("Request failed: %s", e)
  126. if hasattr(e, "response") and e.response is not None and e.response.text:
  127. logger.debug("Response: %s", e.response.text[:500])
  128. return 1
  129. except json.JSONDecodeError as e:
  130. logger.error("Invalid JSON response: %s", e)
  131. return 1
  132. if __name__ == "__main__":
  133. sys.exit(main())