Files
twitchrise-bot/bot.py

547 lines
20 KiB
Python

import os
import time
import json
import threading
import requests
import html
import re
from urllib.parse import urlparse, unquote
from dotenv import load_dotenv
from apprise import Apprise
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
ConversationHandler,
MessageHandler,
filters
)
from typing import Optional
# === Load Config ===
load_dotenv()
CLIENT_ID = os.getenv("TWITCH_CLIENT_ID")
CLIENT_SECRET = os.getenv("TWITCH_CLIENT_SECRET")
BOT_TOKEN = os.getenv("BOT_TOKEN")
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "60"))
WATCHLIST_FILE = os.path.join(os.path.dirname(__file__), "watchlists.json")
# === Globals ===
live_status = {} # {twitch_user_id: bool}
user_ids_cache = {} # {username: user_id}
watchlists = {}
# === Conversation states ===
SET_APPRISE_CONFIRM = 1
# === Utility: Timestamped logging ===
def log(msg: str):
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {msg}", flush=True)
# === Sanitization Utilities ===
# Twitch usernames are case-insensitive. We'll sanitize inputs to extract a clean
# username from URLs, @mentions, percent-encoded strings, or embedded HTML.
ALLOWED_USERNAME_RE = re.compile(r"^[A-Za-z0-9_]+$")
def sanitize_username(raw: str) -> Optional[str]:
"""Return a cleaned lowercase twitch username or None if it can't be parsed.
Handles inputs like:
- "@username"
- "https://twitch.tv/username"
- "https%3A%2F%2Fwww.twitch.tv%2Fusername"
- "<username>"
- plain usernames
"""
if not raw:
return None
s = raw.strip()
# decode HTML entities and percent-encoding
s = html.unescape(s)
try:
s = unquote(s)
except Exception:
pass
# If it looks like a URL (contains twitch.tv) parse and extract last path segment
if "twitch.tv" in s:
try:
parsed = urlparse(s if s.startswith("http") else "http://" + s)
parts = [p for p in parsed.path.split("/") if p]
if parts:
s = parts[-1]
except Exception:
pass
# Remove any surrounding angle brackets and common stray characters
s = s.strip().lstrip("@").strip("<>")
# Take the first token if the user pasted a sentence/HTML
s = s.split()[0]
# If it's already a valid username, return it
if ALLOWED_USERNAME_RE.match(s):
return s.lower()
# Otherwise try to remove disallowed chars and see if that leaves a valid name
cleaned = re.sub(r"[^A-Za-z0-9_]", "", s)
if cleaned and ALLOWED_USERNAME_RE.match(cleaned):
return cleaned.lower()
# Couldn't produce a valid username
return None
# === File Utilities ===
def load_watchlists():
if not os.path.exists(WATCHLIST_FILE):
return {}
with open(WATCHLIST_FILE, "r", encoding="utf-8") as f:
try:
return json.load(f)
except Exception as e:
log(f"[ERROR] Failed to parse watchlists.json: {e}")
return {}
def save_watchlists(watchlists_data):
with open(WATCHLIST_FILE, "w", encoding="utf-8") as f:
json.dump(watchlists_data, f, indent=2)
def clean_watchlists():
"""Sanitize all stored watchlists and remove invalid channel entries.
This helps when older entries contain full URLs or malformed values.
"""
global watchlists
changed = False
for chat_id, data in list(watchlists.items()):
channels = data.get("channels", []) or []
new_channels = []
for ch in channels:
san = sanitize_username(ch)
if san and san not in new_channels:
new_channels.append(san)
if new_channels != channels:
watchlists[chat_id]["channels"] = new_channels
changed = True
log(f"Cleaned channels for {chat_id}: {channels} -> {new_channels}")
if changed:
save_watchlists(watchlists)
log("Saved cleaned watchlists.json")
# Load & clean on startup
watchlists = load_watchlists()
clean_watchlists()
# === Twitch API Utilities ===
def get_app_token():
log("Requesting Twitch app token...")
resp = requests.post("https://id.twitch.tv/oauth2/token", params={
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"grant_type": "client_credentials"
})
resp.raise_for_status()
token = resp.json().get("access_token")
log("Twitch token received.")
return token
def chunked(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
def get_user_ids(headers, usernames):
"""Given an iterable of usernames (possibly messy), sanitize and resolve their Twitch IDs.
Returns a dict mapping sanitized_username -> user_id for resolved users.
"""
# Sanitize incoming list
usernames = [sanitize_username(u) for u in usernames]
usernames = [u for u in usernames if u]
usernames = [u.lower() for u in usernames]
to_fetch = [u for u in usernames if u not in user_ids_cache]
if to_fetch:
log(f"Fetching Twitch user IDs for: {', '.join(to_fetch)}")
for chunk in chunked(to_fetch, 100):
try:
resp = requests.get(
"https://api.twitch.tv/helix/users",
headers=headers,
params=[("login", name) for name in chunk]
)
resp.raise_for_status()
data = resp.json().get("data", [])
for user in data:
key = user.get("login", "").lower()
user_ids_cache[key] = user.get("id")
log(f"Resolved {key} => {user_ids_cache[key]}")
except requests.exceptions.HTTPError as e:
status = getattr(e.response, "status_code", "N/A")
text = getattr(e.response, "text", str(e))
log(f"[ERROR] Failed to fetch user ids for chunk {chunk}: {status} {text}")
# Log unresolved names (those that didn't come back in API)
unresolved = [u for u in to_fetch if u not in user_ids_cache]
if unresolved:
log(f"[WARN] Could not resolve these usernames: {', '.join(unresolved)}")
# Return mapping for requested usernames that we have cached
return {u: user_ids_cache.get(u) for u in usernames if u in user_ids_cache}
def get_live_streams(headers, user_ids):
"""Fetch live stream objects for given user_id list. Handles batching (100 per request).
Returns dict mapping user_id -> stream_object.
"""
if not user_ids:
return {}
ids_list = list(user_ids)
result = {}
for chunk in chunked(ids_list, 100):
try:
resp = requests.get(
"https://api.twitch.tv/helix/streams",
headers=headers,
params=[("user_id", uid) for uid in chunk]
)
resp.raise_for_status()
for stream in resp.json().get("data", []):
result[stream["user_id"]] = stream
except requests.exceptions.HTTPError as e:
status = getattr(e.response, "status_code", "N/A")
text = getattr(e.response, "text", str(e))
log(f"[ERROR] Failed to fetch live streams for chunk {chunk}: {status} {text}")
return result
# === Notification Utility ===
def send_notification(chat_id, title, body):
ap = Apprise()
# Always send to Telegram
ap.add(f"tgram://{BOT_TOKEN}/{chat_id}")
# Add extra user URLs if available
extra_urls = watchlists.get(chat_id, {}).get("apprise_urls", [])
for url in extra_urls:
ap.add(url)
try:
ap.notify(title=title, body=body)
except Exception as e:
log(f"[ERROR] Failed to send notification for {chat_id}: {e}")
# === Background Twitch Monitor ===
def monitor_twitch():
log("Twitch monitor thread started.")
token = get_app_token()
headers = {
"Client-ID": CLIENT_ID,
"Authorization": f"Bearer {token}"
}
global live_status
while True:
try:
log("Polling Twitch for stream updates...")
# Gather all channels across all users (ensure sanitized)
all_channels = set()
for data in watchlists.values():
for ch in data.get("channels", []):
san = sanitize_username(ch)
if san:
all_channels.add(san)
# Resolve to user IDs
user_id_map = get_user_ids(headers, list(all_channels))
# Poll live streams
live_data = get_live_streams(headers, user_id_map.values())
current_live = {uid: True for uid in live_data}
# Check changes per user
for chat_id, data in watchlists.items():
for username in data.get("channels", []):
san_username = sanitize_username(username)
if not san_username:
continue
uid = user_id_map.get(san_username)
if not uid:
continue
was_live = live_status.get(uid, False)
is_live = current_live.get(uid, False)
if is_live and not was_live:
s = live_data[uid]
title = f"🔴 {san_username} is now LIVE!"
body = f"{s.get('title')}\nGame: {s.get('game_name')}\nViewers: {s.get('viewer_count')}\nhttps://twitch.tv/{san_username}"
send_notification(chat_id, title, body)
log(f"Notified {chat_id}{san_username} went LIVE.")
elif not is_live and was_live:
title = f"{san_username} has gone offline."
body = f"{san_username} is no longer streaming.\nhttps://twitch.tv/{san_username}"
send_notification(chat_id, title, body)
log(f"Notified {chat_id}{san_username} went OFFLINE.")
live_status = current_live
except requests.exceptions.HTTPError as e:
status = getattr(e.response, 'status_code', None)
log(f"[ERROR] Twitch monitor HTTP error: {e} (status={status})")
# If token expired, refresh it and continue
if status == 401:
try:
token = get_app_token()
headers["Authorization"] = f"Bearer {token}"
log("Refreshed Twitch app token after 401.")
except Exception as ex:
log(f"[ERROR] Failed to refresh token: {ex}")
except Exception as e:
log(f"[ERROR] Twitch monitor: {e}")
time.sleep(CHECK_INTERVAL)
# === Telegram Bot Commands ===
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = str(update.effective_chat.id)
username = update.effective_user.username or update.effective_user.full_name
if chat_id not in watchlists:
watchlists[chat_id] = {"channels": [], "apprise_urls": []}
save_watchlists(watchlists)
log(f"User {username} ({chat_id}) started the bot.")
await update.message.reply_text(
"👋 Welcome to the Twitchrise bot for Telegram!\n\n"
"Use /add <channel> to monitor a Twitch channel.\n"
"Use /remove <channel> to stop monitoring the channel.\n"
"Use /list to see your monitor list.\n"
"Use /setapprise <url> to add extra notification targets.\n"
"Use /rmapprise <number> to remove already added notification targets.\n"
"Use /listapprise to list all added notification targets.\n\n"
"You can see the supported URLs for notification targets like Discord, Gotify etc. and their formats here - https://github.com/caronc/apprise#supported-notifications. \n\n"
"Please remember that these will work in addition to Telegram, you will always receive updates in this chat irrespective of if you add more targets or not."
)
async def add_channel(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = str(update.effective_chat.id)
username = update.effective_user.username or update.effective_user.full_name
if len(context.args) != 1:
await update.message.reply_text("Usage: /add <channel_name>")
return
raw = context.args[0]
channel = sanitize_username(raw)
if not channel:
await update.message.reply_text("⚠️ Invalid channel name. Please provide a plain username or a Twitch URL.")
return
watchlists.setdefault(chat_id, {"channels": [], "apprise_urls": []})
if channel not in watchlists[chat_id]["channels"]:
watchlists[chat_id]["channels"].append(channel)
save_watchlists(watchlists)
log(f"User {username} ({chat_id}) added channel: {channel}")
await update.message.reply_text(f"✅ Added {channel} to your watchlist.")
# === Check if channel is live immediately ===
try:
token = get_app_token()
headers = {
"Client-ID": CLIENT_ID,
"Authorization": f"Bearer {token}"
}
user_id_map = get_user_ids(headers, [channel])
uid = user_id_map.get(channel)
if uid:
live_data = get_live_streams(headers, [uid])
if uid in live_data:
s = live_data[uid]
title = f"🟢 {channel} is already LIVE!"
body = f"{s.get('title')}\nGame: {s.get('game_name')}\nViewers: {s.get('viewer_count')}\nhttps://twitch.tv/{channel}"
send_notification(chat_id, title, body)
log(f"Immediate notification to {username} ({chat_id}) — {channel} already LIVE.")
live_status[uid] = True
except Exception as e:
log(f"[ERROR] Live check on add failed: {e}")
else:
await update.message.reply_text(f"⚠️ {channel} is already in your watchlist.")
async def remove_channel(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = str(update.effective_chat.id)
username = update.effective_user.username or update.effective_user.full_name
if len(context.args) != 1:
await update.message.reply_text("Usage: /remove <channel_name>")
return
raw = context.args[0]
channel = sanitize_username(raw)
if not channel:
await update.message.reply_text("⚠️ Invalid channel name.")
return
if chat_id in watchlists and channel in watchlists[chat_id]["channels"]:
watchlists[chat_id]["channels"].remove(channel)
save_watchlists(watchlists)
log(f"User {username} ({chat_id}) removed channel: {channel}")
await update.message.reply_text(f"🗑 Removed {channel} from your watchlist.")
else:
await update.message.reply_text(f"⚠️ {channel} is not in your watchlist.")
async def list_channels(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = str(update.effective_chat.id)
username = update.effective_user.username or update.effective_user.full_name
channels = watchlists.get(chat_id, {}).get("channels", [])
log(f"User {username} ({chat_id}) requested watchlist: {channels}")
if channels:
message = "📜 Your watchlist: (tap to copy)\n"
for ch in channels:
safe_ch = html.escape(ch)
message += f"• <code>{safe_ch}</code>\n"
await update.message.reply_text(message, parse_mode="HTML")
else:
await update.message.reply_text("📭 Your watchlist is empty.")
# === /setapprise flow ===
async def set_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = str(update.effective_chat.id)
if len(context.args) != 1:
await update.message.reply_text("Usage: /setapprise <apprise_url>")
return ConversationHandler.END
url = context.args[0]
context.user_data["pending_apprise_url"] = url
# Test the URL
ap = Apprise()
ap.add(url)
worked = ap.notify(title="Test Notification", body="If you see this, the Apprise URL works!")
if worked:
await update.message.reply_text(
"✅ Test notification sent successfully.\n"
"Do you want to save this URL for future alerts? Please reply 'yes' or 'no'"
)
return SET_APPRISE_CONFIRM
else:
await update.message.reply_text("❌ The Apprise URL did not work. Please check and try again.")
return ConversationHandler.END
async def confirm_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = str(update.effective_chat.id)
reply = update.message.text.strip().lower()
if reply in ("yes", "y"):
url = context.user_data.get("pending_apprise_url")
if not url:
await update.message.reply_text("⚠️ No pending URL found.")
return ConversationHandler.END
watchlists.setdefault(chat_id, {"channels": [], "apprise_urls": []})
if url not in watchlists[chat_id]["apprise_urls"]:
watchlists[chat_id]["apprise_urls"].append(url)
save_watchlists(watchlists)
log(f"User {chat_id} saved Apprise URL: {url}")
await update.message.reply_text("💾 Saved your Apprise URL.")
else:
await update.message.reply_text("❌ Not saved.")
return ConversationHandler.END
async def cancel_set_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("❌ Operation cancelled.")
return ConversationHandler.END
async def list_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = str(update.effective_chat.id)
apprise_urls = watchlists.get(chat_id, {}).get("apprise_urls", [])
if not apprise_urls:
await update.message.reply_text("📭 You have no saved Apprise URLs.")
return
message = "🔗 Your saved Apprise URLs: (tap to copy)\n"
for i, url in enumerate(apprise_urls, start=1):
safe_url = html.escape(url) # escape &, <, >
message += f"({i}) <code>{safe_url}</code>\n"
await update.message.reply_text(message, parse_mode="HTML")
async def remove_apprise(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = str(update.effective_chat.id)
apprise_urls = watchlists.get(chat_id, {}).get("apprise_urls", [])
if len(context.args) != 1:
await update.message.reply_text("Usage: /rmapprise <number>")
return
try:
index = int(context.args[0])
if index < 1 or index > len(apprise_urls):
await update.message.reply_text("⚠️ Invalid number. Use /listapprise to see saved URLs.")
return
except ValueError:
await update.message.reply_text("⚠️ Please provide a valid number.")
return
removed_url = apprise_urls.pop(index - 1)
watchlists[chat_id]["apprise_urls"] = apprise_urls
save_watchlists(watchlists)
log(f"User {chat_id} removed Apprise URL: {removed_url}")
await update.message.reply_text(f"🗑 Removed Apprise URL:\n{removed_url}")
# === Main Entry ===
if __name__ == "__main__":
# Start Twitch monitoring in background
t = threading.Thread(target=monitor_twitch, daemon=True)
t.start()
# Start Telegram bot
app = ApplicationBuilder().token(BOT_TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("add", add_channel))
app.add_handler(CommandHandler("remove", remove_channel))
app.add_handler(CommandHandler("list", list_channels))
app.add_handler(CommandHandler("listapprise", list_apprise))
app.add_handler(CommandHandler("rmapprise", remove_apprise))
# Conversation for /setapprise
conv_handler = ConversationHandler(
entry_points=[CommandHandler("setapprise", set_apprise)],
states={
SET_APPRISE_CONFIRM: [
MessageHandler(filters.TEXT & ~filters.COMMAND, confirm_apprise)
]
},
fallbacks=[CommandHandler("cancel", cancel_set_apprise)]
)
app.add_handler(conv_handler)
log("Telegram bot is running...")
app.run_polling()