diff --git a/bot.py b/bot.py index a035f9d..c452e57 100644 --- a/bot.py +++ b/bot.py @@ -4,6 +4,8 @@ import json import threading import requests import html +import re +from urllib.parse import urlparse, unquote from dotenv import load_dotenv from apprise import Apprise from telegram import Update @@ -15,6 +17,7 @@ from telegram.ext import ( MessageHandler, filters ) +from typing import Optional # === Load Config === load_dotenv() @@ -34,23 +37,113 @@ watchlists = {} SET_APPRISE_CONFIRM = 1 # === Utility: Timestamped logging === -def log(msg): +def log(msg: str): print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {msg}", flush=True) +# === Sanitization Utilities === +# Twitch usernames are case-insensitive. We'll sanitize inputs to extract a clean +# username from URLs, @mentions, percent-encoded strings, or embedded HTML. + +ALLOWED_USERNAME_RE = re.compile(r"^[A-Za-z0-9_]+$") + + +def sanitize_username(raw: str) -> Optional[str]: + """Return a cleaned lowercase twitch username or None if it can't be parsed. + + Handles inputs like: + - "@username" + - "https://twitch.tv/username" + - "https%3A%2F%2Fwww.twitch.tv%2Fusername" + - "" + - plain usernames + """ + if not raw: + return None + + s = raw.strip() + # decode HTML entities and percent-encoding + s = html.unescape(s) + try: + s = unquote(s) + except Exception: + pass + + # If it looks like a URL (contains twitch.tv) parse and extract last path segment + if "twitch.tv" in s: + try: + parsed = urlparse(s if s.startswith("http") else "http://" + s) + parts = [p for p in parsed.path.split("/") if p] + if parts: + s = parts[-1] + except Exception: + pass + + # Remove any surrounding angle brackets and common stray characters + s = s.strip().lstrip("@").strip("<>") + + # Take the first token if the user pasted a sentence/HTML + s = s.split()[0] + + # If it's already a valid username, return it + if ALLOWED_USERNAME_RE.match(s): + return s.lower() + + # Otherwise try to remove disallowed chars and see if that leaves a valid name + cleaned = re.sub(r"[^A-Za-z0-9_]", "", s) + if cleaned and ALLOWED_USERNAME_RE.match(cleaned): + return cleaned.lower() + + # Couldn't produce a valid username + return None + + # === File Utilities === + def load_watchlists(): if not os.path.exists(WATCHLIST_FILE): return {} with open(WATCHLIST_FILE, "r", encoding="utf-8") as f: - return json.load(f) + try: + return json.load(f) + except Exception as e: + log(f"[ERROR] Failed to parse watchlists.json: {e}") + return {} + def save_watchlists(watchlists_data): with open(WATCHLIST_FILE, "w", encoding="utf-8") as f: json.dump(watchlists_data, f, indent=2) + +def clean_watchlists(): + """Sanitize all stored watchlists and remove invalid channel entries. + This helps when older entries contain full URLs or malformed values. + """ + global watchlists + changed = False + for chat_id, data in list(watchlists.items()): + channels = data.get("channels", []) or [] + new_channels = [] + for ch in channels: + san = sanitize_username(ch) + if san and san not in new_channels: + new_channels.append(san) + if new_channels != channels: + watchlists[chat_id]["channels"] = new_channels + changed = True + log(f"Cleaned channels for {chat_id}: {channels} -> {new_channels}") + if changed: + save_watchlists(watchlists) + log("Saved cleaned watchlists.json") + + +# Load & clean on startup watchlists = load_watchlists() +clean_watchlists() + # === Twitch API Utilities === + def get_app_token(): log("Requesting Twitch app token...") resp = requests.post("https://id.twitch.tv/oauth2/token", params={ @@ -59,33 +152,85 @@ def get_app_token(): "grant_type": "client_credentials" }) resp.raise_for_status() + token = resp.json().get("access_token") log("Twitch token received.") - return resp.json()["access_token"] + return token + + +def chunked(lst, n): + for i in range(0, len(lst), n): + yield lst[i:i + n] + def get_user_ids(headers, usernames): + """Given an iterable of usernames (possibly messy), sanitize and resolve their Twitch IDs. + + Returns a dict mapping sanitized_username -> user_id for resolved users. + """ + # Sanitize incoming list + usernames = [sanitize_username(u) for u in usernames] + usernames = [u for u in usernames if u] usernames = [u.lower() for u in usernames] + to_fetch = [u for u in usernames if u not in user_ids_cache] if to_fetch: log(f"Fetching Twitch user IDs for: {', '.join(to_fetch)}") - resp = requests.get("https://api.twitch.tv/helix/users", - headers=headers, - params=[('login', name) for name in to_fetch]) - resp.raise_for_status() - for user in resp.json()["data"]: - user_ids_cache[user['login']] = user['id'] - log(f"Resolved {user['login']} => {user['id']}") + for chunk in chunked(to_fetch, 100): + try: + resp = requests.get( + "https://api.twitch.tv/helix/users", + headers=headers, + params=[("login", name) for name in chunk] + ) + resp.raise_for_status() + data = resp.json().get("data", []) + for user in data: + key = user.get("login", "").lower() + user_ids_cache[key] = user.get("id") + log(f"Resolved {key} => {user_ids_cache[key]}") + except requests.exceptions.HTTPError as e: + status = getattr(e.response, "status_code", "N/A") + text = getattr(e.response, "text", str(e)) + log(f"[ERROR] Failed to fetch user ids for chunk {chunk}: {status} {text}") + + # Log unresolved names (those that didn't come back in API) + unresolved = [u for u in to_fetch if u not in user_ids_cache] + if unresolved: + log(f"[WARN] Could not resolve these usernames: {', '.join(unresolved)}") + + # Return mapping for requested usernames that we have cached return {u: user_ids_cache.get(u) for u in usernames if u in user_ids_cache} + def get_live_streams(headers, user_ids): + """Fetch live stream objects for given user_id list. Handles batching (100 per request). + + Returns dict mapping user_id -> stream_object. + """ if not user_ids: return {} - resp = requests.get("https://api.twitch.tv/helix/streams", - headers=headers, - params=[('user_id', uid) for uid in user_ids]) - resp.raise_for_status() - return {stream['user_id']: stream for stream in resp.json()["data"]} + + ids_list = list(user_ids) + result = {} + for chunk in chunked(ids_list, 100): + try: + resp = requests.get( + "https://api.twitch.tv/helix/streams", + headers=headers, + params=[("user_id", uid) for uid in chunk] + ) + resp.raise_for_status() + for stream in resp.json().get("data", []): + result[stream["user_id"]] = stream + except requests.exceptions.HTTPError as e: + status = getattr(e.response, "status_code", "N/A") + text = getattr(e.response, "text", str(e)) + log(f"[ERROR] Failed to fetch live streams for chunk {chunk}: {status} {text}") + return result + # === Notification Utility === + def send_notification(chat_id, title, body): ap = Apprise() # Always send to Telegram @@ -94,9 +239,14 @@ def send_notification(chat_id, title, body): extra_urls = watchlists.get(chat_id, {}).get("apprise_urls", []) for url in extra_urls: ap.add(url) - ap.notify(title=title, body=body) + try: + ap.notify(title=title, body=body) + except Exception as e: + log(f"[ERROR] Failed to send notification for {chat_id}: {e}") + # === Background Twitch Monitor === + def monitor_twitch(): log("Twitch monitor thread started.") token = get_app_token() @@ -109,10 +259,14 @@ def monitor_twitch(): while True: try: log("Polling Twitch for stream updates...") - # Gather all channels across all users + + # Gather all channels across all users (ensure sanitized) all_channels = set() for data in watchlists.values(): - all_channels.update(data.get("channels", [])) + for ch in data.get("channels", []): + san = sanitize_username(ch) + if san: + all_channels.add(san) # Resolve to user IDs user_id_map = get_user_ids(headers, list(all_channels)) @@ -124,7 +278,11 @@ def monitor_twitch(): # Check changes per user for chat_id, data in watchlists.items(): for username in data.get("channels", []): - uid = user_id_map.get(username.lower()) + san_username = sanitize_username(username) + if not san_username: + continue + + uid = user_id_map.get(san_username) if not uid: continue @@ -133,23 +291,36 @@ def monitor_twitch(): if is_live and not was_live: s = live_data[uid] - title = f"🔴 {username} is now LIVE!" - body = f"{s['title']}\nGame: {s['game_name']}\nViewers: {s['viewer_count']}\nhttps://twitch.tv/{username}" + title = f"🔴 {san_username} is now LIVE!" + body = f"{s.get('title')}\nGame: {s.get('game_name')}\nViewers: {s.get('viewer_count')}\nhttps://twitch.tv/{san_username}" send_notification(chat_id, title, body) - log(f"Notified {chat_id} — {username} went LIVE.") + log(f"Notified {chat_id} — {san_username} went LIVE.") elif not is_live and was_live: - title = f"⚫ {username} has gone offline." - body = f"{username} is no longer streaming.\nhttps://twitch.tv/{username}" + title = f"⚫ {san_username} has gone offline." + body = f"{san_username} is no longer streaming.\nhttps://twitch.tv/{san_username}" send_notification(chat_id, title, body) - log(f"Notified {chat_id} — {username} went OFFLINE.") + log(f"Notified {chat_id} — {san_username} went OFFLINE.") live_status = current_live + + except requests.exceptions.HTTPError as e: + status = getattr(e.response, 'status_code', None) + log(f"[ERROR] Twitch monitor HTTP error: {e} (status={status})") + # If token expired, refresh it and continue + if status == 401: + try: + token = get_app_token() + headers["Authorization"] = f"Bearer {token}" + log("Refreshed Twitch app token after 401.") + except Exception as ex: + log(f"[ERROR] Failed to refresh token: {ex}") except Exception as e: log(f"[ERROR] Twitch monitor: {e}") time.sleep(CHECK_INTERVAL) + # === Telegram Bot Commands === async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) @@ -170,6 +341,7 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): "Please remember that these will work in addition to Telegram, you will always receive updates in this chat irrespective of if you add more targets or not." ) + async def add_channel(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) username = update.effective_user.username or update.effective_user.full_name @@ -177,7 +349,12 @@ async def add_channel(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("Usage: /add ") return - channel = context.args[0].lower() + raw = context.args[0] + channel = sanitize_username(raw) + if not channel: + await update.message.reply_text("⚠️ Invalid channel name. Please provide a plain username or a Twitch URL.") + return + watchlists.setdefault(chat_id, {"channels": [], "apprise_urls": []}) if channel not in watchlists[chat_id]["channels"]: @@ -200,7 +377,7 @@ async def add_channel(update: Update, context: ContextTypes.DEFAULT_TYPE): if uid in live_data: s = live_data[uid] title = f"🟢 {channel} is already LIVE!" - body = f"{s['title']}\nGame: {s['game_name']}\nViewers: {s['viewer_count']}\nhttps://twitch.tv/{channel}" + body = f"{s.get('title')}\nGame: {s.get('game_name')}\nViewers: {s.get('viewer_count')}\nhttps://twitch.tv/{channel}" send_notification(chat_id, title, body) log(f"Immediate notification to {username} ({chat_id}) — {channel} already LIVE.") live_status[uid] = True @@ -210,13 +387,18 @@ async def add_channel(update: Update, context: ContextTypes.DEFAULT_TYPE): else: await update.message.reply_text(f"⚠️ {channel} is already in your watchlist.") + async def remove_channel(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) username = update.effective_user.username or update.effective_user.full_name if len(context.args) != 1: await update.message.reply_text("Usage: /remove ") return - channel = context.args[0].lower() + raw = context.args[0] + channel = sanitize_username(raw) + if not channel: + await update.message.reply_text("⚠️ Invalid channel name.") + return if chat_id in watchlists and channel in watchlists[chat_id]["channels"]: watchlists[chat_id]["channels"].remove(channel) save_watchlists(watchlists) @@ -225,6 +407,7 @@ async def remove_channel(update: Update, context: ContextTypes.DEFAULT_TYPE): else: await update.message.reply_text(f"⚠️ {channel} is not in your watchlist.") + async def list_channels(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) username = update.effective_user.username or update.effective_user.full_name @@ -240,6 +423,7 @@ async def list_channels(update: Update, context: ContextTypes.DEFAULT_TYPE): else: await update.message.reply_text("📭 Your watchlist is empty.") + # === /setapprise flow === async def set_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) @@ -265,6 +449,7 @@ async def set_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("❌ The Apprise URL did not work. Please check and try again.") return ConversationHandler.END + async def confirm_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) reply = update.message.text.strip().lower() @@ -285,10 +470,12 @@ async def confirm_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): return ConversationHandler.END + async def cancel_set_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("❌ Operation cancelled.") return ConversationHandler.END + async def list_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) apprise_urls = watchlists.get(chat_id, {}).get("apprise_urls", []) @@ -303,6 +490,7 @@ async def list_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text(message, parse_mode="HTML") + async def remove_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) apprise_urls = watchlists.get(chat_id, {}).get("apprise_urls", []) @@ -326,6 +514,7 @@ async def remove_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): log(f"User {chat_id} removed Apprise URL: {removed_url}") await update.message.reply_text(f"🗑 Removed Apprise URL:\n{removed_url}") + # === Main Entry === if __name__ == "__main__": # Start Twitch monitoring in background @@ -340,7 +529,7 @@ if __name__ == "__main__": app.add_handler(CommandHandler("list", list_channels)) app.add_handler(CommandHandler("listapprise", list_apprise)) app.add_handler(CommandHandler("rmapprise", remove_apprise)) - + # Conversation for /setapprise conv_handler = ConversationHandler( entry_points=[CommandHandler("setapprise", set_apprise)],