First Commit

This commit is contained in:
2025-12-24 14:11:28 +07:00
parent 6545359337
commit e4a0134e78
3 changed files with 337 additions and 0 deletions

163
get_host.py Normal file
View File

@@ -0,0 +1,163 @@
import sys
import logging
from zabbix_utils import ZabbixAPI
from datetime import datetime, timedelta
file_handler = logging.FileHandler(filename="tmp.log")
stdout_handler = logging.StreamHandler(stream=sys.stdout)
handlers = [stdout_handler]
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s",
handlers=handlers,
)
logger = logging.getLogger(__name__)
ZABBIX_URL = "https://nm.vn.backone.cloud/api_jsonrpc.php"
API_TOKEN = "dac06ab72f7c64392ff23adc021a61fd27397b1790406d97bd883c6d4ae7b628"
TRIGGER_STRING = "Zabbix agent is not available"
DAY_INACTIVE = 90
# Initialize the ZabbixAPI object and log in using the token
try:
api = ZabbixAPI(url=ZABBIX_URL)
# The login method accepts the 'token' parameter for token-based authentication
api.login(token=API_TOKEN)
logging.info("Authentication successful!")
except Exception as e:
logging.error(f"Authentication failed or an error occurred: {e}")
sys.exit(1)
def get_enabled_hosts_with_triggers():
"""Retrieves all enabled hosts that have active triggers."""
try:
# Use host.get to retrieve hosts that have enabled triggers
# `with_monitored_triggers` flag returns only hosts that have enabled triggers
hosts = api.host.get(
output=["hostid", "name"],
hostids=["10084", "10747", "10720", "10949", "11287", "10715", "10714"],
# filter={"status": 0}, # Filter for enabled hosts (0: enabled; 1: disabled)
# selectTriggers=["triggerid", "description", "status"],
# with_monitored_triggers=True,
)
return hosts
except Exception as e:
logging.error(f"Error fetching hosts: {e}")
return []
def get_specific_trigger_alerts(triggerids):
"""Retrieves current alerts (problems) for a list of trigger IDs."""
if not triggerids:
return []
try:
# Use alert.get to retrieve problem alerts generated by the specified triggers
alerts = api.alert.get(
output="extend",
triggerids=triggerids,
# Fetch only problem alerts, not all messages
# This is complex with alert.get, it's easier to check the 'status' of an event in the problems screen.
# To get *current problems* via API, it's better to use the problem.get method.
# For simplicity using alert.get:
# We can filter for alerts generated after a certain time, but it's not a real 'current problems' status.
# Let's use problem.get as it is more direct for "current alerts"
)
return alerts
except Exception as e:
logging.error(f"Error fetching alerts: {e}")
return []
def get_current_problems_by_triggers(triggerids):
"""Retrieves current open problems associated with a list of trigger IDs using problem.get."""
if not triggerids:
return []
try:
problems = api.problem.get(
output=["objectid", "eventid", "name", "severity", "clock"],
objectids=triggerids,
# Show only unresolved problems
acknowledged=False, # Optional: filter only unacknowledged problems
recent=True, # Show most recent problems
# sortfield=["clock"],
# sortorder="DESC",
)
return problems
except Exception as e:
logging.error(f"Error fetching problems: {e}")
return []
def disable_hosts(hostids):
if not hostids:
return False
for hostid in hostids:
api.host.update(hostid=hostid, status=1)
logging.info(f"(Host with ID: {hostid}) has been disabled.")
return True
# Main execution
if __name__ == "__main__":
logging.info(f"Get all Hosts with Trigger contains {TRIGGER_STRING}")
enabled_hosts = get_enabled_hosts_with_triggers()
logging.info(enabled_hosts)
sys.exit(0)
if enabled_hosts:
logging.info(f"Found {len(enabled_hosts)} enabled hosts with triggers:")
all_trigger_ids = []
trigger_to_host = {}
for host in enabled_hosts:
logging.debug(f"- Host: {host['name']} (ID: {host['hostid']})")
for trigger in host.get("triggers", []):
# print(
# f" Trigger: {trigger['description']} (ID: {trigger['triggerid']}, Status: {'Enabled' if trigger['status'] == '0' else 'Disabled'})"
# )
if (
trigger["status"] == "0"
and TRIGGER_STRING in trigger["description"]
): # Collect only enabled trigger IDs
all_trigger_ids.append(trigger["triggerid"])
# hosts_objects["id"] = host["hostid"]
trigger_to_host[trigger["triggerid"]] = host["hostid"]
# print(hosts_objects)
hosts_with_problem = []
if all_trigger_ids:
# if hosts_objects:
logger.info(
f"Fetching current open problems for {len(all_trigger_ids)} enabled triggers..."
)
# current_problems = get_current_problems_by_triggers_new(hosts_objects)
current_problems = get_current_problems_by_triggers(all_trigger_ids)
logging.info(f"Current Open Problems ({len(current_problems)})")
current_time = datetime.now()
time_threshold = current_time - timedelta(DAY_INACTIVE)
timestamp_threshold = int(time_threshold.timestamp())
logging.info(f"Time Threshold: {timestamp_threshold}")
for problem in current_problems:
logger.debug(
f"* {trigger_to_host[problem["objectid"]]} PROBLEM: {problem['name']} (Severity: {problem['severity']}, Time: {problem['clock']})"
)
if int(problem["clock"]) < timestamp_threshold:
hosts_with_problem.append(trigger_to_host[problem["objectid"]])
logging.debug(f"Host with Problem: {hosts_with_problem}")
logging.info(f"Found {len(hosts_with_problem)} hosts with problem")
disable_hosts(hosts_with_problem)
else:
logger.info("\nNo enabled triggers found to check for problems.")
else:
logger.info("No enabled hosts with triggers found.")

