#!/usr/bin/env python3 """ Frigate Object Batch Counter Service Listens to Frigate MQTT events, counts unique objects per batch, and persists results to SQLite when a batch ends. """ import os import sys import json import sqlite3 import threading import time import logging import signal from datetime import datetime, timedelta from pathlib import Path import paho.mqtt.client as mqtt class FrigateCounterService: def __init__(self): self.setup_logging() self.load_config() self.init_db() self.state_lock = threading.Lock() self.batch_timer = None self.shutdown_event = threading.Event() self.current_state = self.load_state() # ------------------------------------------------------------------ # # Setup & Config # ------------------------------------------------------------------ # def setup_logging(self): level = getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO) logging.basicConfig( level=level, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) self.logger = logging.getLogger(__name__) def load_config(self): self.mqtt_host = os.getenv("FRIGATE_MQTT_HOST", "localhost") self.mqtt_port = int(os.getenv("FRIGATE_MQTT_PORT", "1883")) self.mqtt_user = os.getenv("FRIGATE_MQTT_USER") self.mqtt_pass = os.getenv("FRIGATE_MQTT_PASS") self.mqtt_topic = os.getenv("FRIGATE_MQTT_TOPIC", "frigate/events") self.camera_name = os.getenv("CAMERA_NAME") if not self.camera_name: raise ValueError("Environment variable CAMERA_NAME is required") self.object_label = os.getenv("OBJECT_LABEL", "ayam-potong") self.batch_timeout = float(os.getenv("BATCH_TIMEOUT_SECONDS", "5")) self.cutoff_time_str = os.getenv("DAILY_CUTOFF_TIME", "17:00") # Validate cutoff format HH:MM datetime.strptime(self.cutoff_time_str, "%H:%M") self.db_path = os.getenv("DB_PATH", "frigate_counter.db") self.state_file = os.getenv("STATE_FILE", "current_batch.json") self.min_duration_per_batch = int(os.getenv("MIN_DURATION_PER_BATCH", "60")) # ------------------------------------------------------------------ # # Database # ------------------------------------------------------------------ # def init_db(self): self.db = sqlite3.connect(self.db_path, check_same_thread=False) cur = self.db.cursor() cur.execute( """ CREATE TABLE IF NOT EXISTS batches ( id INTEGER PRIMARY KEY AUTOINCREMENT, counting_date TEXT NOT NULL, batch_number INTEGER NOT NULL, camera_name TEXT NOT NULL, object_label TEXT NOT NULL, count INTEGER NOT NULL, start_time TEXT NOT NULL, end_time TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(counting_date, batch_number, camera_name, object_label) ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS daily_summaries ( id INTEGER PRIMARY KEY AUTOINCREMENT, counting_date TEXT NOT NULL, camera_name TEXT NOT NULL, object_label TEXT NOT NULL, total_count INTEGER NOT NULL DEFAULT 0, total_batches INTEGER NOT NULL DEFAULT 0, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(counting_date, camera_name, object_label) ) """ ) self.db.commit() # ------------------------------------------------------------------ # # Counting-date logic (day ends at cutoff, e.g. 17:00) # ------------------------------------------------------------------ # def get_counting_date(self, dt=None): """Return the business-day string that ends at cutoff_time.""" if dt is None: dt = datetime.now() cutoff = datetime.strptime(self.cutoff_time_str, "%H:%M").time() # e.g. cutoff 17:00 => 16:59 belongs to today, 17:00 belongs to tomorrow if dt.time() < cutoff: #if dt.time() >= cutoff: return dt.date().isoformat() return (dt.date() + timedelta(days=1)).isoformat() #return (dt.date() - timedelta(days=1)).isoformat() # ------------------------------------------------------------------ # # State persistence (JSON) – survives restarts # ------------------------------------------------------------------ # def load_state(self): if not Path(self.state_file).exists(): return None try: with open(self.state_file, "r", encoding="utf-8") as f: state = json.load(f) current_date = self.get_counting_date() if state.get("counting_date") != current_date: self.logger.warning( "State file belongs to previous counting day (%s). " "Finalizing it before starting fresh.", state.get("counting_date"), ) self._insert_batch( state["counting_date"], state["batch_number"], state["count"], state["start_time"], datetime.now().isoformat(), ) Path(self.state_file).unlink(missing_ok=True) return None self.logger.info( "Resumed batch #%s from %s with count=%s", state["batch_number"], state["start_time"], state["count"], ) # Restart the inactivity timer self._reset_batch_timer() return state except Exception as exc: self.logger.error("Failed to load state file: %s", exc) return None def save_state(self): if self.current_state is None: Path(self.state_file).unlink(missing_ok=True) return with open(self.state_file, "w", encoding="utf-8") as f: json.dump(self.current_state, f, indent=2, ensure_ascii=False) # ------------------------------------------------------------------ # # Batch lifecycle # ------------------------------------------------------------------ # def get_next_batch_number(self, counting_date): cur = self.db.cursor() cur.execute( """ SELECT COALESCE(MAX(batch_number), 0) FROM batches WHERE counting_date = ? AND camera_name = ? AND object_label = ? """, (counting_date, self.camera_name, self.object_label), ) return cur.fetchone()[0] + 1 def start_new_batch(self, counting_date): batch_number = self.get_next_batch_number(counting_date) now = datetime.now().isoformat() self.current_state = { "counting_date": counting_date, "batch_number": batch_number, "count": 0, "start_time": now, "last_detection_time": now, "counted_event_ids": [], } self.save_state() self.logger.info( "Started batch #%s for %s (%s)", batch_number, counting_date, self.object_label, ) def process_detection(self, event_id): """ Called for every matching Frigate event (new/update/end). Deduplicates by event_id and resets the 5-minute batch timer. """ should_reset_timer = False with self.state_lock: counting_date = self.get_counting_date() # 1. No active batch -> start one if self.current_state is None: self.start_new_batch(counting_date) should_reset_timer = True # 2. Cutoff crossed since batch started -> finalize old, start new elif self.current_state["counting_date"] != counting_date: self._end_batch_locked() self.start_new_batch(counting_date) should_reset_timer = True #self.current_state["count"] += 1 # 3. Deduplicate event ID if event_id not in self.current_state["counted_event_ids"]: self.current_state["count"] += 1 self.current_state["counted_event_ids"].append(event_id) self.logger.info( "Counted %s (event %s) | batch #%s total: %s", self.object_label, event_id, self.current_state["batch_number"], self.current_state["count"], ) # Always refresh last_detection_time so the batch stays alive self.current_state["last_detection_time"] = datetime.now().isoformat() self.save_state() should_reset_timer = True if should_reset_timer: self._reset_batch_timer() def _reset_batch_timer(self): if self.batch_timer: self.batch_timer.cancel() self.batch_timer = threading.Timer(self.batch_timeout, self._on_batch_timeout) self.batch_timer.daemon = True self.batch_timer.start() def _on_batch_timeout(self): self.logger.info("Batch inactivity timeout (%ss) reached", self.batch_timeout) self.end_batch() def end_batch(self): with self.state_lock: self._end_batch_locked() def _end_batch_locked(self): if self.current_state is None: return state = self.current_state """ Check Duration """ start_time_obj = datetime.fromisoformat(state["start_time"]) end_time_obj = datetime.now() duration = end_time_obj - start_time_obj duration_seconds = duration.total_seconds() if state["count"] == 0 or duration_seconds < self.min_duration_per_batch: # Nothing to persist self.current_state = None self.save_state() if self.batch_timer: self.batch_timer.cancel() self.batch_timer = None return end_time = datetime.now().isoformat() count_per_second = state["count"] / duration_seconds try: self._insert_batch( state["counting_date"], state["batch_number"], state["count"], state["start_time"], end_time, ) self.logger.info( "Batch #%s ended | count=%s | duration=%s | cps=%s", state["batch_number"], state["count"], duration, count_per_second ) except Exception as exc: self.logger.error("Failed to persist batch: %s", exc) # Leave state intact so we can retry on next timeout return self.current_state = None self.save_state() if self.batch_timer: self.batch_timer.cancel() self.batch_timer = None def _insert_batch(self, counting_date, batch_number, count, start_time, end_time): """Atomic insert into batches + upsert daily summary.""" cur = self.db.cursor() cur.execute( """ INSERT INTO batches (counting_date, batch_number, camera_name, object_label, count, start_time, end_time) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( counting_date, batch_number, self.camera_name, self.object_label, count, start_time, end_time, ), ) cur.execute( """ INSERT INTO daily_summaries (counting_date, camera_name, object_label, total_count, total_batches) VALUES (?, ?, ?, ?, 1) ON CONFLICT(counting_date, camera_name, object_label) DO UPDATE SET total_count = total_count + excluded.total_count, total_batches = total_batches + excluded.total_batches, updated_at = CURRENT_TIMESTAMP """, (counting_date, self.camera_name, self.object_label, count), ) self.db.commit() # Log running totals for the day cur.execute( """ SELECT total_count, total_batches FROM daily_summaries WHERE counting_date = ? AND camera_name = ? AND object_label = ? """, (counting_date, self.camera_name, self.object_label), ) row = cur.fetchone() if row: self.logger.info( "Daily totals for %s: %s objects across %s batch(es)", counting_date, row[0], row[1], ) # ------------------------------------------------------------------ # # Cutoff watcher (forces batch end at 17:00 etc.) # ------------------------------------------------------------------ # def cutoff_watcher(self): """Runs every minute to force-close a batch when the business day rolls over.""" while not self.shutdown_event.is_set(): time.sleep(60) with self.state_lock: if self.current_state is None: continue if self.current_state["counting_date"] != self.get_counting_date(): self.logger.info("Daily cutoff reached – finalizing batch") self._end_batch_locked() # ------------------------------------------------------------------ # # MQTT callbacks # ------------------------------------------------------------------ # def on_connect(self, client, userdata, flags, rc): if rc == 0: self.logger.info("MQTT connected to %s:%s", self.mqtt_host, self.mqtt_port) client.subscribe(self.mqtt_topic) self.logger.info("Subscribed to %s", self.mqtt_topic) else: self.logger.error("MQTT connection failed, code=%s", rc) def on_message(self, client, userdata, msg): try: payload = json.loads(msg.payload.decode("utf-8")) after = payload.get("after", {}) if after.get("camera") != self.camera_name: return if after.get("label") != self.object_label: return event_id = after.get("id") if not event_id: return self.process_detection(event_id) except json.JSONDecodeError: self.logger.warning("Received non-JSON payload on %s", msg.topic) except Exception as exc: self.logger.exception("Error handling MQTT message: %s", exc) # ------------------------------------------------------------------ # # Run / Shutdown # ------------------------------------------------------------------ # def run(self): # Graceful shutdown on SIGINT / SIGTERM signal.signal(signal.SIGINT, lambda _s, _f: self.shutdown()) signal.signal(signal.SIGTERM, lambda _s, _f: self.shutdown()) # Support both paho-mqtt v1 and v2 try: self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1) except (AttributeError, TypeError): self.client = mqtt.Client() if self.mqtt_user and self.mqtt_pass: self.client.username_pw_set(self.mqtt_user, self.mqtt_pass) self.client.on_connect = self.on_connect self.client.on_message = self.on_message # Start background cutoff watcher watcher = threading.Thread(target=self.cutoff_watcher, daemon=True) watcher.start() try: self.client.connect(self.mqtt_host, self.mqtt_port, keepalive=60) self.client.loop_forever() except Exception as exc: self.logger.error("MQTT loop error: %s", exc) finally: self.shutdown() def shutdown(self): if self.shutdown_event.is_set(): return self.logger.info("Shutting down...") self.shutdown_event.set() try: self.client.disconnect() except Exception: pass self.end_batch() self.db.close() self.logger.info("Shutdown complete") if __name__ == "__main__": service = FrigateCounterService() service.run()