Files
karunng-web-test/app.py
T
2026-06-02 15:22:08 +07:00

584 lines
19 KiB
Python

from flask import Flask, abort, redirect, render_template, request, url_for
import sqlite3
import json
from datetime import datetime
import os
from pathlib import Path
app = Flask(__name__)
BASE_DIR = Path(__file__).resolve().parent
SETTINGS_DB = BASE_DIR / 'dashboard_settings.db'
DISPLAY_DATE_FORMAT = '%d-%m-%Y'
def load_env_file(env_path):
if not env_path.exists():
return
with env_path.open() as env_file:
for line in env_file:
stripped_line = line.strip()
if not stripped_line or stripped_line.startswith('#') or '=' not in stripped_line:
continue
key, value = stripped_line.split('=', 1)
os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'"))
def get_configured_db_path(env_name, default_file_name):
db_path = Path(os.environ.get(env_name, default_file_name)).expanduser()
if not db_path.is_absolute():
db_path = BASE_DIR / db_path
return db_path
load_env_file(BASE_DIR / '.env')
KARUNG_TUANG_DB = get_configured_db_path('KARUNG_TUANG_DB', 'karung_tuang.db')
KARUNG_MASUK_DB = get_configured_db_path('KARUNG_MASUK_DB', 'karung_masuk.db')
if KARUNG_TUANG_DB.resolve() == KARUNG_MASUK_DB.resolve():
raise RuntimeError('KARUNG_TUANG_DB and KARUNG_MASUK_DB must point to different database files.')
DB_CONFIGS = {
'tuang': {
'path': KARUNG_TUANG_DB,
'table': 'counter_data',
},
'masuk': {
'path': KARUNG_MASUK_DB,
'table': 'karung_counts',
},
}
DEFAULT_DASHBOARD_SETTINGS = {
'id': '',
'cycle_name': 'Siklus Aktif',
'cycle_start': '',
'cycle_end': '',
'saldo_awal': 0,
}
def get_db_data(db_path, table_name):
"""Fetch all data from database"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(f"SELECT id, camera_name, date, counter_value FROM {table_name} ORDER BY date DESC")
rows = cursor.fetchall()
conn.close()
data = []
for row in rows:
data.append({
'id': row[0],
'camera_name': row[1],
'date': row[2],
'counter_value': row[3]
})
return data
def load_dashboard_settings():
ensure_settings_db()
with sqlite3.connect(SETTINGS_DB) as conn:
row = conn.execute(
"""
SELECT id, cycle_name, cycle_start, cycle_end, saldo_awal
FROM cycle_settings
ORDER BY is_active DESC, id DESC
LIMIT 1
"""
).fetchone()
if not row:
return DEFAULT_DASHBOARD_SETTINGS.copy()
return {
'id': row[0],
'cycle_name': row[1],
'cycle_start': row[2],
'cycle_end': row[3],
'saldo_awal': row[4],
}
def save_dashboard_settings(settings):
ensure_settings_db()
with sqlite3.connect(SETTINGS_DB) as conn:
conn.execute('UPDATE cycle_settings SET is_active = 0')
conn.execute(
"""
INSERT INTO cycle_settings (
cycle_name,
cycle_start,
cycle_end,
saldo_awal,
is_active,
updated_at
)
VALUES (?, ?, ?, ?, 1, ?)
""",
(
settings['cycle_name'],
settings['cycle_start'],
settings['cycle_end'],
settings['saldo_awal'],
datetime.now().isoformat(timespec='seconds')
)
)
def get_cycle_options():
ensure_settings_db()
with sqlite3.connect(SETTINGS_DB) as conn:
rows = conn.execute(
"""
SELECT latest_cycle.id,
latest_cycle.cycle_name,
latest_cycle.cycle_start,
latest_cycle.cycle_end,
latest_cycle.saldo_awal
FROM cycle_settings AS latest_cycle
INNER JOIN (
SELECT cycle_name, MAX(id) AS latest_id
FROM cycle_settings
GROUP BY cycle_name
) AS grouped_cycle
ON latest_cycle.id = grouped_cycle.latest_id
ORDER BY latest_cycle.cycle_name COLLATE NOCASE
"""
).fetchall()
return [
{
'id': row[0],
'cycle_name': row[1],
'cycle_start': row[2],
'cycle_end': row[3],
'saldo_awal': row[4],
}
for row in rows
]
def select_cycle_by_name(cycle_name):
ensure_settings_db()
with sqlite3.connect(SETTINGS_DB) as conn:
row = conn.execute(
"""
SELECT id
FROM cycle_settings
WHERE cycle_name = ?
ORDER BY id DESC
LIMIT 1
""",
(cycle_name,)
).fetchone()
if not row:
abort(404, description='Cycle name was not found.')
conn.execute('UPDATE cycle_settings SET is_active = 0')
conn.execute(
"""
UPDATE cycle_settings
SET is_active = 1,
updated_at = ?
WHERE id = ?
""",
(datetime.now().isoformat(timespec='seconds'), row[0])
)
def ensure_settings_db():
with sqlite3.connect(SETTINGS_DB) as conn:
existing_schema = conn.execute(
"""
SELECT sql
FROM sqlite_master
WHERE type = 'table'
AND name = 'cycle_settings'
"""
).fetchone()
if existing_schema and 'CHECK (id = 1)' in existing_schema[0]:
conn.execute('ALTER TABLE cycle_settings RENAME TO cycle_settings_old')
create_cycle_settings_table(conn)
conn.execute(
"""
INSERT INTO cycle_settings (
cycle_name,
cycle_start,
cycle_end,
saldo_awal,
is_active,
updated_at
)
SELECT cycle_name,
cycle_start,
cycle_end,
saldo_awal,
0,
updated_at
FROM cycle_settings_old
ORDER BY id
"""
)
conn.execute('DROP TABLE cycle_settings_old')
else:
create_cycle_settings_table(conn)
ensure_settings_active_column(conn)
settings_count = conn.execute('SELECT COUNT(*) FROM cycle_settings').fetchone()[0]
if settings_count:
ensure_active_cycle(conn)
return
conn.execute(
"""
INSERT OR IGNORE INTO cycle_settings (
cycle_name,
cycle_start,
cycle_end,
saldo_awal,
is_active,
updated_at
)
VALUES (?, ?, ?, ?, 1, ?)
""",
(
DEFAULT_DASHBOARD_SETTINGS['cycle_name'],
DEFAULT_DASHBOARD_SETTINGS['cycle_start'],
DEFAULT_DASHBOARD_SETTINGS['cycle_end'],
DEFAULT_DASHBOARD_SETTINGS['saldo_awal'],
datetime.now().isoformat(timespec='seconds')
)
)
def create_cycle_settings_table(conn):
conn.execute(
"""
CREATE TABLE IF NOT EXISTS cycle_settings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cycle_name TEXT NOT NULL,
cycle_start TEXT NOT NULL DEFAULT '',
cycle_end TEXT NOT NULL DEFAULT '',
saldo_awal INTEGER NOT NULL DEFAULT 0,
is_active INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL
)
"""
)
def ensure_settings_active_column(conn):
columns = {
row[1]
for row in conn.execute('PRAGMA table_info(cycle_settings)').fetchall()
}
if 'is_active' not in columns:
conn.execute(
"""
ALTER TABLE cycle_settings
ADD COLUMN is_active INTEGER NOT NULL DEFAULT 0
"""
)
def ensure_active_cycle(conn):
active_count = conn.execute(
'SELECT COUNT(*) FROM cycle_settings WHERE is_active = 1'
).fetchone()[0]
if active_count:
return
latest_id = conn.execute(
'SELECT id FROM cycle_settings ORDER BY id DESC LIMIT 1'
).fetchone()[0]
conn.execute('UPDATE cycle_settings SET is_active = 1 WHERE id = ?', (latest_id,))
def parse_cycle_date(date_text, field_name):
date_text = date_text.strip()
if not date_text:
return None
for date_format in (DISPLAY_DATE_FORMAT, '%Y-%m-%d'):
try:
return datetime.strptime(date_text, date_format).date()
except ValueError:
continue
abort(400, description=f'{field_name} must use DD-MM-YYYY or YYYY-MM-DD format.')
def parse_data_date(date_text):
date_text = str(date_text).strip()
if not date_text:
return None
try:
return datetime.fromisoformat(date_text.replace('Z', '+00:00')).date()
except ValueError:
pass
for date_format in (DISPLAY_DATE_FORMAT, '%d/%m/%Y'):
try:
return datetime.strptime(date_text, date_format).date()
except ValueError:
continue
return None
def format_history_date(date_text):
parsed_date = parse_data_date(date_text)
if not parsed_date:
return date_text
return parsed_date.strftime('%Y-%m-%d')
def format_date_picker_value(date_text):
parsed_date = parse_cycle_date(date_text, 'Cycle date')
if not parsed_date:
return ''
return parsed_date.strftime('%Y-%m-%d')
def filter_data_by_cycle(data, cycle_start, cycle_end):
if not cycle_start or not cycle_end:
return []
filtered_data = []
for item in data:
item_date = parse_data_date(item['date'])
if not item_date:
continue
if item_date < cycle_start:
continue
if item_date > cycle_end:
continue
filtered_data.append(item)
return filtered_data
def is_masuk_out_item(item):
return str(item['camera_name']).lower().endswith('_out')
def get_settings_from_form():
cycle_name = request.form.get('cycle_name', '').strip()
cycle_start_raw = request.form.get('cycle_start', '').strip()
cycle_end_raw = request.form.get('cycle_end', '').strip()
saldo_awal_raw = request.form.get('saldo_awal', '0').strip()
if not cycle_name:
abort(400, description='Cycle name is required.')
if not cycle_start_raw or not cycle_end_raw:
abort(400, description='Cycle start and cycle end are required to filter historical data.')
cycle_start = parse_cycle_date(cycle_start_raw, 'Cycle start')
cycle_end = parse_cycle_date(cycle_end_raw, 'Cycle end')
if cycle_start and cycle_end and cycle_start > cycle_end:
abort(400, description='Cycle start must be before or equal to cycle end.')
try:
saldo_awal = int(saldo_awal_raw or 0)
except ValueError:
abort(400, description='Saldo awal must be a whole number.')
if saldo_awal < 0:
abort(400, description='Saldo awal cannot be negative.')
return {
'cycle_name': cycle_name,
'cycle_start': cycle_start.strftime(DISPLAY_DATE_FORMAT) if cycle_start else '',
'cycle_end': cycle_end.strftime(DISPLAY_DATE_FORMAT) if cycle_end else '',
'saldo_awal': saldo_awal,
}
def ensure_db_writable(db_path):
db_path = Path(db_path)
if not db_path.exists():
abort(404, description=f'Database file not found: {db_path}')
if not os.access(db_path, os.W_OK):
abort(403, description=f'Cannot save jumlah karung because database is not writable: {db_path}')
if not os.access(db_path.parent, os.W_OK):
abort(403, description=f'Cannot save jumlah karung because database directory is not writable: {db_path.parent}')
def update_db_data(db_path, table_name, row_id, counter_value):
"""Update one database row by primary key."""
ensure_db_writable(db_path)
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute(
f"""
UPDATE {table_name}
SET counter_value = ?
WHERE id = ?
""",
(counter_value, row_id)
)
updated_count = cursor.rowcount
except sqlite3.OperationalError as error:
abort(403, description=f'Cannot save jumlah karung to database {db_path}: {error}')
if updated_count == 0:
abort(404, description='No database row was found for the requested edit.')
def update_db_rows(db_path, table_name, row_values):
ensure_db_writable(db_path)
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
for row_id, counter_value in row_values.items():
cursor.execute(
f"""
UPDATE {table_name}
SET counter_value = ?
WHERE id = ?
""",
(counter_value, row_id)
)
if cursor.rowcount == 0:
abort(404, description='No database row was found for one of the requested edits.')
except sqlite3.OperationalError as error:
abort(403, description=f'Cannot save jumlah karung to database {db_path}: {error}')
def get_counter_value_from_form(counter_value_raw):
if not counter_value_raw:
abort(400, description='Jumlah karung is required to edit database data.')
try:
counter_value = int(counter_value_raw)
except ValueError:
abort(400, description='Jumlah karung must be a whole number.')
if counter_value < 0:
abort(400, description='Jumlah karung cannot be negative.')
return counter_value
def get_bulk_counter_values_from_form():
row_values = {}
for field_name, field_value in request.form.items():
if not field_name.startswith('counter_value_'):
continue
row_id_raw = field_name.removeprefix('counter_value_')
try:
row_id = int(row_id_raw)
except ValueError:
abort(400, description='Invalid database row id in edit form.')
row_values[row_id] = get_counter_value_from_form(field_value.strip())
if not row_values:
abort(400, description='No jumlah karung values were submitted.')
return row_values
def get_json_data(json_path):
"""Fetch current data from JSON file"""
try:
with open(json_path, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
@app.route('/database/<db_name>/<int:row_id>/edit', methods=['POST'])
def edit_db_row(db_name, row_id):
db_config = DB_CONFIGS.get(db_name)
if not db_config:
abort(404, description='Unknown database. Use tuang or masuk.')
counter_value = get_counter_value_from_form(request.form.get('counter_value', '').strip())
update_db_data(
db_config['path'],
db_config['table'],
row_id,
counter_value
)
return redirect(url_for('index'))
@app.route('/database/<db_name>/edit', methods=['POST'])
def edit_db_rows(db_name):
db_config = DB_CONFIGS.get(db_name)
if not db_config:
abort(404, description='Unknown database. Use tuang or masuk.')
update_db_rows(
db_config['path'],
db_config['table'],
get_bulk_counter_values_from_form()
)
return redirect(url_for('index'))
@app.route('/settings', methods=['POST'])
def update_dashboard_settings():
settings = get_settings_from_form()
save_dashboard_settings(settings)
return redirect(url_for('index'))
@app.route('/settings/select', methods=['POST'])
def select_dashboard_cycle():
cycle_name = request.form.get('cycle_name', '').strip()
if not cycle_name:
abort(400, description='Cycle name is required to select a cycle.')
select_cycle_by_name(cycle_name)
return redirect(url_for('index'))
@app.route('/')
def index():
dashboard_settings = load_dashboard_settings()
cycle_options = get_cycle_options()
cycle_start = parse_cycle_date(dashboard_settings['cycle_start'], 'Cycle start')
cycle_end = parse_cycle_date(dashboard_settings['cycle_end'], 'Cycle end')
cycle_start_picker_value = format_date_picker_value(dashboard_settings['cycle_start'])
cycle_end_picker_value = format_date_picker_value(dashboard_settings['cycle_end'])
# Get historical data from databases
tuang_db_data = get_db_data(DB_CONFIGS['tuang']['path'], DB_CONFIGS['tuang']['table'])
masuk_db_data = get_db_data(DB_CONFIGS['masuk']['path'], DB_CONFIGS['masuk']['table'])
tuang_db_data = filter_data_by_cycle(tuang_db_data, cycle_start, cycle_end)
masuk_db_data = filter_data_by_cycle(masuk_db_data, cycle_start, cycle_end)
for item in tuang_db_data:
item['display_date'] = format_history_date(item['date'])
# Get current date data from JSON files
tuang_json_data = get_json_data('/etc/frigate-counter/karung-tuang-2/karung_tuang.json')
masuk_json_data = get_json_data('/etc/frigate-counter/karung-masuk/karung_masuk.json')
# Calculate totals from JSON (current date)
tuang_json_total = sum(item.get('karung', 0) for item in tuang_json_data.values())
masuk_json_total = sum(item.get('karung', 0) for item in masuk_json_data.values())
# Calculate totals from database (historical)
tuang_db_total = sum(item['counter_value'] for item in tuang_db_data)
masuk_db_total = sum(
item['counter_value'] if not is_masuk_out_item(item) else -item['counter_value']
for item in masuk_db_data
)
saldo_akhir = dashboard_settings['saldo_awal'] + masuk_db_total - tuang_db_total
current_date = datetime.now().strftime(DISPLAY_DATE_FORMAT)
return render_template('index.html',
dashboard_settings=dashboard_settings,
cycle_options=cycle_options,
cycle_start_picker_value=cycle_start_picker_value,
cycle_end_picker_value=cycle_end_picker_value,
tuang_db_data=tuang_db_data,
masuk_db_data=masuk_db_data,
tuang_json_data=tuang_json_data,
masuk_json_data=masuk_json_data,
tuang_json_total=tuang_json_total,
masuk_json_total=masuk_json_total,
tuang_db_total=tuang_db_total,
masuk_db_total=masuk_db_total,
saldo_akhir=saldo_akhir,
current_date=current_date)
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=8080)