import os import time import json import threading import requests import html import re from urllib.parse import urlparse, unquote from dotenv import load_dotenv from apprise import Apprise from telegram import Update from telegram.ext import ( ApplicationBuilder, CommandHandler, ContextTypes, ConversationHandler, MessageHandler, filters ) from typing import Optional # === Load Config === load_dotenv() CLIENT_ID = os.getenv("TWITCH_CLIENT_ID") CLIENT_SECRET = os.getenv("TWITCH_CLIENT_SECRET") BOT_TOKEN = os.getenv("BOT_TOKEN") CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "60")) WATCHLIST_FILE = os.path.join(os.path.dirname(__file__), "watchlists.json") # === Globals === live_status = {} # {twitch_user_id: bool} user_ids_cache = {} # {username: user_id} watchlists = {} # === Conversation states === SET_APPRISE_CONFIRM = 1 # === Utility: Timestamped logging === def log(msg: str): print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {msg}", flush=True) # === Sanitization Utilities === # Twitch usernames are case-insensitive. We'll sanitize inputs to extract a clean # username from URLs, @mentions, percent-encoded strings, or embedded HTML. ALLOWED_USERNAME_RE = re.compile(r"^[A-Za-z0-9_]+$") def sanitize_username(raw: str) -> Optional[str]: """Return a cleaned lowercase twitch username or None if it can't be parsed. Handles inputs like: - "@username" - "https://twitch.tv/username" - "https%3A%2F%2Fwww.twitch.tv%2Fusername" - "" - plain usernames """ if not raw: return None s = raw.strip() # decode HTML entities and percent-encoding s = html.unescape(s) try: s = unquote(s) except Exception: pass # If it looks like a URL (contains twitch.tv) parse and extract last path segment if "twitch.tv" in s: try: parsed = urlparse(s if s.startswith("http") else "http://" + s) parts = [p for p in parsed.path.split("/") if p] if parts: s = parts[-1] except Exception: pass # Remove any surrounding angle brackets and common stray characters s = s.strip().lstrip("@").strip("<>") # Take the first token if the user pasted a sentence/HTML s = s.split()[0] # If it's already a valid username, return it if ALLOWED_USERNAME_RE.match(s): return s.lower() # Otherwise try to remove disallowed chars and see if that leaves a valid name cleaned = re.sub(r"[^A-Za-z0-9_]", "", s) if cleaned and ALLOWED_USERNAME_RE.match(cleaned): return cleaned.lower() # Couldn't produce a valid username return None # === File Utilities === def load_watchlists(): if not os.path.exists(WATCHLIST_FILE): return {} with open(WATCHLIST_FILE, "r", encoding="utf-8") as f: try: return json.load(f) except Exception as e: log(f"[ERROR] Failed to parse watchlists.json: {e}") return {} def save_watchlists(watchlists_data): with open(WATCHLIST_FILE, "w", encoding="utf-8") as f: json.dump(watchlists_data, f, indent=2) def clean_watchlists(): """Sanitize all stored watchlists and remove invalid channel entries. This helps when older entries contain full URLs or malformed values. """ global watchlists changed = False for chat_id, data in list(watchlists.items()): channels = data.get("channels", []) or [] new_channels = [] for ch in channels: san = sanitize_username(ch) if san and san not in new_channels: new_channels.append(san) if new_channels != channels: watchlists[chat_id]["channels"] = new_channels changed = True log(f"Cleaned channels for {chat_id}: {channels} -> {new_channels}") if changed: save_watchlists(watchlists) log("Saved cleaned watchlists.json") # Load & clean on startup watchlists = load_watchlists() clean_watchlists() # === Twitch API Utilities === def get_app_token(): log("Requesting Twitch app token...") resp = requests.post("https://id.twitch.tv/oauth2/token", params={ "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "grant_type": "client_credentials" }) resp.raise_for_status() token = resp.json().get("access_token") log("Twitch token received.") return token def chunked(lst, n): for i in range(0, len(lst), n): yield lst[i:i + n] def get_user_ids(headers, usernames): """Given an iterable of usernames (possibly messy), sanitize and resolve their Twitch IDs. Returns a dict mapping sanitized_username -> user_id for resolved users. """ # Sanitize incoming list usernames = [sanitize_username(u) for u in usernames] usernames = [u for u in usernames if u] usernames = [u.lower() for u in usernames] to_fetch = [u for u in usernames if u not in user_ids_cache] if to_fetch: log(f"Fetching Twitch user IDs for: {', '.join(to_fetch)}") for chunk in chunked(to_fetch, 100): try: resp = requests.get( "https://api.twitch.tv/helix/users", headers=headers, params=[("login", name) for name in chunk] ) resp.raise_for_status() data = resp.json().get("data", []) for user in data: key = user.get("login", "").lower() user_ids_cache[key] = user.get("id") log(f"Resolved {key} => {user_ids_cache[key]}") except requests.exceptions.HTTPError as e: status = getattr(e.response, "status_code", "N/A") text = getattr(e.response, "text", str(e)) log(f"[ERROR] Failed to fetch user ids for chunk {chunk}: {status} {text}") # Log unresolved names (those that didn't come back in API) unresolved = [u for u in to_fetch if u not in user_ids_cache] if unresolved: log(f"[WARN] Could not resolve these usernames: {', '.join(unresolved)}") # Return mapping for requested usernames that we have cached return {u: user_ids_cache.get(u) for u in usernames if u in user_ids_cache} def get_live_streams(headers, user_ids): """Fetch live stream objects for given user_id list. Handles batching (100 per request). Returns dict mapping user_id -> stream_object. """ if not user_ids: return {} ids_list = list(user_ids) result = {} for chunk in chunked(ids_list, 100): try: resp = requests.get( "https://api.twitch.tv/helix/streams", headers=headers, params=[("user_id", uid) for uid in chunk] ) resp.raise_for_status() for stream in resp.json().get("data", []): result[stream["user_id"]] = stream except requests.exceptions.HTTPError as e: status = getattr(e.response, "status_code", "N/A") text = getattr(e.response, "text", str(e)) log(f"[ERROR] Failed to fetch live streams for chunk {chunk}: {status} {text}") return result # === Notification Utility === def send_notification(chat_id, title, body): ap = Apprise() # Always send to Telegram ap.add(f"tgram://{BOT_TOKEN}/{chat_id}") # Add extra user URLs if available extra_urls = watchlists.get(chat_id, {}).get("apprise_urls", []) for url in extra_urls: ap.add(url) try: ap.notify(title=title, body=body) except Exception as e: log(f"[ERROR] Failed to send notification for {chat_id}: {e}") # === Background Twitch Monitor === def monitor_twitch(): log("Twitch monitor thread started.") token = get_app_token() headers = { "Client-ID": CLIENT_ID, "Authorization": f"Bearer {token}" } global live_status while True: try: log("Polling Twitch for stream updates...") # Gather all channels across all users (ensure sanitized) all_channels = set() for data in watchlists.values(): for ch in data.get("channels", []): san = sanitize_username(ch) if san: all_channels.add(san) # Resolve to user IDs user_id_map = get_user_ids(headers, list(all_channels)) # Poll live streams live_data = get_live_streams(headers, user_id_map.values()) current_live = {uid: True for uid in live_data} # Check changes per user for chat_id, data in watchlists.items(): for username in data.get("channels", []): san_username = sanitize_username(username) if not san_username: continue uid = user_id_map.get(san_username) if not uid: continue was_live = live_status.get(uid, False) is_live = current_live.get(uid, False) if is_live and not was_live: s = live_data[uid] title = f"🔴 {san_username} is now LIVE!" body = f"{s.get('title')}\nGame: {s.get('game_name')}\nViewers: {s.get('viewer_count')}\nhttps://twitch.tv/{san_username}" send_notification(chat_id, title, body) log(f"Notified {chat_id} — {san_username} went LIVE.") elif not is_live and was_live: title = f"⚫ {san_username} has gone offline." body = f"{san_username} is no longer streaming.\nhttps://twitch.tv/{san_username}" send_notification(chat_id, title, body) log(f"Notified {chat_id} — {san_username} went OFFLINE.") live_status = current_live except requests.exceptions.HTTPError as e: status = getattr(e.response, 'status_code', None) log(f"[ERROR] Twitch monitor HTTP error: {e} (status={status})") # If token expired, refresh it and continue if status == 401: try: token = get_app_token() headers["Authorization"] = f"Bearer {token}" log("Refreshed Twitch app token after 401.") except Exception as ex: log(f"[ERROR] Failed to refresh token: {ex}") except Exception as e: log(f"[ERROR] Twitch monitor: {e}") time.sleep(CHECK_INTERVAL) # === Telegram Bot Commands === async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) username = update.effective_user.username or update.effective_user.full_name if chat_id not in watchlists: watchlists[chat_id] = {"channels": [], "apprise_urls": []} save_watchlists(watchlists) log(f"User {username} ({chat_id}) started the bot.") await update.message.reply_text( "👋 Welcome to the Twitchrise bot for Telegram!\n\n" "Use /add to monitor a Twitch channel.\n" "Use /remove to stop monitoring the channel.\n" "Use /list to see your monitor list.\n" "Use /setapprise to add extra notification targets.\n" "Use /rmapprise to remove already added notification targets.\n" "Use /listapprise to list all added notification targets.\n\n" "You can see the supported URLs for notification targets like Discord, Gotify etc. and their formats here - https://github.com/caronc/apprise#supported-notifications. \n\n" "Please remember that these will work in addition to Telegram, you will always receive updates in this chat irrespective of if you add more targets or not." ) async def add_channel(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) username = update.effective_user.username or update.effective_user.full_name if len(context.args) != 1: await update.message.reply_text("Usage: /add ") return raw = context.args[0] channel = sanitize_username(raw) if not channel: await update.message.reply_text("⚠️ Invalid channel name. Please provide a plain username or a Twitch URL.") return watchlists.setdefault(chat_id, {"channels": [], "apprise_urls": []}) if channel not in watchlists[chat_id]["channels"]: watchlists[chat_id]["channels"].append(channel) save_watchlists(watchlists) log(f"User {username} ({chat_id}) added channel: {channel}") await update.message.reply_text(f"✅ Added {channel} to your watchlist.") # === Check if channel is live immediately === try: token = get_app_token() headers = { "Client-ID": CLIENT_ID, "Authorization": f"Bearer {token}" } user_id_map = get_user_ids(headers, [channel]) uid = user_id_map.get(channel) if uid: live_data = get_live_streams(headers, [uid]) if uid in live_data: s = live_data[uid] title = f"🟢 {channel} is already LIVE!" body = f"{s.get('title')}\nGame: {s.get('game_name')}\nViewers: {s.get('viewer_count')}\nhttps://twitch.tv/{channel}" send_notification(chat_id, title, body) log(f"Immediate notification to {username} ({chat_id}) — {channel} already LIVE.") live_status[uid] = True except Exception as e: log(f"[ERROR] Live check on add failed: {e}") else: await update.message.reply_text(f"⚠️ {channel} is already in your watchlist.") async def remove_channel(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) username = update.effective_user.username or update.effective_user.full_name if len(context.args) != 1: await update.message.reply_text("Usage: /remove ") return raw = context.args[0] channel = sanitize_username(raw) if not channel: await update.message.reply_text("⚠️ Invalid channel name.") return if chat_id in watchlists and channel in watchlists[chat_id]["channels"]: watchlists[chat_id]["channels"].remove(channel) save_watchlists(watchlists) log(f"User {username} ({chat_id}) removed channel: {channel}") await update.message.reply_text(f"🗑 Removed {channel} from your watchlist.") else: await update.message.reply_text(f"⚠️ {channel} is not in your watchlist.") async def list_channels(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) username = update.effective_user.username or update.effective_user.full_name channels = watchlists.get(chat_id, {}).get("channels", []) log(f"User {username} ({chat_id}) requested watchlist: {channels}") if channels: message = "📜 Your watchlist: (tap to copy)\n" for ch in channels: safe_ch = html.escape(ch) message += f"• {safe_ch}\n" await update.message.reply_text(message, parse_mode="HTML") else: await update.message.reply_text("📭 Your watchlist is empty.") # === /setapprise flow === async def set_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) if len(context.args) != 1: await update.message.reply_text("Usage: /setapprise ") return ConversationHandler.END url = context.args[0] context.user_data["pending_apprise_url"] = url # Test the URL ap = Apprise() ap.add(url) worked = ap.notify(title="Test Notification", body="If you see this, the Apprise URL works!") if worked: await update.message.reply_text( "✅ Test notification sent successfully.\n" "Do you want to save this URL for future alerts? Please reply 'yes' or 'no'" ) return SET_APPRISE_CONFIRM else: await update.message.reply_text("❌ The Apprise URL did not work. Please check and try again.") return ConversationHandler.END async def confirm_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) reply = update.message.text.strip().lower() if reply in ("yes", "y"): url = context.user_data.get("pending_apprise_url") if not url: await update.message.reply_text("⚠️ No pending URL found.") return ConversationHandler.END watchlists.setdefault(chat_id, {"channels": [], "apprise_urls": []}) if url not in watchlists[chat_id]["apprise_urls"]: watchlists[chat_id]["apprise_urls"].append(url) save_watchlists(watchlists) log(f"User {chat_id} saved Apprise URL: {url}") await update.message.reply_text("💾 Saved your Apprise URL.") else: await update.message.reply_text("❌ Not saved.") return ConversationHandler.END async def cancel_set_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("❌ Operation cancelled.") return ConversationHandler.END async def list_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) apprise_urls = watchlists.get(chat_id, {}).get("apprise_urls", []) if not apprise_urls: await update.message.reply_text("📭 You have no saved Apprise URLs.") return message = "🔗 Your saved Apprise URLs: (tap to copy)\n" for i, url in enumerate(apprise_urls, start=1): safe_url = html.escape(url) # escape &, <, > message += f"({i}) {safe_url}\n" await update.message.reply_text(message, parse_mode="HTML") async def remove_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = str(update.effective_chat.id) apprise_urls = watchlists.get(chat_id, {}).get("apprise_urls", []) if len(context.args) != 1: await update.message.reply_text("Usage: /rmapprise ") return try: index = int(context.args[0]) if index < 1 or index > len(apprise_urls): await update.message.reply_text("⚠️ Invalid number. Use /listapprise to see saved URLs.") return except ValueError: await update.message.reply_text("⚠️ Please provide a valid number.") return removed_url = apprise_urls.pop(index - 1) watchlists[chat_id]["apprise_urls"] = apprise_urls save_watchlists(watchlists) log(f"User {chat_id} removed Apprise URL: {removed_url}") await update.message.reply_text(f"🗑 Removed Apprise URL:\n{removed_url}") # === Main Entry === if __name__ == "__main__": # Start Twitch monitoring in background t = threading.Thread(target=monitor_twitch, daemon=True) t.start() # Start Telegram bot app = ApplicationBuilder().token(BOT_TOKEN).build() app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("add", add_channel)) app.add_handler(CommandHandler("remove", remove_channel)) app.add_handler(CommandHandler("list", list_channels)) app.add_handler(CommandHandler("listapprise", list_apprise)) app.add_handler(CommandHandler("rmapprise", remove_apprise)) # Conversation for /setapprise conv_handler = ConversationHandler( entry_points=[CommandHandler("setapprise", set_apprise)], states={ SET_APPRISE_CONFIRM: [ MessageHandler(filters.TEXT & ~filters.COMMAND, confirm_apprise) ] }, fallbacks=[CommandHandler("cancel", cancel_set_apprise)] ) app.add_handler(conv_handler) log("Telegram bot is running...") app.run_polling()