First Commit

This commit is contained in:
2026-05-19 14:32:46 +07:00
parent 73a8a01048
commit b3d162450d
20 changed files with 6543 additions and 2 deletions
+505
View File
@@ -0,0 +1,505 @@
#!/usr/bin/env python3
"""
Frigate Object Batch Counter Service
Listens to Frigate MQTT events, counts unique objects per batch,
and persists results to SQLite when a batch ends.
"""
import os
import sys
import json
import sqlite3
import threading
import time
import logging
import signal
from datetime import datetime, timedelta
from pathlib import Path
import paho.mqtt.client as mqtt
class FrigateCounterService:
def __init__(self):
self.setup_logging()
self.load_config()
self.init_db()
self.state_lock = threading.Lock()
self.batch_timer = None
self.shutdown_event = threading.Event()
self.current_state = self.load_state()
# Telenan Batch Label
self.ignore_batch_label = False
self.ignore_batch_label_timer = None
# ------------------------------------------------------------------ #
# Setup & Config
# ------------------------------------------------------------------ #
def setup_logging(self):
level = getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO)
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
self.logger = logging.getLogger(__name__)
def load_config(self):
self.mqtt_host = os.getenv("FRIGATE_MQTT_HOST", "localhost")
self.mqtt_port = int(os.getenv("FRIGATE_MQTT_PORT", "1883"))
self.mqtt_user = os.getenv("FRIGATE_MQTT_USER")
self.mqtt_pass = os.getenv("FRIGATE_MQTT_PASS")
self.mqtt_topic = os.getenv("FRIGATE_MQTT_TOPIC", "frigate/events")
self.camera_name = os.getenv("CAMERA_NAME")
if not self.camera_name:
raise ValueError("Environment variable CAMERA_NAME is required")
self.object_label = os.getenv("OBJECT_LABEL", "ayam-potong")
self.batch_label = os.getenv("BATCH_LABEL", "telenan") # 20260514 - Adding Label telenan for new batch sign
self.ignore_batch_label_timeout = float(os.getenv("IGNORE_BATCH_LABEL_TIMEOUT_SECONDS", "30"))
self.batch_timeout = float(os.getenv("BATCH_TIMEOUT_SECONDS", "300"))
self.cutoff_time_str = os.getenv("DAILY_CUTOFF_TIME", "17:00")
# Validate cutoff format HH:MM
datetime.strptime(self.cutoff_time_str, "%H:%M")
self.db_path = os.getenv("DB_PATH", "frigate_counter.db")
self.state_file = os.getenv("STATE_FILE", "current_batch.json")
self.min_duration_per_batch = int(os.getenv("MIN_DURATION_PER_BATCH", "60"))
# ------------------------------------------------------------------ #
# Database
# ------------------------------------------------------------------ #
def init_db(self):
self.db = sqlite3.connect(self.db_path, check_same_thread=False)
cur = self.db.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS batches (
id INTEGER PRIMARY KEY AUTOINCREMENT,
counting_date TEXT NOT NULL,
batch_number INTEGER NOT NULL,
camera_name TEXT NOT NULL,
object_label TEXT NOT NULL,
count INTEGER NOT NULL,
start_time TEXT NOT NULL,
end_time TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(counting_date, batch_number, camera_name, object_label)
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS daily_summaries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
counting_date TEXT NOT NULL,
camera_name TEXT NOT NULL,
object_label TEXT NOT NULL,
total_count INTEGER NOT NULL DEFAULT 0,
total_batches INTEGER NOT NULL DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(counting_date, camera_name, object_label)
)
"""
)
self.db.commit()
# ------------------------------------------------------------------ #
# Counting-date logic (day ends at cutoff, e.g. 17:00)
# ------------------------------------------------------------------ #
def get_counting_date(self, dt=None):
"""Return the business-day string that ends at cutoff_time."""
if dt is None:
dt = datetime.now()
cutoff = datetime.strptime(self.cutoff_time_str, "%H:%M").time()
# e.g. cutoff 17:00 => 16:59 belongs to today, 17:00 belongs to tomorrow
if dt.time() < cutoff:
#if dt.time() >= cutoff:
return dt.date().isoformat()
return (dt.date() + timedelta(days=1)).isoformat()
#return (dt.date() - timedelta(days=1)).isoformat()
# ------------------------------------------------------------------ #
# State persistence (JSON) survives restarts
# ------------------------------------------------------------------ #
def load_state(self):
if not Path(self.state_file).exists():
return None
try:
with open(self.state_file, "r", encoding="utf-8") as f:
state = json.load(f)
current_date = self.get_counting_date()
if state.get("counting_date") != current_date:
self.logger.warning(
"State file belongs to previous counting day (%s). "
"Finalizing it before starting fresh.",
state.get("counting_date"),
)
self._insert_batch(
state["counting_date"],
state["batch_number"],
state["count"],
state["start_time"],
datetime.now().isoformat(),
)
Path(self.state_file).unlink(missing_ok=True)
return None
self.logger.info(
"Resumed batch #%s from %s with count=%s",
state["batch_number"],
state["start_time"],
state["count"],
)
# Restart the inactivity timer
#self._reset_batch_timer()
return state
except Exception as exc:
self.logger.error("Failed to load state file: %s", exc)
return None
def save_state(self):
if self.current_state is None:
Path(self.state_file).unlink(missing_ok=True)
return
with open(self.state_file, "w", encoding="utf-8") as f:
json.dump(self.current_state, f, indent=2, ensure_ascii=False)
# ------------------------------------------------------------------ #
# Batch lifecycle
# ------------------------------------------------------------------ #
def get_next_batch_number(self, counting_date):
cur = self.db.cursor()
cur.execute(
"""
SELECT COALESCE(MAX(batch_number), 0)
FROM batches
WHERE counting_date = ? AND camera_name = ? AND object_label = ?
""",
(counting_date, self.camera_name, self.object_label),
)
return cur.fetchone()[0] + 1
def start_new_batch(self, counting_date):
batch_number = self.get_next_batch_number(counting_date)
now = datetime.now().isoformat()
self.current_state = {
"counting_date": counting_date,
"batch_number": batch_number,
"count": 0,
"start_time": now,
"last_detection_time": now,
"counted_event_ids": [],
}
self.save_state()
self.logger.info(
"Started batch #%s for %s (%s)",
batch_number,
counting_date,
self.object_label,
)
def process_detection(self, event_id, label):
"""
Called for every matching Frigate event (new/update/end).
Deduplicates by event_id and resets the 5-minute batch timer.
"""
should_reset_timer = False
if label == self.object_label:
with self.state_lock:
counting_date = self.get_counting_date()
# 1. No active batch -> start one
if self.current_state is None:
self.start_new_batch(counting_date)
should_reset_timer = True
# 2. Cutoff crossed since batch started -> finalize old, start new
elif self.current_state["counting_date"] != counting_date:
self._end_batch_locked()
self.start_new_batch(counting_date)
should_reset_timer = True
#self.current_state["count"] += 1
# 3. Deduplicate event ID
if event_id not in self.current_state["counted_event_ids"]:
self.current_state["count"] += 1
self.current_state["counted_event_ids"].append(event_id)
self.logger.info(
"Counted %s (event %s) | batch #%s total: %s",
self.object_label,
event_id,
self.current_state["batch_number"],
self.current_state["count"],
)
# Always refresh last_detection_time so the batch stays alive
self.current_state["last_detection_time"] = datetime.now().isoformat()
self.save_state()
should_reset_timer = True
elif label == self.batch_label:
self._ignore_batch_label()
self.end_batch()
"""
if should_reset_timer:
self._reset_batch_timer()
"""
def _ignore_batch_label(self):
if not self.ignore_batch_label_timer:
self.ignore_batch_label = True
self.ignore_batch_label_timer = threading.Timer(self.ignore_batch_label_timeout, self._on_ignore_batch_label_timeout)
self.ignore_batch_label_timer.daemon = True
self.ignore_batch_label_timer.start()
self.logger.info("Ignore Batch Label for %ss. self.ignore_batch_label = %s", self.ignore_batch_label_timeout, self.ignore_batch_label)
def _reset_batch_timer(self):
if self.batch_timer:
self.batch_timer.cancel()
self.batch_timer = threading.Timer(self.batch_timeout, self._on_batch_timeout)
self.batch_timer.daemon = True
self.batch_timer.start()
def _on_ignore_batch_label_timeout(self):
self.ignore_batch_label = False
self.logger.info("Ignore Batch Label is done. self.ignore_batch_label = %s", self.ignore_batch_label)
self.end_batch()
def _on_batch_timeout(self):
self.logger.info("Batch inactivity timeout (%ss) reached", self.batch_timeout)
self.end_batch()
def end_batch(self):
with self.state_lock:
self._end_batch_locked()
def _end_batch_locked(self):
if self.current_state is None:
return
state = self.current_state
""" Check Duration """
start_time_obj = datetime.fromisoformat(state["start_time"])
end_time_obj = datetime.now()
duration = end_time_obj - start_time_obj
duration_seconds = duration.total_seconds()
if state["count"] == 0 or duration_seconds < self.min_duration_per_batch:
# Nothing to persist
self.current_state = None
self.save_state()
"""
if self.batch_timer:
self.batch_timer.cancel()
self.batch_timer = None
"""
return
end_time = datetime.now().isoformat()
count_per_second = state["count"] / duration_seconds
try:
self._insert_batch(
state["counting_date"],
state["batch_number"],
state["count"],
state["start_time"],
end_time,
)
self.logger.info(
"Batch #%s ended | count=%s | duration=%s | cps=%s",
state["batch_number"],
state["count"],
duration,
count_per_second
)
except Exception as exc:
self.logger.error("Failed to persist batch: %s", exc)
# Leave state intact so we can retry on next timeout
return
self.current_state = None
self.save_state()
"""
if self.batch_timer:
self.batch_timer.cancel()
self.batch_timer = None
"""
def _insert_batch(self, counting_date, batch_number, count, start_time, end_time):
"""Atomic insert into batches + upsert daily summary."""
cur = self.db.cursor()
cur.execute(
"""
INSERT INTO batches
(counting_date, batch_number, camera_name, object_label, count, start_time, end_time)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
counting_date,
batch_number,
self.camera_name,
self.object_label,
count,
start_time,
end_time,
),
)
cur.execute(
"""
INSERT INTO daily_summaries
(counting_date, camera_name, object_label, total_count, total_batches)
VALUES (?, ?, ?, ?, 1)
ON CONFLICT(counting_date, camera_name, object_label)
DO UPDATE SET
total_count = total_count + excluded.total_count,
total_batches = total_batches + excluded.total_batches,
updated_at = CURRENT_TIMESTAMP
""",
(counting_date, self.camera_name, self.object_label, count),
)
self.db.commit()
# Log running totals for the day
cur.execute(
"""
SELECT total_count, total_batches
FROM daily_summaries
WHERE counting_date = ? AND camera_name = ? AND object_label = ?
""",
(counting_date, self.camera_name, self.object_label),
)
row = cur.fetchone()
if row:
self.logger.info(
"Daily totals for %s: %s objects across %s batch(es)",
counting_date,
row[0],
row[1],
)
# ------------------------------------------------------------------ #
# Cutoff watcher (forces batch end at 17:00 etc.)
# ------------------------------------------------------------------ #
def cutoff_watcher(self):
"""Runs every minute to force-close a batch when the business day rolls over."""
while not self.shutdown_event.is_set():
time.sleep(60)
with self.state_lock:
if self.current_state is None:
continue
if self.current_state["counting_date"] != self.get_counting_date():
self.logger.info("Daily cutoff reached finalizing batch")
self._end_batch_locked()
# ------------------------------------------------------------------ #
# MQTT callbacks
# ------------------------------------------------------------------ #
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.logger.info("MQTT connected to %s:%s", self.mqtt_host, self.mqtt_port)
client.subscribe(self.mqtt_topic)
self.logger.info("Subscribed to %s", self.mqtt_topic)
else:
self.logger.error("MQTT connection failed, code=%s", rc)
def on_message(self, client, userdata, msg):
try:
payload = json.loads(msg.payload.decode("utf-8"))
after = payload.get("after", {})
""" 20260514 For Telenan """
label = after.get("label")
camera = after.get("camera")
if camera != self.camera_name:
return
#if after.get("label") != self.object_label:
if label != self.object_label:
if label == self.batch_label and self.ignore_batch_label:
return
#if label != self.batch_label and self.ignore_batch_label:
# return
event_id = after.get("id")
if not event_id:
return
self.process_detection(event_id, label)
except json.JSONDecodeError:
self.logger.warning("Received non-JSON payload on %s", msg.topic)
except Exception as exc:
self.logger.exception("Error handling MQTT message: %s", exc)
# ------------------------------------------------------------------ #
# Run / Shutdown
# ------------------------------------------------------------------ #
def run(self):
# Graceful shutdown on SIGINT / SIGTERM
signal.signal(signal.SIGINT, lambda _s, _f: self.shutdown())
signal.signal(signal.SIGTERM, lambda _s, _f: self.shutdown())
# Support both paho-mqtt v1 and v2
try:
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
except (AttributeError, TypeError):
self.client = mqtt.Client()
if self.mqtt_user and self.mqtt_pass:
self.client.username_pw_set(self.mqtt_user, self.mqtt_pass)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
# Start background cutoff watcher
watcher = threading.Thread(target=self.cutoff_watcher, daemon=True)
watcher.start()
try:
self.client.connect(self.mqtt_host, self.mqtt_port, keepalive=60)
self.client.loop_forever()
except Exception as exc:
self.logger.error("MQTT loop error: %s", exc)
finally:
self.shutdown()
def shutdown(self):
if self.shutdown_event.is_set():
return
self.logger.info("Shutting down...")
self.shutdown_event.set()
try:
self.client.disconnect()
except Exception:
pass
self.end_batch()
self.db.close()
self.logger.info("Shutdown complete")
if __name__ == "__main__":
service = FrigateCounterService()
service.run()