#!/usr/bin/env python3 """ Frigate MQTT Counter Service Monitors Frigate NVR MQTT events to count "karung" objects after detecting both "pintu-kiri-buka" and "pintu-kanan-buka" """ import paho.mqtt.client as mqtt import sqlite3 import schedule import time import threading import os import logging import json from datetime import datetime, date from typing import Optional # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class FrigateCounter: def __init__(self): # MQTT configuration from environment variables self.frigate_mqtt_host = os.environ.get('FRIGATE_MQTT_HOST', 'localhost') self.frigate_mqtt_port = int(os.environ.get('FRIGATE_MQTT_PORT', 1883)) #self.report_mqtt_host = os.environ.get('REPORT_MQTT_HOST', 'localhost') self.report_mqtt_host = os.environ.get('REPORT_MQTT_HOST', 'mqtt.backone.cloud') self.report_mqtt_port = int(os.environ.get('REPORT_MQTT_PORT', 1883)) self.top_topic = os.environ.get("TOP_TOPIC", "cpsp") self.site_name = os.environ.get('SITE_NAME', 'sukawarna') self.topic = os.environ.get('TOPIC', f"{self.top_topic}/counter/{self.site_name}") self.camera_name = os.environ.get('CAMERA_NAME', 'kandang_1_karung_masuk') self.pintu_tutup_zone_name = os.environ.get('PINTU_TUTUP_ZONE_NAME', 'pintu_tutup') self.pintu_kiri_buka_zone_name = os.environ.get('PINTU_KIRI_BUKA_ZONE_NAME', 'pintu_kiri_buka') self.pintu_kanan_buka_zone_name = os.environ.get('PINTU_KANAN_BUKA_ZONE_NAME', 'pintu_kanan_buka') logger.info(f"FRIGATE_MQTT_HOST: {self.frigate_mqtt_host}:{self.frigate_mqtt_port}") logger.info(f"REPORT_MQTT_HOST: {self.report_mqtt_host}:{self.report_mqtt_port}") logger.info(f"TOPIC: {self.topic}") logger.info(f"CAMERA_NAME: {self.camera_name}") # Database setup self.db_path = '/etc/frigate-counter/karung-masuk/karung_masuk.db' self.init_database() # JSON storage for temporary persistent counter values self.json_storage_path = '/etc/frigate-counter/karung-masuk/karung_masuk.json' self.init_json_storage() # State tracking self.pintu_kiri_buka_detected = False self.pintu_kanan_buka_detected = False self.timer_active = False self.timer_start_time = None self.counter = 0 self.counter_lock = threading.Lock() self.seen_objects = {} # State pintu tracking self.timer_active_pintu = False self.timer_start_time_pintu = None self.pintu_buka_timer = False # Load previous counter value on startup self.load_previous_counter() # MQTT clients self.frigate_client = None self.report_client = None # Initialize MQTT clients self.setup_mqtt_clients() # Schedule daily reset at midnight schedule.every().day.at("23:59").do(self.reset_counter) def init_database(self): """Initialize SQLite database with required table""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS karung_counts ( id INTEGER PRIMARY KEY AUTOINCREMENT, camera_name TEXT NOT NULL, date DATE NOT NULL, counter_value INTEGER NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() logger.info("Database initialized") def init_json_storage(self): """Initialize JSON storage file for temporary persistent counter values""" if not os.path.exists(self.json_storage_path): # Create empty JSON file with empty dictionary with open(self.json_storage_path, 'w') as f: json.dump({}, f) logger.info("JSON storage initialized") def setup_mqtt_clients(self): """Setup MQTT clients for both Frigate and reporting""" # Frigate MQTT client (for receiving events) self.frigate_client = mqtt.Client() self.frigate_client.on_connect = self.on_frigate_connect self.frigate_client.on_message = self.on_frigate_message self.frigate_client.connect(self.frigate_mqtt_host, self.frigate_mqtt_port, 60) # Reporting MQTT client (for publishing results) self.report_client = mqtt.Client() self.report_client.on_connect = self.on_report_connect self.report_client.connect(self.report_mqtt_host, self.report_mqtt_port, 60) # Start MQTT client loops in separate threads self.frigate_client.loop_start() self.report_client.loop_start() def on_frigate_connect(self, client, userdata, flags, rc): """Callback when connected to Frigate MQTT""" logger.info("Connected to Frigate MQTT broker") client.subscribe("frigate/events") def on_report_connect(self, client, userdata, flags, rc): """Callback when connected to reporting MQTT broker""" logger.info("Connected to reporting MQTT broker") def on_frigate_message(self, client, userdata, msg): """Handle incoming Frigate MQTT messages""" try: # Parse the message (assuming JSON format) #import json payload = json.loads(msg.payload.decode()) if "after" not in payload: return #event_after = payload["after"] event_after = payload.get("after", {}) event_before = payload.get("before", {}) camera_name = event_after.get("camera", "unknown") if camera_name != self.camera_name: return event_type = event_after.get('type', '') label = event_after.get("label", "") track_id = event_after.get("id") zones_after = event_after.get("entered_zones", []) zones_before = event_before.get("entered_zones", []) # Dont detect stationary stationary = event_after.get("stationary") if stationary: return new_zones = [z for z in zones_after if z not in zones_before] # debounce per track_id if camera_name not in self.seen_objects: self.seen_objects[camera_name] = {} if track_id in self.seen_objects[camera_name]: return # sudah dihitung # Extract object type and camera name #camera_name = data.get('camera', 'unknown') #event_type = data.get('type', '') #label = data.get('label', '') logger.debug(f"Received message: camera={camera_name}, type={event_type}, label={label}") # Handle different object types if label == "pintu-kiri-buka" and not self.timer_active and not self.pintu_kiri_buka_detected: self.handle_pintu_kiri_buka(camera_name) elif label == "pintu-kanan-buka" and not self.timer_active and not self.pintu_kanan_buka_detected: self.handle_pintu_kanan_buka(camera_name) elif label == "karung" and self.timer_active: self.handle_karung(camera_name, track_id) elif label == "pintu-tutup" and self.timer_active and self.pintu_tutup_zone_name in zones_after and not self.pintu_buka_timer: self.handle_pintu_tutup(camera_name) except Exception as e: logger.error(f"Error processing MQTT message: {e}") def handle_pintu_tutup(self, camera_name): """Handle detection of pintu-tutup""" logger.info(f"Detected pintu-tutup {camera_name}") if self.timer_active: logger.info("Stop Counting Karung") self.timer_active = False self.seen_objects = {} def handle_pintu_kiri_buka(self, camera_name): """Handle detection of pintu-kiri-buka""" logger.info(f"Detected pintu-kiri-buka on {camera_name}") # Only process if timer is not active if not self.timer_active: self.pintu_kiri_buka_detected = True self.check_detection_sequence() else: logger.debug("Ignoring pintu-kiri-buka during timer period") def handle_pintu_kanan_buka(self, camera_name): """Handle detection of pintu-kanan-buka""" logger.info(f"Detected pintu-kanan-buka on {camera_name}") # Only process if timer is not active if not self.timer_active: self.pintu_kanan_buka_detected = True self.check_detection_sequence() else: logger.debug("Ignoring pintu-kanan-buka during timer period") def handle_karung(self, camera_name, track_id): """Handle detection of karung object""" logger.info(f"Detected karung on {camera_name}") # Only count if timer is active if self.timer_active: with self.counter_lock: self.counter += 1 self.seen_objects[camera_name][track_id] = datetime.now() self.save_to_json(camera_name) self.publish_result() logger.info(f"Counter incremented to {self.counter} and republish to MQTT") else: logger.debug("Ignoring karung outside timer period") def check_detection_sequence(self): """Check if both pintu-kiri-buka and pintu-kanan-buka have been detected""" if self.pintu_kiri_buka_detected and self.pintu_kanan_buka_detected: self.start_timer() return if not self.timer_active_pintu: self.start_timer_pintu() def start_timer_pintu(self): """Start the 5-minute timer Pintu""" logger.info("Starting 5-minute timer Pintu") self.timer_active_pintu = True self.timer_start_time_pintu = datetime.now() # Schedule timer expiration check timer_thread = threading.Thread(target=self.check_timer_expiration_pintu) timer_thread.daemon = True timer_thread.start() def start_timer(self): """Start Counting""" #logger.info("Starting 60-minute timer") logger.info("Start Counting and Timer pintu-buka 5 minutes") self.timer_active = True self.timer_start_time = datetime.now() self.pintu_buka_timer = True # Schedule timer expiration check timer_thread = threading.Thread(target=self.check_timer_expiration_pintu_buka) timer_thread.daemon = True timer_thread.start() # Reset detection flags self.pintu_kiri_buka_detected = False self.pintu_kanan_buka_detected = False def check_timer_expiration_pintu(self): """Check if timer has expired (5 minutes)""" time.sleep(5 * 60) # Wait 5 minutes if self.timer_active_pintu: logger.info("Timer Pintu expired (5 minutes)") self.timer_active_pintu = False self.pintu_kiri_buka_detected = False self.pintu_kanan_buka_detected = False def check_timer_expiration_pintu_buka(self): """Check if timer has expired (5 minutes)""" time.sleep(5 * 60) # Wait 5 minutes if self.pintu_buka_timer: logger.info("Timer Pintu Buka expired (5 minutes)") self.pintu_buka_timer = False def publish_result(self): """Publish counter result to MQTT topic""" if self.counter > 0: topic = f"{self.topic}/{self.camera_name}/karung" #TESTING #topic = f"{self.topic}/{self.camera_name}-check_pintu/karung" message = str(self.counter) try: self.report_client.publish(topic, message) logger.info(f"Published counter result to {topic}: {message}") # Save to database #self.save_to_database() # Save to JSON for temporary persistent storage #self.save_to_json('frigate_camera') except Exception as e: logger.error(f"Error publishing result: {e}") else: logger.info("Counter is zero, not publishing result") def save_to_database(self): """Save counter result to SQLite database""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO karung_counts (camera_name, date, counter_value) VALUES (?, ?, ?) ''', (self.camera_name, date.today(), self.counter)) conn.commit() conn.close() logger.info(f"Saved counter result to database: {self.counter}") except Exception as e: logger.error(f"Error saving to database: {e}") # Ensure we don't lose data due to database errors # We should still try to save to JSON as backup try: self.save_to_json(self.camera_name) logger.info("Fallback save to JSON successful") except Exception as e2: logger.error(f"Fallback save to JSON also failed: {e2}") def save_to_json(self, camera_name): """Save counter result to JSON file for temporary persistent storage""" try: # Read existing data if os.path.exists(self.json_storage_path): with open(self.json_storage_path, 'r') as f: data = json.load(f) else: data = {} # Update counter value for this camera if camera_name not in data: data[camera_name] = {} data[camera_name]["karung"] = self.counter # Write back to file with open(self.json_storage_path, 'w') as f: json.dump(data, f, indent=2) logger.info(f"Saved counter result to JSON storage for {camera_name}: {self.counter}") except Exception as e: logger.error(f"Error saving to JSON storage: {e}") # Try to create a backup of the existing file before overwriting try: import shutil backup_path = f"{self.json_storage_path}.backup" if os.path.exists(self.json_storage_path): shutil.copy2(self.json_storage_path, backup_path) logger.info(f"Created backup of JSON storage: {backup_path}") except Exception as backup_e: logger.error(f"Failed to create backup of JSON storage: {backup_e}") raise # Re-raise the original exception def load_previous_counter(self): """Load the previous counter value from JSON storage on startup""" try: self.counter = self.load_from_json(self.camera_name) logger.info(f"Loaded previous counter value: {self.counter}") except Exception as e: logger.error(f"Error loading previous counter value: {e}") # If loading fails, start with 0 counter self.counter = 0 def load_from_json(self, camera_name): """Load counter result from JSON file""" try: if os.path.exists(self.json_storage_path): with open(self.json_storage_path, 'r') as f: data = json.load(f) if camera_name in data and "karung" in data[camera_name]: return data[camera_name]["karung"] return 0 except Exception as e: logger.error(f"Error loading from JSON storage: {e}") # Return 0 in case of error for safety return 0 def reset_counter(self): """Reset counter and clear detection flags""" logger.info("Resetting counter at midnight") #with self.counter_lock: # self.counter = 0 # self.save_to_database() # self.save_to_json(self.camera_name) # Save current counter value before resetting self.publish_result() self.save_to_database() self.counter = 0 self.save_to_json(self.camera_name) self.pintu_kiri_buka_detected = False self.pintu_kanan_buka_detected = False self.timer_active = False self.timer_start_time = None self.seen_objects = {} def run(self): """Main run loop""" logger.info("Starting Frigate Counter Service") try: while True: # Run scheduled tasks schedule.run_pending() time.sleep(1) except KeyboardInterrupt: logger.info("Shutting down Frigate Counter Service") self.frigate_client.loop_stop() self.report_client.loop_stop() self.frigate_client.disconnect() self.report_client.disconnect() if __name__ == "__main__": counter_service = FrigateCounter() counter_service.run()