Archives
Trending
Support
Login
clear text
XML
Django
JavaScript
MATLAB
C
C++
Python
SQL
Shell
Markdown
YAML
JSON
CSS
PHP
Java
Ruby
Go
Rust
Swift
Kotlin
TypeScript
Perl
Lua
R
Scala
Haskell
Groovy
Dart
Clojure
VB.NET
Objective-C
PowerShell
Bash
CoffeeScript
Verilog
import re import os import requests import time from datetime import datetime, date from collections import defaultdict import configparser import logging import schedule import threading import sys # Налаштування логів LOG_FILE_PATH = '/opt/olt_monitor/olt_monitor.log' LAN_EVENTS_LOG = '/opt/olt_monitor/lan_events.log' SENT_MESSAGES_FILE = '/opt/olt_monitor/sent_messages.txt' LAST_RESET_FILE = '/opt/olt_monitor/last_log_reset.txt' LAST_ACTIVATION_FILE = '/opt/olt_monitor/last_activation.txt' # Створення lan_events.log при старті, якщо не існує if not os.path.exists(LAN_EVENTS_LOG): try: with open(LAN_EVENTS_LOG, 'w', encoding='utf-8') as f: f.write('') logging.info(f"Створено файл {LAN_EVENTS_LOG}") except Exception as e: logging.error(f"Помилка створення {LAN_EVENTS_LOG}: {e}") raise # Налаштування логгера для lan_events.log lan_logger = logging.getLogger('lan_events') lan_handler = logging.FileHandler(LAN_EVENTS_LOG, encoding='utf-8') lan_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')) lan_logger.addHandler(lan_handler) lan_logger.setLevel(logging.INFO) # Змінна для відстеження часу попереджень last_size_warning = {} # Налаштування логування в консоль і olt_monitor.log logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[logging.StreamHandler(), logging.FileHandler(LOG_FILE_PATH, encoding='utf-8')] ) # Завантаження конфігурації config = configparser.ConfigParser() try: config.read('/opt/olt_monitor/config.ini') except Exception as e: logging.error(f"Помилка читання config.ini: {e}") raise # Конфігурація try: TELEGRAM_BOT_TOKEN = config.get('Settings', 'TelegramBotToken') TELEGRAM_CHAT_IDS = config.get('Settings', 'TelegramChatIDs').split(',') REPORT_TIME = config.get('Settings', 'ReportTime', fallback='06:00') LOG_FILE = config.get('Settings', 'LogFile', fallback='/var/log/zte.log') LAST_POSITION_FILE = config.get('Settings', 'LastPositionFile', fallback='/opt/olt_monitor/last_position.txt') CHECK_INTERVAL = config.getint('Settings', 'CheckInterval', fallback=5) except Exception as e: logging.error(f"Помилка парсингу конфігурації: {e}") raise # Завантажуємо відправлені повідомлення def load_sent_messages(): if not os.path.exists(SENT_MESSAGES_FILE): return set() try: with open(SENT_MESSAGES_FILE, 'r', encoding='utf-8') as f: return set(line.strip() for line in f if line.strip()) except Exception as e: logging.error(f"Помилка читання файлу {SENT_MESSAGES_FILE}: {e}") return set() # Зберігаємо нове повідомлення def save_sent_message(msg, sent_messages): try: with open(SENT_MESSAGES_FILE, 'a', encoding='utf-8') as f: f.write(msg + '\n') sent_messages.add(msg) except Exception as e: logging.error(f"Помилка збереження повідомлення в {SENT_MESSAGES_FILE}: {e}") # Перевірка часу останнього повідомлення про активацію def can_send_activation(): if not os.path.exists(LAST_ACTIVATION_FILE): return True try: with open(LAST_ACTIVATION_FILE, 'r') as f: last_time = float(f.read().strip()) return (time.time() - last_time) > 300 # 5 хвилин except Exception as e: logging.error(f"Помилка читання {LAST_ACTIVATION_FILE}: {e}") return True # Зберігаємо час активації def save_activation_time(): try: with open(LAST_ACTIVATION_FILE, 'w') as f: f.write(str(time.time())) except Exception as e: logging.error(f"Помилка збереження {LAST_ACTIVATION_FILE}: {e}") # Відправка повідомлення в Telegram def send_telegram(msg, sent_messages): if msg in sent_messages: logging.warning(f"Повідомлення вже відправлено, пропущено: {msg}") return url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" for chat_id in TELEGRAM_CHAT_IDS: try: response = requests.post(url, data={"chat_id": chat_id.strip(), "text": msg}, timeout=10) response.raise_for_status() logging.info(f"Повідомлення успішно відправлено в Telegram (chat_id: {chat_id}): {msg}") save_sent_message(msg, sent_messages) except requests.RequestException as e: error_msg = f"Помилка відправки в Telegram для chat_id {chat_id}: {e}, response: {response.text if 'response' in locals() else 'немає відповіді'}" logging.error(error_msg) critical_msg = f"⚠️ Критична помилка в olt_monitor.py:\n{error_msg}" if critical_msg not in sent_messages: try: for critical_chat_id in TELEGRAM_CHAT_IDS: requests.post(url, data={"chat_id": critical_chat_id.strip(), "text": critical_msg}, timeout=10) save_sent_message(critical_msg, sent_messages) except Exception as critical_e: logging.error(f"Не вдалося відправити критичну помилку в Telegram: {critical_e}") # Відправка звіту про події LAN у Telegram (з очищенням lan_events.log) def send_lan_summary_to_telegram(sent_messages): logging.info("Початок створення звіту про події LAN") summary = defaultdict(lambda: {'los': 0, 'restore': 0}) today = str(date.today()) try: if not os.path.exists(LAN_EVENTS_LOG): logging.info(f"Файл {LAN_EVENTS_LOG} не існує") else: with open(LAN_EVENTS_LOG, 'r', encoding='utf-8') as f: lines_processed = 0 current_entry = [] for line in f: line = line.strip() if not line: continue # Начало новой записи определяется по временной метке (YYYY-MM-DD HH:MM:SS, с опциональными миллисекундами) if re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:,\d+)?", line): if current_entry: # Обрабатываем предыдущую запись lines_processed += 1 entry_text = '\n'.join(current_entry) logging.debug(f"Обробка запису у {LAN_EVENTS_LOG}: {entry_text}") # Извлекаем порт, OLT и номер ONU port_match = re.search(r"Порт: (gpon-onu_\d+/\d+/\d+(?::\d+)?)", entry_text) olt_match = re.search(r"OLT: (\S+) \((\d+\.\d+\.\d+\.\d+)\)", entry_text) onu_match = re.search(r"ONU №(\d+)", entry_text) if port_match and olt_match and onu_match: port = port_match.group(1) olt_name = olt_match.group(1) olt_ip = olt_match.group(2) onu_number = onu_match.group(1) key = (port, olt_name, olt_ip, onu_number) if "LAN LOS знайдено" in entry_text: summary[key]['los'] += 1 logging.debug(f"Знайдено LAN LOS для {key} у {LAN_EVENTS_LOG}") elif "LAN LOS Restore знайдено" in entry_text: summary[key]['restore'] += 1 logging.debug(f"Знайдено LAN LOS Restore для {key} у {LAN_EVENTS_LOG}") else: logging.debug(f"Запис не відповідає формату port/olt/onu: {entry_text}") current_entry = [] current_entry.append(line) else: current_entry.append(line) # Обработка последней записи if current_entry: lines_processed += 1 entry_text = '\n'.join(current_entry) logging.debug(f"Обробка останнього запису у {LAN_EVENTS_LOG}: {entry_text}") port_match = re.search(r"Порт: (gpon-onu_\d+/\d+/\d+(?::\d+)?)", entry_text) olt_match = re.search(r"OLT: (\S+) \((\d+\.\d+\.\d+\.\d+)\)", entry_text) onu_match = re.search(r"ONU №(\d+)", entry_text) if port_match and olt_match and onu_match: port = port_match.group(1) olt_name = olt_match.group(1) olt_ip = olt_match.group(2) onu_number = onu_match.group(1) key = (port, olt_name, olt_ip, onu_number) if "LAN LOS знайдено" in entry_text: summary[key]['los'] += 1 logging.debug(f"Знайдено LAN LOS для {key} у {LAN_EVENTS_LOG}") elif "LAN LOS Restore знайдено" in entry_text: summary[key]['restore'] += 1 logging.debug(f"Знайдено LAN LOS Restore для {key} у {LAN_EVENTS_LOG}") else: logging.debug(f"Останній запис не відповідає формату: {entry_text}") logging.info(f"Оброблено {lines_processed} записів у {LAN_EVENTS_LOG}") # Формирование отчета if summary: msg = f"📊 Звіт подій LAN за {today}:\n\n" for (port, olt_name, olt_ip, onu_number), counts in sorted(summary.items()): total = counts['los'] + counts['restore'] msg += ( f"🔢 ONU №{onu_number}\n" f"📍 Порт: {port}\n" f"🖥 OLT: {olt_name} ({olt_ip})\n" f"🔴 Втрата LAN: {counts['los']}\n" f"🟢 LAN відновлено: {counts['restore']}\n" f"🔢 Загалом: {total}\n\n" ) else: msg = f"📊 Звіт подій LAN за {today}:\n\n⚠️ Подій LAN LOS або Restore не знайдено" send_telegram(msg.strip(), sent_messages) logging.info("Звіт про події LAN відправлено в Telegram") # Очищаем lan_events.log после отправки try: with open(LAN_EVENTS_LOG, 'w', encoding='utf-8') as f: f.write('') logging.info(f"Файл {LAN_EVENTS_LOG} очищено після відправки звіту") except Exception as e: logging.error(f"Помилка очищення {LAN_EVENTS_LOG}: {e}") except Exception as e: logging.error(f"Помилка створення звіту LAN: {e}") # Ручна перевірка звіту без очищення lan_events.log def manual_lan_summary(): sent_messages = load_sent_messages() logging.info("Початок ручної перевірки звіту про події LAN") summary = defaultdict(lambda: {'los': 0, 'restore': 0}) today = str(date.today()) try: if not os.path.exists(LAN_EVENTS_LOG): logging.info(f"Файл {LAN_EVENTS_LOG} не існує") else: with open(LAN_EVENTS_LOG, 'r', encoding='utf-8') as f: lines_processed = 0 current_entry = [] for line in f: line = line.strip() if not line: continue # Начало новой записи определяется по временной метке if re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:,\d+)?", line): if current_entry: # Обрабатываем предыдущую запись lines_processed += 1 entry_text = '\n'.join(current_entry) logging.debug(f"Обробка запису у {LAN_EVENTS_LOG}: {entry_text}") port_match = re.search(r"Порт: (gpon-onu_\d+/\d+/\d+(?::\d+)?)", entry_text) olt_match = re.search(r"OLT: (\S+) \((\d+\.\d+\.\d+\.\d+)\)", entry_text) onu_match = re.search(r"ONU №(\d+)", entry_text) if port_match and olt_match and onu_match: port = port_match.group(1) olt_name = olt_match.group(1) olt_ip = olt_match.group(2) onu_number = onu_match.group(1) key = (port, olt_name, olt_ip, onu_number) if "LAN LOS знайдено" in entry_text: summary[key]['los'] += 1 logging.debug(f"Знайдено LAN LOS для {key} у {LAN_EVENTS_LOG}") elif "LAN LOS Restore знайдено" in entry_text: summary[key]['restore'] += 1 logging.debug(f"Знайдено LAN LOS Restore для {key} у {LAN_EVENTS_LOG}") else: logging.debug(f"Запис не відповідає формату port/olt/onu: {entry_text}") current_entry = [] current_entry.append(line) else: current_entry.append(line) # Обработка последней записи if current_entry: lines_processed += 1 entry_text = '\n'.join(current_entry) logging.debug(f"Обробка останнього запису у {LAN_EVENTS_LOG}: {entry_text}") port_match = re.search(r"Порт: (gpon-onu_\d+/\d+/\d+(?::\d+)?)", entry_text) olt_match = re.search(r"OLT: (\S+) \((\d+\.\d+\.\d+\.\d+)\)", entry_text) onu_match = re.search(r"ONU №(\d+)", entry_text) if port_match and olt_match and onu_match: port = port_match.group(1) olt_name = olt_match.group(1) olt_ip = olt_match.group(2) onu_number = onu_match.group(1) key = (port, olt_name, olt_ip, onu_number) if "LAN LOS знайдено" in entry_text: summary[key]['los'] += 1 logging.debug(f"Знайдено LAN LOS для {key} у {LAN_EVENTS_LOG}") elif "LAN LOS Restore знайдено" in entry_text: summary[key]['restore'] += 1 logging.debug(f"Знайдено LAN LOS Restore для {key} у {LAN_EVENTS_LOG}") else: logging.debug(f"Останній запис не відповідає формату: {entry_text}") logging.info(f"Оброблено {lines_processed} записів у {LAN_EVENTS_LOG}") # Формирование отчета if summary: msg = f"📊 Ручний звіт подій LAN за {today}:\n\n" for (port, olt_name, olt_ip, onu_number), counts in sorted(summary.items()): total = counts['los'] + counts['restore'] msg += ( f"🔢 ONU №{onu_number}\n" f"📍 Порт: {port}\n" f"🖥 OLT: {olt_name} ({olt_ip})\n" f"🔴 Втрата LAN: {counts['los']}\n" f"🟢 LAN відновлено: {counts['restore']}\n" f"🔢 Загалом: {total}\n\n" ) else: msg = f"📊 Ручний звіт подій LAN за {today}:\n\n⚠️ Подій LAN LOS або Restore не знайдено" send_telegram(msg.strip(), sent_messages) logging.info("Ручний звіт про події LAN відправлено в Telegram") except Exception as e: logging.error(f"Помилка створення ручного звіту LAN: {e}") # Налаштування логування та перезапису файлів def setup_logging_and_messages(): try: logging.info("Початок налаштування логування") last_reset_date = None if os.path.exists(LAST_RESET_FILE): with open(LAST_RESET_FILE, 'r') as f: last_reset_date = f.read().strip() logging.info(f"Прочитано дату останнього перезапису: {last_reset_date}") else: logging.info(f"Файл {LAST_RESET_FILE} не існує, буде створено") today = str(date.today()) logging.info(f"Поточна дата: {today}") # Перевірка розміру логу log_size_mb = os.path.getsize(LOG_FILE_PATH) / (1024 * 1024) if os.path.exists(LOG_FILE_PATH) else 0 logging.info("Розмір логу: " + str(round(log_size_mb, 2)) + " МБ") if last_reset_date != today: sent_messages = load_sent_messages() try: logging.basicConfig( filename=LOG_FILE_PATH, level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', filemode='w', force=True ) logging.info(f"Лог перезаписано для нової доби (size={log_size_mb:.2f} МБ)") with open(SENT_MESSAGES_FILE, 'w', encoding='utf-8') as f: f.write('') logging.info("Файл sent_messages.txt перезаписано для нової доби") with open(LAST_RESET_FILE, 'w') as f: f.write(today) logging.info(f"Оновлено {LAST_RESET_FILE} з датою {today}") except Exception as e: logging.error(f"Помилка при перезаписі файлів: {e}") raise else: logging.basicConfig( filename=LOG_FILE_PATH, level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', filemode='a', force=True ) logging.info("Дата не змінилася, використовується режим додавання") except Exception as e: logging.error(f"Критична помилка налаштування: {e}") print(f"Критична помилка налаштування: {e}") raise # Перевірка розміру файлів def check_file_size(file_path, file_name, max_size_mb=10): try: if os.path.exists(file_path): size_mb = os.path.getsize(file_path) / (1024 * 1024) if size_mb > max_size_mb: current_time = time.time() last_warning_time = last_size_warning.get(file_name, 0) if current_time - last_warning_time > 600: logging.warning(f"Розмір {file_name} перевищує {max_size_mb} МБ: {size_mb:.2f} МБ") last_size_warning[file_name] = current_time except Exception as e: logging.error(f"Помилка перевірки розміру {file_name}: {e}") # Глобальні змінні current_interface = None current_olt_name = None current_olt_ip = None current_description = None onu_add_buffer = defaultdict(dict) # Завантажуємо останню позицію def load_last_position(): if not os.path.exists(LAST_POSITION_FILE): return 0, os.stat(LOG_FILE).st_ino if os.path.exists(LOG_FILE) else 0 try: with open(LAST_POSITION_FILE, 'r') as f: pos = f.read().strip() if pos == '': return 0, os.stat(LOG_FILE).st_ino return int(pos), os.stat(LOG_FILE).st_ino except Exception as e: logging.error(f"Помилка читання позиції з {LAST_POSITION_FILE}: {e}") return 0, os.stat(LOG_FILE).st_ino if os.path.exists(LOG_FILE) else 0 # Зберігаємо останню позицію def save_last_position(position): try: with open(LAST_POSITION_FILE, 'w') as f: f.write(str(position)) except Exception as e: logging.error(f"Помилка збереження позиції в {LAST_POSITION_FILE}: {e}") # Парсинг логу def parse_log(file_obj, start_pos, sent_messages): global current_interface, current_olt_name, current_olt_ip, current_description try: file_obj.seek(start_pos) file_size = os.path.getsize(LOG_FILE) if start_pos > file_size: logging.warning(f"start_pos ({start_pos}) більше file_size ({file_size}), скидаємо на 0") start_pos = 0 file_obj.seek(0) lines = file_obj.readlines() end_pos = file_obj.tell() except Exception as e: logging.error(f"Помилка читання файлу логу: {e}") return start_pos for line in lines: line = line.strip().replace('#012', '').replace('#015', '') if not line: continue # Витягуємо ім'я та IP OLT m_olt = re.search(r"\[(\S+) (\d+\.\d+\.\d+\.\d+)\]", line) if m_olt: current_olt_name = m_olt.group(1) current_olt_ip = m_olt.group(2) # Витягуємо час time_match = re.match(r"^\w+\s+\d+\s+(\d{2}:\d{2}:\d{2})", line) time_str = time_match.group(1) if time_match else datetime.now().strftime("%H:%M:%S") try: event_time = datetime.strptime(time_str, "%H:%M:%S") event_time = event_time.replace(year=datetime.now().year, month=datetime.now().month, day=datetime.now().day) except ValueError as e: logging.error(f"Помилка формату часу {time_str}: {e}") continue # Перевіряємо OLT перед обробкою подій if not current_olt_name or not current_olt_ip: logging.warning(f"Пропущено подію через відсутність OLT: {line}") continue # Витягуємо інтерфейс m_intf = re.search(r"(?:interface\s+)(gpon-olt_\d+/\d+/\d+|gpon-onu_\d+/\d+/\d+:\d+)", line, re.IGNORECASE) if m_intf: current_interface = m_intf.group(1) continue # Витягуємо description m_desc = re.search(r"description\s+(\S+)", line, re.IGNORECASE) if m_desc: current_description = m_desc.group(1) for key, data in list(onu_add_buffer.items()): if key[2] == current_interface or key[2].replace("gpon-olt_", "gpon-onu_") + f":{key[3]}" == current_interface: data['description'] = current_description desc_str = f"\n🏷 Опис: {data['description']}" if data['description'] else "" msg = ( f"✅ ONU додано\n\n" f"🕒 Час: {data['time_str']}\n" f"🔢 ONU №{data['onu_number']}\n" f"📍 Порт: {data['onu_iface']}{desc_str}\n" f"🖥 OLT: {key[0]} ({key[1]})" ) send_telegram(msg, sent_messages) del onu_add_buffer[key] continue # LAN LOS Alarm m_lan_los = re.search(r"(?:GponRm notify:.*)?
\s*SubType:\d+\s*Pos:\d+\s*ONU Uni lan los\. alarm", line, re.IGNORECASE) if m_lan_los: m_iface_num = re.search(r"<(gpon-onu_\d+/\d+/\d+):(\d+)>", line) if m_iface_num: onu_iface = m_iface_num.group(1) onu_num = m_iface_num.group(2) desc_str = f"\n🏷 Опис: {current_description}" if current_description and current_interface == onu_iface else "" msg = ( f"🔴 Втрата LAN-з'єднання\n\n" f"🕒 Час: {time_str}\n" f"🔢 ONU №{onu_num}\n" f"📍 Порт: {onu_iface}{desc_str}\n" f"🖥 OLT: {current_olt_name} ({current_olt_ip})" ) lan_logger.warning(f"LAN LOS знайдено: {msg}") # Записываем только в lan_events.log current_description = None continue # LAN LOS Restore m_lan_restore = re.search(r"(?:GponRm notify:.*)?
\s*SubType:\d+\s*Pos:\d+\s*ONU Uni lan los\. restore", line, re.IGNORECASE) if m_lan_restore: m_iface_num = re.search(r"<(gpon-onu_\d+/\d+/\d+):(\d+)>", line) if m_iface_num: onu_iface = m_iface_num.group(1) onu_num = m_iface_num.group(2) desc_str = f"\n🏷 Опис: {current_description}" if current_description and current_interface == onu_iface else "" msg = ( f"🟢 LAN-з'єднання відновлено\n\n" f"🕒 Час: {time_str}\n" f"🔢 ONU №{onu_num}\n" f"📍 Порт: {onu_iface}{desc_str}\n" f"🖥 OLT: {current_olt_name} ({current_olt_ip})" ) lan_logger.warning(f"LAN LOS Restore знайдено: {msg}") # Записываем только в lan_events.log current_description = None continue # Видалення ONU m_no_onu = re.search(r"\b(?:no\s+onu|ont delete)\s+(\d+)(?:\s+\d+)?|ont delete\s+(\d+/\d+/\d+)\s+(\d+)", line, re.IGNORECASE) if m_no_onu: onu_iface = m_no_onu.group(2) or current_interface or "невідомий" onu_number = m_no_onu.group(1) or m_no_onu.group(3) desc_str = f"\n🏷 Опис: {current_description}" if current_description and current_interface == onu_iface else "" msg = ( f"❌ ONU видалено\n\n" f"🕒 Час: {time_str}\n" f"🔢 ONU №{onu_number}\n" f"📍 Порт: {onu_iface}{desc_str}\n" f"🖥 OLT: {current_olt_name} ({current_olt_ip})" ) send_telegram(msg, sent_messages) current_description = None key = (current_olt_name, current_olt_ip, onu_iface, onu_number) if key in onu_add_buffer: del onu_add_buffer[key] continue # Додавання ONU m_add_onu = re.search(r"\b(?:onu\s+add|ont add)\s+(\d+)\s+(\d+)|ont add\s+(\d+/\d+/\d+)\s+(\d+)|onu\s+(\d+)\s+type\s+\S+\s+sn\s+\S+", line, re.IGNORECASE) if m_add_onu: onu_iface = m_add_onu.group(1) or m_add_onu.group(3) or current_interface or "невідомий" onu_number = m_add_onu.group(2) or m_add_onu.group(4) or m_add_onu.group(5) key = (current_olt_name, current_olt_ip, onu_iface, onu_number) onu_add_buffer[key] = { 'time_str': time_str, 'onu_iface': onu_iface, 'onu_number': onu_number, 'description': current_description if current_interface == onu_iface else None, 'timestamp': datetime.now() } current_description = None continue # Обробка відкладених подій ONU current_time = datetime.now() for key, data in list(onu_add_buffer.items()): if (current_time - data['timestamp']).seconds >= 5: desc_str = f"\n🏷 Опис: {data['description']}" if data['description'] else "" msg = ( f"✅ ONU додано\n\n" f"🕒 Час: {data['time_str']}\n" f"🔢 ONU №{data['onu_number']}\n" f"📍 Порт: {data['onu_iface']}{desc_str}\n" f"🖥 OLT: {key[0]} ({key[1]})" ) send_telegram(msg, sent_messages) del onu_add_buffer[key] return end_pos # Моніторинг логу def monitor_log(): sent_messages = load_sent_messages() if can_send_activation(): test_msg = f"🔔 Моніторинг OLT активовано\n\n🕒 Час: {datetime.now().strftime('%H:%M:%S')}" send_telegram(test_msg, sent_messages) save_activation_time() last_pos, last_inode = load_last_position() while True: try: check_file_size(LOG_FILE_PATH, "olt_monitor.log", max_size_mb=10) check_file_size(SENT_MESSAGES_FILE, "sent_messages.txt", max_size_mb=10) check_file_size(LAN_EVENTS_LOG, "lan_events.log", max_size_mb=10) if not os.path.exists(LOG_FILE): logging.error(f"Файл логу {LOG_FILE} не існує. Очікування 60 секунд") time.sleep(60) continue current_inode = os.stat(LOG_FILE).st_ino if current_inode != last_inode: logging.info(f"Виявлено новий inode для {LOG_FILE}, скидання позиції") last_pos, last_inode = 0, current_inode with open(LOG_FILE, "r", encoding="utf-8") as f: new_pos = parse_log(f, last_pos, sent_messages) if new_pos != last_pos: save_last_position(new_pos) last_pos = new_pos time.sleep(CHECK_INTERVAL) except Exception as e: logging.error(f"Критична помилка при обробці логу: {e}") time.sleep(60) # Планувальник для щоденного звіту def run_scheduler(): sent_messages = load_sent_messages() schedule.every().day.at(REPORT_TIME).do(send_lan_summary_to_telegram, sent_messages) while True: schedule.run_pending() time.sleep(60) if __name__ == "__main__": try: setup_logging_and_messages() # Перевіряємо аргумент командного рядка if len(sys.argv) > 1 and sys.argv[1] == "--manual-report": manual_lan_summary() else: # Запускаємо планувальник у окремому потоці scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) scheduler_thread.start() monitor_log() except Exception as e: logging.error(f"Помилка запуску програми: {e}") raise
Mark as private
for 30 minutes
for 6 hours
for 1 day
for 1 week
for 1 month
for 1 year