Files
karung-tuang_frigate-counter/counter_snapshot.py
2026-04-08 12:27:26 +07:00

178 lines
5.3 KiB
Python
Executable File

#!/usr/bin/python3
import sys
import sqlite3
from pathlib import Path
from datetime import datetime, timedelta
def list_files_with_extension(directory_path, extension):
"""
Lists files in a directory with a specific extension using pathlib.Path.glob().
Args:
directory_path (str): The path to the directory.
extension (str): The desired file extension (e.g., '.txt', '.py').
Returns:
list: A list of Path objects for files with the specified extension.
"""
path_obj = Path(directory_path)
file_list = [f.name for f in path_obj.iterdir() if f.is_file() and f.suffix == extension]
return file_list
def list_files_pathlib(directory_path):
"""Lists all files in a given directory using the pathlib module."""
files_list = []
path = Path(directory_path)
for item in path.iterdir():
if item.is_file():
files_list.append(item.name) # Appends only the file name
return files_list
def get_start_end_timestamp(data_date: str = None) -> [int, int]:
start_ts = end_ts = 0
if data_date:
format_string = "%Y-%m-%d"
try:
end_date = datetime.strptime(data_date, format_string)
end_time = end_date.replace(hour=17, minute=0, second=0)
start_time = end_time - timedelta(days=1)
start_ts = int(start_time.timestamp())
end_ts = int(end_time.timestamp())
except ValueError:
print(f"Value {start_date} must be in format '%Y-%m-%d'")
sys.exit(1)
else:
start_time = end_time = datetime.now()
if end_time.hour < 17:
start_time = end_time - timedelta(days=1)
start_time = start_time.replace(hour=17, minute=0, second=0, microsecond=0)
start_ts = int(start_time.timestamp())
end_ts = int(end_time.timestamp())
return start_ts, end_ts
def get_ts_from_file(file: str) -> int:
file_split = file.split("-")
ts_int = int(float(file_split[1]))
return ts_int
def get_files(start_ts: int = 0, end_ts: int = 0, camera_name: str = None) -> list:
directory = "/var/lib/docker/volumes/frigate_storage/_data/clips" # Current directory
files = list_files_with_extension(directory, ".jpg")
files_list = []
for file in files:
ts_int = get_ts_from_file(file)
if ts_int >= start_ts:
if end_ts == 0:
if camera_name in file:
files_list.append(file)
elif ts_int < end_ts:
if camera_name in file:
files_list.append(file)
return files_list
def check_duplicate(files: list = []) -> list:
DETIK_ANTAR_KARUNG = 30
files_nodup = []
for i in range(len(files)-1):
ts_current = get_ts_from_file(files[i])
ts_next = get_ts_from_file(files[i+1])
if ts_next - ts_current > DETIK_ANTAR_KARUNG:
files_nodup.append(files[i])
return files_nodup
def store_in_db(date_ts: int, camera_name: str, value: int, value_nodup: int):
print(str(datetime.now()), f"Get {date_ts} {camera_name} {value} {value_nodup}")
DB_FILE = "/etc/frigate/counter.db"
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
# Create DB
create_sql = '''CREATE TABLE IF NOT EXISTS counter (
id INTEGER PRIMARY KEY,
date_ts INTEGER,
camera_name TEXT,
value INTEGER,
value_nodup INTEGER
)
'''
cursor.execute(create_sql)
# Start Checkin DB
allrows = []
try:
cursor.execute("SELECT * FROM counter where date_ts=? and camera_name=?", (date_ts, camera_name))
allrows = cursor.fetchall()
except sqlite3.OperationalError:
print(str(datetime.now()), f"Error")
if len(allrows):
# UPDATE
print(str(datetime.now()), f"Found record. Update DB")
update_sql = "UPDATE counter SET value = ?, value_nodup = ? WHERE date_ts = ? and camera_name = ?"
cursor.execute(update_sql, (date_ts, camera_name, value, value_nodup))
else:
# INSERT
print(str(datetime.now()), f"Insert into DB")
insert_sql = "INSERT INTO counter (date_ts, camera_name, value, value_nodup) VALUES (?, ?, ?, ?)"
cursor.execute(insert_sql, (date_ts, camera_name, value, value_nodup))
conn.commit()
conn.close()
end_date = None
if len(sys.argv) > 2:
camera_name = sys.argv[1]
end_date = sys.argv[2]
start_ts, end_ts = get_start_end_timestamp(end_date)
elif len(sys.argv) > 1:
camera_name = sys.argv[1]
start_ts, end_ts = get_start_end_timestamp()
else:
print("================")
print("Counter Snapshot")
print("================\n")
print(f"Usage:")
print(f"\tpython3 {sys.argv[0]} [camera_name] [data_date]\n")
print(f"Example:")
print(f"\tpython3 {sys.argv[0]} kandang_1_karung_pakan")
print(f"\tpython3 {sys.argv[0]} kandang_1_karung_pakan 2025-09-23 -> data from 2025-09-22 17:00:00 to 2025-09-23 16:59:59")
sys.exit(1)
#print(start_ts, end_ts, camera_name)
files = get_files(start_ts, end_ts, camera_name)
files_nodup = check_duplicate(files)
value = len(files)
value_nodup = len(files_nodup)
#print(value)
#print(value_nodup)
if end_date:
store_in_db(end_ts, camera_name, value, value_nodup)
print(str(datetime.now()), f"Store in DB {end_ts}/{camera_name}/{value}/{value_nodup}")
#for file in files_nodup:
# print(file)