Files
frigate_counter/frigate_counter_web.py
2026-02-14 17:00:37 +07:00

176 lines
4.6 KiB
Python

#!/usr/bin/env python3
import sqlite3
from flask import Flask, render_template, jsonify
from datetime import datetime
import os
from collections import defaultdict
# Configuration
DB_FILE = "/etc/frigate/counter_database.db"
app = Flask(__name__)
def get_db_connection():
"""Create a database connection"""
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row # This allows us to access columns by name
return conn
def get_counter_data():
"""Get all counter data from the database"""
conn = get_db_connection()
cursor = conn.cursor()
# Get all records ordered by date descending
cursor.execute("""
SELECT id, camera_name, date, counter_value
FROM counter_data
ORDER BY date DESC
""")
rows = cursor.fetchall()
conn.close()
# Convert to list of dictionaries
data = []
for row in rows:
data.append({
'id': row['id'],
'camera_name': row['camera_name'],
'date': row['date'],
'counter_value': row['counter_value']
})
return data
def get_counter_summary():
"""Get summary of counter data grouped by camera"""
conn = get_db_connection()
cursor = conn.cursor()
# Get latest counter value for each camera
cursor.execute("""
SELECT camera_name, MAX(date) as latest_date, counter_value
FROM counter_data
GROUP BY camera_name
ORDER BY latest_date DESC
""")
rows = cursor.fetchall()
conn.close()
# Convert to list of dictionaries
data = []
for row in rows:
data.append({
'camera_name': row['camera_name'],
'latest_date': row['latest_date'],
'counter_value': row['counter_value']
})
return data
def get_pivoted_data():
"""Get counter data pivoted by camera_name (rows) and date (columns)"""
conn = get_db_connection()
cursor = conn.cursor()
# Get all records ordered by date DESC (latest first)
cursor.execute("""
SELECT camera_name, date, counter_value
FROM counter_data
ORDER BY date DESC
""")
rows = cursor.fetchall()
conn.close()
# Group data by camera and date
pivoted_data = defaultdict(lambda: defaultdict(int))
dates = set()
cameras = set()
for row in rows:
camera_name = row['camera_name']
date = row['date'][:10] # Extract just the date part (YYYY-MM-DD)
counter_value = row['counter_value']
pivoted_data[camera_name][date] = counter_value
dates.add(date)
cameras.add(camera_name)
# Convert sets to sorted lists (sort dates in reverse order - latest first)
sorted_dates = sorted(dates, reverse=True)
sorted_cameras = sorted(cameras)
# Create matrix format
matrix = []
for camera in sorted_cameras:
row = {'camera_name': camera}
for date in sorted_dates:
row[date] = pivoted_data[camera][date]
matrix.append(row)
return {
'matrix': matrix,
'dates': sorted_dates,
'cameras': sorted_cameras
}
@app.route('/')
def index():
"""Main page showing counter data"""
counter_data = get_counter_data()
summary_data = get_counter_summary()
pivoted_data = get_pivoted_data()
# Add current datetime to context
current_time = datetime.now()
return render_template('index.html',
counter_data=counter_data,
summary_data=summary_data,
pivoted_data=pivoted_data,
current_time=current_time)
@app.route('/api/data')
def api_data():
"""API endpoint to get counter data as JSON"""
counter_data = get_counter_data()
return jsonify(counter_data)
@app.route('/api/summary')
def api_summary():
"""API endpoint to get counter summary as JSON"""
summary_data = get_counter_summary()
return jsonify(summary_data)
@app.route('/api/pivoted')
def api_pivoted():
"""API endpoint to get pivoted counter data as JSON"""
pivoted_data = get_pivoted_data()
return jsonify(pivoted_data)
if __name__ == '__main__':
# Create database directory if it doesn't exist
os.makedirs(os.path.dirname(DB_FILE), exist_ok=True)
# Initialize the database if needed
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS counter_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
camera_name TEXT NOT NULL,
date TIMESTAMP NOT NULL,
counter_value INTEGER NOT NULL
)
""")
conn.commit()
conn.close()
app.run(host='0.0.0.0', port=8899, debug=True)