diff --git a/README.md b/README.md index 8dd89b9..c8f239a 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,4 @@ -# zenai-apc-salatiga - +pip install -r requirements.txt +export CAMERA_NAME=cam_gudang_utara +export FRIGATE_MQTT_HOST=192.168.1.10 +python counter_service.py diff --git a/counter_dashboard.py b/counter_dashboard.py new file mode 100644 index 0000000..9cd60b6 --- /dev/null +++ b/counter_dashboard.py @@ -0,0 +1,462 @@ +#!/usr/bin/env python3 +""" +Frigate Counter Dashboard +A beautiful, interactive Flask web app for viewing daily batch counting data. +""" + +import json +import os +import sqlite3 +import csv +from io import StringIO +from datetime import datetime, timedelta + +from flask import Flask, render_template, jsonify, request, Response +from werkzeug.serving import WSGIRequestHandler + + +app = Flask(__name__, template_folder="templates") +app.config["SECRET_KEY"] = os.getenv("SECRET_KEY", "frigate-counter-dashboard") + +DB_PATH = os.getenv("DB_PATH", "frigate_counter.db") +CURRENT_BATCH_PATH = os.getenv("CURRENT_BATCH_PATH", "current_batch.json") +CUTOFF_TIME = os.getenv("CUTOFF_TIME", "17:00") + + +# ------------------------------------------------------------------ # +# Database helpers +# ------------------------------------------------------------------ # +def get_db(): + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + return conn + + +# def get_counting_date(dt=None, cutoff_str="17:00"): +def get_counting_date(dt=None, cutoff_str=CUTOFF_TIME): + """Replicate the service logic for determining the counting date.""" + if dt is None: + dt = datetime.now() + cutoff = datetime.strptime(cutoff_str, "%H:%M").time() + if dt.time() < cutoff: + return dt.date().isoformat() + return (dt.date() + timedelta(days=1)).isoformat() + + +# ------------------------------------------------------------------ # +# Routes +# ------------------------------------------------------------------ # +@app.route("/") +def index(): + """Main dashboard page.""" + return render_template("dashboard.html") + + +@app.route("/api/current-batch") +def api_current_batch(): + """Get real-time data from the current active batch file.""" + try: + with open(CURRENT_BATCH_PATH, "r") as f: + data = json.load(f) + return jsonify( + { + "success": True, + "counting_date": data.get("counting_date"), + "batch_number": data.get("batch_number"), + "count": data.get("count", 0), + "start_time": data.get("start_time"), + "last_detection_time": data.get("last_detection_time"), + } + ) + except FileNotFoundError: + return jsonify( + { + "success": False, + "error": "No active batch", + "count": 0, + "batch_number": None, + "counting_date": None, + } + ), 404 + except Exception as e: + return jsonify( + { + "success": False, + "error": str(e), + "count": 0, + "batch_number": None, + "counting_date": None, + } + ), 500 + + +@app.route("/api/summary") +def api_summary(): + """Get summary statistics for the dashboard cards.""" + conn = get_db() + cur = conn.cursor() + + # Today's counting date (based on cutoff) + today = get_counting_date() + + # Today's stats + cur.execute( + """ + SELECT COALESCE(total_count, 0) as total_count, + COALESCE(total_batches, 0) as total_batches + FROM daily_summaries + WHERE counting_date = ? + """, + (today,), + ) + today_row = cur.fetchone() + + # Yesterday's stats + yesterday = ( + datetime.strptime(today, "%Y-%m-%d").date() - timedelta(days=1) + ).isoformat() + cur.execute( + """ + SELECT COALESCE(total_count, 0) as total_count, + COALESCE(total_batches, 0) as total_batches + FROM daily_summaries + WHERE counting_date = ? + """, + (yesterday,), + ) + yesterday_row = cur.fetchone() + + # All-time totals + cur.execute(""" + SELECT COALESCE(SUM(total_count), 0) as grand_total, + COALESCE(SUM(total_batches), 0) as grand_batches, + COUNT(DISTINCT counting_date) as total_days + FROM daily_summaries + """) + all_time = cur.fetchone() + + # Average per day + cur.execute(""" + SELECT ROUND(AVG(total_count), 1) as avg_per_day + FROM daily_summaries + """) + avg = cur.fetchone() + + # Best day + cur.execute(""" + SELECT counting_date, total_count + FROM daily_summaries + ORDER BY total_count DESC + LIMIT 1 + """) + best = cur.fetchone() + + conn.close() + + return jsonify( + { + "today": { + "date": today, + "total_count": today_row["total_count"] if today_row else 0, + "total_batches": today_row["total_batches"] if today_row else 0, + }, + "yesterday": { + "date": yesterday, + "total_count": yesterday_row["total_count"] if yesterday_row else 0, + "total_batches": yesterday_row["total_batches"] if yesterday_row else 0, + }, + "all_time": { + "grand_total": all_time["grand_total"], + "grand_batches": all_time["grand_batches"], + "total_days": all_time["total_days"], + }, + "average_per_day": avg["avg_per_day"] or 0, + "best_day": { + "date": best["counting_date"] if best else None, + "count": best["total_count"] if best else 0, + }, + } + ) + + +@app.route("/api/daily-data") +def api_daily_data(): + """Get daily data for charts and table.""" + days = request.args.get("days", 30, type=int) + date_from = (datetime.now() - timedelta(days=days)).date().isoformat() + + conn = get_db() + cur = conn.cursor() + + # Daily summaries for chart + cur.execute( + """ + SELECT counting_date, total_count, total_batches, + ROUND(CAST(total_count AS FLOAT) / total_batches, 1) as avg_per_batch + FROM daily_summaries + WHERE counting_date >= ? + ORDER BY counting_date ASC + """, + (date_from,), + ) + + daily_data = [] + for row in cur.fetchall(): + daily_data.append( + { + "date": row["counting_date"], + "total_count": row["total_count"], + "total_batches": row["total_batches"], + "avg_per_batch": row["avg_per_batch"] or 0, + } + ) + + conn.close() + return jsonify(daily_data) + + +@app.route("/api/day-detail/") +def api_day_detail(date): + """Get detailed batch information for a specific day.""" + conn = get_db() + cur = conn.cursor() + + # Batches for this day + cur.execute( + """ + SELECT batch_number, count, start_time, end_time, + ROUND( + (julianday(end_time) - julianday(start_time)) * 24 * 60, 1 + ) as duration_minutes + FROM batches + WHERE counting_date = ? + ORDER BY batch_number ASC + """, + (date,), + ) + + batches = [] + total_duration = 0 + for row in cur.fetchall(): + duration = row["duration_minutes"] or 0 + total_duration += duration + batches.append( + { + "batch_number": row["batch_number"], + "count": row["count"], + "start_time": row["start_time"], + "end_time": row["end_time"], + "duration_minutes": duration, + } + ) + + # Summary for the day + cur.execute( + """ + SELECT total_count, total_batches + FROM daily_summaries + WHERE counting_date = ? + """, + (date,), + ) + summary = cur.fetchone() + + conn.close() + + return jsonify( + { + "date": date, + "total_count": summary["total_count"] if summary else 0, + "total_batches": summary["total_batches"] if summary else 0, + "total_duration_minutes": round(total_duration, 1), + "avg_duration_minutes": round(total_duration / len(batches), 1) + if batches + else 0, + "batches": batches, + } + ) + + +@app.route("/api/recent-batches") +def api_recent_batches(): + """Get the most recent batches across all days.""" + limit = request.args.get("limit", 10, type=int) + + conn = get_db() + cur = conn.cursor() + + cur.execute( + """ + SELECT counting_date, batch_number, count, start_time, end_time, + ROUND( + (julianday(end_time) - julianday(start_time)) * 24 * 60, 1 + ) as duration_minutes + FROM batches + ORDER BY end_time DESC + LIMIT ? + """, + (limit,), + ) + + batches = [] + for row in cur.fetchall(): + batches.append( + { + "date": row["counting_date"], + "batch_number": row["batch_number"], + "count": row["count"], + "start_time": row["start_time"], + "end_time": row["end_time"], + "duration_minutes": row["duration_minutes"] or 0, + } + ) + + conn.close() + return jsonify(batches) + + +@app.route("/api/available-dates") +def api_available_dates(): + """Get list of all available counting dates.""" + conn = get_db() + cur = conn.cursor() + + cur.execute(""" + SELECT counting_date, total_count, total_batches + FROM daily_summaries + ORDER BY counting_date DESC + """) + + dates = [] + for row in cur.fetchall(): + dates.append( + { + "date": row["counting_date"], + "total_count": row["total_count"], + "total_batches": row["total_batches"], + } + ) + + conn.close() + return jsonify(dates) + + +# ------------------------------------------------------------------ # +# CSV Export Routes +# ------------------------------------------------------------------ # +@app.route("/api/export-daily-csv") +def export_daily_csv(): + """Export daily summary records as CSV.""" + days = request.args.get("days", 30, type=int) + date_from = (datetime.now() - timedelta(days=days)).date().isoformat() + + conn = get_db() + cur = conn.cursor() + + cur.execute( + """ + SELECT counting_date, total_count, total_batches, + ROUND(CAST(total_count AS FLOAT) / NULLIF(total_batches, 0), 1) as avg_per_batch + FROM daily_summaries + WHERE counting_date >= ? + ORDER BY counting_date ASC + """, + (date_from,), + ) + + output = StringIO() + writer = csv.writer(output) + writer.writerow(["Date", "Total Count", "Total Batches", "Avg per Batch"]) + + for row in cur.fetchall(): + writer.writerow( + [ + row["counting_date"], + row["total_count"], + row["total_batches"], + row["avg_per_batch"] or 0, + ] + ) + + conn.close() + + csv_data = output.getvalue() + output.close() + + filename = f"daily_records_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" + return Response( + csv_data, + mimetype="text/csv", + headers={ + "Content-Disposition": f"attachment; filename={filename}", + "Content-Type": "text/csv; charset=utf-8", + }, + ) + + +@app.route("/api/export-day-csv/") +def export_day_csv(date): + """Export batch details for a specific day as CSV.""" + conn = get_db() + cur = conn.cursor() + + # Batches for this day + cur.execute( + """ + SELECT batch_number, count, start_time, end_time, + ROUND( + (julianday(end_time) - julianday(start_time)) * 24 * 60, 1 + ) as duration_minutes + FROM batches + WHERE counting_date = ? + ORDER BY batch_number ASC + """, + (date,), + ) + + output = StringIO() + writer = csv.writer(output) + writer.writerow( + ["Batch Number", "Count", "Start Time", "End Time", "Duration (min)"] + ) + + for row in cur.fetchall(): + writer.writerow( + [ + row["batch_number"], + row["count"], + row["start_time"], + row["end_time"], + row["duration_minutes"] or 0, + ] + ) + + conn.close() + + csv_data = output.getvalue() + output.close() + + filename = f"day_detail_{date}.csv" + return Response( + csv_data, + mimetype="text/csv", + headers={ + "Content-Disposition": f"attachment; filename={filename}", + "Content-Type": "text/csv; charset=utf-8", + }, + ) + + +# ------------------------------------------------------------------ # +# Run +# ------------------------------------------------------------------ # +if __name__ == "__main__": + # Suppress Flask's default request log spam + WSGIRequestHandler.protocol_version = "HTTP/1.1" + + port = int(os.getenv("DASHBOARD_PORT", 80)) + host = os.getenv("DASHBOARD_HOST", "0.0.0.0") + debug = os.getenv("FLASK_DEBUG", "false").lower() == "true" + + print(f"🚀 Dashboard running at http://{host}:{port}") + app.run(host=host, port=port, debug=debug) diff --git a/counter_dashboard.py.20260506 b/counter_dashboard.py.20260506 new file mode 100644 index 0000000..6808e78 --- /dev/null +++ b/counter_dashboard.py.20260506 @@ -0,0 +1,280 @@ +#!/usr/bin/env python3 +""" +Frigate Counter Dashboard +A beautiful, interactive Flask web app for viewing daily batch counting data. +""" +import os +import sqlite3 +from datetime import datetime, timedelta + +from flask import Flask, render_template, jsonify, request +from werkzeug.serving import WSGIRequestHandler + + +app = Flask(__name__, template_folder='templates') +app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'frigate-counter-dashboard') + +DB_PATH = os.getenv('DB_PATH', 'frigate_counter.db') + + +# ------------------------------------------------------------------ # +# Database helpers +# ------------------------------------------------------------------ # +def get_db(): + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + return conn + + +def get_counting_date(dt=None, cutoff_str="08:00"): + """Replicate the service logic for determining the counting date.""" + if dt is None: + dt = datetime.now() + cutoff = datetime.strptime(cutoff_str, "%H:%M").time() + print(f"DT: {dt.time()}, CO: {cutoff}") + if dt.time() >= cutoff: + print(f"Return Current day: {dt.date()}") + return dt.date().isoformat() + print(f"Return -1 day: {dt.date() - timedelta(days=1)}") + return (dt.date() - timedelta(days=1)).isoformat() + + +# ------------------------------------------------------------------ # +# Routes +# ------------------------------------------------------------------ # +@app.route('/') +def index(): + """Main dashboard page.""" + return render_template('dashboard.html') + + +@app.route('/api/summary') +def api_summary(): + """Get summary statistics for the dashboard cards.""" + conn = get_db() + cur = conn.cursor() + + # Today's counting date (based on cutoff) + today = get_counting_date() + + # Today's stats + cur.execute(""" + SELECT COALESCE(total_count, 0) as total_count, + COALESCE(total_batches, 0) as total_batches + FROM daily_summaries + WHERE counting_date = ? + """, (today,)) + today_row = cur.fetchone() + + # Yesterday's stats + yesterday = (datetime.strptime(today, "%Y-%m-%d").date() - timedelta(days=1)).isoformat() + cur.execute(""" + SELECT COALESCE(total_count, 0) as total_count, + COALESCE(total_batches, 0) as total_batches + FROM daily_summaries + WHERE counting_date = ? + """, (yesterday,)) + yesterday_row = cur.fetchone() + + # All-time totals + cur.execute(""" + SELECT COALESCE(SUM(total_count), 0) as grand_total, + COALESCE(SUM(total_batches), 0) as grand_batches, + COUNT(DISTINCT counting_date) as total_days + FROM daily_summaries + """) + all_time = cur.fetchone() + + # Average per day + cur.execute(""" + SELECT ROUND(AVG(total_count), 1) as avg_per_day + FROM daily_summaries + """) + avg = cur.fetchone() + + # Best day + cur.execute(""" + SELECT counting_date, total_count + FROM daily_summaries + ORDER BY total_count DESC + LIMIT 1 + """) + best = cur.fetchone() + + conn.close() + + return jsonify({ + 'today': { + 'date': today, + 'total_count': today_row['total_count'] if today_row else 0, + 'total_batches': today_row['total_batches'] if today_row else 0, + }, + 'yesterday': { + 'date': yesterday, + 'total_count': yesterday_row['total_count'] if yesterday_row else 0, + 'total_batches': yesterday_row['total_batches'] if yesterday_row else 0, + }, + 'all_time': { + 'grand_total': all_time['grand_total'], + 'grand_batches': all_time['grand_batches'], + 'total_days': all_time['total_days'], + }, + 'average_per_day': avg['avg_per_day'] or 0, + 'best_day': { + 'date': best['counting_date'] if best else None, + 'count': best['total_count'] if best else 0, + } + }) + + +@app.route('/api/daily-data') +def api_daily_data(): + """Get daily data for charts and table.""" + days = request.args.get('days', 30, type=int) + date_from = (datetime.now() - timedelta(days=days)).date().isoformat() + + conn = get_db() + cur = conn.cursor() + + # Daily summaries for chart + cur.execute(""" + SELECT counting_date, total_count, total_batches, + ROUND(CAST(total_count AS FLOAT) / total_batches, 1) as avg_per_batch + FROM daily_summaries + WHERE counting_date >= ? + ORDER BY counting_date ASC + """, (date_from,)) + + daily_data = [] + for row in cur.fetchall(): + daily_data.append({ + 'date': row['counting_date'], + 'total_count': row['total_count'], + 'total_batches': row['total_batches'], + 'avg_per_batch': row['avg_per_batch'] or 0, + }) + + conn.close() + return jsonify(daily_data) + + +@app.route('/api/day-detail/') +def api_day_detail(date): + """Get detailed batch information for a specific day.""" + conn = get_db() + cur = conn.cursor() + + # Batches for this day + cur.execute(""" + SELECT batch_number, count, start_time, end_time, + ROUND( + (julianday(end_time) - julianday(start_time)) * 24 * 60, 1 + ) as duration_minutes + FROM batches + WHERE counting_date = ? + ORDER BY batch_number ASC + """, (date,)) + + batches = [] + total_duration = 0 + for row in cur.fetchall(): + duration = row['duration_minutes'] or 0 + total_duration += duration + batches.append({ + 'batch_number': row['batch_number'], + 'count': row['count'], + 'start_time': row['start_time'], + 'end_time': row['end_time'], + 'duration_minutes': duration, + }) + + # Summary for the day + cur.execute(""" + SELECT total_count, total_batches + FROM daily_summaries + WHERE counting_date = ? + """, (date,)) + summary = cur.fetchone() + + conn.close() + + return jsonify({ + 'date': date, + 'total_count': summary['total_count'] if summary else 0, + 'total_batches': summary['total_batches'] if summary else 0, + 'total_duration_minutes': round(total_duration, 1), + 'avg_duration_minutes': round(total_duration / len(batches), 1) if batches else 0, + 'batches': batches, + }) + + +@app.route('/api/recent-batches') +def api_recent_batches(): + """Get the most recent batches across all days.""" + limit = request.args.get('limit', 10, type=int) + + conn = get_db() + cur = conn.cursor() + + cur.execute(""" + SELECT counting_date, batch_number, count, start_time, end_time, + ROUND( + (julianday(end_time) - julianday(start_time)) * 24 * 60, 1 + ) as duration_minutes + FROM batches + ORDER BY end_time DESC + LIMIT ? + """, (limit,)) + + batches = [] + for row in cur.fetchall(): + batches.append({ + 'date': row['counting_date'], + 'batch_number': row['batch_number'], + 'count': row['count'], + 'start_time': row['start_time'], + 'end_time': row['end_time'], + 'duration_minutes': row['duration_minutes'] or 0, + }) + + conn.close() + return jsonify(batches) + + +@app.route('/api/available-dates') +def api_available_dates(): + """Get list of all available counting dates.""" + conn = get_db() + cur = conn.cursor() + + cur.execute(""" + SELECT counting_date, total_count, total_batches + FROM daily_summaries + ORDER BY counting_date DESC + """) + + dates = [] + for row in cur.fetchall(): + dates.append({ + 'date': row['counting_date'], + 'total_count': row['total_count'], + 'total_batches': row['total_batches'], + }) + + conn.close() + return jsonify(dates) + + +# ------------------------------------------------------------------ # +# Run +# ------------------------------------------------------------------ # +if __name__ == '__main__': + # Suppress Flask's default request log spam + WSGIRequestHandler.protocol_version = "HTTP/1.1" + + port = int(os.getenv('DASHBOARD_PORT', 80)) + host = os.getenv('DASHBOARD_HOST', '0.0.0.0') + debug = os.getenv('FLASK_DEBUG', 'false').lower() == 'true' + + print(f"🚀 Dashboard running at http://{host}:{port}") + app.run(host=host, port=port, debug=debug) diff --git a/counter_dashboard.py.20260517 b/counter_dashboard.py.20260517 new file mode 100644 index 0000000..d6c4940 --- /dev/null +++ b/counter_dashboard.py.20260517 @@ -0,0 +1,377 @@ +#!/usr/bin/env python3 +""" +Frigate Counter Dashboard +A beautiful, interactive Flask web app for viewing daily batch counting data. +""" +import os +import sqlite3 +import csv +from io import StringIO +from datetime import datetime, timedelta + +from flask import Flask, render_template, jsonify, request, Response +from werkzeug.serving import WSGIRequestHandler + + +app = Flask(__name__, template_folder='templates') +app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'frigate-counter-dashboard') + +DB_PATH = os.getenv('DB_PATH', 'frigate_counter.db') + + +# ------------------------------------------------------------------ # +# Database helpers +# ------------------------------------------------------------------ # +def get_db(): + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + return conn + + +def get_counting_date(dt=None, cutoff_str="20:00"): + """Replicate the service logic for determining the counting date.""" + if dt is None: + dt = datetime.now() + cutoff = datetime.strptime(cutoff_str, "%H:%M").time() + #if dt.time() >= cutoff: + if dt.time() < cutoff: + return dt.date().isoformat() + return (dt.date() + timedelta(days=1)).isoformat() + #return (dt.date() - timedelta(days=1)).isoformat() + + +# ------------------------------------------------------------------ # +# Routes +# ------------------------------------------------------------------ # +@app.route('/') +def index(): + """Main dashboard page.""" + return render_template('dashboard.html') + + +@app.route('/api/summary') +def api_summary(): + """Get summary statistics for the dashboard cards.""" + conn = get_db() + cur = conn.cursor() + + # Today's counting date (based on cutoff) + today = get_counting_date() + + # Today's stats + cur.execute(""" + SELECT COALESCE(total_count, 0) as total_count, + COALESCE(total_batches, 0) as total_batches + FROM daily_summaries + WHERE counting_date = ? + """, (today,)) + today_row = cur.fetchone() + + # Yesterday's stats + yesterday = (datetime.strptime(today, "%Y-%m-%d").date() - timedelta(days=1)).isoformat() + cur.execute(""" + SELECT COALESCE(total_count, 0) as total_count, + COALESCE(total_batches, 0) as total_batches + FROM daily_summaries + WHERE counting_date = ? + """, (yesterday,)) + yesterday_row = cur.fetchone() + + # All-time totals + cur.execute(""" + SELECT COALESCE(SUM(total_count), 0) as grand_total, + COALESCE(SUM(total_batches), 0) as grand_batches, + COUNT(DISTINCT counting_date) as total_days + FROM daily_summaries + """) + all_time = cur.fetchone() + + # Average per day + cur.execute(""" + SELECT ROUND(AVG(total_count), 1) as avg_per_day + FROM daily_summaries + """) + avg = cur.fetchone() + + # Best day + cur.execute(""" + SELECT counting_date, total_count + FROM daily_summaries + ORDER BY total_count DESC + LIMIT 1 + """) + best = cur.fetchone() + + conn.close() + + return jsonify({ + 'today': { + 'date': today, + 'total_count': today_row['total_count'] if today_row else 0, + 'total_batches': today_row['total_batches'] if today_row else 0, + }, + 'yesterday': { + 'date': yesterday, + 'total_count': yesterday_row['total_count'] if yesterday_row else 0, + 'total_batches': yesterday_row['total_batches'] if yesterday_row else 0, + }, + 'all_time': { + 'grand_total': all_time['grand_total'], + 'grand_batches': all_time['grand_batches'], + 'total_days': all_time['total_days'], + }, + 'average_per_day': avg['avg_per_day'] or 0, + 'best_day': { + 'date': best['counting_date'] if best else None, + 'count': best['total_count'] if best else 0, + } + }) + + +@app.route('/api/daily-data') +def api_daily_data(): + """Get daily data for charts and table.""" + days = request.args.get('days', 30, type=int) + date_from = (datetime.now() - timedelta(days=days)).date().isoformat() + + conn = get_db() + cur = conn.cursor() + + # Daily summaries for chart + cur.execute(""" + SELECT counting_date, total_count, total_batches, + ROUND(CAST(total_count AS FLOAT) / total_batches, 1) as avg_per_batch + FROM daily_summaries + WHERE counting_date >= ? + ORDER BY counting_date ASC + """, (date_from,)) + + daily_data = [] + for row in cur.fetchall(): + daily_data.append({ + 'date': row['counting_date'], + 'total_count': row['total_count'], + 'total_batches': row['total_batches'], + 'avg_per_batch': row['avg_per_batch'] or 0, + }) + + conn.close() + return jsonify(daily_data) + + +@app.route('/api/day-detail/') +def api_day_detail(date): + """Get detailed batch information for a specific day.""" + conn = get_db() + cur = conn.cursor() + + # Batches for this day + cur.execute(""" + SELECT batch_number, count, start_time, end_time, + ROUND( + (julianday(end_time) - julianday(start_time)) * 24 * 60, 1 + ) as duration_minutes + FROM batches + WHERE counting_date = ? + ORDER BY batch_number ASC + """, (date,)) + + batches = [] + total_duration = 0 + for row in cur.fetchall(): + duration = row['duration_minutes'] or 0 + total_duration += duration + batches.append({ + 'batch_number': row['batch_number'], + 'count': row['count'], + 'start_time': row['start_time'], + 'end_time': row['end_time'], + 'duration_minutes': duration, + }) + + # Summary for the day + cur.execute(""" + SELECT total_count, total_batches + FROM daily_summaries + WHERE counting_date = ? + """, (date,)) + summary = cur.fetchone() + + conn.close() + + return jsonify({ + 'date': date, + 'total_count': summary['total_count'] if summary else 0, + 'total_batches': summary['total_batches'] if summary else 0, + 'total_duration_minutes': round(total_duration, 1), + 'avg_duration_minutes': round(total_duration / len(batches), 1) if batches else 0, + 'batches': batches, + }) + + +@app.route('/api/recent-batches') +def api_recent_batches(): + """Get the most recent batches across all days.""" + limit = request.args.get('limit', 10, type=int) + + conn = get_db() + cur = conn.cursor() + + cur.execute(""" + SELECT counting_date, batch_number, count, start_time, end_time, + ROUND( + (julianday(end_time) - julianday(start_time)) * 24 * 60, 1 + ) as duration_minutes + FROM batches + ORDER BY end_time DESC + LIMIT ? + """, (limit,)) + + batches = [] + for row in cur.fetchall(): + batches.append({ + 'date': row['counting_date'], + 'batch_number': row['batch_number'], + 'count': row['count'], + 'start_time': row['start_time'], + 'end_time': row['end_time'], + 'duration_minutes': row['duration_minutes'] or 0, + }) + + conn.close() + return jsonify(batches) + + +@app.route('/api/available-dates') +def api_available_dates(): + """Get list of all available counting dates.""" + conn = get_db() + cur = conn.cursor() + + cur.execute(""" + SELECT counting_date, total_count, total_batches + FROM daily_summaries + ORDER BY counting_date DESC + """) + + dates = [] + for row in cur.fetchall(): + dates.append({ + 'date': row['counting_date'], + 'total_count': row['total_count'], + 'total_batches': row['total_batches'], + }) + + conn.close() + return jsonify(dates) + + +# ------------------------------------------------------------------ # +# CSV Export Routes +# ------------------------------------------------------------------ # +@app.route('/api/export-daily-csv') +def export_daily_csv(): + """Export daily summary records as CSV.""" + days = request.args.get('days', 30, type=int) + date_from = (datetime.now() - timedelta(days=days)).date().isoformat() + + conn = get_db() + cur = conn.cursor() + + cur.execute(""" + SELECT counting_date, total_count, total_batches, + ROUND(CAST(total_count AS FLOAT) / NULLIF(total_batches, 0), 1) as avg_per_batch + FROM daily_summaries + WHERE counting_date >= ? + ORDER BY counting_date ASC + """, (date_from,)) + + output = StringIO() + writer = csv.writer(output) + writer.writerow(['Date', 'Total Count', 'Total Batches', 'Avg per Batch']) + + for row in cur.fetchall(): + writer.writerow([ + row['counting_date'], + row['total_count'], + row['total_batches'], + row['avg_per_batch'] or 0 + ]) + + conn.close() + + csv_data = output.getvalue() + output.close() + + filename = f"daily_records_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" + return Response( + csv_data, + mimetype='text/csv', + headers={ + 'Content-Disposition': f'attachment; filename={filename}', + 'Content-Type': 'text/csv; charset=utf-8' + } + ) + + +@app.route('/api/export-day-csv/') +def export_day_csv(date): + """Export batch details for a specific day as CSV.""" + conn = get_db() + cur = conn.cursor() + + # Batches for this day + cur.execute(""" + SELECT batch_number, count, start_time, end_time, + ROUND( + (julianday(end_time) - julianday(start_time)) * 24 * 60, 1 + ) as duration_minutes + FROM batches + WHERE counting_date = ? + ORDER BY batch_number ASC + """, (date,)) + + output = StringIO() + writer = csv.writer(output) + writer.writerow([ + 'Batch Number', 'Count', 'Start Time', 'End Time', 'Duration (min)' + ]) + + for row in cur.fetchall(): + writer.writerow([ + row['batch_number'], + row['count'], + row['start_time'], + row['end_time'], + row['duration_minutes'] or 0 + ]) + + conn.close() + + csv_data = output.getvalue() + output.close() + + filename = f"day_detail_{date}.csv" + return Response( + csv_data, + mimetype='text/csv', + headers={ + 'Content-Disposition': f'attachment; filename={filename}', + 'Content-Type': 'text/csv; charset=utf-8' + } + ) + + +# ------------------------------------------------------------------ # +# Run +# ------------------------------------------------------------------ # +if __name__ == '__main__': + # Suppress Flask's default request log spam + WSGIRequestHandler.protocol_version = "HTTP/1.1" + + port = int(os.getenv('DASHBOARD_PORT', 80)) + host = os.getenv('DASHBOARD_HOST', '0.0.0.0') + debug = os.getenv('FLASK_DEBUG', 'false').lower() == 'true' + + print(f"🚀 Dashboard running at http://{host}:{port}") + app.run(host=host, port=port, debug=debug) diff --git a/counter_service-test.py b/counter_service-test.py new file mode 100644 index 0000000..bede965 --- /dev/null +++ b/counter_service-test.py @@ -0,0 +1,503 @@ +#!/usr/bin/env python3 +""" +Frigate Object Batch Counter Service + +Listens to Frigate MQTT events, counts unique objects per batch, +and persists results to SQLite when a batch ends. +""" + +import os +import sys +import json +import sqlite3 +import threading +import time +import logging +import signal +from datetime import datetime, timedelta +from pathlib import Path + +import paho.mqtt.client as mqtt + + +class FrigateCounterService: + def __init__(self): + self.setup_logging() + self.load_config() + self.init_db() + self.state_lock = threading.Lock() + self.batch_timer = None + self.shutdown_event = threading.Event() + self.current_state = self.load_state() + + # Telenan Batch Label + self.ignore_batch_label = False + self.ignore_batch_label_timer = None + # ------------------------------------------------------------------ # + # Setup & Config + # ------------------------------------------------------------------ # + def setup_logging(self): + level = getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO) + logging.basicConfig( + level=level, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + self.logger = logging.getLogger(__name__) + + def load_config(self): + self.mqtt_host = os.getenv("FRIGATE_MQTT_HOST", "localhost") + self.mqtt_port = int(os.getenv("FRIGATE_MQTT_PORT", "1883")) + self.mqtt_user = os.getenv("FRIGATE_MQTT_USER") + self.mqtt_pass = os.getenv("FRIGATE_MQTT_PASS") + self.mqtt_topic = os.getenv("FRIGATE_MQTT_TOPIC", "frigate/events") + + self.camera_name = os.getenv("CAMERA_NAME") + if not self.camera_name: + raise ValueError("Environment variable CAMERA_NAME is required") + + self.object_label = os.getenv("OBJECT_LABEL", "ayam-potong") + self.batch_label = os.getenv("BATCH_LABEL", "telenan") # 20260514 - Adding Label telenan for new batch sign + self.ignore_batch_label_timeout = float(os.getenv("IGNORE_BATCH_LABEL_TIMEOUT_SECONDS", "30")) + self.batch_timeout = float(os.getenv("BATCH_TIMEOUT_SECONDS", "5")) + self.cutoff_time_str = os.getenv("DAILY_CUTOFF_TIME", "17:00") + + # Validate cutoff format HH:MM + datetime.strptime(self.cutoff_time_str, "%H:%M") + + self.db_path = os.getenv("DB_PATH", "/tmp/frigate_counter-test.db") + self.state_file = os.getenv("STATE_FILE", "/tmp/current_batch-test.json") + + self.min_duration_per_batch = int(os.getenv("MIN_DURATION_PER_BATCH", "60")) + # ------------------------------------------------------------------ # + # Database + # ------------------------------------------------------------------ # + def init_db(self): + self.db = sqlite3.connect(self.db_path, check_same_thread=False) + cur = self.db.cursor() + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS batches ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + counting_date TEXT NOT NULL, + batch_number INTEGER NOT NULL, + camera_name TEXT NOT NULL, + object_label TEXT NOT NULL, + count INTEGER NOT NULL, + start_time TEXT NOT NULL, + end_time TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(counting_date, batch_number, camera_name, object_label) + ) + """ + ) + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS daily_summaries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + counting_date TEXT NOT NULL, + camera_name TEXT NOT NULL, + object_label TEXT NOT NULL, + total_count INTEGER NOT NULL DEFAULT 0, + total_batches INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(counting_date, camera_name, object_label) + ) + """ + ) + + self.db.commit() + + # ------------------------------------------------------------------ # + # Counting-date logic (day ends at cutoff, e.g. 17:00) + # ------------------------------------------------------------------ # + def get_counting_date(self, dt=None): + """Return the business-day string that ends at cutoff_time.""" + if dt is None: + dt = datetime.now() + cutoff = datetime.strptime(self.cutoff_time_str, "%H:%M").time() + # e.g. cutoff 17:00 => 16:59 belongs to today, 17:00 belongs to tomorrow + if dt.time() < cutoff: + #if dt.time() >= cutoff: + return dt.date().isoformat() + return (dt.date() + timedelta(days=1)).isoformat() + #return (dt.date() - timedelta(days=1)).isoformat() + + # ------------------------------------------------------------------ # + # State persistence (JSON) – survives restarts + # ------------------------------------------------------------------ # + def load_state(self): + if not Path(self.state_file).exists(): + return None + + try: + with open(self.state_file, "r", encoding="utf-8") as f: + state = json.load(f) + + current_date = self.get_counting_date() + if state.get("counting_date") != current_date: + self.logger.warning( + "State file belongs to previous counting day (%s). " + "Finalizing it before starting fresh.", + state.get("counting_date"), + ) + self._insert_batch( + state["counting_date"], + state["batch_number"], + state["count"], + state["start_time"], + datetime.now().isoformat(), + ) + Path(self.state_file).unlink(missing_ok=True) + return None + + self.logger.info( + "Resumed batch #%s from %s with count=%s", + state["batch_number"], + state["start_time"], + state["count"], + ) + # Restart the inactivity timer + #self._reset_batch_timer() + return state + + except Exception as exc: + self.logger.error("Failed to load state file: %s", exc) + return None + + def save_state(self): + if self.current_state is None: + Path(self.state_file).unlink(missing_ok=True) + return + with open(self.state_file, "w", encoding="utf-8") as f: + json.dump(self.current_state, f, indent=2, ensure_ascii=False) + + # ------------------------------------------------------------------ # + # Batch lifecycle + # ------------------------------------------------------------------ # + def get_next_batch_number(self, counting_date): + cur = self.db.cursor() + cur.execute( + """ + SELECT COALESCE(MAX(batch_number), 0) + FROM batches + WHERE counting_date = ? AND camera_name = ? AND object_label = ? + """, + (counting_date, self.camera_name, self.object_label), + ) + return cur.fetchone()[0] + 1 + + def start_new_batch(self, counting_date): + batch_number = self.get_next_batch_number(counting_date) + now = datetime.now().isoformat() + self.current_state = { + "counting_date": counting_date, + "batch_number": batch_number, + "count": 0, + "start_time": now, + "last_detection_time": now, + "counted_event_ids": [], + } + self.save_state() + self.logger.info( + "Started batch #%s for %s (%s)", + batch_number, + counting_date, + self.object_label, + ) + + def process_detection(self, event_id, label): + """ + Called for every matching Frigate event (new/update/end). + Deduplicates by event_id and resets the 5-minute batch timer. + """ + should_reset_timer = False + + if label == self.object_label: + with self.state_lock: + counting_date = self.get_counting_date() + + # 1. No active batch -> start one + if self.current_state is None: + self.start_new_batch(counting_date) + should_reset_timer = True + + # 2. Cutoff crossed since batch started -> finalize old, start new + elif self.current_state["counting_date"] != counting_date: + self._end_batch_locked() + self.start_new_batch(counting_date) + should_reset_timer = True + + #self.current_state["count"] += 1 + + # 3. Deduplicate event ID + if event_id not in self.current_state["counted_event_ids"]: + self.current_state["count"] += 1 + self.current_state["counted_event_ids"].append(event_id) + self.logger.info( + "Counted %s (event %s) | batch #%s total: %s", + self.object_label, + event_id, + self.current_state["batch_number"], + self.current_state["count"], + ) + + # Always refresh last_detection_time so the batch stays alive + self.current_state["last_detection_time"] = datetime.now().isoformat() + self.save_state() + should_reset_timer = True + + elif label == self.batch_label: + self._ignore_batch_label() + self.end_batch() + + """ + if should_reset_timer: + self._reset_batch_timer() + """ + + def _ignore_batch_label(self): + if not self.ignore_batch_label_timer: + self.ignore_batch_label = True + self.ignore_batch_label_timer = threading.Timer(self.ignore_batch_label_timeout, self._on_ignore_batch_label_timeout) + self.ignore_batch_label_timer.daemon = True + self.ignore_batch_label_timer.start() + self.logger.info("Ignore Batch Label for %ss. self.ignore_batch_label = $s", self.ignore_batch_label_timeout, self.ignore_batch_label) + + def _reset_batch_timer(self): + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = threading.Timer(self.batch_timeout, self._on_batch_timeout) + self.batch_timer.daemon = True + self.batch_timer.start() + + def _on_ignore_batch_label_timeout(self): + self.ignore_batch_label = False + self.logger.info("Ignore Batch Label is done. self.ignore_batch_label = %s", self.ignore_batch_label) + self.end_batch() + + def _on_batch_timeout(self): + self.logger.info("Batch inactivity timeout (%ss) reached", self.batch_timeout) + self.end_batch() + + def end_batch(self): + with self.state_lock: + self._end_batch_locked() + + def _end_batch_locked(self): + if self.current_state is None: + return + + state = self.current_state + + """ Check Duration """ + start_time_obj = datetime.fromisoformat(state["start_time"]) + end_time_obj = datetime.now() + duration = end_time_obj - start_time_obj + duration_seconds = duration.total_seconds() + + if state["count"] == 0 or duration_seconds < self.min_duration_per_batch: + # Nothing to persist + self.current_state = None + self.save_state() + + """ + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = None + """ + + return + + end_time = datetime.now().isoformat() + + count_per_second = state["count"] / duration_seconds + + try: + self._insert_batch( + state["counting_date"], + state["batch_number"], + state["count"], + state["start_time"], + end_time, + ) + self.logger.info( + "Batch #%s ended | count=%s | duration=%s | cps=%s", + state["batch_number"], + state["count"], + duration, + count_per_second + ) + except Exception as exc: + self.logger.error("Failed to persist batch: %s", exc) + # Leave state intact so we can retry on next timeout + return + + self.current_state = None + self.save_state() + """ + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = None + """ + + def _insert_batch(self, counting_date, batch_number, count, start_time, end_time): + """Atomic insert into batches + upsert daily summary.""" + cur = self.db.cursor() + + cur.execute( + """ + INSERT INTO batches + (counting_date, batch_number, camera_name, object_label, count, start_time, end_time) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + counting_date, + batch_number, + self.camera_name, + self.object_label, + count, + start_time, + end_time, + ), + ) + + cur.execute( + """ + INSERT INTO daily_summaries + (counting_date, camera_name, object_label, total_count, total_batches) + VALUES (?, ?, ?, ?, 1) + ON CONFLICT(counting_date, camera_name, object_label) + DO UPDATE SET + total_count = total_count + excluded.total_count, + total_batches = total_batches + excluded.total_batches, + updated_at = CURRENT_TIMESTAMP + """, + (counting_date, self.camera_name, self.object_label, count), + ) + + self.db.commit() + + # Log running totals for the day + cur.execute( + """ + SELECT total_count, total_batches + FROM daily_summaries + WHERE counting_date = ? AND camera_name = ? AND object_label = ? + """, + (counting_date, self.camera_name, self.object_label), + ) + row = cur.fetchone() + if row: + self.logger.info( + "Daily totals for %s: %s objects across %s batch(es)", + counting_date, + row[0], + row[1], + ) + + # ------------------------------------------------------------------ # + # Cutoff watcher (forces batch end at 17:00 etc.) + # ------------------------------------------------------------------ # + def cutoff_watcher(self): + """Runs every minute to force-close a batch when the business day rolls over.""" + while not self.shutdown_event.is_set(): + time.sleep(60) + with self.state_lock: + if self.current_state is None: + continue + if self.current_state["counting_date"] != self.get_counting_date(): + self.logger.info("Daily cutoff reached – finalizing batch") + self._end_batch_locked() + + # ------------------------------------------------------------------ # + # MQTT callbacks + # ------------------------------------------------------------------ # + def on_connect(self, client, userdata, flags, rc): + if rc == 0: + self.logger.info("MQTT connected to %s:%s", self.mqtt_host, self.mqtt_port) + client.subscribe(self.mqtt_topic) + self.logger.info("Subscribed to %s", self.mqtt_topic) + else: + self.logger.error("MQTT connection failed, code=%s", rc) + + def on_message(self, client, userdata, msg): + try: + payload = json.loads(msg.payload.decode("utf-8")) + after = payload.get("after", {}) + + """ 20260514 For Telenan """ + label = after.get("label") + + if after.get("camera") != self.camera_name: + return + + #if after.get("label") != self.object_label: + if label != self.object_label: + return + + if label != self.batch_label and self.ignore_batch_label: + return + + event_id = after.get("id") + if not event_id: + return + + self.process_detection(event_id, label) + + except json.JSONDecodeError: + self.logger.warning("Received non-JSON payload on %s", msg.topic) + except Exception as exc: + self.logger.exception("Error handling MQTT message: %s", exc) + + # ------------------------------------------------------------------ # + # Run / Shutdown + # ------------------------------------------------------------------ # + def run(self): + # Graceful shutdown on SIGINT / SIGTERM + signal.signal(signal.SIGINT, lambda _s, _f: self.shutdown()) + signal.signal(signal.SIGTERM, lambda _s, _f: self.shutdown()) + + # Support both paho-mqtt v1 and v2 + try: + self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1) + except (AttributeError, TypeError): + self.client = mqtt.Client() + + if self.mqtt_user and self.mqtt_pass: + self.client.username_pw_set(self.mqtt_user, self.mqtt_pass) + + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + + # Start background cutoff watcher + watcher = threading.Thread(target=self.cutoff_watcher, daemon=True) + watcher.start() + + try: + self.client.connect(self.mqtt_host, self.mqtt_port, keepalive=60) + self.client.loop_forever() + except Exception as exc: + self.logger.error("MQTT loop error: %s", exc) + finally: + self.shutdown() + + def shutdown(self): + if self.shutdown_event.is_set(): + return + self.logger.info("Shutting down...") + self.shutdown_event.set() + try: + self.client.disconnect() + except Exception: + pass + self.end_batch() + self.db.close() + self.logger.info("Shutdown complete") + + +if __name__ == "__main__": + service = FrigateCounterService() + service.run() diff --git a/counter_service.py b/counter_service.py new file mode 100644 index 0000000..6ea13ab --- /dev/null +++ b/counter_service.py @@ -0,0 +1,519 @@ +#!/usr/bin/env python3 +""" +Frigate Object Batch Counter Service + +Listens to Frigate MQTT events, counts unique objects per batch, +and persists results to SQLite when a batch ends. +""" + +import os +import sys +import json +import sqlite3 +import threading +import time +import logging +import signal +from datetime import datetime, timedelta +from pathlib import Path + +import paho.mqtt.client as mqtt + + +class FrigateCounterService: + def __init__(self): + self.setup_logging() + self.load_config() + self.init_db() + self.state_lock = threading.Lock() + self.batch_timer = None + self.shutdown_event = threading.Event() + self.current_state = self.load_state() + + # Telenan Batch Label + self.ignore_batch_label = False + self.ignore_batch_label_timer = None + self.sleep_after_batch_label_detected = 10 + # ------------------------------------------------------------------ # + # Setup & Config + # ------------------------------------------------------------------ # + def setup_logging(self): + level = getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO) + logging.basicConfig( + level=level, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + self.logger = logging.getLogger(__name__) + + def load_config(self): + self.mqtt_host = os.getenv("FRIGATE_MQTT_HOST", "localhost") + self.mqtt_port = int(os.getenv("FRIGATE_MQTT_PORT", "1883")) + self.mqtt_user = os.getenv("FRIGATE_MQTT_USER") + self.mqtt_pass = os.getenv("FRIGATE_MQTT_PASS") + self.mqtt_topic = os.getenv("FRIGATE_MQTT_TOPIC", "frigate/events") + + self.camera_name = os.getenv("CAMERA_NAME") + if not self.camera_name: + raise ValueError("Environment variable CAMERA_NAME is required") + + self.object_label = os.getenv("OBJECT_LABEL", "ayam-potong") + self.batch_label = os.getenv("BATCH_LABEL", "telenan") # 20260514 - Adding Label telenan for new batch sign + self.ignore_batch_label_timeout = float(os.getenv("IGNORE_BATCH_LABEL_TIMEOUT_SECONDS", "30")) + self.batch_timeout = float(os.getenv("BATCH_TIMEOUT_SECONDS", "300")) + self.cutoff_time_str = os.getenv("DAILY_CUTOFF_TIME", "17:00") + + # Validate cutoff format HH:MM + datetime.strptime(self.cutoff_time_str, "%H:%M") + + self.db_path = os.getenv("DB_PATH", "frigate_counter.db") + self.state_file = os.getenv("STATE_FILE", "current_batch.json") + + self.min_duration_per_batch = int(os.getenv("MIN_DURATION_PER_BATCH", "60")) + self.min_object_per_batch = int(os.getenv("MIN_OBJECT_PER_BATCH", "60")) + # ------------------------------------------------------------------ # + # Database + # ------------------------------------------------------------------ # + def init_db(self): + self.db = sqlite3.connect(self.db_path, check_same_thread=False) + cur = self.db.cursor() + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS batches ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + counting_date TEXT NOT NULL, + batch_number INTEGER NOT NULL, + camera_name TEXT NOT NULL, + object_label TEXT NOT NULL, + count INTEGER NOT NULL, + start_time TEXT NOT NULL, + end_time TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(counting_date, batch_number, camera_name, object_label) + ) + """ + ) + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS daily_summaries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + counting_date TEXT NOT NULL, + camera_name TEXT NOT NULL, + object_label TEXT NOT NULL, + total_count INTEGER NOT NULL DEFAULT 0, + total_batches INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(counting_date, camera_name, object_label) + ) + """ + ) + + self.db.commit() + + # ------------------------------------------------------------------ # + # Counting-date logic (day ends at cutoff, e.g. 17:00) + # ------------------------------------------------------------------ # + def get_counting_date(self, dt=None): + """Return the business-day string that ends at cutoff_time.""" + if dt is None: + dt = datetime.now() + cutoff = datetime.strptime(self.cutoff_time_str, "%H:%M").time() + # e.g. cutoff 17:00 => 16:59 belongs to today, 17:00 belongs to tomorrow + if dt.time() < cutoff: + #if dt.time() >= cutoff: + return dt.date().isoformat() + return (dt.date() + timedelta(days=1)).isoformat() + #return (dt.date() - timedelta(days=1)).isoformat() + + # ------------------------------------------------------------------ # + # State persistence (JSON) – survives restarts + # ------------------------------------------------------------------ # + def load_state(self): + if not Path(self.state_file).exists(): + return None + + try: + with open(self.state_file, "r", encoding="utf-8") as f: + state = json.load(f) + + current_date = self.get_counting_date() + if state.get("counting_date") != current_date: + self.logger.warning( + "State file belongs to previous counting day (%s). " + "Finalizing it before starting fresh.", + state.get("counting_date"), + ) + self._insert_batch( + state["counting_date"], + state["batch_number"], + state["count"], + state["start_time"], + datetime.now().isoformat(), + ) + Path(self.state_file).unlink(missing_ok=True) + return None + + self.logger.info( + "Resumed batch #%s from %s with count=%s", + state["batch_number"], + state["start_time"], + state["count"], + ) + # Restart the inactivity timer + """ + Comment this to remove reset timer + """ + self._reset_batch_timer() + return state + + except Exception as exc: + self.logger.error("Failed to load state file: %s", exc) + return None + + def save_state(self): + if self.current_state is None: + Path(self.state_file).unlink(missing_ok=True) + return + with open(self.state_file, "w", encoding="utf-8") as f: + json.dump(self.current_state, f, indent=2, ensure_ascii=False) + + # ------------------------------------------------------------------ # + # Batch lifecycle + # ------------------------------------------------------------------ # + def get_next_batch_number(self, counting_date): + cur = self.db.cursor() + cur.execute( + """ + SELECT COALESCE(MAX(batch_number), 0) + FROM batches + WHERE counting_date = ? AND camera_name = ? AND object_label = ? + """, + (counting_date, self.camera_name, self.object_label), + ) + return cur.fetchone()[0] + 1 + + def start_new_batch(self, counting_date): + batch_number = self.get_next_batch_number(counting_date) + now = datetime.now().isoformat() + self.current_state = { + "counting_date": counting_date, + "batch_number": batch_number, + "count": 0, + "start_time": now, + "last_detection_time": now, + "counted_event_ids": [], + } + self.save_state() + self.logger.info( + "Started batch #%s for %s (%s)", + batch_number, + counting_date, + self.object_label, + ) + + def process_detection(self, event_id, label): + """ + Called for every matching Frigate event (new/update/end). + Deduplicates by event_id and resets the 5-minute batch timer. + """ + should_reset_timer = False + + if label == self.object_label: + with self.state_lock: + counting_date = self.get_counting_date() + + # 1. No active batch -> start one + if self.current_state is None: + self.start_new_batch(counting_date) + should_reset_timer = True + + # 2. Cutoff crossed since batch started -> finalize old, start new + elif self.current_state["counting_date"] != counting_date: + self._end_batch_locked() + self.start_new_batch(counting_date) + should_reset_timer = True + + # 3. Deduplicate event ID + if event_id not in self.current_state["counted_event_ids"]: + self.current_state["count"] += 1 + self.current_state["counted_event_ids"].append(event_id) + self.logger.info( + "Counted %s (event %s) | batch #%s total: %s", + self.object_label, + event_id, + self.current_state["batch_number"], + self.current_state["count"], + ) + + # Always refresh last_detection_time so the batch stays alive + self.current_state["last_detection_time"] = datetime.now().isoformat() + self.save_state() + should_reset_timer = True + + elif label == self.batch_label: + self._ignore_batch_label() + self.end_batch() + + self.logger.info("Batch Label detected. Sleep for %s seconds.", self.sleep_after_batch_label_detected) + time.sleep(self.sleep_after_batch_label_detected) + + should_reset_timer = True + + """ + Comment this to remove reset timer + """ + if should_reset_timer: + self._reset_batch_timer() + + def _ignore_batch_label(self): + if not self.ignore_batch_label_timer: + self.ignore_batch_label = True + self.ignore_batch_label_timer = threading.Timer(self.ignore_batch_label_timeout, self._on_ignore_batch_label_timeout) + self.ignore_batch_label_timer.daemon = True + self.ignore_batch_label_timer.start() + self.logger.info("Ignore Batch Label for %ss. self.ignore_batch_label = %s", self.ignore_batch_label_timeout, self.ignore_batch_label) + + def _reset_batch_timer(self): + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = threading.Timer(self.batch_timeout, self._on_batch_timeout) + self.batch_timer.daemon = True + self.batch_timer.start() + + def _on_ignore_batch_label_timeout(self): + self.ignore_batch_label_timer = None + self.ignore_batch_label = False + self.logger.info("Ignore Batch Label is done. self.ignore_batch_label = %s", self.ignore_batch_label) + #self.end_batch() + + def _on_batch_timeout(self): + self.logger.info("Batch inactivity timeout (%ss) reached", self.batch_timeout) + self.end_batch() + + def end_batch(self): + with self.state_lock: + self._end_batch_locked() + + def _end_batch_locked(self): + if self.current_state is None: + return + + state = self.current_state + + """ Check Duration """ + start_time_obj = datetime.fromisoformat(state["start_time"]) + end_time_obj = datetime.now() + duration = end_time_obj - start_time_obj + duration_seconds = duration.total_seconds() + + # Minimal conditions per batch + #if state["count"] == 0 or duration_seconds < self.min_duration_per_batch: + if state["count"] < self.min_object_per_batch or duration_seconds < self.min_duration_per_batch: + # Nothing to persist + self.current_state = None + self.save_state() + + """ + Related to _reset_batch_timer + """ + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = None + + return + + end_time = datetime.now().isoformat() + + count_per_second = state["count"] / duration_seconds + + try: + self._insert_batch( + state["counting_date"], + state["batch_number"], + state["count"], + state["start_time"], + end_time, + ) + self.logger.info( + "Batch #%s ended | count=%s | duration=%s | cps=%s", + state["batch_number"], + state["count"], + duration, + count_per_second + ) + except Exception as exc: + self.logger.error("Failed to persist batch: %s", exc) + # Leave state intact so we can retry on next timeout + return + + self.current_state = None + self.save_state() + """ + Related to _reset_batch_timer + """ + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = None + + def _insert_batch(self, counting_date, batch_number, count, start_time, end_time): + """Atomic insert into batches + upsert daily summary.""" + cur = self.db.cursor() + + cur.execute( + """ + INSERT INTO batches + (counting_date, batch_number, camera_name, object_label, count, start_time, end_time) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + counting_date, + batch_number, + self.camera_name, + self.object_label, + count, + start_time, + end_time, + ), + ) + + cur.execute( + """ + INSERT INTO daily_summaries + (counting_date, camera_name, object_label, total_count, total_batches) + VALUES (?, ?, ?, ?, 1) + ON CONFLICT(counting_date, camera_name, object_label) + DO UPDATE SET + total_count = total_count + excluded.total_count, + total_batches = total_batches + excluded.total_batches, + updated_at = CURRENT_TIMESTAMP + """, + (counting_date, self.camera_name, self.object_label, count), + ) + + self.db.commit() + + # Log running totals for the day + cur.execute( + """ + SELECT total_count, total_batches + FROM daily_summaries + WHERE counting_date = ? AND camera_name = ? AND object_label = ? + """, + (counting_date, self.camera_name, self.object_label), + ) + row = cur.fetchone() + if row: + self.logger.info( + "Daily totals for %s: %s objects across %s batch(es)", + counting_date, + row[0], + row[1], + ) + + # ------------------------------------------------------------------ # + # Cutoff watcher (forces batch end at 17:00 etc.) + # ------------------------------------------------------------------ # + def cutoff_watcher(self): + """Runs every minute to force-close a batch when the business day rolls over.""" + while not self.shutdown_event.is_set(): + time.sleep(60) + with self.state_lock: + if self.current_state is None: + continue + if self.current_state["counting_date"] != self.get_counting_date(): + self.logger.info("Daily cutoff reached – finalizing batch") + self._end_batch_locked() + + # ------------------------------------------------------------------ # + # MQTT callbacks + # ------------------------------------------------------------------ # + def on_connect(self, client, userdata, flags, rc): + if rc == 0: + self.logger.info("MQTT connected to %s:%s", self.mqtt_host, self.mqtt_port) + client.subscribe(self.mqtt_topic) + self.logger.info("Subscribed to %s", self.mqtt_topic) + else: + self.logger.error("MQTT connection failed, code=%s", rc) + + def on_message(self, client, userdata, msg): + try: + payload = json.loads(msg.payload.decode("utf-8")) + after = payload.get("after", {}) + + """ 20260514 For Telenan """ + label = after.get("label") + camera = after.get("camera") + + if camera != self.camera_name: + return + + #if after.get("label") != self.object_label: + if label != self.object_label: + if label == self.batch_label and self.ignore_batch_label: + return + + #if label != self.batch_label and self.ignore_batch_label: + # return + + event_id = after.get("id") + if not event_id: + return + + self.process_detection(event_id, label) + + except json.JSONDecodeError: + self.logger.warning("Received non-JSON payload on %s", msg.topic) + except Exception as exc: + self.logger.exception("Error handling MQTT message: %s", exc) + + # ------------------------------------------------------------------ # + # Run / Shutdown + # ------------------------------------------------------------------ # + def run(self): + # Graceful shutdown on SIGINT / SIGTERM + signal.signal(signal.SIGINT, lambda _s, _f: self.shutdown()) + signal.signal(signal.SIGTERM, lambda _s, _f: self.shutdown()) + + # Support both paho-mqtt v1 and v2 + try: + self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1) + except (AttributeError, TypeError): + self.client = mqtt.Client() + + if self.mqtt_user and self.mqtt_pass: + self.client.username_pw_set(self.mqtt_user, self.mqtt_pass) + + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + + # Start background cutoff watcher + watcher = threading.Thread(target=self.cutoff_watcher, daemon=True) + watcher.start() + + try: + self.client.connect(self.mqtt_host, self.mqtt_port, keepalive=60) + self.client.loop_forever() + except Exception as exc: + self.logger.error("MQTT loop error: %s", exc) + finally: + self.shutdown() + + def shutdown(self): + if self.shutdown_event.is_set(): + return + self.logger.info("Shutting down...") + self.shutdown_event.set() + try: + self.client.disconnect() + except Exception: + pass + self.end_batch() + self.db.close() + self.logger.info("Shutdown complete") + + +if __name__ == "__main__": + service = FrigateCounterService() + service.run() diff --git a/counter_service.py.20260514 b/counter_service.py.20260514 new file mode 100644 index 0000000..e03a336 --- /dev/null +++ b/counter_service.py.20260514 @@ -0,0 +1,464 @@ +#!/usr/bin/env python3 +""" +Frigate Object Batch Counter Service + +Listens to Frigate MQTT events, counts unique objects per batch, +and persists results to SQLite when a batch ends. +""" + +import os +import sys +import json +import sqlite3 +import threading +import time +import logging +import signal +from datetime import datetime, timedelta +from pathlib import Path + +import paho.mqtt.client as mqtt + + +class FrigateCounterService: + def __init__(self): + self.setup_logging() + self.load_config() + self.init_db() + self.state_lock = threading.Lock() + self.batch_timer = None + self.shutdown_event = threading.Event() + self.current_state = self.load_state() + + # ------------------------------------------------------------------ # + # Setup & Config + # ------------------------------------------------------------------ # + def setup_logging(self): + level = getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO) + logging.basicConfig( + level=level, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + self.logger = logging.getLogger(__name__) + + def load_config(self): + self.mqtt_host = os.getenv("FRIGATE_MQTT_HOST", "localhost") + self.mqtt_port = int(os.getenv("FRIGATE_MQTT_PORT", "1883")) + self.mqtt_user = os.getenv("FRIGATE_MQTT_USER") + self.mqtt_pass = os.getenv("FRIGATE_MQTT_PASS") + self.mqtt_topic = os.getenv("FRIGATE_MQTT_TOPIC", "frigate/events") + + self.camera_name = os.getenv("CAMERA_NAME") + if not self.camera_name: + raise ValueError("Environment variable CAMERA_NAME is required") + + self.object_label = os.getenv("OBJECT_LABEL", "ayam-potong") + self.batch_timeout = float(os.getenv("BATCH_TIMEOUT_SECONDS", "5")) + self.cutoff_time_str = os.getenv("DAILY_CUTOFF_TIME", "17:00") + + # Validate cutoff format HH:MM + datetime.strptime(self.cutoff_time_str, "%H:%M") + + self.db_path = os.getenv("DB_PATH", "frigate_counter.db") + self.state_file = os.getenv("STATE_FILE", "current_batch.json") + + self.min_duration_per_batch = int(os.getenv("MIN_DURATION_PER_BATCH", "60")) + # ------------------------------------------------------------------ # + # Database + # ------------------------------------------------------------------ # + def init_db(self): + self.db = sqlite3.connect(self.db_path, check_same_thread=False) + cur = self.db.cursor() + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS batches ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + counting_date TEXT NOT NULL, + batch_number INTEGER NOT NULL, + camera_name TEXT NOT NULL, + object_label TEXT NOT NULL, + count INTEGER NOT NULL, + start_time TEXT NOT NULL, + end_time TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(counting_date, batch_number, camera_name, object_label) + ) + """ + ) + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS daily_summaries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + counting_date TEXT NOT NULL, + camera_name TEXT NOT NULL, + object_label TEXT NOT NULL, + total_count INTEGER NOT NULL DEFAULT 0, + total_batches INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(counting_date, camera_name, object_label) + ) + """ + ) + + self.db.commit() + + # ------------------------------------------------------------------ # + # Counting-date logic (day ends at cutoff, e.g. 17:00) + # ------------------------------------------------------------------ # + def get_counting_date(self, dt=None): + """Return the business-day string that ends at cutoff_time.""" + if dt is None: + dt = datetime.now() + cutoff = datetime.strptime(self.cutoff_time_str, "%H:%M").time() + # e.g. cutoff 17:00 => 16:59 belongs to today, 17:00 belongs to tomorrow + if dt.time() < cutoff: + #if dt.time() >= cutoff: + return dt.date().isoformat() + return (dt.date() + timedelta(days=1)).isoformat() + #return (dt.date() - timedelta(days=1)).isoformat() + + # ------------------------------------------------------------------ # + # State persistence (JSON) – survives restarts + # ------------------------------------------------------------------ # + def load_state(self): + if not Path(self.state_file).exists(): + return None + + try: + with open(self.state_file, "r", encoding="utf-8") as f: + state = json.load(f) + + current_date = self.get_counting_date() + if state.get("counting_date") != current_date: + self.logger.warning( + "State file belongs to previous counting day (%s). " + "Finalizing it before starting fresh.", + state.get("counting_date"), + ) + self._insert_batch( + state["counting_date"], + state["batch_number"], + state["count"], + state["start_time"], + datetime.now().isoformat(), + ) + Path(self.state_file).unlink(missing_ok=True) + return None + + self.logger.info( + "Resumed batch #%s from %s with count=%s", + state["batch_number"], + state["start_time"], + state["count"], + ) + # Restart the inactivity timer + self._reset_batch_timer() + return state + + except Exception as exc: + self.logger.error("Failed to load state file: %s", exc) + return None + + def save_state(self): + if self.current_state is None: + Path(self.state_file).unlink(missing_ok=True) + return + with open(self.state_file, "w", encoding="utf-8") as f: + json.dump(self.current_state, f, indent=2, ensure_ascii=False) + + # ------------------------------------------------------------------ # + # Batch lifecycle + # ------------------------------------------------------------------ # + def get_next_batch_number(self, counting_date): + cur = self.db.cursor() + cur.execute( + """ + SELECT COALESCE(MAX(batch_number), 0) + FROM batches + WHERE counting_date = ? AND camera_name = ? AND object_label = ? + """, + (counting_date, self.camera_name, self.object_label), + ) + return cur.fetchone()[0] + 1 + + def start_new_batch(self, counting_date): + batch_number = self.get_next_batch_number(counting_date) + now = datetime.now().isoformat() + self.current_state = { + "counting_date": counting_date, + "batch_number": batch_number, + "count": 0, + "start_time": now, + "last_detection_time": now, + "counted_event_ids": [], + } + self.save_state() + self.logger.info( + "Started batch #%s for %s (%s)", + batch_number, + counting_date, + self.object_label, + ) + + def process_detection(self, event_id): + """ + Called for every matching Frigate event (new/update/end). + Deduplicates by event_id and resets the 5-minute batch timer. + """ + should_reset_timer = False + + with self.state_lock: + counting_date = self.get_counting_date() + + # 1. No active batch -> start one + if self.current_state is None: + self.start_new_batch(counting_date) + should_reset_timer = True + + # 2. Cutoff crossed since batch started -> finalize old, start new + elif self.current_state["counting_date"] != counting_date: + self._end_batch_locked() + self.start_new_batch(counting_date) + should_reset_timer = True + + #self.current_state["count"] += 1 + + # 3. Deduplicate event ID + if event_id not in self.current_state["counted_event_ids"]: + self.current_state["count"] += 1 + self.current_state["counted_event_ids"].append(event_id) + self.logger.info( + "Counted %s (event %s) | batch #%s total: %s", + self.object_label, + event_id, + self.current_state["batch_number"], + self.current_state["count"], + ) + + # Always refresh last_detection_time so the batch stays alive + self.current_state["last_detection_time"] = datetime.now().isoformat() + self.save_state() + should_reset_timer = True + + if should_reset_timer: + self._reset_batch_timer() + + def _reset_batch_timer(self): + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = threading.Timer(self.batch_timeout, self._on_batch_timeout) + self.batch_timer.daemon = True + self.batch_timer.start() + + def _on_batch_timeout(self): + self.logger.info("Batch inactivity timeout (%ss) reached", self.batch_timeout) + self.end_batch() + + def end_batch(self): + with self.state_lock: + self._end_batch_locked() + + def _end_batch_locked(self): + if self.current_state is None: + return + + state = self.current_state + + """ Check Duration """ + start_time_obj = datetime.fromisoformat(state["start_time"]) + end_time_obj = datetime.now() + duration = end_time_obj - start_time_obj + duration_seconds = duration.total_seconds() + + if state["count"] == 0 or duration_seconds < self.min_duration_per_batch: + # Nothing to persist + self.current_state = None + self.save_state() + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = None + return + + end_time = datetime.now().isoformat() + + count_per_second = state["count"] / duration_seconds + + try: + self._insert_batch( + state["counting_date"], + state["batch_number"], + state["count"], + state["start_time"], + end_time, + ) + self.logger.info( + "Batch #%s ended | count=%s | duration=%s | cps=%s", + state["batch_number"], + state["count"], + duration, + count_per_second + ) + except Exception as exc: + self.logger.error("Failed to persist batch: %s", exc) + # Leave state intact so we can retry on next timeout + return + + self.current_state = None + self.save_state() + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = None + + def _insert_batch(self, counting_date, batch_number, count, start_time, end_time): + """Atomic insert into batches + upsert daily summary.""" + cur = self.db.cursor() + + cur.execute( + """ + INSERT INTO batches + (counting_date, batch_number, camera_name, object_label, count, start_time, end_time) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + counting_date, + batch_number, + self.camera_name, + self.object_label, + count, + start_time, + end_time, + ), + ) + + cur.execute( + """ + INSERT INTO daily_summaries + (counting_date, camera_name, object_label, total_count, total_batches) + VALUES (?, ?, ?, ?, 1) + ON CONFLICT(counting_date, camera_name, object_label) + DO UPDATE SET + total_count = total_count + excluded.total_count, + total_batches = total_batches + excluded.total_batches, + updated_at = CURRENT_TIMESTAMP + """, + (counting_date, self.camera_name, self.object_label, count), + ) + + self.db.commit() + + # Log running totals for the day + cur.execute( + """ + SELECT total_count, total_batches + FROM daily_summaries + WHERE counting_date = ? AND camera_name = ? AND object_label = ? + """, + (counting_date, self.camera_name, self.object_label), + ) + row = cur.fetchone() + if row: + self.logger.info( + "Daily totals for %s: %s objects across %s batch(es)", + counting_date, + row[0], + row[1], + ) + + # ------------------------------------------------------------------ # + # Cutoff watcher (forces batch end at 17:00 etc.) + # ------------------------------------------------------------------ # + def cutoff_watcher(self): + """Runs every minute to force-close a batch when the business day rolls over.""" + while not self.shutdown_event.is_set(): + time.sleep(60) + with self.state_lock: + if self.current_state is None: + continue + if self.current_state["counting_date"] != self.get_counting_date(): + self.logger.info("Daily cutoff reached – finalizing batch") + self._end_batch_locked() + + # ------------------------------------------------------------------ # + # MQTT callbacks + # ------------------------------------------------------------------ # + def on_connect(self, client, userdata, flags, rc): + if rc == 0: + self.logger.info("MQTT connected to %s:%s", self.mqtt_host, self.mqtt_port) + client.subscribe(self.mqtt_topic) + self.logger.info("Subscribed to %s", self.mqtt_topic) + else: + self.logger.error("MQTT connection failed, code=%s", rc) + + def on_message(self, client, userdata, msg): + try: + payload = json.loads(msg.payload.decode("utf-8")) + after = payload.get("after", {}) + + if after.get("camera") != self.camera_name: + return + if after.get("label") != self.object_label: + return + + event_id = after.get("id") + if not event_id: + return + + self.process_detection(event_id) + + except json.JSONDecodeError: + self.logger.warning("Received non-JSON payload on %s", msg.topic) + except Exception as exc: + self.logger.exception("Error handling MQTT message: %s", exc) + + # ------------------------------------------------------------------ # + # Run / Shutdown + # ------------------------------------------------------------------ # + def run(self): + # Graceful shutdown on SIGINT / SIGTERM + signal.signal(signal.SIGINT, lambda _s, _f: self.shutdown()) + signal.signal(signal.SIGTERM, lambda _s, _f: self.shutdown()) + + # Support both paho-mqtt v1 and v2 + try: + self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1) + except (AttributeError, TypeError): + self.client = mqtt.Client() + + if self.mqtt_user and self.mqtt_pass: + self.client.username_pw_set(self.mqtt_user, self.mqtt_pass) + + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + + # Start background cutoff watcher + watcher = threading.Thread(target=self.cutoff_watcher, daemon=True) + watcher.start() + + try: + self.client.connect(self.mqtt_host, self.mqtt_port, keepalive=60) + self.client.loop_forever() + except Exception as exc: + self.logger.error("MQTT loop error: %s", exc) + finally: + self.shutdown() + + def shutdown(self): + if self.shutdown_event.is_set(): + return + self.logger.info("Shutting down...") + self.shutdown_event.set() + try: + self.client.disconnect() + except Exception: + pass + self.end_batch() + self.db.close() + self.logger.info("Shutdown complete") + + +if __name__ == "__main__": + service = FrigateCounterService() + service.run() diff --git a/counter_service.py.20260518 b/counter_service.py.20260518 new file mode 100644 index 0000000..ebb20c4 --- /dev/null +++ b/counter_service.py.20260518 @@ -0,0 +1,513 @@ +#!/usr/bin/env python3 +""" +Frigate Object Batch Counter Service + +Listens to Frigate MQTT events, counts unique objects per batch, +and persists results to SQLite when a batch ends. +""" + +import os +import sys +import json +import sqlite3 +import threading +import time +import logging +import signal +from datetime import datetime, timedelta +from pathlib import Path + +import paho.mqtt.client as mqtt + + +class FrigateCounterService: + def __init__(self): + self.setup_logging() + self.load_config() + self.init_db() + self.state_lock = threading.Lock() + self.batch_timer = None + self.shutdown_event = threading.Event() + self.current_state = self.load_state() + + # Telenan Batch Label + self.ignore_batch_label = False + self.ignore_batch_label_timer = None + # ------------------------------------------------------------------ # + # Setup & Config + # ------------------------------------------------------------------ # + def setup_logging(self): + level = getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO) + logging.basicConfig( + level=level, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + self.logger = logging.getLogger(__name__) + + def load_config(self): + self.mqtt_host = os.getenv("FRIGATE_MQTT_HOST", "localhost") + self.mqtt_port = int(os.getenv("FRIGATE_MQTT_PORT", "1883")) + self.mqtt_user = os.getenv("FRIGATE_MQTT_USER") + self.mqtt_pass = os.getenv("FRIGATE_MQTT_PASS") + self.mqtt_topic = os.getenv("FRIGATE_MQTT_TOPIC", "frigate/events") + + self.camera_name = os.getenv("CAMERA_NAME") + if not self.camera_name: + raise ValueError("Environment variable CAMERA_NAME is required") + + self.object_label = os.getenv("OBJECT_LABEL", "ayam-potong") + self.batch_label = os.getenv("BATCH_LABEL", "telenan") # 20260514 - Adding Label telenan for new batch sign + self.ignore_batch_label_timeout = float(os.getenv("IGNORE_BATCH_LABEL_TIMEOUT_SECONDS", "30")) + self.batch_timeout = float(os.getenv("BATCH_TIMEOUT_SECONDS", "300")) + self.cutoff_time_str = os.getenv("DAILY_CUTOFF_TIME", "17:00") + + # Validate cutoff format HH:MM + datetime.strptime(self.cutoff_time_str, "%H:%M") + + self.db_path = os.getenv("DB_PATH", "frigate_counter.db") + self.state_file = os.getenv("STATE_FILE", "current_batch.json") + + self.min_duration_per_batch = int(os.getenv("MIN_DURATION_PER_BATCH", "60")) + self.min_object_per_batch = int(os.getenv("MIN_OBJECT_PER_BATCH", "60")) + # ------------------------------------------------------------------ # + # Database + # ------------------------------------------------------------------ # + def init_db(self): + self.db = sqlite3.connect(self.db_path, check_same_thread=False) + cur = self.db.cursor() + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS batches ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + counting_date TEXT NOT NULL, + batch_number INTEGER NOT NULL, + camera_name TEXT NOT NULL, + object_label TEXT NOT NULL, + count INTEGER NOT NULL, + start_time TEXT NOT NULL, + end_time TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(counting_date, batch_number, camera_name, object_label) + ) + """ + ) + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS daily_summaries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + counting_date TEXT NOT NULL, + camera_name TEXT NOT NULL, + object_label TEXT NOT NULL, + total_count INTEGER NOT NULL DEFAULT 0, + total_batches INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(counting_date, camera_name, object_label) + ) + """ + ) + + self.db.commit() + + # ------------------------------------------------------------------ # + # Counting-date logic (day ends at cutoff, e.g. 17:00) + # ------------------------------------------------------------------ # + def get_counting_date(self, dt=None): + """Return the business-day string that ends at cutoff_time.""" + if dt is None: + dt = datetime.now() + cutoff = datetime.strptime(self.cutoff_time_str, "%H:%M").time() + # e.g. cutoff 17:00 => 16:59 belongs to today, 17:00 belongs to tomorrow + if dt.time() < cutoff: + #if dt.time() >= cutoff: + return dt.date().isoformat() + return (dt.date() + timedelta(days=1)).isoformat() + #return (dt.date() - timedelta(days=1)).isoformat() + + # ------------------------------------------------------------------ # + # State persistence (JSON) – survives restarts + # ------------------------------------------------------------------ # + def load_state(self): + if not Path(self.state_file).exists(): + return None + + try: + with open(self.state_file, "r", encoding="utf-8") as f: + state = json.load(f) + + current_date = self.get_counting_date() + if state.get("counting_date") != current_date: + self.logger.warning( + "State file belongs to previous counting day (%s). " + "Finalizing it before starting fresh.", + state.get("counting_date"), + ) + self._insert_batch( + state["counting_date"], + state["batch_number"], + state["count"], + state["start_time"], + datetime.now().isoformat(), + ) + Path(self.state_file).unlink(missing_ok=True) + return None + + self.logger.info( + "Resumed batch #%s from %s with count=%s", + state["batch_number"], + state["start_time"], + state["count"], + ) + # Restart the inactivity timer + """ + Comment this to remove reset timer + """ + self._reset_batch_timer() + return state + + except Exception as exc: + self.logger.error("Failed to load state file: %s", exc) + return None + + def save_state(self): + if self.current_state is None: + Path(self.state_file).unlink(missing_ok=True) + return + with open(self.state_file, "w", encoding="utf-8") as f: + json.dump(self.current_state, f, indent=2, ensure_ascii=False) + + # ------------------------------------------------------------------ # + # Batch lifecycle + # ------------------------------------------------------------------ # + def get_next_batch_number(self, counting_date): + cur = self.db.cursor() + cur.execute( + """ + SELECT COALESCE(MAX(batch_number), 0) + FROM batches + WHERE counting_date = ? AND camera_name = ? AND object_label = ? + """, + (counting_date, self.camera_name, self.object_label), + ) + return cur.fetchone()[0] + 1 + + def start_new_batch(self, counting_date): + batch_number = self.get_next_batch_number(counting_date) + now = datetime.now().isoformat() + self.current_state = { + "counting_date": counting_date, + "batch_number": batch_number, + "count": 0, + "start_time": now, + "last_detection_time": now, + "counted_event_ids": [], + } + self.save_state() + self.logger.info( + "Started batch #%s for %s (%s)", + batch_number, + counting_date, + self.object_label, + ) + + def process_detection(self, event_id, label): + """ + Called for every matching Frigate event (new/update/end). + Deduplicates by event_id and resets the 5-minute batch timer. + """ + should_reset_timer = False + + if label == self.object_label: + with self.state_lock: + counting_date = self.get_counting_date() + + # 1. No active batch -> start one + if self.current_state is None: + self.start_new_batch(counting_date) + should_reset_timer = True + + # 2. Cutoff crossed since batch started -> finalize old, start new + elif self.current_state["counting_date"] != counting_date: + self._end_batch_locked() + self.start_new_batch(counting_date) + should_reset_timer = True + + # 3. Deduplicate event ID + if event_id not in self.current_state["counted_event_ids"]: + self.current_state["count"] += 1 + self.current_state["counted_event_ids"].append(event_id) + self.logger.info( + "Counted %s (event %s) | batch #%s total: %s", + self.object_label, + event_id, + self.current_state["batch_number"], + self.current_state["count"], + ) + + # Always refresh last_detection_time so the batch stays alive + self.current_state["last_detection_time"] = datetime.now().isoformat() + self.save_state() + should_reset_timer = True + + elif label == self.batch_label: + self._ignore_batch_label() + self.end_batch() + time.sleep(5) + should_reset_timer = True + + """ + Comment this to remove reset timer + """ + if should_reset_timer: + self._reset_batch_timer() + + def _ignore_batch_label(self): + if not self.ignore_batch_label_timer: + self.ignore_batch_label = True + self.ignore_batch_label_timer = threading.Timer(self.ignore_batch_label_timeout, self._on_ignore_batch_label_timeout) + self.ignore_batch_label_timer.daemon = True + self.ignore_batch_label_timer.start() + self.logger.info("Ignore Batch Label for %ss. self.ignore_batch_label = %s", self.ignore_batch_label_timeout, self.ignore_batch_label) + + def _reset_batch_timer(self): + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = threading.Timer(self.batch_timeout, self._on_batch_timeout) + self.batch_timer.daemon = True + self.batch_timer.start() + + def _on_ignore_batch_label_timeout(self): + self.ignore_batch_label_timer = None + self.ignore_batch_label = False + self.logger.info("Ignore Batch Label is done. self.ignore_batch_label = %s", self.ignore_batch_label) + #self.end_batch() + + def _on_batch_timeout(self): + self.logger.info("Batch inactivity timeout (%ss) reached", self.batch_timeout) + self.end_batch() + + def end_batch(self): + with self.state_lock: + self._end_batch_locked() + + def _end_batch_locked(self): + if self.current_state is None: + return + + state = self.current_state + + """ Check Duration """ + start_time_obj = datetime.fromisoformat(state["start_time"]) + end_time_obj = datetime.now() + duration = end_time_obj - start_time_obj + duration_seconds = duration.total_seconds() + + if state["count"] == 0 or duration_seconds < self.min_duration_per_batch: + #if state["count"] < self.min_object_per_batch or duration_seconds < self.min_duration_per_batch: + # Nothing to persist + self.current_state = None + self.save_state() + + """ + Related to _reset_batch_timer + """ + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = None + + return + + end_time = datetime.now().isoformat() + + count_per_second = state["count"] / duration_seconds + + try: + self._insert_batch( + state["counting_date"], + state["batch_number"], + state["count"], + state["start_time"], + end_time, + ) + self.logger.info( + "Batch #%s ended | count=%s | duration=%s | cps=%s", + state["batch_number"], + state["count"], + duration, + count_per_second + ) + except Exception as exc: + self.logger.error("Failed to persist batch: %s", exc) + # Leave state intact so we can retry on next timeout + return + + self.current_state = None + self.save_state() + """ + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = None + """ + + def _insert_batch(self, counting_date, batch_number, count, start_time, end_time): + """Atomic insert into batches + upsert daily summary.""" + cur = self.db.cursor() + + cur.execute( + """ + INSERT INTO batches + (counting_date, batch_number, camera_name, object_label, count, start_time, end_time) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + counting_date, + batch_number, + self.camera_name, + self.object_label, + count, + start_time, + end_time, + ), + ) + + cur.execute( + """ + INSERT INTO daily_summaries + (counting_date, camera_name, object_label, total_count, total_batches) + VALUES (?, ?, ?, ?, 1) + ON CONFLICT(counting_date, camera_name, object_label) + DO UPDATE SET + total_count = total_count + excluded.total_count, + total_batches = total_batches + excluded.total_batches, + updated_at = CURRENT_TIMESTAMP + """, + (counting_date, self.camera_name, self.object_label, count), + ) + + self.db.commit() + + # Log running totals for the day + cur.execute( + """ + SELECT total_count, total_batches + FROM daily_summaries + WHERE counting_date = ? AND camera_name = ? AND object_label = ? + """, + (counting_date, self.camera_name, self.object_label), + ) + row = cur.fetchone() + if row: + self.logger.info( + "Daily totals for %s: %s objects across %s batch(es)", + counting_date, + row[0], + row[1], + ) + + # ------------------------------------------------------------------ # + # Cutoff watcher (forces batch end at 17:00 etc.) + # ------------------------------------------------------------------ # + def cutoff_watcher(self): + """Runs every minute to force-close a batch when the business day rolls over.""" + while not self.shutdown_event.is_set(): + time.sleep(60) + with self.state_lock: + if self.current_state is None: + continue + if self.current_state["counting_date"] != self.get_counting_date(): + self.logger.info("Daily cutoff reached – finalizing batch") + self._end_batch_locked() + + # ------------------------------------------------------------------ # + # MQTT callbacks + # ------------------------------------------------------------------ # + def on_connect(self, client, userdata, flags, rc): + if rc == 0: + self.logger.info("MQTT connected to %s:%s", self.mqtt_host, self.mqtt_port) + client.subscribe(self.mqtt_topic) + self.logger.info("Subscribed to %s", self.mqtt_topic) + else: + self.logger.error("MQTT connection failed, code=%s", rc) + + def on_message(self, client, userdata, msg): + try: + payload = json.loads(msg.payload.decode("utf-8")) + after = payload.get("after", {}) + + """ 20260514 For Telenan """ + label = after.get("label") + camera = after.get("camera") + + if camera != self.camera_name: + return + + #if after.get("label") != self.object_label: + if label != self.object_label: + if label == self.batch_label and self.ignore_batch_label: + return + + #if label != self.batch_label and self.ignore_batch_label: + # return + + event_id = after.get("id") + if not event_id: + return + + self.process_detection(event_id, label) + + except json.JSONDecodeError: + self.logger.warning("Received non-JSON payload on %s", msg.topic) + except Exception as exc: + self.logger.exception("Error handling MQTT message: %s", exc) + + # ------------------------------------------------------------------ # + # Run / Shutdown + # ------------------------------------------------------------------ # + def run(self): + # Graceful shutdown on SIGINT / SIGTERM + signal.signal(signal.SIGINT, lambda _s, _f: self.shutdown()) + signal.signal(signal.SIGTERM, lambda _s, _f: self.shutdown()) + + # Support both paho-mqtt v1 and v2 + try: + self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1) + except (AttributeError, TypeError): + self.client = mqtt.Client() + + if self.mqtt_user and self.mqtt_pass: + self.client.username_pw_set(self.mqtt_user, self.mqtt_pass) + + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + + # Start background cutoff watcher + watcher = threading.Thread(target=self.cutoff_watcher, daemon=True) + watcher.start() + + try: + self.client.connect(self.mqtt_host, self.mqtt_port, keepalive=60) + self.client.loop_forever() + except Exception as exc: + self.logger.error("MQTT loop error: %s", exc) + finally: + self.shutdown() + + def shutdown(self): + if self.shutdown_event.is_set(): + return + self.logger.info("Shutting down...") + self.shutdown_event.set() + try: + self.client.disconnect() + except Exception: + pass + self.end_batch() + self.db.close() + self.logger.info("Shutdown complete") + + +if __name__ == "__main__": + service = FrigateCounterService() + service.run() diff --git a/counter_service.py.no_reset_timer b/counter_service.py.no_reset_timer new file mode 100644 index 0000000..26a1e78 --- /dev/null +++ b/counter_service.py.no_reset_timer @@ -0,0 +1,505 @@ +#!/usr/bin/env python3 +""" +Frigate Object Batch Counter Service + +Listens to Frigate MQTT events, counts unique objects per batch, +and persists results to SQLite when a batch ends. +""" + +import os +import sys +import json +import sqlite3 +import threading +import time +import logging +import signal +from datetime import datetime, timedelta +from pathlib import Path + +import paho.mqtt.client as mqtt + + +class FrigateCounterService: + def __init__(self): + self.setup_logging() + self.load_config() + self.init_db() + self.state_lock = threading.Lock() + self.batch_timer = None + self.shutdown_event = threading.Event() + self.current_state = self.load_state() + + # Telenan Batch Label + self.ignore_batch_label = False + self.ignore_batch_label_timer = None + # ------------------------------------------------------------------ # + # Setup & Config + # ------------------------------------------------------------------ # + def setup_logging(self): + level = getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO) + logging.basicConfig( + level=level, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + self.logger = logging.getLogger(__name__) + + def load_config(self): + self.mqtt_host = os.getenv("FRIGATE_MQTT_HOST", "localhost") + self.mqtt_port = int(os.getenv("FRIGATE_MQTT_PORT", "1883")) + self.mqtt_user = os.getenv("FRIGATE_MQTT_USER") + self.mqtt_pass = os.getenv("FRIGATE_MQTT_PASS") + self.mqtt_topic = os.getenv("FRIGATE_MQTT_TOPIC", "frigate/events") + + self.camera_name = os.getenv("CAMERA_NAME") + if not self.camera_name: + raise ValueError("Environment variable CAMERA_NAME is required") + + self.object_label = os.getenv("OBJECT_LABEL", "ayam-potong") + self.batch_label = os.getenv("BATCH_LABEL", "telenan") # 20260514 - Adding Label telenan for new batch sign + self.ignore_batch_label_timeout = float(os.getenv("IGNORE_BATCH_LABEL_TIMEOUT_SECONDS", "30")) + self.batch_timeout = float(os.getenv("BATCH_TIMEOUT_SECONDS", "300")) + self.cutoff_time_str = os.getenv("DAILY_CUTOFF_TIME", "17:00") + + # Validate cutoff format HH:MM + datetime.strptime(self.cutoff_time_str, "%H:%M") + + self.db_path = os.getenv("DB_PATH", "frigate_counter.db") + self.state_file = os.getenv("STATE_FILE", "current_batch.json") + + self.min_duration_per_batch = int(os.getenv("MIN_DURATION_PER_BATCH", "60")) + # ------------------------------------------------------------------ # + # Database + # ------------------------------------------------------------------ # + def init_db(self): + self.db = sqlite3.connect(self.db_path, check_same_thread=False) + cur = self.db.cursor() + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS batches ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + counting_date TEXT NOT NULL, + batch_number INTEGER NOT NULL, + camera_name TEXT NOT NULL, + object_label TEXT NOT NULL, + count INTEGER NOT NULL, + start_time TEXT NOT NULL, + end_time TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(counting_date, batch_number, camera_name, object_label) + ) + """ + ) + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS daily_summaries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + counting_date TEXT NOT NULL, + camera_name TEXT NOT NULL, + object_label TEXT NOT NULL, + total_count INTEGER NOT NULL DEFAULT 0, + total_batches INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(counting_date, camera_name, object_label) + ) + """ + ) + + self.db.commit() + + # ------------------------------------------------------------------ # + # Counting-date logic (day ends at cutoff, e.g. 17:00) + # ------------------------------------------------------------------ # + def get_counting_date(self, dt=None): + """Return the business-day string that ends at cutoff_time.""" + if dt is None: + dt = datetime.now() + cutoff = datetime.strptime(self.cutoff_time_str, "%H:%M").time() + # e.g. cutoff 17:00 => 16:59 belongs to today, 17:00 belongs to tomorrow + if dt.time() < cutoff: + #if dt.time() >= cutoff: + return dt.date().isoformat() + return (dt.date() + timedelta(days=1)).isoformat() + #return (dt.date() - timedelta(days=1)).isoformat() + + # ------------------------------------------------------------------ # + # State persistence (JSON) – survives restarts + # ------------------------------------------------------------------ # + def load_state(self): + if not Path(self.state_file).exists(): + return None + + try: + with open(self.state_file, "r", encoding="utf-8") as f: + state = json.load(f) + + current_date = self.get_counting_date() + if state.get("counting_date") != current_date: + self.logger.warning( + "State file belongs to previous counting day (%s). " + "Finalizing it before starting fresh.", + state.get("counting_date"), + ) + self._insert_batch( + state["counting_date"], + state["batch_number"], + state["count"], + state["start_time"], + datetime.now().isoformat(), + ) + Path(self.state_file).unlink(missing_ok=True) + return None + + self.logger.info( + "Resumed batch #%s from %s with count=%s", + state["batch_number"], + state["start_time"], + state["count"], + ) + # Restart the inactivity timer + #self._reset_batch_timer() + return state + + except Exception as exc: + self.logger.error("Failed to load state file: %s", exc) + return None + + def save_state(self): + if self.current_state is None: + Path(self.state_file).unlink(missing_ok=True) + return + with open(self.state_file, "w", encoding="utf-8") as f: + json.dump(self.current_state, f, indent=2, ensure_ascii=False) + + # ------------------------------------------------------------------ # + # Batch lifecycle + # ------------------------------------------------------------------ # + def get_next_batch_number(self, counting_date): + cur = self.db.cursor() + cur.execute( + """ + SELECT COALESCE(MAX(batch_number), 0) + FROM batches + WHERE counting_date = ? AND camera_name = ? AND object_label = ? + """, + (counting_date, self.camera_name, self.object_label), + ) + return cur.fetchone()[0] + 1 + + def start_new_batch(self, counting_date): + batch_number = self.get_next_batch_number(counting_date) + now = datetime.now().isoformat() + self.current_state = { + "counting_date": counting_date, + "batch_number": batch_number, + "count": 0, + "start_time": now, + "last_detection_time": now, + "counted_event_ids": [], + } + self.save_state() + self.logger.info( + "Started batch #%s for %s (%s)", + batch_number, + counting_date, + self.object_label, + ) + + def process_detection(self, event_id, label): + """ + Called for every matching Frigate event (new/update/end). + Deduplicates by event_id and resets the 5-minute batch timer. + """ + should_reset_timer = False + + if label == self.object_label: + with self.state_lock: + counting_date = self.get_counting_date() + + # 1. No active batch -> start one + if self.current_state is None: + self.start_new_batch(counting_date) + should_reset_timer = True + + # 2. Cutoff crossed since batch started -> finalize old, start new + elif self.current_state["counting_date"] != counting_date: + self._end_batch_locked() + self.start_new_batch(counting_date) + should_reset_timer = True + + #self.current_state["count"] += 1 + + # 3. Deduplicate event ID + if event_id not in self.current_state["counted_event_ids"]: + self.current_state["count"] += 1 + self.current_state["counted_event_ids"].append(event_id) + self.logger.info( + "Counted %s (event %s) | batch #%s total: %s", + self.object_label, + event_id, + self.current_state["batch_number"], + self.current_state["count"], + ) + + # Always refresh last_detection_time so the batch stays alive + self.current_state["last_detection_time"] = datetime.now().isoformat() + self.save_state() + should_reset_timer = True + + elif label == self.batch_label: + self._ignore_batch_label() + self.end_batch() + + """ + if should_reset_timer: + self._reset_batch_timer() + """ + + def _ignore_batch_label(self): + if not self.ignore_batch_label_timer: + self.ignore_batch_label = True + self.ignore_batch_label_timer = threading.Timer(self.ignore_batch_label_timeout, self._on_ignore_batch_label_timeout) + self.ignore_batch_label_timer.daemon = True + self.ignore_batch_label_timer.start() + self.logger.info("Ignore Batch Label for %ss. self.ignore_batch_label = %s", self.ignore_batch_label_timeout, self.ignore_batch_label) + + def _reset_batch_timer(self): + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = threading.Timer(self.batch_timeout, self._on_batch_timeout) + self.batch_timer.daemon = True + self.batch_timer.start() + + def _on_ignore_batch_label_timeout(self): + self.ignore_batch_label = False + self.logger.info("Ignore Batch Label is done. self.ignore_batch_label = %s", self.ignore_batch_label) + self.end_batch() + + def _on_batch_timeout(self): + self.logger.info("Batch inactivity timeout (%ss) reached", self.batch_timeout) + self.end_batch() + + def end_batch(self): + with self.state_lock: + self._end_batch_locked() + + def _end_batch_locked(self): + if self.current_state is None: + return + + state = self.current_state + + """ Check Duration """ + start_time_obj = datetime.fromisoformat(state["start_time"]) + end_time_obj = datetime.now() + duration = end_time_obj - start_time_obj + duration_seconds = duration.total_seconds() + + if state["count"] == 0 or duration_seconds < self.min_duration_per_batch: + # Nothing to persist + self.current_state = None + self.save_state() + + """ + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = None + """ + + return + + end_time = datetime.now().isoformat() + + count_per_second = state["count"] / duration_seconds + + try: + self._insert_batch( + state["counting_date"], + state["batch_number"], + state["count"], + state["start_time"], + end_time, + ) + self.logger.info( + "Batch #%s ended | count=%s | duration=%s | cps=%s", + state["batch_number"], + state["count"], + duration, + count_per_second + ) + except Exception as exc: + self.logger.error("Failed to persist batch: %s", exc) + # Leave state intact so we can retry on next timeout + return + + self.current_state = None + self.save_state() + """ + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = None + """ + + def _insert_batch(self, counting_date, batch_number, count, start_time, end_time): + """Atomic insert into batches + upsert daily summary.""" + cur = self.db.cursor() + + cur.execute( + """ + INSERT INTO batches + (counting_date, batch_number, camera_name, object_label, count, start_time, end_time) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + counting_date, + batch_number, + self.camera_name, + self.object_label, + count, + start_time, + end_time, + ), + ) + + cur.execute( + """ + INSERT INTO daily_summaries + (counting_date, camera_name, object_label, total_count, total_batches) + VALUES (?, ?, ?, ?, 1) + ON CONFLICT(counting_date, camera_name, object_label) + DO UPDATE SET + total_count = total_count + excluded.total_count, + total_batches = total_batches + excluded.total_batches, + updated_at = CURRENT_TIMESTAMP + """, + (counting_date, self.camera_name, self.object_label, count), + ) + + self.db.commit() + + # Log running totals for the day + cur.execute( + """ + SELECT total_count, total_batches + FROM daily_summaries + WHERE counting_date = ? AND camera_name = ? AND object_label = ? + """, + (counting_date, self.camera_name, self.object_label), + ) + row = cur.fetchone() + if row: + self.logger.info( + "Daily totals for %s: %s objects across %s batch(es)", + counting_date, + row[0], + row[1], + ) + + # ------------------------------------------------------------------ # + # Cutoff watcher (forces batch end at 17:00 etc.) + # ------------------------------------------------------------------ # + def cutoff_watcher(self): + """Runs every minute to force-close a batch when the business day rolls over.""" + while not self.shutdown_event.is_set(): + time.sleep(60) + with self.state_lock: + if self.current_state is None: + continue + if self.current_state["counting_date"] != self.get_counting_date(): + self.logger.info("Daily cutoff reached – finalizing batch") + self._end_batch_locked() + + # ------------------------------------------------------------------ # + # MQTT callbacks + # ------------------------------------------------------------------ # + def on_connect(self, client, userdata, flags, rc): + if rc == 0: + self.logger.info("MQTT connected to %s:%s", self.mqtt_host, self.mqtt_port) + client.subscribe(self.mqtt_topic) + self.logger.info("Subscribed to %s", self.mqtt_topic) + else: + self.logger.error("MQTT connection failed, code=%s", rc) + + def on_message(self, client, userdata, msg): + try: + payload = json.loads(msg.payload.decode("utf-8")) + after = payload.get("after", {}) + + """ 20260514 For Telenan """ + label = after.get("label") + camera = after.get("camera") + + if camera != self.camera_name: + return + + #if after.get("label") != self.object_label: + if label != self.object_label: + if label == self.batch_label and self.ignore_batch_label: + return + + #if label != self.batch_label and self.ignore_batch_label: + # return + + event_id = after.get("id") + if not event_id: + return + + self.process_detection(event_id, label) + + except json.JSONDecodeError: + self.logger.warning("Received non-JSON payload on %s", msg.topic) + except Exception as exc: + self.logger.exception("Error handling MQTT message: %s", exc) + + # ------------------------------------------------------------------ # + # Run / Shutdown + # ------------------------------------------------------------------ # + def run(self): + # Graceful shutdown on SIGINT / SIGTERM + signal.signal(signal.SIGINT, lambda _s, _f: self.shutdown()) + signal.signal(signal.SIGTERM, lambda _s, _f: self.shutdown()) + + # Support both paho-mqtt v1 and v2 + try: + self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1) + except (AttributeError, TypeError): + self.client = mqtt.Client() + + if self.mqtt_user and self.mqtt_pass: + self.client.username_pw_set(self.mqtt_user, self.mqtt_pass) + + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + + # Start background cutoff watcher + watcher = threading.Thread(target=self.cutoff_watcher, daemon=True) + watcher.start() + + try: + self.client.connect(self.mqtt_host, self.mqtt_port, keepalive=60) + self.client.loop_forever() + except Exception as exc: + self.logger.error("MQTT loop error: %s", exc) + finally: + self.shutdown() + + def shutdown(self): + if self.shutdown_event.is_set(): + return + self.logger.info("Shutting down...") + self.shutdown_event.set() + try: + self.client.disconnect() + except Exception: + pass + self.end_batch() + self.db.close() + self.logger.info("Shutdown complete") + + +if __name__ == "__main__": + service = FrigateCounterService() + service.run() diff --git a/counter_service.py.workarund b/counter_service.py.workarund new file mode 100644 index 0000000..2d97e9b --- /dev/null +++ b/counter_service.py.workarund @@ -0,0 +1,507 @@ +#!/usr/bin/env python3 +""" +Frigate Object Batch Counter Service + +Listens to Frigate MQTT events, counts unique objects per batch, +and persists results to SQLite when a batch ends. +""" + +import os +import sys +import json +import sqlite3 +import threading +import time +import logging +import signal +from datetime import datetime, timedelta +from pathlib import Path + +import paho.mqtt.client as mqtt + + +class FrigateCounterService: + def __init__(self): + self.setup_logging() + self.load_config() + self.init_db() + self.state_lock = threading.Lock() + self.batch_timer = None + self.shutdown_event = threading.Event() + self.current_state = self.load_state() + + # Telenan Batch Label + self.ignore_batch_label = False + self.ignore_batch_label_timer = None + # ------------------------------------------------------------------ # + # Setup & Config + # ------------------------------------------------------------------ # + def setup_logging(self): + level = getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO) + logging.basicConfig( + level=level, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + self.logger = logging.getLogger(__name__) + + def load_config(self): + self.mqtt_host = os.getenv("FRIGATE_MQTT_HOST", "localhost") + self.mqtt_port = int(os.getenv("FRIGATE_MQTT_PORT", "1883")) + self.mqtt_user = os.getenv("FRIGATE_MQTT_USER") + self.mqtt_pass = os.getenv("FRIGATE_MQTT_PASS") + self.mqtt_topic = os.getenv("FRIGATE_MQTT_TOPIC", "frigate/events") + + self.camera_name = os.getenv("CAMERA_NAME") + if not self.camera_name: + raise ValueError("Environment variable CAMERA_NAME is required") + + self.object_label = os.getenv("OBJECT_LABEL", "ayam-potong") + self.batch_label = os.getenv("BATCH_LABEL", "telenan") # 20260514 - Adding Label telenan for new batch sign + self.ignore_batch_label_timeout = float(os.getenv("IGNORE_BATCH_LABEL_TIMEOUT_SECONDS", "30")) + self.batch_timeout = float(os.getenv("BATCH_TIMEOUT_SECONDS", "300")) + self.cutoff_time_str = os.getenv("DAILY_CUTOFF_TIME", "17:00") + + # Validate cutoff format HH:MM + datetime.strptime(self.cutoff_time_str, "%H:%M") + + self.db_path = os.getenv("DB_PATH", "frigate_counter.db") + self.state_file = os.getenv("STATE_FILE", "current_batch.json") + + self.min_duration_per_batch = int(os.getenv("MIN_DURATION_PER_BATCH", "60")) + # ------------------------------------------------------------------ # + # Database + # ------------------------------------------------------------------ # + def init_db(self): + self.db = sqlite3.connect(self.db_path, check_same_thread=False) + cur = self.db.cursor() + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS batches ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + counting_date TEXT NOT NULL, + batch_number INTEGER NOT NULL, + camera_name TEXT NOT NULL, + object_label TEXT NOT NULL, + count INTEGER NOT NULL, + start_time TEXT NOT NULL, + end_time TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(counting_date, batch_number, camera_name, object_label) + ) + """ + ) + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS daily_summaries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + counting_date TEXT NOT NULL, + camera_name TEXT NOT NULL, + object_label TEXT NOT NULL, + total_count INTEGER NOT NULL DEFAULT 0, + total_batches INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(counting_date, camera_name, object_label) + ) + """ + ) + + self.db.commit() + + # ------------------------------------------------------------------ # + # Counting-date logic (day ends at cutoff, e.g. 17:00) + # ------------------------------------------------------------------ # + def get_counting_date(self, dt=None): + """Return the business-day string that ends at cutoff_time.""" + if dt is None: + dt = datetime.now() + cutoff = datetime.strptime(self.cutoff_time_str, "%H:%M").time() + # e.g. cutoff 17:00 => 16:59 belongs to today, 17:00 belongs to tomorrow + if dt.time() < cutoff: + #if dt.time() >= cutoff: + return dt.date().isoformat() + return (dt.date() + timedelta(days=1)).isoformat() + #return (dt.date() - timedelta(days=1)).isoformat() + + # ------------------------------------------------------------------ # + # State persistence (JSON) – survives restarts + # ------------------------------------------------------------------ # + def load_state(self): + if not Path(self.state_file).exists(): + return None + + try: + with open(self.state_file, "r", encoding="utf-8") as f: + state = json.load(f) + + current_date = self.get_counting_date() + if state.get("counting_date") != current_date: + self.logger.warning( + "State file belongs to previous counting day (%s). " + "Finalizing it before starting fresh.", + state.get("counting_date"), + ) + self._insert_batch( + state["counting_date"], + state["batch_number"], + state["count"], + state["start_time"], + datetime.now().isoformat(), + ) + Path(self.state_file).unlink(missing_ok=True) + return None + + self.logger.info( + "Resumed batch #%s from %s with count=%s", + state["batch_number"], + state["start_time"], + state["count"], + ) + # Restart the inactivity timer + """ + Comment this to remove reset timer + """ + self._reset_batch_timer() + return state + + except Exception as exc: + self.logger.error("Failed to load state file: %s", exc) + return None + + def save_state(self): + if self.current_state is None: + Path(self.state_file).unlink(missing_ok=True) + return + with open(self.state_file, "w", encoding="utf-8") as f: + json.dump(self.current_state, f, indent=2, ensure_ascii=False) + + # ------------------------------------------------------------------ # + # Batch lifecycle + # ------------------------------------------------------------------ # + def get_next_batch_number(self, counting_date): + cur = self.db.cursor() + cur.execute( + """ + SELECT COALESCE(MAX(batch_number), 0) + FROM batches + WHERE counting_date = ? AND camera_name = ? AND object_label = ? + """, + (counting_date, self.camera_name, self.object_label), + ) + return cur.fetchone()[0] + 1 + + def start_new_batch(self, counting_date): + batch_number = self.get_next_batch_number(counting_date) + now = datetime.now().isoformat() + self.current_state = { + "counting_date": counting_date, + "batch_number": batch_number, + "count": 0, + "start_time": now, + "last_detection_time": now, + "counted_event_ids": [], + } + self.save_state() + self.logger.info( + "Started batch #%s for %s (%s)", + batch_number, + counting_date, + self.object_label, + ) + + def process_detection(self, event_id, label): + """ + Called for every matching Frigate event (new/update/end). + Deduplicates by event_id and resets the 5-minute batch timer. + """ + should_reset_timer = False + + if label == self.object_label: + with self.state_lock: + counting_date = self.get_counting_date() + + # 1. No active batch -> start one + if self.current_state is None: + self.start_new_batch(counting_date) + should_reset_timer = True + + # 2. Cutoff crossed since batch started -> finalize old, start new + elif self.current_state["counting_date"] != counting_date: + self._end_batch_locked() + self.start_new_batch(counting_date) + should_reset_timer = True + + # 3. Deduplicate event ID + if event_id not in self.current_state["counted_event_ids"]: + self.current_state["count"] += 1 + self.current_state["counted_event_ids"].append(event_id) + self.logger.info( + "Counted %s (event %s) | batch #%s total: %s", + self.object_label, + event_id, + self.current_state["batch_number"], + self.current_state["count"], + ) + + # Always refresh last_detection_time so the batch stays alive + self.current_state["last_detection_time"] = datetime.now().isoformat() + self.save_state() + should_reset_timer = True + + elif label == self.batch_label: + self._ignore_batch_label() + self.end_batch() + + """ + Comment this to remove reset timer + """ + if should_reset_timer: + self._reset_batch_timer() + + def _ignore_batch_label(self): + if not self.ignore_batch_label_timer: + self.ignore_batch_label = True + self.ignore_batch_label_timer = threading.Timer(self.ignore_batch_label_timeout, self._on_ignore_batch_label_timeout) + self.ignore_batch_label_timer.daemon = True + self.ignore_batch_label_timer.start() + self.logger.info("Ignore Batch Label for %ss. self.ignore_batch_label = %s", self.ignore_batch_label_timeout, self.ignore_batch_label) + + def _reset_batch_timer(self): + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = threading.Timer(self.batch_timeout, self._on_batch_timeout) + self.batch_timer.daemon = True + self.batch_timer.start() + + def _on_ignore_batch_label_timeout(self): + self.ignore_batch_label = False + self.logger.info("Ignore Batch Label is done. self.ignore_batch_label = %s", self.ignore_batch_label) + self.end_batch() + + def _on_batch_timeout(self): + self.logger.info("Batch inactivity timeout (%ss) reached", self.batch_timeout) + self.end_batch() + + def end_batch(self): + with self.state_lock: + self._end_batch_locked() + + def _end_batch_locked(self): + if self.current_state is None: + return + + state = self.current_state + + """ Check Duration """ + start_time_obj = datetime.fromisoformat(state["start_time"]) + end_time_obj = datetime.now() + duration = end_time_obj - start_time_obj + duration_seconds = duration.total_seconds() + + if state["count"] == 0 or duration_seconds < self.min_duration_per_batch: + # Nothing to persist + self.current_state = None + self.save_state() + + """ + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = None + """ + + return + + end_time = datetime.now().isoformat() + + count_per_second = state["count"] / duration_seconds + + try: + self._insert_batch( + state["counting_date"], + state["batch_number"], + state["count"], + state["start_time"], + end_time, + ) + self.logger.info( + "Batch #%s ended | count=%s | duration=%s | cps=%s", + state["batch_number"], + state["count"], + duration, + count_per_second + ) + except Exception as exc: + self.logger.error("Failed to persist batch: %s", exc) + # Leave state intact so we can retry on next timeout + return + + self.current_state = None + self.save_state() + """ + if self.batch_timer: + self.batch_timer.cancel() + self.batch_timer = None + """ + + def _insert_batch(self, counting_date, batch_number, count, start_time, end_time): + """Atomic insert into batches + upsert daily summary.""" + cur = self.db.cursor() + + cur.execute( + """ + INSERT INTO batches + (counting_date, batch_number, camera_name, object_label, count, start_time, end_time) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + counting_date, + batch_number, + self.camera_name, + self.object_label, + count, + start_time, + end_time, + ), + ) + + cur.execute( + """ + INSERT INTO daily_summaries + (counting_date, camera_name, object_label, total_count, total_batches) + VALUES (?, ?, ?, ?, 1) + ON CONFLICT(counting_date, camera_name, object_label) + DO UPDATE SET + total_count = total_count + excluded.total_count, + total_batches = total_batches + excluded.total_batches, + updated_at = CURRENT_TIMESTAMP + """, + (counting_date, self.camera_name, self.object_label, count), + ) + + self.db.commit() + + # Log running totals for the day + cur.execute( + """ + SELECT total_count, total_batches + FROM daily_summaries + WHERE counting_date = ? AND camera_name = ? AND object_label = ? + """, + (counting_date, self.camera_name, self.object_label), + ) + row = cur.fetchone() + if row: + self.logger.info( + "Daily totals for %s: %s objects across %s batch(es)", + counting_date, + row[0], + row[1], + ) + + # ------------------------------------------------------------------ # + # Cutoff watcher (forces batch end at 17:00 etc.) + # ------------------------------------------------------------------ # + def cutoff_watcher(self): + """Runs every minute to force-close a batch when the business day rolls over.""" + while not self.shutdown_event.is_set(): + time.sleep(60) + with self.state_lock: + if self.current_state is None: + continue + if self.current_state["counting_date"] != self.get_counting_date(): + self.logger.info("Daily cutoff reached – finalizing batch") + self._end_batch_locked() + + # ------------------------------------------------------------------ # + # MQTT callbacks + # ------------------------------------------------------------------ # + def on_connect(self, client, userdata, flags, rc): + if rc == 0: + self.logger.info("MQTT connected to %s:%s", self.mqtt_host, self.mqtt_port) + client.subscribe(self.mqtt_topic) + self.logger.info("Subscribed to %s", self.mqtt_topic) + else: + self.logger.error("MQTT connection failed, code=%s", rc) + + def on_message(self, client, userdata, msg): + try: + payload = json.loads(msg.payload.decode("utf-8")) + after = payload.get("after", {}) + + """ 20260514 For Telenan """ + label = after.get("label") + camera = after.get("camera") + + if camera != self.camera_name: + return + + #if after.get("label") != self.object_label: + if label != self.object_label: + if label == self.batch_label and self.ignore_batch_label: + return + + #if label != self.batch_label and self.ignore_batch_label: + # return + + event_id = after.get("id") + if not event_id: + return + + self.process_detection(event_id, label) + + except json.JSONDecodeError: + self.logger.warning("Received non-JSON payload on %s", msg.topic) + except Exception as exc: + self.logger.exception("Error handling MQTT message: %s", exc) + + # ------------------------------------------------------------------ # + # Run / Shutdown + # ------------------------------------------------------------------ # + def run(self): + # Graceful shutdown on SIGINT / SIGTERM + signal.signal(signal.SIGINT, lambda _s, _f: self.shutdown()) + signal.signal(signal.SIGTERM, lambda _s, _f: self.shutdown()) + + # Support both paho-mqtt v1 and v2 + try: + self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1) + except (AttributeError, TypeError): + self.client = mqtt.Client() + + if self.mqtt_user and self.mqtt_pass: + self.client.username_pw_set(self.mqtt_user, self.mqtt_pass) + + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + + # Start background cutoff watcher + watcher = threading.Thread(target=self.cutoff_watcher, daemon=True) + watcher.start() + + try: + self.client.connect(self.mqtt_host, self.mqtt_port, keepalive=60) + self.client.loop_forever() + except Exception as exc: + self.logger.error("MQTT loop error: %s", exc) + finally: + self.shutdown() + + def shutdown(self): + if self.shutdown_event.is_set(): + return + self.logger.info("Shutting down...") + self.shutdown_event.set() + try: + self.client.disconnect() + except Exception: + pass + self.end_batch() + self.db.close() + self.logger.info("Shutdown complete") + + +if __name__ == "__main__": + service = FrigateCounterService() + service.run() diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..1698f93 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,14 @@ +services: + counter: + image: python:3.11-slim + working_dir: /app + volumes: + - ./:/app + environment: + - CAMERA_NAME=cam_gudang_utara + - OBJECT_LABEL=ayam-potong + - FRIGATE_MQTT_HOST=mqtt.local + - DAILY_CUTOFF_TIME=17:00 + - BATCH_TIMEOUT_SECONDS=60 + command: > + sh -c "pip install paho-mqtt && python counter_service.py" diff --git a/env.example b/env.example new file mode 100644 index 0000000..6703c72 --- /dev/null +++ b/env.example @@ -0,0 +1,24 @@ +# MQTT broker where Frigate publishes +FRIGATE_MQTT_HOST=192.168.1.10 +FRIGATE_MQTT_PORT=1883 +FRIGATE_MQTT_USER= +FRIGATE_MQTT_PASS= + +# Frigate topic (usually frigate/events) +FRIGATE_MQTT_TOPIC=frigate/events + +# Camera & object to track +CAMERA_NAME=cam_gudang_utara +OBJECT_LABEL=ayam-potong + +# Batch ends after this many seconds with no new detections +BATCH_TIMEOUT_SECONDS=60 + +# Business-day cutoff (HH:MM). Batches running at this time are forced closed. +DAILY_CUTOFF_TIME=17:00 + +# Persistence paths +DB_PATH=./frigate_counter.db +STATE_FILE=./current_batch.json + +LOG_LEVEL=INFO diff --git a/frigate-counter-dashboard.service b/frigate-counter-dashboard.service new file mode 100644 index 0000000..870acec --- /dev/null +++ b/frigate-counter-dashboard.service @@ -0,0 +1,48 @@ +[Unit] +Description=Frigate Counter Dashboard +Documentation=https://github.com/your-repo/frigate-counter +After=network-online.target frigate-counter.service +Wants=network-online.target + +[Service] +Type=simple +User=frigate-counter +Group=frigate-counter + +WorkingDirectory=/opt/frigate-counter + +Environment="PATH=/opt/frigate-counter/venv/bin:/usr/local/bin:/usr/bin:/bin" +Environment="DB_PATH=/opt/frigate-counter/frigate_counter.db" +Environment="DASHBOARD_HOST=0.0.0.0" +Environment="DASHBOARD_PORT=5000" +Environment="FLASK_DEBUG=false" + +# Use Gunicorn for production +ExecStart=/opt/frigate-counter/venv/bin/gunicorn \ + -w 2 \ + -b 0.0.0.0:5000 \ + --access-logfile - \ + --error-logfile - \ + --capture-output \ + --enable-stdio-inheritance \ + dashboard:app + +ExecReload=/bin/kill -HUP $MAINPID + +TimeoutStopSec=30 +KillSignal=SIGTERM + +Restart=on-failure +RestartSec=5 +StartLimitInterval=60s +StartLimitBurst=3 + +# Security hardening +NoNewPrivileges=true +ProtectSystem=strict +ProtectHome=true +ReadWritePaths=/opt/frigate-counter +PrivateTmp=true + +[Install] +WantedBy=multi-user.target diff --git a/frigate-counter.service b/frigate-counter.service new file mode 100644 index 0000000..a13c9c4 --- /dev/null +++ b/frigate-counter.service @@ -0,0 +1,58 @@ +[Unit] +Description=Frigate Object Batch Counter Service +Documentation=https://github.com/your-repo/frigate-counter +After=network-online.target mosquitto.service +Wants=network-online.target + +[Service] +Type=simple +User=frigate-counter +Group=frigate-counter + +# Working directory where state file and DB live +WorkingDirectory=/opt/frigate-counter + +# Path to your Python virtual environment +Environment="PYTHONPATH=/opt/frigate-counter" +Environment="PATH=/opt/frigate-counter/venv/bin:/usr/local/bin:/usr/bin:/bin" +Environment="CAMERA_NAME=cam_gudang_utara" +Environment="OBJECT_LABEL=ayam-potong" +Environment="FRIGATE_MQTT_HOST=192.168.1.10" +Environment="FRIGATE_MQTT_PORT=1883" +Environment="FRIGATE_MQTT_USER=" +Environment="FRIGATE_MQTT_PASS=" +Environment="FRIGATE_MQTT_TOPIC=frigate/events" +Environment="BATCH_TIMEOUT_SECONDS=60" +Environment="DAILY_CUTOFF_TIME=17:00" +Environment="DB_PATH=/opt/frigate-counter/frigate_counter.db" +Environment="STATE_FILE=/opt/frigate-counter/current_batch.json" +Environment="LOG_LEVEL=INFO" + +# Or use an EnvironmentFile instead of inline variables: +# EnvironmentFile=/opt/frigate-counter/.env + +ExecStart=/opt/frigate-counter/venv/bin/python /opt/frigate-counter/counter_service.py +ExecReload=/bin/kill -HUP $MAINPID + +# Graceful shutdown: give the service time to persist the batch to DB +TimeoutStopSec=30 +KillSignal=SIGTERM + +# Restart policy +Restart=on-failure +RestartSec=5 +StartLimitInterval=60s +StartLimitBurst=3 + +# Security hardening (optional but recommended) +NoNewPrivileges=true +ProtectSystem=strict +ProtectHome=true +ReadWritePaths=/opt/frigate-counter +PrivateTmp=true +ProtectKernelTunables=true +ProtectKernelModules=true +ProtectControlGroups=true + +[Install] +WantedBy=multi-user.target diff --git a/frigate_counter.db b/frigate_counter.db new file mode 100644 index 0000000..c113801 Binary files /dev/null and b/frigate_counter.db differ diff --git a/frigate_counter.sql b/frigate_counter.sql new file mode 100644 index 0000000..ff68c0f --- /dev/null +++ b/frigate_counter.sql @@ -0,0 +1,17 @@ +-- All batches today (based on cutoff logic) +SELECT * +FROM batches +WHERE counting_date = (SELECT counting_date FROM daily_summaries ORDER BY updated_at DESC LIMIT 1) +ORDER BY batch_number; + +-- Daily summary +SELECT counting_date, + total_count AS total_ayam, + total_batches AS jumlah_batch +FROM daily_summaries +ORDER BY counting_date DESC; + +-- Average objects per batch per day +SELECT counting_date, + ROUND(CAST(total_count AS FLOAT) / total_batches, 1) AS avg_per_batch +FROM daily_summaries; diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..cd1b86c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +flask>=2.3.0 +paho-mqtt>=1.6,<3.0 +gunicorn>=21.0 diff --git a/templates/dashboard.html b/templates/dashboard.html new file mode 100644 index 0000000..4bed33d --- /dev/null +++ b/templates/dashboard.html @@ -0,0 +1,796 @@ + + + + + + 🐔 ZenAI APC Dashboard + + + + + + + +
+
+
+
+
+ +
+
+

