#!/usr/bin/env python3 """ Frigate Counter Dashboard A beautiful, interactive Flask web app for viewing daily batch counting data. """ import os import sqlite3 import csv from io import StringIO from datetime import datetime, timedelta from flask import Flask, render_template, jsonify, request, Response from werkzeug.serving import WSGIRequestHandler app = Flask(__name__, template_folder='templates') app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'frigate-counter-dashboard') DB_PATH = os.getenv('DB_PATH', 'frigate_counter.db') # ------------------------------------------------------------------ # # Database helpers # ------------------------------------------------------------------ # def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def get_counting_date(dt=None, cutoff_str="20:00"): """Replicate the service logic for determining the counting date.""" if dt is None: dt = datetime.now() cutoff = datetime.strptime(cutoff_str, "%H:%M").time() #if dt.time() >= cutoff: if dt.time() < cutoff: return dt.date().isoformat() return (dt.date() + timedelta(days=1)).isoformat() #return (dt.date() - timedelta(days=1)).isoformat() # ------------------------------------------------------------------ # # Routes # ------------------------------------------------------------------ # @app.route('/') def index(): """Main dashboard page.""" return render_template('dashboard.html') @app.route('/api/summary') def api_summary(): """Get summary statistics for the dashboard cards.""" conn = get_db() cur = conn.cursor() # Today's counting date (based on cutoff) today = get_counting_date() # Today's stats cur.execute(""" SELECT COALESCE(total_count, 0) as total_count, COALESCE(total_batches, 0) as total_batches FROM daily_summaries WHERE counting_date = ? """, (today,)) today_row = cur.fetchone() # Yesterday's stats yesterday = (datetime.strptime(today, "%Y-%m-%d").date() - timedelta(days=1)).isoformat() cur.execute(""" SELECT COALESCE(total_count, 0) as total_count, COALESCE(total_batches, 0) as total_batches FROM daily_summaries WHERE counting_date = ? """, (yesterday,)) yesterday_row = cur.fetchone() # All-time totals cur.execute(""" SELECT COALESCE(SUM(total_count), 0) as grand_total, COALESCE(SUM(total_batches), 0) as grand_batches, COUNT(DISTINCT counting_date) as total_days FROM daily_summaries """) all_time = cur.fetchone() # Average per day cur.execute(""" SELECT ROUND(AVG(total_count), 1) as avg_per_day FROM daily_summaries """) avg = cur.fetchone() # Best day cur.execute(""" SELECT counting_date, total_count FROM daily_summaries ORDER BY total_count DESC LIMIT 1 """) best = cur.fetchone() conn.close() return jsonify({ 'today': { 'date': today, 'total_count': today_row['total_count'] if today_row else 0, 'total_batches': today_row['total_batches'] if today_row else 0, }, 'yesterday': { 'date': yesterday, 'total_count': yesterday_row['total_count'] if yesterday_row else 0, 'total_batches': yesterday_row['total_batches'] if yesterday_row else 0, }, 'all_time': { 'grand_total': all_time['grand_total'], 'grand_batches': all_time['grand_batches'], 'total_days': all_time['total_days'], }, 'average_per_day': avg['avg_per_day'] or 0, 'best_day': { 'date': best['counting_date'] if best else None, 'count': best['total_count'] if best else 0, } }) @app.route('/api/daily-data') def api_daily_data(): """Get daily data for charts and table.""" days = request.args.get('days', 30, type=int) date_from = (datetime.now() - timedelta(days=days)).date().isoformat() conn = get_db() cur = conn.cursor() # Daily summaries for chart cur.execute(""" SELECT counting_date, total_count, total_batches, ROUND(CAST(total_count AS FLOAT) / total_batches, 1) as avg_per_batch FROM daily_summaries WHERE counting_date >= ? ORDER BY counting_date ASC """, (date_from,)) daily_data = [] for row in cur.fetchall(): daily_data.append({ 'date': row['counting_date'], 'total_count': row['total_count'], 'total_batches': row['total_batches'], 'avg_per_batch': row['avg_per_batch'] or 0, }) conn.close() return jsonify(daily_data) @app.route('/api/day-detail/') def api_day_detail(date): """Get detailed batch information for a specific day.""" conn = get_db() cur = conn.cursor() # Batches for this day cur.execute(""" SELECT batch_number, count, start_time, end_time, ROUND( (julianday(end_time) - julianday(start_time)) * 24 * 60, 1 ) as duration_minutes FROM batches WHERE counting_date = ? ORDER BY batch_number ASC """, (date,)) batches = [] total_duration = 0 for row in cur.fetchall(): duration = row['duration_minutes'] or 0 total_duration += duration batches.append({ 'batch_number': row['batch_number'], 'count': row['count'], 'start_time': row['start_time'], 'end_time': row['end_time'], 'duration_minutes': duration, }) # Summary for the day cur.execute(""" SELECT total_count, total_batches FROM daily_summaries WHERE counting_date = ? """, (date,)) summary = cur.fetchone() conn.close() return jsonify({ 'date': date, 'total_count': summary['total_count'] if summary else 0, 'total_batches': summary['total_batches'] if summary else 0, 'total_duration_minutes': round(total_duration, 1), 'avg_duration_minutes': round(total_duration / len(batches), 1) if batches else 0, 'batches': batches, }) @app.route('/api/recent-batches') def api_recent_batches(): """Get the most recent batches across all days.""" limit = request.args.get('limit', 10, type=int) conn = get_db() cur = conn.cursor() cur.execute(""" SELECT counting_date, batch_number, count, start_time, end_time, ROUND( (julianday(end_time) - julianday(start_time)) * 24 * 60, 1 ) as duration_minutes FROM batches ORDER BY end_time DESC LIMIT ? """, (limit,)) batches = [] for row in cur.fetchall(): batches.append({ 'date': row['counting_date'], 'batch_number': row['batch_number'], 'count': row['count'], 'start_time': row['start_time'], 'end_time': row['end_time'], 'duration_minutes': row['duration_minutes'] or 0, }) conn.close() return jsonify(batches) @app.route('/api/available-dates') def api_available_dates(): """Get list of all available counting dates.""" conn = get_db() cur = conn.cursor() cur.execute(""" SELECT counting_date, total_count, total_batches FROM daily_summaries ORDER BY counting_date DESC """) dates = [] for row in cur.fetchall(): dates.append({ 'date': row['counting_date'], 'total_count': row['total_count'], 'total_batches': row['total_batches'], }) conn.close() return jsonify(dates) # ------------------------------------------------------------------ # # CSV Export Routes # ------------------------------------------------------------------ # @app.route('/api/export-daily-csv') def export_daily_csv(): """Export daily summary records as CSV.""" days = request.args.get('days', 30, type=int) date_from = (datetime.now() - timedelta(days=days)).date().isoformat() conn = get_db() cur = conn.cursor() cur.execute(""" SELECT counting_date, total_count, total_batches, ROUND(CAST(total_count AS FLOAT) / NULLIF(total_batches, 0), 1) as avg_per_batch FROM daily_summaries WHERE counting_date >= ? ORDER BY counting_date ASC """, (date_from,)) output = StringIO() writer = csv.writer(output) writer.writerow(['Date', 'Total Count', 'Total Batches', 'Avg per Batch']) for row in cur.fetchall(): writer.writerow([ row['counting_date'], row['total_count'], row['total_batches'], row['avg_per_batch'] or 0 ]) conn.close() csv_data = output.getvalue() output.close() filename = f"daily_records_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" return Response( csv_data, mimetype='text/csv', headers={ 'Content-Disposition': f'attachment; filename={filename}', 'Content-Type': 'text/csv; charset=utf-8' } ) @app.route('/api/export-day-csv/') def export_day_csv(date): """Export batch details for a specific day as CSV.""" conn = get_db() cur = conn.cursor() # Batches for this day cur.execute(""" SELECT batch_number, count, start_time, end_time, ROUND( (julianday(end_time) - julianday(start_time)) * 24 * 60, 1 ) as duration_minutes FROM batches WHERE counting_date = ? ORDER BY batch_number ASC """, (date,)) output = StringIO() writer = csv.writer(output) writer.writerow([ 'Batch Number', 'Count', 'Start Time', 'End Time', 'Duration (min)' ]) for row in cur.fetchall(): writer.writerow([ row['batch_number'], row['count'], row['start_time'], row['end_time'], row['duration_minutes'] or 0 ]) conn.close() csv_data = output.getvalue() output.close() filename = f"day_detail_{date}.csv" return Response( csv_data, mimetype='text/csv', headers={ 'Content-Disposition': f'attachment; filename={filename}', 'Content-Type': 'text/csv; charset=utf-8' } ) # ------------------------------------------------------------------ # # Run # ------------------------------------------------------------------ # if __name__ == '__main__': # Suppress Flask's default request log spam WSGIRequestHandler.protocol_version = "HTTP/1.1" port = int(os.getenv('DASHBOARD_PORT', 80)) host = os.getenv('DASHBOARD_HOST', '0.0.0.0') debug = os.getenv('FLASK_DEBUG', 'false').lower() == 'true' print(f"🚀 Dashboard running at http://{host}:{port}") app.run(host=host, port=port, debug=debug)