Files
karung-masuk_frigate-counter/frigate_counter.py.backup

434 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Frigate MQTT Counter Service
Monitors Frigate NVR MQTT events to count "karung" objects
after detecting both "pintu-kiri-buka" and "pintu-kanan-buka"
"""
import paho.mqtt.client as mqtt
import sqlite3
import schedule
import time
import threading
import os
import logging
import json
from datetime import datetime, date
from typing import Optional
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class FrigateCounter:
def __init__(self):
# MQTT configuration from environment variables
self.frigate_mqtt_host = os.environ.get('FRIGATE_MQTT_HOST', 'localhost')
self.frigate_mqtt_port = int(os.environ.get('FRIGATE_MQTT_PORT', 1883))
#self.report_mqtt_host = os.environ.get('REPORT_MQTT_HOST', 'localhost')
self.report_mqtt_host = os.environ.get('REPORT_MQTT_HOST', 'mqtt.backone.cloud')
self.report_mqtt_port = int(os.environ.get('REPORT_MQTT_PORT', 1883))
self.top_topic = os.environ.get("TOP_TOPIC", "cpsp")
self.site_name = os.environ.get('SITE_NAME', 'sukawarna')
self.topic = os.environ.get('TOPIC', f"{self.top_topic}/counter/{self.site_name}")
self.camera_name = os.environ.get('CAMERA_NAME', 'kandang_1_karung_masuk')
self.pintu_tutup_zone_name = os.environ.get('PINTU_TUTUP_ZONE_NAME', 'pintu_tutup')
self.pintu_kiri_buka_zone_name = os.environ.get('PINTU_KIRI_BUKA_ZONE_NAME', 'pintu_kiri_buka')
self.pintu_kanan_buka_zone_name = os.environ.get('PINTU_KANAN_BUKA_ZONE_NAME', 'pintu_kanan_buka')
logger.info(f"FRIGATE_MQTT_HOST: {self.frigate_mqtt_host}:{self.frigate_mqtt_port}")
logger.info(f"REPORT_MQTT_HOST: {self.report_mqtt_host}:{self.report_mqtt_port}")
logger.info(f"TOPIC: {self.topic}")
logger.info(f"CAMERA_NAME: {self.camera_name}")
# Database setup
self.db_path = '/etc/frigate-counter/karung-masuk/karung_masuk.db'
self.init_database()
# JSON storage for temporary persistent counter values
self.json_storage_path = '/etc/frigate-counter/karung-masuk/karung_masuk.json'
self.init_json_storage()
# State tracking
self.pintu_kiri_buka_detected = False
self.pintu_kanan_buka_detected = False
self.timer_active = False
self.timer_start_time = None
self.counter = 0
self.counter_lock = threading.Lock()
self.seen_objects = {}
# State pintu tracking
self.timer_active_pintu = False
self.timer_start_time_pintu = None
self.pintu_buka_timer = False
# Load previous counter value on startup
self.load_previous_counter()
# MQTT clients
self.frigate_client = None
self.report_client = None
# Initialize MQTT clients
self.setup_mqtt_clients()
# Schedule daily reset at midnight
schedule.every().day.at("23:59").do(self.reset_counter)
def init_database(self):
"""Initialize SQLite database with required table"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS karung_counts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
camera_name TEXT NOT NULL,
date DATE NOT NULL,
counter_value INTEGER NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
logger.info("Database initialized")
def init_json_storage(self):
"""Initialize JSON storage file for temporary persistent counter values"""
if not os.path.exists(self.json_storage_path):
# Create empty JSON file with empty dictionary
with open(self.json_storage_path, 'w') as f:
json.dump({}, f)
logger.info("JSON storage initialized")
def setup_mqtt_clients(self):
"""Setup MQTT clients for both Frigate and reporting"""
# Frigate MQTT client (for receiving events)
self.frigate_client = mqtt.Client()
self.frigate_client.on_connect = self.on_frigate_connect
self.frigate_client.on_message = self.on_frigate_message
self.frigate_client.connect(self.frigate_mqtt_host, self.frigate_mqtt_port, 60)
# Reporting MQTT client (for publishing results)
self.report_client = mqtt.Client()
self.report_client.on_connect = self.on_report_connect
self.report_client.connect(self.report_mqtt_host, self.report_mqtt_port, 60)
# Start MQTT client loops in separate threads
self.frigate_client.loop_start()
self.report_client.loop_start()
def on_frigate_connect(self, client, userdata, flags, rc):
"""Callback when connected to Frigate MQTT"""
logger.info("Connected to Frigate MQTT broker")
client.subscribe("frigate/events")
def on_report_connect(self, client, userdata, flags, rc):
"""Callback when connected to reporting MQTT broker"""
logger.info("Connected to reporting MQTT broker")
def on_frigate_message(self, client, userdata, msg):
"""Handle incoming Frigate MQTT messages"""
try:
# Parse the message (assuming JSON format)
#import json
payload = json.loads(msg.payload.decode())
if "after" not in payload:
return
#event_after = payload["after"]
event_after = payload.get("after", {})
event_before = payload.get("before", {})
camera_name = event_after.get("camera", "unknown")
if camera_name != self.camera_name:
return
event_type = event_after.get('type', '')
label = event_after.get("label", "")
track_id = event_after.get("id")
zones_after = event_after.get("entered_zones", [])
zones_before = event_before.get("entered_zones", [])
# Dont detect stationary
stationary = event_after.get("stationary")
if stationary:
return
new_zones = [z for z in zones_after if z not in zones_before]
# debounce per track_id
if camera_name not in self.seen_objects:
self.seen_objects[camera_name] = {}
if track_id in self.seen_objects[camera_name]:
return # sudah dihitung
# Extract object type and camera name
#camera_name = data.get('camera', 'unknown')
#event_type = data.get('type', '')
#label = data.get('label', '')
logger.debug(f"Received message: camera={camera_name}, type={event_type}, label={label}")
# Handle different object types
if label == "pintu-kiri-buka" and not self.timer_active and not self.pintu_kiri_buka_detected:
self.handle_pintu_kiri_buka(camera_name)
elif label == "pintu-kanan-buka" and not self.timer_active and not self.pintu_kanan_buka_detected:
self.handle_pintu_kanan_buka(camera_name)
elif label == "karung" and self.timer_active:
self.handle_karung(camera_name, track_id)
elif label == "pintu-tutup" and self.timer_active and self.pintu_tutup_zone_name in zones_after and not self.pintu_buka_timer:
self.handle_pintu_tutup(camera_name)
except Exception as e:
logger.error(f"Error processing MQTT message: {e}")
def handle_pintu_tutup(self, camera_name):
"""Handle detection of pintu-tutup"""
logger.info(f"Detected pintu-tutup {camera_name}")
if self.timer_active:
logger.info("Stop Counting Karung")
self.timer_active = False
self.seen_objects = {}
def handle_pintu_kiri_buka(self, camera_name):
"""Handle detection of pintu-kiri-buka"""
logger.info(f"Detected pintu-kiri-buka on {camera_name}")
# Only process if timer is not active
if not self.timer_active:
self.pintu_kiri_buka_detected = True
self.check_detection_sequence()
else:
logger.debug("Ignoring pintu-kiri-buka during timer period")
def handle_pintu_kanan_buka(self, camera_name):
"""Handle detection of pintu-kanan-buka"""
logger.info(f"Detected pintu-kanan-buka on {camera_name}")
# Only process if timer is not active
if not self.timer_active:
self.pintu_kanan_buka_detected = True
self.check_detection_sequence()
else:
logger.debug("Ignoring pintu-kanan-buka during timer period")
def handle_karung(self, camera_name, track_id):
"""Handle detection of karung object"""
logger.info(f"Detected karung on {camera_name}")
# Only count if timer is active
if self.timer_active:
with self.counter_lock:
self.counter += 1
self.seen_objects[camera_name][track_id] = datetime.now()
self.save_to_json(camera_name)
self.publish_result()
logger.info(f"Counter incremented to {self.counter} and republish to MQTT")
else:
logger.debug("Ignoring karung outside timer period")
def check_detection_sequence(self):
"""Check if both pintu-kiri-buka and pintu-kanan-buka have been detected"""
if self.pintu_kiri_buka_detected and self.pintu_kanan_buka_detected:
self.start_timer()
return
if not self.timer_active_pintu:
self.start_timer_pintu()
def start_timer_pintu(self):
"""Start the 5-minute timer Pintu"""
logger.info("Starting 5-minute timer Pintu")
self.timer_active_pintu = True
self.timer_start_time_pintu = datetime.now()
# Schedule timer expiration check
timer_thread = threading.Thread(target=self.check_timer_expiration_pintu)
timer_thread.daemon = True
timer_thread.start()
def start_timer(self):
"""Start Counting"""
#logger.info("Starting 60-minute timer")
logger.info("Start Counting and Timer pintu-buka 5 minutes")
self.timer_active = True
self.timer_start_time = datetime.now()
self.pintu_buka_timer = True
# Schedule timer expiration check
timer_thread = threading.Thread(target=self.check_timer_expiration_pintu_buka)
timer_thread.daemon = True
timer_thread.start()
# Reset detection flags
self.pintu_kiri_buka_detected = False
self.pintu_kanan_buka_detected = False
def check_timer_expiration_pintu(self):
"""Check if timer has expired (5 minutes)"""
time.sleep(5 * 60) # Wait 5 minutes
if self.timer_active_pintu:
logger.info("Timer Pintu expired (5 minutes)")
self.timer_active_pintu = False
self.pintu_kiri_buka_detected = False
self.pintu_kanan_buka_detected = False
def check_timer_expiration_pintu_buka(self):
"""Check if timer has expired (5 minutes)"""
time.sleep(5 * 60) # Wait 5 minutes
if self.pintu_buka_timer:
logger.info("Timer Pintu Buka expired (5 minutes)")
self.pintu_buka_timer = False
def publish_result(self):
"""Publish counter result to MQTT topic"""
if self.counter > 0:
topic = f"{self.topic}/{self.camera_name}/karung"
#TESTING
#topic = f"{self.topic}/{self.camera_name}-check_pintu/karung"
message = str(self.counter)
try:
self.report_client.publish(topic, message)
logger.info(f"Published counter result to {topic}: {message}")
# Save to database
#self.save_to_database()
# Save to JSON for temporary persistent storage
#self.save_to_json('frigate_camera')
except Exception as e:
logger.error(f"Error publishing result: {e}")
else:
logger.info("Counter is zero, not publishing result")
def save_to_database(self):
"""Save counter result to SQLite database"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO karung_counts (camera_name, date, counter_value)
VALUES (?, ?, ?)
''', (self.camera_name, date.today(), self.counter))
conn.commit()
conn.close()
logger.info(f"Saved counter result to database: {self.counter}")
except Exception as e:
logger.error(f"Error saving to database: {e}")
# Ensure we don't lose data due to database errors
# We should still try to save to JSON as backup
try:
self.save_to_json(self.camera_name)
logger.info("Fallback save to JSON successful")
except Exception as e2:
logger.error(f"Fallback save to JSON also failed: {e2}")
def save_to_json(self, camera_name):
"""Save counter result to JSON file for temporary persistent storage"""
try:
# Read existing data
if os.path.exists(self.json_storage_path):
with open(self.json_storage_path, 'r') as f:
data = json.load(f)
else:
data = {}
# Update counter value for this camera
if camera_name not in data:
data[camera_name] = {}
data[camera_name]["karung"] = self.counter
# Write back to file
with open(self.json_storage_path, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Saved counter result to JSON storage for {camera_name}: {self.counter}")
except Exception as e:
logger.error(f"Error saving to JSON storage: {e}")
# Try to create a backup of the existing file before overwriting
try:
import shutil
backup_path = f"{self.json_storage_path}.backup"
if os.path.exists(self.json_storage_path):
shutil.copy2(self.json_storage_path, backup_path)
logger.info(f"Created backup of JSON storage: {backup_path}")
except Exception as backup_e:
logger.error(f"Failed to create backup of JSON storage: {backup_e}")
raise # Re-raise the original exception
def load_previous_counter(self):
"""Load the previous counter value from JSON storage on startup"""
try:
self.counter = self.load_from_json(self.camera_name)
logger.info(f"Loaded previous counter value: {self.counter}")
except Exception as e:
logger.error(f"Error loading previous counter value: {e}")
# If loading fails, start with 0 counter
self.counter = 0
def load_from_json(self, camera_name):
"""Load counter result from JSON file"""
try:
if os.path.exists(self.json_storage_path):
with open(self.json_storage_path, 'r') as f:
data = json.load(f)
if camera_name in data and "karung" in data[camera_name]:
return data[camera_name]["karung"]
return 0
except Exception as e:
logger.error(f"Error loading from JSON storage: {e}")
# Return 0 in case of error for safety
return 0
def reset_counter(self):
"""Reset counter and clear detection flags"""
logger.info("Resetting counter at midnight")
#with self.counter_lock:
# self.counter = 0
# self.save_to_database()
# self.save_to_json(self.camera_name)
# Save current counter value before resetting
self.publish_result()
self.save_to_database()
self.counter = 0
self.save_to_json(self.camera_name)
self.pintu_kiri_buka_detected = False
self.pintu_kanan_buka_detected = False
self.timer_active = False
self.timer_start_time = None
self.seen_objects = {}
def run(self):
"""Main run loop"""
logger.info("Starting Frigate Counter Service")
try:
while True:
# Run scheduled tasks
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down Frigate Counter Service")
self.frigate_client.loop_stop()
self.report_client.loop_stop()
self.frigate_client.disconnect()
self.report_client.disconnect()
if __name__ == "__main__":
counter_service = FrigateCounter()
counter_service.run()