ZenAI APC Dashboard

+

Real-time batch counting analytics

+
+
+
+
+ Counting Day +

--

+
+
+
+ Live +
+
+
+
+
+ +
+ +
+ +
+
+ + LIVE + +
+
+
+ +
+ Batch #-- +
+

--

+

Current Batch Count

+
+ -- +
+
+ + +
+
+
+ +
+ Today +
+

--

+

Total Ayam Potong

+
+ -- batches +
+
+ + +
+
+
+ +
+ Yesterday +
+

--

+

Total Ayam Potong

+
+ -- batches +
+
+ + +
+
+
+ +
+ Average +
+

--

+

Per Day

+
+ -- days recorded +
+
+ + +
+
+
+ +
+ Best Day +
+

--

+

--

+
+ Record +
+
+
+ + +
+ +
+
+
+

Daily Trends

+

Object count over time

+
+
+ + + +
+
+
+ +
+
+ + +
+

Quick Stats

+
+
+
+
+ +
+
+

Grand Total

+

All time

+
+
+ -- +
+ +
+
+
+ +
+
+

Total Batches

+

All time

+
+
+ -- +
+ +
+
+
+ +
+
+

Avg per Batch

+

Overall average

+
+
+ -- +
+
+ +
+

Recent Activity

+
+ +
+
+
+
+ + +
+
+
+

