import json import os import asyncio import logging from datetime import datetime, timedelta, time from typing import Dict, List, Optional import apprise from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ContextTypes import pytz from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) class BirthdayBot: def __init__(self, token: str): self.token = token self.data_file = 'birthdays.json' self.users_data = self.load_data() self.pending_endpoints = {} # Store pending endpoints for confirmation self.sent_reminders = set() # Track sent reminders to prevent duplicates self.application = Application.builder().token(token).build() self.setup_handlers() def load_data(self) -> Dict: """Load user data from JSON file""" if os.path.exists(self.data_file): try: with open(self.data_file, 'r') as f: return json.load(f) except json.JSONDecodeError: logger.error("Error reading JSON file, starting with empty data") return {} return {} def save_data(self): """Save user data to JSON file""" with open(self.data_file, 'w') as f: json.dump(self.users_data, f, indent=2, default=str) def get_user_data(self, user_id: str) -> Dict: """Get or create user data""" if user_id not in self.users_data: self.users_data[user_id] = { 'birthdays': {}, 'apprise_endpoints': [], 'reminders': [], 'timezone': 'UTC' } return self.users_data[user_id] def create_reminder_key(self, user_id: str, name: str, birthday_str: str, reminder: Dict, reminder_time: datetime) -> str: """Create a unique key for tracking sent reminders""" # Round reminder time to the minute to avoid sub-minute duplicates rounded_time = reminder_time.replace(second=0, microsecond=0) return f"{user_id}_{name}_{birthday_str}_{reminder['type']}_{reminder['value']}_{rounded_time.isoformat()}" def cleanup_old_reminder_keys(self): """Clean up reminder keys older than 24 hours to prevent memory buildup""" current_time = datetime.now(pytz.UTC) keys_to_remove = [] for key in self.sent_reminders: try: # Extract timestamp from the key (last part after splitting by '_') timestamp_str = key.split('_')[-1] key_time = datetime.fromisoformat(timestamp_str) # Remove keys older than 24 hours if (current_time - key_time).total_seconds() > 86400: # 24 hours keys_to_remove.append(key) except (ValueError, IndexError): # If we can't parse the timestamp, remove the key to be safe keys_to_remove.append(key) for key in keys_to_remove: self.sent_reminders.discard(key) def setup_handlers(self): """Setup command and message handlers""" # Command handlers self.application.add_handler(CommandHandler("start", self.start)) self.application.add_handler(CommandHandler("help", self.help_command)) self.application.add_handler(CommandHandler("add_birthday", self.add_birthday)) self.application.add_handler(CommandHandler("list_birthdays", self.list_birthdays)) self.application.add_handler(CommandHandler("remove_birthday", self.remove_birthday)) self.application.add_handler(CommandHandler("add_endpoint", self.add_endpoint)) self.application.add_handler(CommandHandler("list_endpoints", self.list_endpoints)) self.application.add_handler(CommandHandler("remove_endpoint", self.remove_endpoint)) self.application.add_handler(CommandHandler("add_reminder", self.add_reminder)) self.application.add_handler(CommandHandler("list_reminders", self.list_reminders)) self.application.add_handler(CommandHandler("remove_reminder", self.remove_reminder)) self.application.add_handler(CommandHandler("set_timezone", self.set_timezone)) self.application.add_handler(CommandHandler("test_notifications", self.test_notifications)) # Callback query handler for inline keyboards self.application.add_handler(CallbackQueryHandler(self.handle_callback)) async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Start command handler""" welcome_message = """ ๐ŸŽ‰ Welcome to Birthday Reminder Bot! ๐ŸŽ‰ I'll help you remember all the important birthdays and send notifications through various channels. Use /help to see all available commands. """ await update.message.reply_text(welcome_message) async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Help command handler""" help_text = """ ๐Ÿ“‹ **Available Commands:** **Birthday Management:** โ€ข `/add_birthday` - Add a new birthday โ€ข `/list_birthdays` - View all birthdays โ€ข `/remove_birthday` - Remove a birthday **Notification Endpoints:** โ€ข `/add_endpoint` - Add Apprise notification endpoint โ€ข `/list_endpoints` - View all endpoints โ€ข `/remove_endpoint` - Remove an endpoint **Reminders:** โ€ข `/add_reminder` - Add reminder schedule โ€ข `/list_reminders` - View all reminders โ€ข `/remove_reminder` - Remove a reminder **Settings:** โ€ข `/set_timezone` - Set your timezone โ€ข `/test_notifications` - Test your notification setup **Birthday Format:** Use MM-DD format (e.g., 03-15 for March 15) **Apprise Format:** Any valid Apprise URL (telegram, discord, email, etc.) Examples: - Telegram: `tgram://bot_token/chat_id` - Discord: `discord://webhook_id/webhook_token` - Email: `mailto://user:pass@smtp.gmail.com` """ await update.message.reply_text(help_text, parse_mode='Markdown') async def add_birthday(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Add birthday command handler""" if len(context.args) < 2: await update.message.reply_text( "Usage: /add_birthday \n" "Example: /add_birthday John 03-15\n" "Date format: MM-DD (March 15)" ) return user_id = str(update.effective_user.id) user_data = self.get_user_data(user_id) name = context.args[0] date_str = context.args[1] try: # Validate date format datetime.strptime(date_str, '%m-%d') user_data['birthdays'][name] = date_str self.save_data() await update.message.reply_text(f"โœ… Birthday added: {name} on {date_str}") except ValueError: await update.message.reply_text("โŒ Invalid date format. Use MM-DD (e.g., 03-15)") async def list_birthdays(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """List all birthdays""" user_id = str(update.effective_user.id) user_data = self.get_user_data(user_id) if not user_data['birthdays']: await update.message.reply_text("๐Ÿ“… No birthdays stored yet.") return message = "๐Ÿ“… **Your Birthdays:**\n\n" for name, date in user_data['birthdays'].items(): next_birthday = self.get_next_birthday_date(date) days_until = (next_birthday - datetime.now().date()).days message += f"โ€ข {name}: {date} ({days_until} days until next birthday)\n" await update.message.reply_text(message, parse_mode='Markdown') async def remove_birthday(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Remove birthday command handler""" if len(context.args) < 1: await update.message.reply_text("Usage: /remove_birthday ") return user_id = str(update.effective_user.id) user_data = self.get_user_data(user_id) name = context.args[0] if name in user_data['birthdays']: del user_data['birthdays'][name] self.save_data() await update.message.reply_text(f"โœ… Removed birthday for {name}") else: await update.message.reply_text(f"โŒ No birthday found for {name}") async def add_endpoint(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Add Apprise endpoint with confirmation""" if len(context.args) < 1: await update.message.reply_text( "Usage: /add_endpoint \n" "Example: /add_endpoint mailto://user:pass@smtp.gmail.com" ) return user_id = str(update.effective_user.id) user_data = self.get_user_data(user_id) endpoint = ' '.join(context.args) # Test the endpoint first apobj = apprise.Apprise() if not apobj.add(endpoint): await update.message.reply_text("โŒ Invalid Apprise endpoint format") return # Send test notification test_title = "๐Ÿงช Birthday Bot Test" test_message = "This is a test notification to verify your endpoint is working correctly. Please confirm if you received this message." try: success = apobj.notify(body=test_message, title=test_title) if not success: await update.message.reply_text("โŒ Failed to send test notification to this endpoint") return except Exception as e: await update.message.reply_text(f"โŒ Error testing endpoint: {str(e)}") return # Store endpoint temporarily for confirmation self.pending_endpoints[user_id] = endpoint # Create confirmation keyboard keyboard = [ [ InlineKeyboardButton("โœ… Yes, I received it", callback_data="confirm_endpoint_yes"), InlineKeyboardButton("โŒ No, I didn't receive it", callback_data="confirm_endpoint_no") ] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"๐Ÿ“ก Test notification sent to:\n`{self.mask_sensitive_info(endpoint)}`\n\n" "Did you receive the test notification?", reply_markup=reply_markup, parse_mode='Markdown' ) async def list_endpoints(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """List all notification endpoints""" user_id = str(update.effective_user.id) user_data = self.get_user_data(user_id) if not user_data['apprise_endpoints']: await update.message.reply_text("๐Ÿ“ก No notification endpoints configured.") return message = "๐Ÿ“ก **Notification Endpoints:**\n\n" for i, endpoint in enumerate(user_data['apprise_endpoints'], 1): # Hide sensitive information in display display_endpoint = self.mask_sensitive_info(endpoint) message += f"{i}. {display_endpoint}\n" await update.message.reply_text(message, parse_mode='Markdown') def mask_sensitive_info(self, endpoint: str) -> str: """Mask sensitive information in endpoints for display""" # Basic masking for common patterns if 'mailto://' in endpoint: parts = endpoint.split('@') if len(parts) > 1: return f"mailto://***@{parts[-1]}" elif 'tgram://' in endpoint: return "tgram://*** (Telegram)" elif 'discord://' in endpoint: return "discord://*** (Discord Webhook)" return endpoint[:20] + "..." if len(endpoint) > 20 else endpoint async def remove_endpoint(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Remove notification endpoint""" user_id = str(update.effective_user.id) user_data = self.get_user_data(user_id) if not user_data['apprise_endpoints']: await update.message.reply_text("๐Ÿ“ก No endpoints to remove.") return # Create inline keyboard with endpoints keyboard = [] for i, endpoint in enumerate(user_data['apprise_endpoints']): display_endpoint = self.mask_sensitive_info(endpoint) keyboard.append([InlineKeyboardButton( f"{i+1}. {display_endpoint}", callback_data=f"remove_endpoint_{i}" )]) reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "Select endpoint to remove:", reply_markup=reply_markup ) async def add_reminder(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Add reminder schedule""" if len(context.args) < 2: await update.message.reply_text( "Usage: /add_reminder \n\n" "Types:\n" "โ€ข `minutes_before` - Minutes before birthday (e.g., 15)\n" "โ€ข `hours_before` - Hours before birthday (e.g., 24)\n" "โ€ข `days_before` - Days before birthday (e.g., 1)\n" "โ€ข `time_on_day` - Specific time on birthday (e.g., 09:00)\n" "โ€ข `time_before` - Specific time before birthday (e.g., 1:18:00 = 1 day, 18:00)" ) return user_id = str(update.effective_user.id) user_data = self.get_user_data(user_id) reminder_type = context.args[0] value = context.args[1] reminder = {'type': reminder_type, 'value': value} try: # Validate reminder format self.validate_reminder(reminder) user_data['reminders'].append(reminder) self.save_data() await update.message.reply_text(f"โœ… Reminder added: {reminder_type} = {value}") except ValueError as e: await update.message.reply_text(f"โŒ Invalid reminder format: {str(e)}") def validate_reminder(self, reminder: Dict): """Validate reminder format""" reminder_type = reminder['type'] value = reminder['value'] if reminder_type in ['minutes_before', 'hours_before', 'days_before']: int(value) # Will raise ValueError if not a valid integer elif reminder_type == 'time_on_day': datetime.strptime(value, '%H:%M') # Will raise ValueError if not HH:MM elif reminder_type == 'time_before': # Format: D:HH:MM (days:hours:minutes) parts = value.split(':') if len(parts) != 3: raise ValueError("time_before format should be D:HH:MM") int(parts[0]) # days datetime.strptime(f"{parts[1]}:{parts[2]}", '%H:%M') # time else: raise ValueError(f"Unknown reminder type: {reminder_type}") async def list_reminders(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """List all reminders""" user_id = str(update.effective_user.id) user_data = self.get_user_data(user_id) if not user_data['reminders']: await update.message.reply_text("โฐ No reminders configured.") return message = "โฐ **Your Reminders:**\n\n" for i, reminder in enumerate(user_data['reminders'], 1): message += f"{i}. {reminder['type']}: {reminder['value']}\n" await update.message.reply_text(message, parse_mode='Markdown') async def remove_reminder(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Remove reminder""" if len(context.args) < 1: await update.message.reply_text("Usage: /remove_reminder ") return user_id = str(update.effective_user.id) user_data = self.get_user_data(user_id) try: index = int(context.args[0]) - 1 if 0 <= index < len(user_data['reminders']): removed = user_data['reminders'].pop(index) self.save_data() await update.message.reply_text( f"โœ… Removed reminder: {removed['type']} = {removed['value']}" ) else: await update.message.reply_text("โŒ Invalid reminder number") except ValueError: await update.message.reply_text("โŒ Please provide a valid number") async def set_timezone(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Set user timezone""" if len(context.args) < 1: await update.message.reply_text( "Usage: /set_timezone \n" "Example: /set_timezone America/New_York\n" "Common timezones: UTC, US/Eastern, US/Pacific, Europe/London, Asia/Tokyo" ) return user_id = str(update.effective_user.id) user_data = self.get_user_data(user_id) timezone = context.args[0] try: pytz.timezone(timezone) # Validate timezone user_data['timezone'] = timezone self.save_data() await update.message.reply_text(f"โœ… Timezone set to {timezone}") except pytz.exceptions.UnknownTimeZoneError: await update.message.reply_text("โŒ Invalid timezone") async def test_notifications(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Test notification setup""" user_id = str(update.effective_user.id) user_data = self.get_user_data(user_id) if not user_data['apprise_endpoints']: await update.message.reply_text("โŒ No notification endpoints configured") return success_count = await self.send_notification( user_id, "๐Ÿงช Test Notification", "This is a test message from Birthday Bot!" ) total_endpoints = len(user_data['apprise_endpoints']) await update.message.reply_text( f"๐Ÿ“ก Test complete: {success_count}/{total_endpoints} endpoints successful" ) async def handle_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle inline keyboard callbacks""" query = update.callback_query await query.answer() user_id = str(query.from_user.id) if query.data.startswith("remove_endpoint_"): index = int(query.data.split("_")[-1]) user_data = self.get_user_data(user_id) if 0 <= index < len(user_data['apprise_endpoints']): removed = user_data['apprise_endpoints'].pop(index) self.save_data() await query.edit_message_text( f"โœ… Removed endpoint: {self.mask_sensitive_info(removed)}" ) else: await query.edit_message_text("โŒ Invalid endpoint selection") elif query.data == "confirm_endpoint_yes": # User confirmed they received the test notification if user_id in self.pending_endpoints: endpoint = self.pending_endpoints[user_id] user_data = self.get_user_data(user_id) user_data['apprise_endpoints'].append(endpoint) # Always ensure telegram endpoint is included telegram_endpoint = f"tgram://{self.token}/{query.message.chat.id}" if telegram_endpoint not in user_data['apprise_endpoints']: user_data['apprise_endpoints'].append(telegram_endpoint) self.save_data() del self.pending_endpoints[user_id] await query.edit_message_text( f"โœ… Endpoint added successfully!\n" f"๐Ÿ“ก {self.mask_sensitive_info(endpoint)}\n\n" f"You now have {len(user_data['apprise_endpoints'])} notification endpoint(s) configured." ) else: await query.edit_message_text("โŒ No pending endpoint to confirm") elif query.data == "confirm_endpoint_no": # User didn't receive the test notification if user_id in self.pending_endpoints: endpoint = self.pending_endpoints[user_id] del self.pending_endpoints[user_id] await query.edit_message_text( f"โŒ Endpoint not added due to failed test:\n" f"๐Ÿ“ก {self.mask_sensitive_info(endpoint)}\n\n" "Please check your endpoint configuration and try again. " "Make sure the URL format is correct and the service is accessible." ) else: await query.edit_message_text("โŒ No pending endpoint to cancel") def get_next_birthday_date(self, birthday_str: str) -> datetime.date: """Get next occurrence of birthday""" current_year = datetime.now().year month, day = map(int, birthday_str.split('-')) next_birthday = datetime(current_year, month, day).date() # If birthday already passed this year, use next year if next_birthday < datetime.now().date(): next_birthday = datetime(current_year + 1, month, day).date() return next_birthday def calculate_reminder_time(self, birthday_date: datetime.date, reminder: Dict, timezone_str: str) -> datetime: """Calculate when to send reminder based on birthday and reminder settings""" tz = pytz.timezone(timezone_str) reminder_type = reminder['type'] value = reminder['value'] if reminder_type == 'minutes_before': reminder_datetime = datetime.combine(birthday_date, time(0, 0)) reminder_datetime -= timedelta(minutes=int(value)) elif reminder_type == 'hours_before': reminder_datetime = datetime.combine(birthday_date, time(0, 0)) reminder_datetime -= timedelta(hours=int(value)) elif reminder_type == 'days_before': reminder_datetime = datetime.combine(birthday_date - timedelta(days=int(value)), time(9, 0)) elif reminder_type == 'time_on_day': reminder_time = datetime.strptime(value, '%H:%M').time() reminder_datetime = datetime.combine(birthday_date, reminder_time) elif reminder_type == 'time_before': days, time_str = value.split(':', 1) reminder_time = datetime.strptime(time_str, '%H:%M').time() target_date = birthday_date - timedelta(days=int(days)) reminder_datetime = datetime.combine(target_date, reminder_time) # Localize to user's timezone then convert to UTC reminder_datetime = tz.localize(reminder_datetime).astimezone(pytz.UTC) return reminder_datetime async def send_notification(self, user_id: str, title: str, message: str) -> int: """Send notification to all configured endpoints""" user_data = self.get_user_data(user_id) if not user_data['apprise_endpoints']: return 0 apobj = apprise.Apprise() # Add all endpoints for endpoint in user_data['apprise_endpoints']: apobj.add(endpoint) # Send notification try: success = apobj.notify(body=message, title=title) return len(user_data['apprise_endpoints']) if success else 0 except Exception as e: logger.error(f"Notification error for user {user_id}: {str(e)}") return 0 async def check_birthdays(self): """Check for upcoming birthdays and send reminders""" logger.info("Checking birthdays...") current_time = datetime.now(pytz.UTC) # Clean up old reminder keys periodically (every hour) if not hasattr(self, '_last_cleanup') or (current_time - self._last_cleanup).total_seconds() > 3600: self.cleanup_old_reminder_keys() self._last_cleanup = current_time for user_id, user_data in self.users_data.items(): if not user_data['birthdays'] or not user_data['reminders']: continue timezone_str = user_data.get('timezone', 'UTC') for name, birthday_str in user_data['birthdays'].items(): next_birthday = self.get_next_birthday_date(birthday_str) for reminder in user_data['reminders']: try: reminder_time = self.calculate_reminder_time( next_birthday, reminder, timezone_str ) # Create unique key for this reminder reminder_key = self.create_reminder_key( user_id, name, birthday_str, reminder, reminder_time ) # Check if it's time to send reminder (within 1 minute window) time_diff = abs((current_time - reminder_time).total_seconds()) if time_diff <= 60 and reminder_key not in self.sent_reminders: # Mark as sent before sending to prevent race conditions self.sent_reminders.add(reminder_key) age = datetime.now().year - 2000 # Approximate age calculation title = f"๐ŸŽ‚ Birthday Reminder: {name}" message = f"Don't forget! {name}'s birthday is coming up on {birthday_str}!" if reminder['type'] == 'time_on_day': message = f"๐ŸŽ‰ It's {name}'s birthday today! ๐ŸŽ‰" await self.send_notification(user_id, title, message) logger.info(f"Sent reminder for {name} to user {user_id}") except Exception as e: logger.error(f"Error processing reminder for {name}: {str(e)}") async def run_birthday_checker(self): """Run periodic birthday checker""" while True: try: await self.check_birthdays() await asyncio.sleep(60) # Check every minute except Exception as e: logger.error(f"Error in birthday checker: {str(e)}") await asyncio.sleep(60) async def post_init(self, application): """Initialize background tasks after the application starts""" asyncio.create_task(self.run_birthday_checker()) def run(self): """Run the bot""" # Set up post-init callback to start background tasks self.application.post_init = self.post_init # Run the bot self.application.run_polling(allowed_updates=Update.ALL_TYPES) # Configuration BOT_TOKEN = os.getenv("BOT_TOKEN") if __name__ == "__main__": if not BOT_TOKEN: print("Error: BOT_TOKEN not found in environment variables") print("Please create a .env file with your bot token:") print("BOT_TOKEN=your_bot_token_here") exit(1) bot = BirthdayBot(BOT_TOKEN) bot.run()