Files
karung-masuk_frigate-counter/frigate_counter.py
T
2026-05-08 16:24:26 +07:00

654 lines
24 KiB
Python

#!/usr/bin/env python3
"""
Frigate MQTT Counter Service
Monitors Frigate NVR MQTT events to count "karung" objects
after detecting both "pintu-kiri-buka" and "pintu-kanan-buka"
"""
import paho.mqtt.client as mqtt
import sqlite3
import schedule
import time
import threading
import os
import logging
import json
import requests
from datetime import datetime, date
from typing import Optional, Dict, Any, List
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class FrigateCounter:
def __init__(self):
# MQTT configuration from environment variables
self.frigate_mqtt_host = os.environ.get("FRIGATE_MQTT_HOST", "localhost")
self.frigate_mqtt_port = int(os.environ.get("FRIGATE_MQTT_PORT", 1883))
#self.report_mqtt_host = os.environ.get('REPORT_MQTT_HOST', 'localhost')
self.report_mqtt_host = os.environ.get("REPORT_MQTT_HOST", "mqtt.backone.cloud")
self.report_mqtt_port = int(os.environ.get("REPORT_MQTT_PORT", 1883))
self.top_topic = os.environ.get("TOP_TOPIC", "cpsp")
self.site_name = os.environ.get("SITE_NAME", "sukawarna")
self.topic = os.environ.get(
"TOPIC", f"{self.top_topic}/counter/{self.site_name}"
)
self.camera_name = os.environ.get("CAMERA_NAME", "kandang_1_karung_masuk")
self.pintu_tutup_zone_name = os.environ.get(
"PINTU_TUTUP_ZONE_NAME", "pintu_tutup"
)
self.pintu_kiri_buka_zone_name = os.environ.get(
"PINTU_KIRI_BUKA_ZONE_NAME", "pintu_kiri_buka"
)
self.pintu_kanan_buka_zone_name = os.environ.get(
"PINTU_KANAN_BUKA_ZONE_NAME", "pintu_kanan_buka"
)
logger.info(
f"FRIGATE_MQTT_HOST: {self.frigate_mqtt_host}:{self.frigate_mqtt_port}"
)
logger.info(
f"REPORT_MQTT_HOST: {self.report_mqtt_host}:{self.report_mqtt_port}"
)
logger.info(f"TOPIC: {self.topic}")
logger.info(f"CAMERA_NAME: {self.camera_name}")
# Webcall to RPi
self.relay_on = os.environ.get(
"RELAY_ON_URI", "http://192.168.192.26:5000/relay_on"
)
self.relay_off = os.environ.get(
"RELAY_OFF_URI", "http://192.168.192.26:5000/relay_off"
)
# Database setup
self.db_path = "/etc/frigate-counter/karung-masuk/karung_masuk.db"
#self.db_path = "/tmp/karung_masuk.db"
self.init_database()
# JSON storage for temporary persistent counter values
self.json_storage_path = "/etc/frigate-counter/karung-masuk/karung_masuk.json"
#self.json_storage_path = "/tmp/karung_masuk.json"
self.init_json_storage()
# State tracking
self.pintu_kiri_buka_detected = False
self.pintu_kanan_buka_detected = False
self.timer_active = False
self.timer_start_time = None
# Counter IN/DOWN
self.counter = 0
self.counter_lock = threading.Lock()
# Counter OUT/UP
self.counter_out = 0
self.seen_objects = {}
# State pintu tracking
self.timer_active_pintu = False
self.timer_start_time_pintu = None
self.timer_active_pintu_tutup = False
self.timer_start_time_pintu_tutup = None
self.pintu_buka_timer = False
# Load previous counter value on startup
self.load_previous_counter()
# MQTT clients
self.frigate_client = None
self.report_client = None
# Initialize MQTT clients
self.setup_mqtt_clients()
# Direction detection
self.previous_positions: Dict[str, float] = {}
# Movement threshold (pixels)
self.movement_threshold = float(os.environ.get("MOVEMENT_THRESHOLD", "2.0"))
# Schedule daily reset at midnight
schedule.every().day.at("23:59").do(self.reset_counter)
# ==================== START - DIRECTION ====================
def _get_object_key(self, camera: str, label: str, obj_id: str) -> str:
return f"{camera}:{label}:{obj_id}"
def _get_box_center_y(self, box: list) -> float:
"""Calculate center Y of bounding box [x_min, y_min, x_max, y_max]."""
if not box or len(box) != 4:
return 0.0
return (box[1] + box[3]) / 2.0
def calculate_direction(
self, camera: str, obj_id: str, current_box: list
) -> Optional[str]:
key = self._get_object_key(camera, "karung", obj_id)
current_y = self._get_box_center_y(current_box)
if current_y == 0.0:
return None
if key not in self.previous_positions:
self.previous_positions[key] = current_y
return None
previous_y = self.previous_positions[key]
self.previous_positions[key] = current_y
delta_y = current_y - previous_y
if abs(delta_y) < self.movement_threshold:
return None
# Y increases downward in image coordinates
if delta_y < 0:
return "UP"
else:
return "DOWN"
def remove_object(self, camera: str, label: str, obj_id: str):
key = self._get_object_key(camera, label, obj_id)
if key in self.previous_positions:
del self.previous_positions[key]
# ==================== END - DIRECTION ====================
# ==================== START - RELAY LAMPU ====================
def action_relay_on(self):
try:
requests.get(self.relay_on, timeout=1)
logger.info("Relay ON")
except requests.exceptions.RequestException as e:
logger.info(e)
def action_relay_off(self):
try:
requests.get(self.relay_off, timeout=2)
logger.info("Relay OFF")
except requests.exceptions.RequestException as e:
logger.info(e)
# ==================== END - RELAY LAMPU ====================
#
def init_database(self):
"""Initialize SQLite database with required table"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS karung_counts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
camera_name TEXT NOT NULL,
date DATE NOT NULL,
counter_value INTEGER NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
logger.info("Database initialized")
def init_json_storage(self):
"""Initialize JSON storage file for temporary persistent counter values"""
if not os.path.exists(self.json_storage_path):
# Create empty JSON file with empty dictionary
with open(self.json_storage_path, "w") as f:
json.dump({}, f)
logger.info("JSON storage initialized")
def setup_mqtt_clients(self):
"""Setup MQTT clients for both Frigate and reporting"""
# Frigate MQTT client (for receiving events)
self.frigate_client = mqtt.Client()
self.frigate_client.on_connect = self.on_frigate_connect
self.frigate_client.on_message = self.on_frigate_message
self.frigate_client.connect(self.frigate_mqtt_host, self.frigate_mqtt_port, 60)
# Reporting MQTT client (for publishing results)
self.report_client = mqtt.Client()
self.report_client.on_connect = self.on_report_connect
self.report_client.connect(self.report_mqtt_host, self.report_mqtt_port, 60)
# Start MQTT client loops in separate threads
self.frigate_client.loop_start()
self.report_client.loop_start()
def on_frigate_connect(self, client, userdata, flags, rc):
"""Callback when connected to Frigate MQTT"""
logger.info("Connected to Frigate MQTT broker")
client.subscribe("frigate/events")
def on_report_connect(self, client, userdata, flags, rc):
"""Callback when connected to reporting MQTT broker"""
logger.info("Connected to reporting MQTT broker")
def on_frigate_message(self, client, userdata, msg):
"""Handle incoming Frigate MQTT messages"""
try:
# Parse the message (assuming JSON format)
# import json
payload = json.loads(msg.payload.decode())
if "after" not in payload:
return
# event_after = payload["after"]
event_after = payload.get("after", {})
event_before = payload.get("before", {})
camera_name = event_after.get("camera", "unknown")
if camera_name != self.camera_name:
return
event_type = event_after.get("type", "")
label = event_after.get("label", "")
track_id = event_after.get("id")
zones_after = event_after.get("entered_zones", [])
zones_before = event_before.get("entered_zones", [])
# Dont detect stationary
stationary = event_after.get("stationary")
# if stationary and:
if stationary and label == "karung":
return
new_zones = [z for z in zones_after if z not in zones_before]
# debounce per track_id
if camera_name not in self.seen_objects:
self.seen_objects[camera_name] = {}
if track_id in self.seen_objects[camera_name]:
return # sudah dihitung
# Extract object type and camera name
# camera_name = data.get('camera', 'unknown')
# event_type = data.get('type', '')
# label = data.get('label', '')
logger.debug(
f"Received message: camera={camera_name}, type={event_type}, label={label}, zones_after={zones_after}, zones_before={zones_before}, timer_active={self.timer_active}, pintu_buka_timer={self.pintu_buka_timer}"
)
# Use after data primarily, fallback to before
data = event_after if event_after else event_before
# Handle different object types
if (
label == "pintu-kiri-buka"
and not self.timer_active
and not self.pintu_kiri_buka_detected
and not self.timer_active_pintu_tutup
):
self.handle_pintu_kiri_buka(camera_name)
elif (
label == "pintu-kanan-buka"
and not self.timer_active
and not self.pintu_kanan_buka_detected
and not self.timer_active_pintu_tutup
):
self.handle_pintu_kanan_buka(camera_name)
elif label == "karung" and self.timer_active:
if event_type == "end":
self.remove_object(camera_name, "karung", track_id)
logger.info(
f"[END] Object ended - Camera: {camera}, Label: {label}, ID: {obj_id}"
)
box = data.get("box")
self.handle_karung(camera_name, track_id, box)
# elif label == "pintu-tutup" and self.timer_active and self.pintu_tutup_zone_name in zones_after and not self.pintu_buka_timer:
elif (
label == "pintu-tutup"
and self.timer_active
and not self.pintu_buka_timer
):
self.handle_pintu_tutup(camera_name)
except Exception as e:
logger.error(f"Error processing MQTT message: {e}")
def handle_pintu_tutup(self, camera_name):
"""Handle detection of pintu-tutup"""
logger.info(f"Detected pintu-tutup {camera_name}")
if self.timer_active:
logger.info("Stop Counting Karung")
self.timer_active = False
self.seen_objects = {}
if not self.timer_active_pintu_tutup:
self.start_timer_pintu_tutup()
# Call RPi
self.action_relay_off()
def handle_pintu_kiri_buka(self, camera_name):
"""Handle detection of pintu-kiri-buka"""
logger.info(f"Detected pintu-kiri-buka on {camera_name}")
# Only process if timer is not active
if not self.timer_active:
self.pintu_kiri_buka_detected = True
self.check_detection_sequence()
else:
logger.debug("Ignoring pintu-kiri-buka during timer period")
def handle_pintu_kanan_buka(self, camera_name):
"""Handle detection of pintu-kanan-buka"""
logger.info(f"Detected pintu-kanan-buka on {camera_name}")
# Only process if timer is not active
if not self.timer_active:
self.pintu_kanan_buka_detected = True
self.check_detection_sequence()
else:
logger.debug("Ignoring pintu-kanan-buka during timer period")
def handle_karung(self, camera_name, track_id, box):
"""Handle detection of karung object"""
logger.info(f"Detected karung on {camera_name}")
# ======================= CALCULATE DIRECTION ==========================
direction = None
if box:
direction = self.calculate_direction(camera_name, track_id, box)
# Only count if timer is active
if self.timer_active:
with self.counter_lock:
if direction:
if direction == "DOWN":
self.counter += 1
elif direction == "UP":
self.counter_out += 1
self.seen_objects[camera_name][track_id] = datetime.now()
self.save_to_json(camera_name)
self.publish_result()
logger.info(
f"Direction={direction}, Counter_IN={self.counter}, Counter_OUT={self.counter_out} and republish to MQTT"
)
else:
logger.debug("Ignoring karung outside timer period")
def check_detection_sequence(self):
"""Check if both pintu-kiri-buka and pintu-kanan-buka have been detected"""
if self.pintu_kiri_buka_detected and self.pintu_kanan_buka_detected:
self.start_timer()
return
if not self.timer_active_pintu:
self.start_timer_pintu()
def start_timer_pintu_tutup(self):
"""Start the 30-seconds timer Pintu"""
logger.info("Starting 30-seconds timer Pintu Tutup")
self.timer_active_pintu_tutup = True
self.timer_start_time_pintu_tutup = datetime.now()
# Schedule timer expiration check
timer_thread = threading.Thread(target=self.check_timer_expiration_pintu_tutup)
timer_thread.daemon = True
timer_thread.start()
def start_timer_pintu(self):
"""Start the 5-minute timer Pintu"""
logger.info("Starting 5-minute timer Pintu")
self.timer_active_pintu = True
self.timer_start_time_pintu = datetime.now()
# Schedule timer expiration check
timer_thread = threading.Thread(target=self.check_timer_expiration_pintu)
timer_thread.daemon = True
timer_thread.start()
def start_timer(self):
"""Start Counting"""
# logger.info("Starting 60-minute timer")
logger.info("Start Counting and Timer pintu-buka 5 minutes")
# logger.info("Start Counting, Timer Counting 30-minutes and Timer pintu-buka 5 minutes")
self.timer_active = True
self.timer_start_time = datetime.now()
self.pintu_buka_timer = True
# Schedule timer expiration check
# timer_thread = threading.Thread(target=self.check_timer_expiration_counting)
# timer_thread.daemon = True
# timer_thread.start()
# Schedule timer expiration check
timer_thread = threading.Thread(target=self.check_timer_expiration_pintu_buka)
timer_thread.daemon = True
timer_thread.start()
# Reset detection flags
self.pintu_kiri_buka_detected = False
self.pintu_kanan_buka_detected = False
# Call to RPI
self.action_relay_on()
def check_timer_expiration_counting(self):
"""Check if timer has expired (30 minutes)"""
time.sleep(30 * 60) # Wait 5 minutes
if self.timer_active:
logger.info("Stop Counting. Timer Counting Expired (30 minutes)")
self.timer_active = False
self.pintu_kiri_buka_detected = False
self.pintu_kanan_buka_detected = False
self.seen_objects = {}
def check_timer_expiration_pintu(self):
"""Check if timer has expired (5 minutes)"""
time.sleep(5 * 60) # Wait 5 minutes
if self.timer_active_pintu:
logger.info("Timer Pintu expired (5 minutes)")
self.timer_active_pintu = False
self.pintu_kiri_buka_detected = False
self.pintu_kanan_buka_detected = False
# Call RPi
# if not self.timer_active:
# self.action_relay_off()
def check_timer_expiration_pintu_buka(self):
"""Check if timer has expired (5 minutes)"""
time.sleep(5 * 60) # Wait 5 minutes
if self.pintu_buka_timer:
logger.info("Timer Pintu Buka expired (5 minutes)")
self.pintu_buka_timer = False
# Call RPi
# if not self.timer_active:
# self.action_relay_off()
def check_timer_expiration_pintu_tutup(self):
"""Check if timer has expired (30 seconds)"""
time.sleep(30) # Wait 5 minutes
if self.timer_active_pintu_tutup:
logger.info("Timer Pintu Tutup expired (30 seconds)")
self.timer_active_pintu_tutup = False
def publish_result(self):
"""Publish counter result to MQTT topic"""
if self.counter > 0:
topic = f"{self.topic}/{self.camera_name}/karung"
topic_out = f"{self.topic}/{self.camera_name}_out/karung"
# TESTING
# topic = f"{self.topic}/{self.camera_name}-check_pintu/karung"
message = str(self.counter)
message_out = str(self.counter_out)
try:
self.report_client.publish(topic, message)
logger.info(f"Published counter result to {topic}: {message}")
self.report_client.publish(topic_out, message_out)
logger.info(f"Published counter result to {topic_out}: {message_out}")
# Save to database
# self.save_to_database()
# Save to JSON for temporary persistent storage
# self.save_to_json('frigate_camera')
except Exception as e:
logger.error(f"Error publishing result: {e}")
else:
logger.info("Counter is zero, not publishing result")
def save_to_database(self):
"""Save counter result to SQLite database"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO karung_counts (camera_name, date, counter_value)
VALUES (?, ?, ?)
""",
(self.camera_name, date.today(), self.counter),
)
camera_name_out = f"{self.camera_name}_out"
cursor.execute(
"""
INSERT INTO karung_counts (camera_name, date, counter_value)
VALUES (?, ?, ?)
""",
(camera_name_out, date.today(), self.counter_out),
)
conn.commit()
conn.close()
logger.info(f"Saved counter result to database: {self.counter}")
except Exception as e:
logger.error(f"Error saving to database: {e}")
# Ensure we don't lose data due to database errors
# We should still try to save to JSON as backup
try:
self.save_to_json(self.camera_name)
logger.info("Fallback save to JSON successful")
except Exception as e2:
logger.error(f"Fallback save to JSON also failed: {e2}")
def save_to_json(self, camera_name):
"""Save counter result to JSON file for temporary persistent storage"""
try:
# Read existing data
if os.path.exists(self.json_storage_path):
with open(self.json_storage_path, "r") as f:
data = json.load(f)
else:
data = {}
# Update counter value for this camera
if camera_name not in data:
data[camera_name] = {}
data[camera_name]["karung"] = self.counter
# COunter Out
camera_name_out = f"{camera_name}_out"
if camera_name_out not in data:
data[camera_name_out] = {}
# Write back to file
with open(self.json_storage_path, "w") as f:
json.dump(data, f, indent=2)
logger.info(
f"Saved counter result to JSON storage for {camera_name}: {self.counter}"
)
except Exception as e:
logger.error(f"Error saving to JSON storage: {e}")
# Try to create a backup of the existing file before overwriting
try:
import shutil
backup_path = f"{self.json_storage_path}.backup"
if os.path.exists(self.json_storage_path):
shutil.copy2(self.json_storage_path, backup_path)
logger.info(f"Created backup of JSON storage: {backup_path}")
except Exception as backup_e:
logger.error(f"Failed to create backup of JSON storage: {backup_e}")
raise # Re-raise the original exception
def load_previous_counter(self):
"""Load the previous counter value from JSON storage on startup"""
try:
self.counter = self.load_from_json(self.camera_name)
self.counter_out = self.load_from_json(f"{self.camera_name}_out")
logger.info(
f"Loaded previous counter_IN: {self.counter}, counter_out: {self.counter_out}"
)
except Exception as e:
logger.error(f"Error loading previous counter value: {e}")
# If loading fails, start with 0 counter
self.counter = 0
def load_from_json(self, camera_name):
"""Load counter result from JSON file"""
try:
if os.path.exists(self.json_storage_path):
with open(self.json_storage_path, "r") as f:
data = json.load(f)
if camera_name in data and "karung" in data[camera_name]:
return data[camera_name]["karung"]
return 0
except Exception as e:
logger.error(f"Error loading from JSON storage: {e}")
# Return 0 in case of error for safety
return 0
def reset_counter(self):
"""Reset counter and clear detection flags"""
logger.info("Resetting counter at midnight")
# with self.counter_lock:
# self.counter = 0
# self.save_to_database()
# self.save_to_json(self.camera_name)
# Save current counter value before resetting
self.publish_result()
self.save_to_database()
self.counter = 0
self.counter_out = 0
self.save_to_json(self.camera_name)
self.pintu_kiri_buka_detected = False
self.pintu_kanan_buka_detected = False
self.timer_active = False
self.timer_start_time = None
self.seen_objects = {}
self.previous_positions = {}
def run(self):
"""Main run loop"""
logger.info("Starting Frigate Counter Service")
try:
while True:
# Run scheduled tasks
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down Frigate Counter Service")
self.frigate_client.loop_stop()
self.report_client.loop_stop()
self.frigate_client.disconnect()
self.report_client.disconnect()
if __name__ == "__main__":
counter_service = FrigateCounter()
counter_service.run()