173
main.py Normal file
View File

@@ -0,0 +1,173 @@
import os
import sys
import logging
from zabbix_utils import ZabbixAPI
from datetime import datetime, timedelta
file_handler = logging.FileHandler(filename="tmp.log")
stdout_handler = logging.StreamHandler(stream=sys.stdout)
handlers = [stdout_handler]
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s",
handlers=handlers,
)
logger = logging.getLogger(__name__)
ZABBIX_URL = os.getenv("ZABBIX_URL", "https://nm.vn.backone.cloud/api_jsonrpc.php")
API_TOKEN = os.getenv(
"API_TOKEN", "dac06ab72f7c64392ff23adc021a61fd27397b1790406d97bd883c6d4ae7b628"
)
TRIGGER_STRING = os.getenv("TRIGGER_STRING", "Zabbix agent is not available")
TRIGGER_DAY = int(os.getenv("TRIGGER_DAY", 90))
# Initialize the ZabbixAPI object and log in using the token
try:
api = ZabbixAPI(url=ZABBIX_URL)
# The login method accepts the 'token' parameter for token-based authentication
api.login(token=API_TOKEN)
logging.info("Authentication successful!")
except Exception as e:
logging.error(f"Authentication failed or an error occurred: {e}")
def get_enabled_hosts_with_triggers():
"""Retrieves all enabled hosts that have active triggers."""
try:
# Use host.get to retrieve hosts that have enabled triggers
# `with_monitored_triggers` flag returns only hosts that have enabled triggers
hosts = api.host.get(
output=["hostid", "name"],
filter={"status": 0}, # Filter for enabled hosts (0: enabled; 1: disabled)
selectTriggers=["triggerid", "description", "status"],
with_monitored_triggers=True,
)
return hosts
except Exception as e:
logging.error(f"Error fetching hosts: {e}")
return []
def get_specific_trigger_alerts(triggerids):
"""Retrieves current alerts (problems) for a list of trigger IDs."""
if not triggerids:
return []
try:
# Use alert.get to retrieve problem alerts generated by the specified triggers
alerts = api.alert.get(
output="extend",
triggerids=triggerids,
# Fetch only problem alerts, not all messages
# This is complex with alert.get, it's easier to check the 'status' of an event in the problems screen.
# To get *current problems* via API, it's better to use the problem.get method.
# For simplicity using alert.get:
# We can filter for alerts generated after a certain time, but it's not a real 'current problems' status.
# Let's use problem.get as it is more direct for "current alerts"
)
return alerts
except Exception as e:
logging.error(f"Error fetching alerts: {e}")
return []
def get_current_problems_by_triggers(triggerids):
"""Retrieves current open problems associated with a list of trigger IDs using problem.get."""
if not triggerids:
return []
try:
problems = api.problem.get(
output=["objectid", "eventid", "name", "severity", "clock"],
objectids=triggerids,
# Show only unresolved problems
acknowledged=False, # Optional: filter only unacknowledged problems
recent=True, # Show most recent problems
# sortfield=["clock"],
# sortorder="DESC",
)
return problems
except Exception as e:
logging.error(f"Error fetching problems: {e}")
return []
def disable_hosts(hosts_list):
if not hosts_list:
return False
for host in hosts_list:
api.host.update(hostid=host["hostid"], status=1)
logging.info(f"(Host: {host["name"]}, ID: {host["hostid"]}) has been disabled.")
return True
# Main execution
if __name__ == "__main__":
logging.info(
f'Get all Hosts with Trigger contains "{TRIGGER_STRING}" for {TRIGGER_DAY} days'
)
enabled_hosts = get_enabled_hosts_with_triggers()
if enabled_hosts:
logging.info(f"Found {len(enabled_hosts)} enabled hosts with triggers:")
all_trigger_ids = []
trigger_to_host = {}
for host in enabled_hosts:
logging.debug(f"- Host: {host['name']} (ID: {host['hostid']})")
for trigger in host.get("triggers", []):
# print(
# f" Trigger: {trigger['description']} (ID: {trigger['triggerid']}, Status: {'Enabled' if trigger['status'] == '0' else 'Disabled'})"
# )
if (
trigger["status"] == "0"
and TRIGGER_STRING in trigger["description"]
): # Collect only enabled trigger IDs
all_trigger_ids.append(trigger["triggerid"])
# hosts_objects["id"] = host["hostid"]
trigger_to_host[trigger["triggerid"]] = {
"hostid": host["hostid"],
"name": host["name"],
}
# trigger_to_host[trigger["triggerid"]]["hostid"] = host["hostid"]
# trigger_to_host[trigger["triggerid"]]["name"] = host["name"]
# print(hosts_objects)
hosts_with_problem = []
if all_trigger_ids:
# if hosts_objects:
logger.info(
f"Fetching current open problems for {len(all_trigger_ids)} enabled triggers..."
)
# current_problems = get_current_problems_by_triggers_new(hosts_objects)
current_problems = get_current_problems_by_triggers(all_trigger_ids)
logging.info(f"Current Open Problems ({len(current_problems)})")
current_time = datetime.now()
time_threshold = current_time - timedelta(TRIGGER_DAY)
timestamp_threshold = int(time_threshold.timestamp())
logging.info(f"Time Threshold: {timestamp_threshold}")
for problem in current_problems:
logger.debug(
f"* {trigger_to_host[problem["objectid"]]['name']} PROBLEM: {problem['name']} (Severity: {problem['severity']}, Time: {problem['clock']})"
)
if int(problem["clock"]) < timestamp_threshold:
hosts_with_problem.append(
trigger_to_host[problem["objectid"]]
# trigger_to_host[problem["objectid"]]["hostid"]
)
logging.debug(f"Host with Problem: {hosts_with_problem}")
logging.info(f"Found {len(hosts_with_problem)} hosts with problem")
# print(hosts_with_problem)
disable_hosts(hosts_with_problem)
else:
logger.info("\nNo enabled triggers found to check for problems.")
else:
logger.info("No enabled hosts with triggers found.")

1
requirements.txt Normal file
View File

@@ -0,0 +1 @@
zabbix-utils==2.0.4