Daily Records

+

Click on a row to view batch details

+
+
+ + + + + +
+
+ +
+ + + + + + + + + + + + + + +
DateTotal CountBatchesAvg/BatchStatusAction
+
+
+
+ + + + + + + + diff --git a/templates/dashboard.html.20260506 b/templates/dashboard.html.20260506 new file mode 100644 index 0000000..7f75fba --- /dev/null +++ b/templates/dashboard.html.20260506 @@ -0,0 +1,710 @@ + + + + + + 🐔 Ayam Counter Dashboard + + + + + + + +
+
+
+
+
+ +
+
+

Ayam Counter Dashboard

+

Real-time batch counting analytics

+
+
+
+
+ Counting Day +

--

+
+
+
+ Live +
+
+
+
+
+ +
+ +
+ +
+
+
+ +
+ Today +
+

--

+

Total Ayam Potong

+
+ -- batches +
+
+ + +
+
+
+ +
+ Yesterday +
+

--

+

Total Ayam Potong

+
+ -- batches +
+
+ + +
+
+
+ +
+ Average +
+

--

+

Per Day

+
+ -- days recorded +
+
+ + +
+
+
+ +
+ Best Day +
+

--

+

--

+
+ Record +
+
+
+ + +
+ +
+
+
+

Daily Trends

