#!/usr/bin/python3 import sys import sqlite3 from pathlib import Path from datetime import datetime, timedelta def list_files_with_extension(directory_path, extension): """ Lists files in a directory with a specific extension using pathlib.Path.glob(). Args: directory_path (str): The path to the directory. extension (str): The desired file extension (e.g., '.txt', '.py'). Returns: list: A list of Path objects for files with the specified extension. """ path_obj = Path(directory_path) file_list = [f.name for f in path_obj.iterdir() if f.is_file() and f.suffix == extension] return file_list def list_files_pathlib(directory_path): """Lists all files in a given directory using the pathlib module.""" files_list = [] path = Path(directory_path) for item in path.iterdir(): if item.is_file(): files_list.append(item.name) # Appends only the file name return files_list def get_start_end_timestamp(data_date: str = None) -> [int, int]: start_ts = end_ts = 0 if data_date: format_string = "%Y-%m-%d" try: end_date = datetime.strptime(data_date, format_string) end_time = end_date.replace(hour=17, minute=0, second=0) start_time = end_time - timedelta(days=1) start_ts = int(start_time.timestamp()) end_ts = int(end_time.timestamp()) except ValueError: print(f"Value {start_date} must be in format '%Y-%m-%d'") sys.exit(1) else: start_time = end_time = datetime.now() if end_time.hour < 17: start_time = end_time - timedelta(days=1) start_time = start_time.replace(hour=17, minute=0, second=0, microsecond=0) start_ts = int(start_time.timestamp()) end_ts = int(end_time.timestamp()) return start_ts, end_ts def get_ts_from_file(file: str) -> int: file_split = file.split("-") ts_int = int(float(file_split[1])) return ts_int def get_files(start_ts: int = 0, end_ts: int = 0, camera_name: str = None) -> list: directory = "/var/lib/docker/volumes/frigate_storage/_data/clips" # Current directory files = list_files_with_extension(directory, ".jpg") files_list = [] for file in files: ts_int = get_ts_from_file(file) if ts_int >= start_ts: if end_ts == 0: if camera_name in file: files_list.append(file) elif ts_int < end_ts: if camera_name in file: files_list.append(file) return files_list def check_duplicate(files: list = []) -> list: DETIK_ANTAR_KARUNG = 30 files_nodup = [] for i in range(len(files)-1): ts_current = get_ts_from_file(files[i]) ts_next = get_ts_from_file(files[i+1]) if ts_next - ts_current > DETIK_ANTAR_KARUNG: files_nodup.append(files[i]) return files_nodup def store_in_db(date_ts: int, camera_name: str, value: int, value_nodup: int): print(str(datetime.now()), f"Get {date_ts} {camera_name} {value} {value_nodup}") DB_FILE = "/etc/frigate/counter.db" conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() # Create DB create_sql = '''CREATE TABLE IF NOT EXISTS counter ( id INTEGER PRIMARY KEY, date_ts INTEGER, camera_name TEXT, value INTEGER, value_nodup INTEGER ) ''' cursor.execute(create_sql) # Start Checkin DB allrows = [] try: cursor.execute("SELECT * FROM counter where date_ts=? and camera_name=?", (date_ts, camera_name)) allrows = cursor.fetchall() except sqlite3.OperationalError: print(str(datetime.now()), f"Error") if len(allrows): # UPDATE print(str(datetime.now()), f"Found record. Update DB") update_sql = "UPDATE counter SET value = ?, value_nodup = ? WHERE date_ts = ? and camera_name = ?" cursor.execute(update_sql, (date_ts, camera_name, value, value_nodup)) else: # INSERT print(str(datetime.now()), f"Insert into DB") insert_sql = "INSERT INTO counter (date_ts, camera_name, value, value_nodup) VALUES (?, ?, ?, ?)" cursor.execute(insert_sql, (date_ts, camera_name, value, value_nodup)) conn.commit() conn.close() end_date = None if len(sys.argv) > 2: camera_name = sys.argv[1] end_date = sys.argv[2] start_ts, end_ts = get_start_end_timestamp(end_date) elif len(sys.argv) > 1: camera_name = sys.argv[1] start_ts, end_ts = get_start_end_timestamp() else: print("================") print("Counter Snapshot") print("================\n") print(f"Usage:") print(f"\tpython3 {sys.argv[0]} [camera_name] [data_date]\n") print(f"Example:") print(f"\tpython3 {sys.argv[0]} kandang_1_karung_pakan") print(f"\tpython3 {sys.argv[0]} kandang_1_karung_pakan 2025-09-23 -> data from 2025-09-22 17:00:00 to 2025-09-23 16:59:59") sys.exit(1) #print(start_ts, end_ts, camera_name) files = get_files(start_ts, end_ts, camera_name) files_nodup = check_duplicate(files) value = len(files) value_nodup = len(files_nodup) #print(value) #print(value_nodup) if end_date: store_in_db(end_ts, camera_name, value, value_nodup) print(str(datetime.now()), f"Store in DB {end_ts}/{camera_name}/{value}/{value_nodup}") #for file in files_nodup: # print(file)