325 lines
10 KiB
Python
325 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Sync counter data from SQLite databases to API.
|
|
|
|
karung_tuang:
|
|
- 17:00-23:59: Syncs CURRENT date
|
|
- 00:00-16:59: Syncs PREVIOUS date (only if not already synced)
|
|
|
|
karung_masuk: Syncs PREVIOUS date from 00:00 onwards, only if data differs
|
|
(skips if already successfully synced for that date).
|
|
"""
|
|
|
|
import os
|
|
import sqlite3
|
|
import json
|
|
import urllib.request
|
|
import urllib.error
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
def get_db_path(db_name):
|
|
"""Get database path in same directory as script."""
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
return os.path.join(script_dir, db_name)
|
|
|
|
|
|
def get_previous_date():
|
|
"""Get yesterday's date in YYYY-MM-DD format."""
|
|
return (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
|
|
|
|
|
|
def get_sync_state_file():
|
|
"""Get path to sync state file."""
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
return os.path.join(script_dir, ".sync_state.json")
|
|
|
|
|
|
def load_sync_state():
|
|
"""Load sync state from file."""
|
|
state_file = get_sync_state_file()
|
|
if os.path.exists(state_file):
|
|
try:
|
|
with open(state_file, "r") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
return {}
|
|
return {}
|
|
|
|
|
|
def save_sync_state(state):
|
|
"""Save sync state to file."""
|
|
state_file = get_sync_state_file()
|
|
try:
|
|
with open(state_file, "w") as f:
|
|
json.dump(state, f)
|
|
except IOError as e:
|
|
print(f"Warning: Could not save sync state: {e}")
|
|
|
|
|
|
def is_already_synced(state, db_name, date):
|
|
"""Check if database was already synced for given date."""
|
|
return state.get(db_name, {}).get("last_synced_date") == date
|
|
|
|
|
|
def mark_synced(state, db_name, date):
|
|
"""Mark database as synced for given date."""
|
|
if db_name not in state:
|
|
state[db_name] = {}
|
|
state[db_name]["last_synced_date"] = date
|
|
|
|
|
|
def get_current_date():
|
|
"""Get today's date in YYYY-MM-DD format."""
|
|
return datetime.now().strftime("%Y-%m-%d")
|
|
|
|
|
|
def get_local_data(db_path, table_name, date):
|
|
"""Get counter data from SQLite database for a specific date."""
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
if table_name == "karung_counts":
|
|
cursor.execute(
|
|
"SELECT camera_name, date, counter_value FROM karung_counts WHERE date = ?",
|
|
(date,),
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"SELECT camera_name, date(date) as d, counter_value FROM counter_data WHERE date(date) = ?",
|
|
(date,),
|
|
)
|
|
|
|
results = cursor.fetchall()
|
|
conn.close()
|
|
|
|
data = {}
|
|
for camera_name, date_val, counter_value in results:
|
|
key = f"{camera_name}_{date_val}"
|
|
data[key] = {
|
|
"camera_name": camera_name,
|
|
"date": date_val,
|
|
"value": counter_value,
|
|
}
|
|
return data
|
|
|
|
|
|
def fetch_api_data(uuid):
|
|
"""Fetch counter data from API."""
|
|
url = f"https://dashboard.cpsp.id/api/cpsp/counter/{uuid}/"
|
|
|
|
req = urllib.request.Request(url, method="GET")
|
|
req.add_header("Accept", "application/json")
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as response:
|
|
result = json.loads(response.read().decode("utf-8"))
|
|
if result.get("success") and "data" in result:
|
|
return result["data"].get("camera_counter", [])
|
|
except urllib.error.HTTPError as e:
|
|
print(f"HTTP Error {e.code}: {e.reason}")
|
|
except urllib.error.URLError as e:
|
|
print(f"URL Error: {e.reason}")
|
|
except json.JSONDecodeError as e:
|
|
print(f"JSON Decode Error: {e}")
|
|
|
|
return []
|
|
|
|
|
|
def update_api(uuid, camera_name, date, value):
|
|
"""Update counter data via API (dummy endpoint)."""
|
|
# Dummy endpoint - replace with actual endpoint when available
|
|
url = f"https://dashboard.cpsp.id/api/cpsp/counter/{uuid}/update/"
|
|
|
|
payload = {"camera_name": camera_name, "date": date, "value": value}
|
|
|
|
data = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(url, data=data, method="POST")
|
|
req.add_header("Content-Type", "application/json")
|
|
req.add_header("Accept", "application/json")
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as response:
|
|
result = json.loads(response.read().decode("utf-8"))
|
|
print(f"Updated {camera_name} for {date}: {value}")
|
|
return result
|
|
except urllib.error.HTTPError as e:
|
|
print(f"HTTP Error {e.code} updating {camera_name}: {e.reason}")
|
|
# For dummy endpoint, just log the intended update
|
|
print(f"[DUMMY] Would update {camera_name} for {date} with value {value}")
|
|
return None
|
|
except urllib.error.URLError as e:
|
|
print(f"URL Error: {e.reason}")
|
|
print(f"[DUMMY] Would update {camera_name} for {date} with value {value}")
|
|
return None
|
|
|
|
|
|
def sync_database(db_config, date, uuid, state=None, skip_if_synced=False):
|
|
"""Sync a single database with API.
|
|
|
|
Args:
|
|
db_config: Database configuration dict
|
|
date: Date string to sync
|
|
uuid: API UUID
|
|
state: Optional sync state dict
|
|
skip_if_synced: If True, skip sync if already marked as synced for this date
|
|
"""
|
|
db_name = db_config.get("name", "unknown")
|
|
|
|
# Check if already synced (for karung_masuk to avoid redundant syncs)
|
|
if skip_if_synced and state and is_already_synced(state, db_name, date):
|
|
print(f"Skipping {db_name} - already synced for {date}")
|
|
return True
|
|
|
|
db_path = get_db_path(db_config["db_file"])
|
|
|
|
if not os.path.exists(db_path):
|
|
print(f"Database not found: {db_path}")
|
|
return False
|
|
|
|
# Get local data
|
|
local_data = get_local_data(db_path, db_config["table"], date)
|
|
|
|
if not local_data:
|
|
print(f"No local data found for {date} in {db_config['db_file']}")
|
|
return False
|
|
|
|
# Get API data
|
|
api_data = fetch_api_data(uuid)
|
|
|
|
# Build API data map
|
|
api_map = {}
|
|
for item in api_data:
|
|
key = f"{item['camera_name']}_{item['date']}"
|
|
api_map[key] = item["value"]
|
|
|
|
has_mismatch = False
|
|
|
|
# Compare and sync
|
|
for key, local_record in local_data.items():
|
|
camera_name = local_record["camera_name"]
|
|
local_value = local_record["value"]
|
|
|
|
if key not in api_map:
|
|
print(f"New record: {camera_name} for {date} = {local_value}")
|
|
update_api(uuid, camera_name, date, local_value)
|
|
has_mismatch = True
|
|
elif api_map[key] != local_value:
|
|
print(
|
|
f"Mismatch for {camera_name} on {date}: API={api_map[key]}, Local={local_value}"
|
|
)
|
|
update_api(uuid, camera_name, date, local_value)
|
|
has_mismatch = True
|
|
else:
|
|
print(f"Synced: {camera_name} for {date} = {local_value}")
|
|
|
|
# Mark as synced if no mismatches found (all data matches)
|
|
if not has_mismatch and state:
|
|
mark_synced(state, db_name, date)
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
"""Main sync function.
|
|
|
|
karung_tuang:
|
|
- 17:00-23:59: Sync CURRENT date
|
|
- 00:00-16:59: Sync PREVIOUS date (only if not already synced)
|
|
|
|
karung_masuk: Sync PREVIOUS date from 00:00 onwards, only if data differs.
|
|
"""
|
|
uuid = os.environ.get("COUNTER_UUID")
|
|
if not uuid:
|
|
print("Error: COUNTER_UUID environment variable not set")
|
|
return
|
|
|
|
now = datetime.now()
|
|
hour = now.hour
|
|
|
|
# Load sync state to track which dates have been synced
|
|
sync_state = load_sync_state()
|
|
|
|
# Database configurations
|
|
db_configs = {
|
|
"karung_tuang": {
|
|
"name": "karung_tuang",
|
|
"db_file": "karung_tuang.db",
|
|
"table": "counter_data",
|
|
# 17:00-23:59: sync current date
|
|
# 00:00-16:59: sync previous date (if not already synced)
|
|
"evening_start": 17,
|
|
"use_current_date_evening": True,
|
|
"use_current_date_morning": False,
|
|
"skip_if_synced_morning": True,
|
|
},
|
|
"karung_masuk": {
|
|
"name": "karung_masuk",
|
|
"db_file": "karung_masuk.db",
|
|
"table": "karung_counts",
|
|
"start_hour": 0, # Sync starts at 00:00
|
|
"use_current_date": False, # Sync previous date
|
|
"skip_if_synced": True, # Skip if already synced for this date
|
|
},
|
|
}
|
|
|
|
# Get dates
|
|
prev_date = get_previous_date()
|
|
curr_date = get_current_date()
|
|
|
|
print(f"Current hour: {hour}:00")
|
|
print(f"Previous date: {prev_date}")
|
|
print(f"Current date: {curr_date}")
|
|
|
|
# Sync databases based on current hour
|
|
for name, config in db_configs.items():
|
|
# Handle karung_tuang with morning/evening windows
|
|
if "evening_start" in config:
|
|
evening_start = config["evening_start"]
|
|
|
|
if hour >= evening_start:
|
|
# Evening window (17:00-23:59): sync current date
|
|
sync_date = curr_date
|
|
skip_if_synced = False
|
|
print(f"\nSyncing {name} (evening window {evening_start:02d}:00-23:59) for date {sync_date}...")
|
|
else:
|
|
# Morning window (00:00-16:59): sync previous date, skip if already synced
|
|
sync_date = prev_date
|
|
skip_if_synced = config.get("skip_if_synced_morning", True)
|
|
print(f"\nSyncing {name} (morning window 00:00-{evening_start:02d}:00) for date {sync_date}...")
|
|
|
|
sync_database(
|
|
config,
|
|
sync_date,
|
|
uuid,
|
|
state=sync_state,
|
|
skip_if_synced=skip_if_synced,
|
|
)
|
|
else:
|
|
# Standard behavior for other databases (karung_masuk)
|
|
start_hour = config["start_hour"]
|
|
end_hour = config.get("end_hour", 24)
|
|
|
|
if hour >= start_hour and hour < end_hour:
|
|
sync_date = curr_date if config.get("use_current_date", False) else prev_date
|
|
|
|
print(f"\nSyncing {name} for date {sync_date}...")
|
|
sync_database(
|
|
config,
|
|
sync_date,
|
|
uuid,
|
|
state=sync_state,
|
|
skip_if_synced=config.get("skip_if_synced", False),
|
|
)
|
|
else:
|
|
print(f"\nSkipping {name} - starts at {start_hour:02d}:00")
|
|
|
|
# Save sync state
|
|
save_sync_state(sync_state)
|
|
|
|
print("\nSync complete.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|