+

Object count over time

+
+
+ + + +
+
+
+ +
+
+ + +
+

Quick Stats

+
+
+
+
+ +
+
+

Grand Total

+

All time

+
+
+ -- +
+ +
+
+
+ +
+
+

Total Batches

+

All time

+
+
+ -- +
+ +
+
+
+ +
+
+

Avg per Batch

+

Overall average

+
+
+ -- +
+
+ +
+

Recent Activity

+
+ +
+
+
+
+ + +
+
+
+

Daily Records

+

Click on a row to view batch details

+
+
+ + + +
+
+ +
+ + + + + + + + + + + + + + +
DateTotal CountBatchesAvg/BatchStatusAction
+
+
+
+ + + + + + + diff --git a/templates/dashboard.html.20260517 b/templates/dashboard.html.20260517 new file mode 100644 index 0000000..53ee43c --- /dev/null +++ b/templates/dashboard.html.20260517 @@ -0,0 +1,739 @@ + + + + + + 🐔 Ayam Counter Dashboard + + + + + + + +
+
+
+
+
+ +
+
+

Ayam Counter Dashboard

+

Real-time batch counting analytics

+
+
+
+
+ Counting Day +

--

+
+
+
+ Live +
+
+
+
+
+ +
+ +
+ +
+
+
+ +
+ Today +
+

--

+

Total Ayam Potong

+
+ -- batches +
+
+ + +
+
+
+ +
+ Yesterday +
+

--

+

Total Ayam Potong

+
+ -- batches +
+
+ + +
+
+
+ +
+ Average +
+

--

+

Per Day

+
+ -- days recorded +
+
+ + +
+
+
+ +
+ Best Day +
+

--

+

--

+
+ Record +
+
+
+ + +
+ +
+
+
+

Daily Trends

+

Object count over time

+
+
+ + + +
+
+
+ +
+
+ + +
+

Quick Stats

+
+
+
+
+ +
+
+

Grand Total

+

All time

+
+
+ -- +
+ +
+
+
+ +
+
+

Total Batches

+

All time

+
+
+ -- +
+ +
+
+
+ +
+
+

Avg per Batch

+

Overall average

+
+
+ -- +
+
+ +
+

Recent Activity

+
+ +
+
+
+
+ + +
+
+
+

Daily Records

+

Click on a row to view batch details

+
+
+ + + + + +
+
+ +
+ + + + + + + + + + + + + + +
DateTotal CountBatchesAvg/BatchStatusAction
+
+
+
+ + + + + + + +