| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- #!/usr/bin/env python3
- """
- Import Einsätze from FF-Agent CSV export and write JSON in the same format
- as fetch_einsaetze.py (for use by the ticker backend).
- CSV format: semicolon-delimited, quoted fields, header row.
- """
- import argparse
- import csv
- import json
- import logging
- import sys
- from datetime import datetime, timezone
- from pathlib import Path
- from typing import List
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s %(levelname)s %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S",
- )
- logger = logging.getLogger(__name__)
- ENCODINGS = ("utf-8-sig", "utf-8", "cp1252")
- def detect_encoding(path: Path) -> str:
- """Try to read first bytes and return encoding that works for the header."""
- raw = path.read_bytes()
- for enc in ENCODINGS:
- try:
- raw.decode(enc)
- return enc
- except UnicodeDecodeError:
- continue
- return "utf-8"
- def parse_csv(path: str) -> List[dict]:
- """
- Parse semicolon-delimited CSV with quoted fields.
- Returns list of row dicts; keys are header names (quotes stripped).
- """
- p = Path(path)
- if not p.is_file():
- raise FileNotFoundError(f"CSV file not found: {path}")
- encoding = detect_encoding(p)
- logger.info("Using encoding: %s", encoding)
- with open(p, "r", encoding=encoding, newline="") as f:
- reader = csv.reader(f, delimiter=";", quotechar='"')
- rows = list(reader)
- if not rows:
- raise ValueError("CSV file is empty")
- header = [cell.strip('"') for cell in rows[0]]
- einsaetze = []
- for row in rows[1:]:
- # Pad row to header length so we don't lose columns
- while len(row) < len(header):
- row.append("")
- record = {}
- for i, key in enumerate(header):
- value = row[i].strip('"') if i < len(row) else ""
- record[key] = value
- # Skip completely empty rows
- if any(v for v in record.values()):
- einsaetze.append(record)
- return einsaetze
- def write_json(einsaetze: List[dict], output_path: str) -> None:
- """Write einsaetze to JSON with updated timestamp. Creates parent dir if needed."""
- out = Path(output_path)
- out.parent.mkdir(parents=True, exist_ok=True)
- payload = {
- "einsaetze": einsaetze,
- "updated": datetime.now(timezone.utc).isoformat(),
- }
- with open(out, "w", encoding="utf-8") as f:
- json.dump(payload, f, ensure_ascii=False, indent=2)
- logger.info("Wrote %d Einsätze to %s", len(einsaetze), output_path)
- def main() -> int:
- parser = argparse.ArgumentParser(
- description="Import FF-Agent CSV export and output JSON for ticker backend."
- )
- parser.add_argument(
- "input",
- nargs="?",
- default=None,
- help="Input CSV file path",
- )
- parser.add_argument(
- "--output",
- "-o",
- default="data/einsaetze.json",
- help="Output JSON path (default: data/einsaetze.json)",
- )
- args = parser.parse_args()
- if not args.input:
- parser.error("Input CSV path required (or pass as positional argument)")
- return 1
- try:
- einsaetze = parse_csv(args.input)
- write_json(einsaetze, args.output)
- return 0
- except (FileNotFoundError, ValueError) as e:
- logger.error("%s", e)
- return 1
- if __name__ == "__main__":
- sys.exit(main())
|