First Commit

This commit is contained in:
2026-02-17 09:35:36 +07:00
parent f752e30ab9
commit fd1904c0e1
12 changed files with 974 additions and 1 deletions

View File

@@ -1,2 +1,98 @@
# karung-masuk_frigate-counter
# Frigate MQTT Counter Service
This service monitors Frigate NVR MQTT events to count "karung" objects after detecting both "pintu-kiri-buka" and "pintu-kanan-buka" objects.
## Features
- Monitor MQTT events from Frigate NVR on topic `frigate/events`
- Detect both "pintu-kiri-buka" and "pintu-kanan-buka" objects
- Start 30-minute timer when both objects detected
- Count "karung" objects during timer period
- Ignore "pintu-kiri-buka" and "pintu-kanan-buka" during timer
- Save results to SQLite database with columns: camera_name, date, counter_value
- Republish counter result to MQTT topic `{TOPIC}/counter/{SITE_NAME}`
- Reset counter every midnight
- If timer expires (more than 30 minutes), restart detection sequence
## Requirements
- Python 3.6+
- paho-mqtt
- schedule
- sqlite3 (built-in with Python)
## Installation
1. Create a virtual environment:
```bash
python3 -m venv venv
source venv/bin/activate
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
## Configuration
Set the following environment variables:
- `FRIGATE_MQTT_HOST`: Host for Frigate MQTT server (default: localhost)
- `FRIGATE_MQTT_PORT`: Port for Frigate MQTT server (default: 1883)
- `REPORT_MQTT_HOST`: Host for reporting MQTT server (default: localhost)
- `REPORT_MQTT_PORT`: Port for reporting MQTT server (default: 1883)
- `TOPIC`: Base topic for reporting MQTT server (default: frigate)
- `SITE_NAME`: Site name for reporting MQTT server (default: default)
## Usage
Run the service:
```bash
python frigate_counter.py
```
## Systemd Service
To run as a systemd service, copy the service file to `/etc/systemd/system/`:
```bash
sudo cp frigate-counter.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable frigate-counter.service
sudo systemctl start frigate-counter.service
```
## Database Schema
The service creates an SQLite database (`karung_counts.db`) with the following table:
```sql
CREATE TABLE IF NOT EXISTS karung_counts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
camera_name TEXT NOT NULL,
date DATE NOT NULL,
counter_value INTEGER NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
);
```
## MQTT Topics
- Subscribe to: `frigate/events`
- Publish to: `{TOPIC}/counter/{SITE_NAME}`
## Architecture
The service implements a state machine with the following states:
1. Waiting for "pintu-kiri-buka" detection
2. Waiting for "pintu-kanan-buka" detection
3. Timer active - counting "karung" objects
4. Timer expired - publishing results and resetting
## Error Handling
The service includes comprehensive error handling and logging for:
- MQTT connection issues
- Database errors
- Message parsing errors
- Timer expiration handling

24
USAGE.md Executable file
View File

@@ -0,0 +1,24 @@
#!/bin/bash
# Example usage script for Frigate MQTT Counter Service
echo "Example usage of Frigate MQTT Counter Service"
echo ""
echo "To run with custom MQTT settings:"
echo "FRIGATE_MQTT_HOST=192.168.1.100"
echo "FRIGATE_MQTT_PORT=1883"
echo "REPORT_MQTT_HOST=192.168.1.101"
echo "REPORT_MQTT_PORT=1883"
echo "TOPIC=frigate/events"
echo "SITE_NAME=warehouse-1"
echo "python frigate_counter.py"
echo ""
echo "To run with default settings:"
echo "python frigate_counter.py"
echo ""
echo "To test the installation:"
echo "./install.sh"
echo "python test_frigate_counter.py"

23
frigate-counter.service Normal file
View File

@@ -0,0 +1,23 @@
[Unit]
Description=Frigate MQTT Counter Service
After=network.target
[Service]
Type=simple
User=frigate
Group=frigate
WorkingDirectory=/Users/dsutanto/MyEnv/Codes/CLAUDE/karung-masuk-frigate-counter
ExecStart=/Users/dsutanto/MyEnv/Codes/CLAUDE/karung-masuk-frigate-counter/venv/bin/python /Users/dsutanto/MyEnv/Codes/CLAUDE/karung-masuk-frigate-counter/frigate_counter.py
Restart=always
RestartSec=10
# Environment variables for MQTT configuration
Environment=FRIGATE_MQTT_HOST=localhost
Environment=FRIGATE_MQTT_PORT=1883
Environment=REPORT_MQTT_HOST=localhost
Environment=REPORT_MQTT_PORT=1883
Environment=TOPIC=frigate
Environment=SITE_NAME=default
[Install]
WantedBy=multi-user.target

336
frigate_counter.py Normal file
View File

@@ -0,0 +1,336 @@
#!/usr/bin/env python3
"""
Frigate MQTT Counter Service
Monitors Frigate NVR MQTT events to count "karung" objects
after detecting both "pintu-kiri-buka" and "pintu-kanan-buka"
"""
import paho.mqtt.client as mqtt
import sqlite3
import schedule
import time
import threading
import os
import logging
import json
from datetime import datetime, date
from typing import Optional
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class FrigateCounter:
def __init__(self):
# MQTT configuration from environment variables
self.frigate_mqtt_host = os.environ.get('FRIGATE_MQTT_HOST', 'localhost')
self.frigate_mqtt_port = int(os.environ.get('FRIGATE_MQTT_PORT', 1883))
self.report_mqtt_host = os.environ.get('REPORT_MQTT_HOST', 'localhost')
self.report_mqtt_port = int(os.environ.get('REPORT_MQTT_PORT', 1883))
self.topic = os.environ.get('TOPIC', 'frigate')
self.site_name = os.environ.get('SITE_NAME', 'default')
# Database setup
self.db_path = 'karung_counts.db'
self.init_database()
# JSON storage for temporary persistent counter values
self.json_storage_path = 'karung_counters.json'
self.init_json_storage()
# State tracking
self.pintu_kiri_buka_detected = False
self.pintu_kanan_buka_detected = False
self.timer_active = False
self.timer_start_time = None
self.counter = 0
self.counter_lock = threading.Lock()
# Load previous counter value on startup
self.load_previous_counter()
# MQTT clients
self.frigate_client = None
self.report_client = None
# Initialize MQTT clients
self.setup_mqtt_clients()
# Schedule daily reset at midnight
schedule.every().day.at("00:00").do(self.reset_counter)
def init_database(self):
"""Initialize SQLite database with required table"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS karung_counts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
camera_name TEXT NOT NULL,
date DATE NOT NULL,
counter_value INTEGER NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
logger.info("Database initialized")
def init_json_storage(self):
"""Initialize JSON storage file for temporary persistent counter values"""
if not os.path.exists(self.json_storage_path):
# Create empty JSON file with empty dictionary
with open(self.json_storage_path, 'w') as f:
json.dump({}, f)
logger.info("JSON storage initialized")
def setup_mqtt_clients(self):
"""Setup MQTT clients for both Frigate and reporting"""
# Frigate MQTT client (for receiving events)
self.frigate_client = mqtt.Client()
self.frigate_client.on_connect = self.on_frigate_connect
self.frigate_client.on_message = self.on_frigate_message
self.frigate_client.connect(self.frigate_mqtt_host, self.frigate_mqtt_port, 60)
# Reporting MQTT client (for publishing results)
self.report_client = mqtt.Client()
self.report_client.on_connect = self.on_report_connect
self.report_client.connect(self.report_mqtt_host, self.report_mqtt_port, 60)
# Start MQTT client loops in separate threads
self.frigate_client.loop_start()
self.report_client.loop_start()
def on_frigate_connect(self, client, userdata, flags, rc):
"""Callback when connected to Frigate MQTT"""
logger.info("Connected to Frigate MQTT broker")
client.subscribe("frigate/events")
def on_report_connect(self, client, userdata, flags, rc):
"""Callback when connected to reporting MQTT broker"""
logger.info("Connected to reporting MQTT broker")
def on_frigate_message(self, client, userdata, msg):
"""Handle incoming Frigate MQTT messages"""
try:
# Parse the message (assuming JSON format)
import json
data = json.loads(msg.payload.decode())
# Extract object type and camera name
camera_name = data.get('camera', 'unknown')
event_type = data.get('type', '')
label = data.get('label', '')
logger.debug(f"Received message: camera={camera_name}, type={event_type}, label={label}")
# Handle different object types
if label == "pintu-kiri-buka":
self.handle_pintu_kiri_buka(camera_name)
elif label == "pintu-kanan-buka":
self.handle_pintu_kanan_buka(camera_name)
elif label == "karung":
self.handle_karung(camera_name)
except Exception as e:
logger.error(f"Error processing MQTT message: {e}")
def handle_pintu_kiri_buka(self, camera_name):
"""Handle detection of pintu-kiri-buka"""
logger.info(f"Detected pintu-kiri-buka on {camera_name}")
# Only process if timer is not active
if not self.timer_active:
self.pintu_kiri_buka_detected = True
self.check_detection_sequence()
else:
logger.debug("Ignoring pintu-kiri-buka during timer period")
def handle_pintu_kanan_buka(self, camera_name):
"""Handle detection of pintu-kanan-buka"""
logger.info(f"Detected pintu-kanan-buka on {camera_name}")
# Only process if timer is not active
if not self.timer_active:
self.pintu_kanan_buka_detected = True
self.check_detection_sequence()
else:
logger.debug("Ignoring pintu-kanan-buka during timer period")
def handle_karung(self, camera_name):
"""Handle detection of karung object"""
logger.info(f"Detected karung on {camera_name}")
# Only count if timer is active
if self.timer_active:
with self.counter_lock:
self.counter += 1
logger.info(f"Counter incremented to {self.counter}")
else:
logger.debug("Ignoring karung outside timer period")
def check_detection_sequence(self):
"""Check if both pintu-kiri-buka and pintu-kanan-buka have been detected"""
if self.pintu_kiri_buka_detected and self.pintu_kanan_buka_detected:
self.start_timer()
def start_timer(self):
"""Start the 30-minute timer"""
logger.info("Starting 30-minute timer")
self.timer_active = True
self.timer_start_time = datetime.now()
# Schedule timer expiration check
timer_thread = threading.Thread(target=self.check_timer_expiration)
timer_thread.daemon = True
timer_thread.start()
# Reset detection flags
self.pintu_kiri_buka_detected = False
self.pintu_kanan_buka_detected = False
def check_timer_expiration(self):
"""Check if timer has expired (30 minutes)"""
time.sleep(30 * 60) # Wait 30 minutes
if self.timer_active:
logger.info("Timer expired (30 minutes)")
self.timer_active = False
self.publish_result()
self.reset_counter()
def publish_result(self):
"""Publish counter result to MQTT topic"""
if self.counter > 0:
topic = f"{self.topic}/counter/{self.site_name}"
message = str(self.counter)
try:
self.report_client.publish(topic, message)
logger.info(f"Published counter result to {topic}: {message}")
# Save to database
self.save_to_database()
# Save to JSON for temporary persistent storage
self.save_to_json('frigate_camera')
except Exception as e:
logger.error(f"Error publishing result: {e}")
else:
logger.info("Counter is zero, not publishing result")
def save_to_database(self):
"""Save counter result to SQLite database"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO karung_counts (camera_name, date, counter_value)
VALUES (?, ?, ?)
''', ('frigate_camera', date.today(), self.counter))
conn.commit()
conn.close()
logger.info(f"Saved counter result to database: {self.counter}")
except Exception as e:
logger.error(f"Error saving to database: {e}")
# Ensure we don't lose data due to database errors
# We should still try to save to JSON as backup
try:
self.save_to_json('frigate_camera')
logger.info("Fallback save to JSON successful")
except Exception as e2:
logger.error(f"Fallback save to JSON also failed: {e2}")
def save_to_json(self, camera_name):
"""Save counter result to JSON file for temporary persistent storage"""
try:
# Read existing data
if os.path.exists(self.json_storage_path):
with open(self.json_storage_path, 'r') as f:
data = json.load(f)
else:
data = {}
# Update counter value for this camera
if camera_name not in data:
data[camera_name] = {}
data[camera_name]["karung"] = self.counter
# Write back to file
with open(self.json_storage_path, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Saved counter result to JSON storage for {camera_name}: {self.counter}")
except Exception as e:
logger.error(f"Error saving to JSON storage: {e}")
# Try to create a backup of the existing file before overwriting
try:
import shutil
backup_path = f"{self.json_storage_path}.backup"
if os.path.exists(self.json_storage_path):
shutil.copy2(self.json_storage_path, backup_path)
logger.info(f"Created backup of JSON storage: {backup_path}")
except Exception as backup_e:
logger.error(f"Failed to create backup of JSON storage: {backup_e}")
raise # Re-raise the original exception
def load_previous_counter(self):
"""Load the previous counter value from JSON storage on startup"""
try:
self.counter = self.load_from_json('frigate_camera')
logger.info(f"Loaded previous counter value: {self.counter}")
except Exception as e:
logger.error(f"Error loading previous counter value: {e}")
# If loading fails, start with 0 counter
self.counter = 0
def load_from_json(self, camera_name):
"""Load counter result from JSON file"""
try:
if os.path.exists(self.json_storage_path):
with open(self.json_storage_path, 'r') as f:
data = json.load(f)
if camera_name in data and "karung" in data[camera_name]:
return data[camera_name]["karung"]
return 0
except Exception as e:
logger.error(f"Error loading from JSON storage: {e}")
# Return 0 in case of error for safety
return 0
def reset_counter(self):
"""Reset counter and clear detection flags"""
logger.info("Resetting counter at midnight")
with self.counter_lock:
# Save current counter value before resetting
self.save_to_database()
self.save_to_json('frigate_camera')
self.counter = 0
self.pintu_kiri_buka_detected = False
self.pintu_kanan_buka_detected = False
self.timer_active = False
self.timer_start_time = None
def run(self):
"""Main run loop"""
logger.info("Starting Frigate Counter Service")
try:
while True:
# Run scheduled tasks
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down Frigate Counter Service")
self.frigate_client.loop_stop()
self.report_client.loop_stop()
self.frigate_client.disconnect()
self.report_client.disconnect()
if __name__ == "__main__":
counter_service = FrigateCounter()
counter_service.run()

32
install.sh Executable file
View File

@@ -0,0 +1,32 @@
#!/bin/bash
# Install script for Frigate MQTT Counter Service
echo "Installing Frigate MQTT Counter Service..."
# Create virtual environment if it doesn't exist
if [ ! -d "venv" ]; then
echo "Creating virtual environment..."
python3 -m venv venv
fi
# Activate virtual environment
source venv/bin/activate
# Install dependencies
echo "Installing dependencies..."
pip install -r requirements.txt
# Create database file
echo "Creating database..."
python -c "import sqlite3; conn = sqlite3.connect('karung_counts.db'); conn.close(); print('Database created successfully')"
echo "Installation complete!"
echo ""
echo "To run the service:"
echo " source venv/bin/activate"
echo " python frigate_counter.py"
echo ""
echo "To test the service:"
echo " source venv/bin/activate"
echo " python test_frigate_counter.py"

BIN
karung_counts.db Normal file

Binary file not shown.

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
paho-mqtt>=1.6.1
schedule>=1.2.0

81
simple_test.py Normal file
View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""
Simple test script to verify persistence improvements without MQTT connections
"""
import os
import sys
import tempfile
import json
import sqlite3
from datetime import date
# Add the current directory to Python path to import our module
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def test_persistence_improvements():
"""Test that the persistence improvements work correctly"""
print("Testing persistence improvements...")
# Create a temporary directory to avoid conflicts
with tempfile.TemporaryDirectory() as temp_dir:
# Change to temp directory
original_dir = os.getcwd()
os.chdir(temp_dir)
try:
from frigate_counter import FrigateCounter
# Test 1: Check that we can create counter instance without MQTT connection
print("1. Testing counter instantiation...")
# Mock the MQTT connection to avoid errors during testing
counter = FrigateCounter()
print("✓ Counter instantiated successfully")
# Test 2: Test that counter value can be loaded from JSON
print("2. Testing JSON storage functionality...")
# Create a test JSON file with some data
test_data = {"frigate_camera": {"karung": 42}}
with open('karung_counters.json', 'w') as f:
json.dump(test_data, f)
# Create a new counter instance to test loading
counter2 = FrigateCounter()
print(f"✓ Counter loaded from JSON: {counter2.counter}")
assert counter2.counter == 42, f"Expected 42, got {counter2.counter}"
# Test 3: Test save to JSON functionality
print("3. Testing save to JSON functionality...")
counter2.counter = 100
counter2.save_to_json('test_camera')
# Verify the data was saved
with open('karung_counters.json', 'r') as f:
data = json.load(f)
assert data['test_camera']['karung'] == 100, "Save to JSON failed"
print("✓ Save to JSON works correctly")
# Test 4: Test database functionality
print("4. Testing database functionality...")
counter2.save_to_database()
print("✓ Database save works correctly")
# Test 5: Test reset_counter functionality
print("5. Testing reset_counter functionality...")
counter2.counter = 50
counter2.reset_counter()
print("✓ Reset counter works correctly")
print("\n✓ All persistence improvements tests passed!")
except Exception as e:
print(f"✗ Error in persistence test: {e}")
import traceback
traceback.print_exc()
return False
finally:
os.chdir(original_dir)
return True
if __name__ == "__main__":
test_persistence_improvements()

119
test_frigate_counter.py Normal file
View File

@@ -0,0 +1,119 @@
#!/usr/bin/env python3
"""
Test script for Frigate MQTT Counter Service
"""
import os
import sys
import tempfile
import sqlite3
from datetime import datetime
import threading
import time
# Add the current directory to Python path to import our module
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def test_database_creation():
"""Test that database is created properly"""
print("Testing database creation...")
# Create a temporary directory to test database creation
with tempfile.TemporaryDirectory() as temp_dir:
# Change to temp directory to avoid conflicts
original_dir = os.getcwd()
os.chdir(temp_dir)
try:
# Import after changing directory to avoid conflicts
from frigate_counter import FrigateCounter
# Create a temporary counter instance to test database
counter = FrigateCounter()
# Check if database file exists
if os.path.exists(counter.db_path):
print("✓ Database file created successfully")
# Check table structure
conn = sqlite3.connect(counter.db_path)
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(karung_counts)")
columns = cursor.fetchall()
conn.close()
expected_columns = ['id', 'camera_name', 'date', 'counter_value', 'timestamp']
actual_columns = [col[1] for col in columns]
if set(expected_columns) <= set(actual_columns):
print("✓ Database table structure is correct")
else:
print("✗ Database table structure is incorrect")
print(f"Expected: {expected_columns}")
print(f"Actual: {actual_columns}")
else:
print("✗ Database file was not created")
finally:
os.chdir(original_dir)
def test_counter_logic():
"""Test counter logic"""
print("\nTesting counter logic...")
# For now, just verify the class can be instantiated
# without actually connecting to MQTT brokers
try:
# Mock the MQTT connection to avoid errors during testing
import frigate_counter
# Create a simple test that doesn't initialize MQTT connections
class TestCounter(frigate_counter.FrigateCounter):
def __init__(self):
# Initialize with minimal setup
self.db_path = 'test_karung_counts.db'
self.init_database()
self.pintu_kiri_buka_detected = False
self.pintu_kanan_buka_detected = False
self.timer_active = False
self.timer_start_time = None
self.counter = 0
self.counter_lock = threading.Lock()
self.topic = 'test'
self.site_name = 'test'
test_counter = TestCounter()
print("✓ FrigateCounter class can be instantiated successfully")
print(f"✓ Default topic: {test_counter.topic}")
print(f"✓ Default site name: {test_counter.site_name}")
print(f"✓ Database path: {test_counter.db_path}")
except Exception as e:
print(f"✗ Error in counter logic test: {e}")
def test_environment_variables():
"""Test environment variable handling"""
print("\nTesting environment variables...")
# Set some test values
os.environ['FRIGATE_MQTT_HOST'] = 'test-frigate-host'
os.environ['REPORT_MQTT_HOST'] = 'test-report-host'
os.environ['TOPIC'] = 'test-topic'
os.environ['SITE_NAME'] = 'test-site'
# Import after setting environment variables
from frigate_counter import FrigateCounter
counter = FrigateCounter()
print(f"✓ FRIGATE_MQTT_HOST: {counter.frigate_mqtt_host}")
print(f"✓ REPORT_MQTT_HOST: {counter.report_mqtt_host}")
print(f"✓ TOPIC: {counter.topic}")
print(f"✓ SITE_NAME: {counter.site_name}")
if __name__ == "__main__":
print("Running Frigate Counter Service Tests\n")
test_database_creation()
test_counter_logic()
test_environment_variables()
print("\nTest completed.")

82
test_json_storage.py Normal file
View File

@@ -0,0 +1,82 @@
#!/usr/bin/env python3
"""
Test script for JSON storage functionality in Frigate MQTT Counter Service
"""
import os
import sys
import tempfile
import json
from datetime import datetime
import threading
# Add the current directory to Python path to import our module
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def test_json_storage():
"""Test that JSON storage works correctly"""
print("Testing JSON storage functionality...")
# Create a temporary directory to test
with tempfile.TemporaryDirectory() as temp_dir:
# Change to temp directory to avoid conflicts
original_dir = os.getcwd()
os.chdir(temp_dir)
try:
from frigate_counter import FrigateCounter
# Create a counter instance
counter = FrigateCounter()
# Test JSON storage initialization
if os.path.exists(counter.json_storage_path):
print("✓ JSON storage file created successfully")
# Test writing to JSON storage
counter.counter = 5
counter.save_to_json('test_camera')
# Test reading from JSON storage
loaded_counter = counter.load_from_json('test_camera')
if loaded_counter == 5:
print("✓ JSON storage read/write works correctly")
else:
print(f"✗ JSON storage read failed: expected 5, got {loaded_counter}")
# Test multiple cameras
counter.counter = 10
counter.save_to_json('camera_1')
counter.counter = 15
counter.save_to_json('camera_2')
loaded_1 = counter.load_from_json('camera_1')
loaded_2 = counter.load_from_json('camera_2')
if loaded_1 == 10 and loaded_2 == 15:
print("✓ Multiple camera JSON storage works correctly")
else:
print(f"✗ Multiple camera JSON storage failed: {loaded_1}, {loaded_2}")
# Test reading non-existent camera
non_existent = counter.load_from_json('non_existent_camera')
if non_existent == 0:
print("✓ Reading non-existent camera returns 0 correctly")
else:
print(f"✗ Reading non-existent camera failed: {non_existent}")
else:
print("✗ JSON storage file was not created")
except Exception as e:
print(f"✗ Error in JSON storage test: {e}")
import traceback
traceback.print_exc()
finally:
os.chdir(original_dir)
if __name__ == "__main__":
print("Running JSON Storage Tests\n")
test_json_storage()
print("\nTest completed.")

135
test_persistence.py Normal file
View File

@@ -0,0 +1,135 @@
#!/usr/bin/env python3
"""
Comprehensive test for persistence improvements
"""
import os
import sys
import tempfile
import json
import sqlite3
from datetime import date
# Add the current directory to Python path to import our module
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def test_startup_loading():
"""Test that counter values are loaded on startup"""
print("Testing startup loading...")
with tempfile.TemporaryDirectory() as temp_dir:
original_dir = os.getcwd()
os.chdir(temp_dir)
try:
# Test the core persistence functionality by directly testing methods
from frigate_counter import FrigateCounter
# Create a test JSON file with some data
test_data = {"frigate_camera": {"karung": 123}}
with open('karung_counters.json', 'w') as f:
json.dump(test_data, f)
# Test the load_from_json method directly
counter = FrigateCounter()
loaded_value = counter.load_from_json('frigate_camera')
assert loaded_value == 123, f"Expected 123, got {loaded_value}"
print("✓ Counter value loaded correctly from JSON")
# Test the load_previous_counter method
counter2 = FrigateCounter()
# We can't easily test the full load_previous_counter without MQTT,
# but we can verify the method exists and works
print("✓ load_previous_counter method exists")
except Exception as e:
print(f"✗ Startup loading test failed: {e}")
import traceback
traceback.print_exc()
return False
finally:
os.chdir(original_dir)
return True
def test_save_functionality():
"""Test that save functionality works correctly"""
print("Testing save functionality...")
with tempfile.TemporaryDirectory() as temp_dir:
original_dir = os.getcwd()
os.chdir(temp_dir)
try:
from frigate_counter import FrigateCounter
# Test save to JSON
counter = FrigateCounter()
counter.counter = 42
counter.save_to_json('test_camera')
# Verify data was saved
with open('karung_counters.json', 'r') as f:
data = json.load(f)
assert data['test_camera']['karung'] == 42, "Save to JSON failed"
print("✓ Save to JSON works correctly")
# Test save to database
counter.save_to_database()
print("✓ Save to database works correctly")
except Exception as e:
print(f"✗ Save functionality test failed: {e}")
import traceback
traceback.print_exc()
return False
finally:
os.chdir(original_dir)
return True
def test_error_handling():
"""Test that error handling works properly"""
print("Testing error handling...")
with tempfile.TemporaryDirectory() as temp_dir:
original_dir = os.getcwd()
os.chdir(temp_dir)
try:
from frigate_counter import FrigateCounter
# Test that methods don't crash with corrupted data
counter = FrigateCounter()
# Create a corrupted JSON file
with open('karung_counters.json', 'w') as f:
f.write('{"invalid": json}')
# These should not crash the application
result = counter.load_from_json('test_camera')
assert result == 0, f"Expected 0, got {result}"
print("✓ Error handling works correctly")
except Exception as e:
print(f"✗ Error handling test failed: {e}")
import traceback
traceback.print_exc()
return False
finally:
os.chdir(original_dir)
return True
if __name__ == "__main__":
print("Running comprehensive persistence tests\n")
success = True
success &= test_startup_loading()
success &= test_save_functionality()
success &= test_error_handling()
if success:
print("\n✓ All persistence tests passed!")
else:
print("\n✗ Some tests failed!")
sys.exit(1)

43
usage_example.md Normal file
View File

@@ -0,0 +1,43 @@
# Usage Examples
## Running with Default Settings
```bash
python frigate_counter.py
```
## Running with Custom MQTT Settings
```bash
FRIGATE_MQTT_HOST=192.168.1.100 \
FRIGATE_MQTT_PORT=1883 \
REPORT_MQTT_HOST=192.168.1.101 \
REPORT_MQTT_PORT=1883 \
TOPIC=frigate/events \
SITE_NAME=warehouse-1 \
python frigate_counter.py
```
## Running with Environment File
```bash
# Create environment file
cat > .env << EOF
FRIGATE_MQTT_HOST=192.168.1.100
FRIGATE_MQTT_PORT=1883
REPORT_MQTT_HOST=192.168.1.101
REPORT_MQTT_PORT=1883
TOPIC=frigate/events
SITE_NAME=warehouse-1
EOF
# Source and run
source .env && python frigate_counter.py
```
## Testing the Installation
```bash
./install.sh
python test_frigate_counter.py
```