diff --git a/DMbot.py b/DMbot.py new file mode 100644 index 0000000..ba94f4a --- /dev/null +++ b/DMbot.py @@ -0,0 +1,189 @@ +import os +import requests +import base64 +import re +import json +from datetime import datetime +from telegram import Update +from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes + +# TELEGRAM DM DIRECTUS BOT + +# Configuration +API_BASE_URL = "" +BEARER_TOKEN = "" # Replace with your actual token +TELEGRAM_TOKEN = "" # Replace with your bot token + +# UUID validation pattern +UUID_PATTERN = re.compile(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$') + +def log_activity(message: str) -> None: + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{timestamp}] {message}") + +async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + help_text = ( + "Hello! I can help you find addons.\n\n" + "🔍 Search by name/creator: Type any search term\n" + "🔎 Get by UUID: Paste a valid UUID\n" + "❌ Cancel ongoing search: Type /cancel or 'cancel'" + ) + await send_response(update, context, help_text) + +async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if context.user_data.get('awaiting_index'): + context.user_data.clear() + await send_response(update, context, "❌ Search cancelled. You can start a new search.") + log_activity("Search cancelled by user") + else: + await send_response(update, context, "⚠️ No active operation to cancel") + +async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + user_info = f"@{update.effective_user.username}" if update.effective_user.username else "Unknown User" + log_activity(f"Received message from {user_info}: '{update.message.text}'") + + user_input = update.message.text.strip().lower() + + # Handle cancellation first + if user_input in ['cancel', 'abort']: + await cancel(update, context) + return + + # Check if expecting index selection + if context.user_data.get('awaiting_index'): + try: + index = int(user_input) - 1 + search_results = context.user_data.get('search_results', []) + if 0 <= index < len(search_results): + selected_uuid = search_results[index]['uuid'] + context.user_data.clear() + await process_uuid(update, context, selected_uuid) + else: + await send_response(update, context, "❌ Invalid index. Type /cancel to abort.") + except ValueError: + await send_response(update, context, "❌ Please enter a valid number or type /cancel to abort") + return + + # Check if input is UUID + if UUID_PATTERN.match(user_input): + await process_uuid(update, context, user_input) + else: + await perform_search(update, context, user_input) + +async def perform_search(update: Update, context: ContextTypes.DEFAULT_TYPE, query: str) -> None: + try: + log_activity(f"Searching for: {query}") + headers = {"Authorization": f"Bearer {BEARER_TOKEN}"} + + # Directus filter parameters + search_params = { + "filter": json.dumps({ + "_or": [ + {"name": {"_icontains": query}}, + {"creator": {"_icontains": query}} + ] + }), + "fields": "uuid,name,creator,version,download_hash", + "limit": "10" + } + + response = requests.get( + url=f"{API_BASE_URL}", + headers=headers, + params=search_params + ) + response.raise_for_status() + + search_data = response.json().get('data', []) + + if not search_data: + await send_response(update, context, "🔍 No results found for your query.") + return + + context.user_data['search_results'] = search_data + context.user_data['awaiting_index'] = True + + results_list = [ + f"{idx}. {item.get('name', 'N/A')} by {item.get('creator', 'N/A')}" + for idx, item in enumerate(search_data, 1) + ] + + response_message = "🔍 Search Results:\n" + "\n".join(results_list) + "\n\nReply with the number to view details:" + await send_response(update, context, response_message) + + except requests.exceptions.HTTPError as e: + error_msg = f"🔧 API Error: {e.response.status_code}" + await send_response(update, context, error_msg) + log_activity(f"Search API Error: {str(e)}") + except Exception as e: + log_activity(f"Search error: {str(e)}") + await send_response(update, context, "⚠️ Error processing search") + +async def process_uuid(update: Update, context: ContextTypes.DEFAULT_TYPE, uuid: str) -> None: + try: + headers = {"Authorization": f"Bearer {BEARER_TOKEN}"} + response = requests.get(f"{API_BASE_URL}{uuid}", headers=headers) + response.raise_for_status() + + data = response.json().get('data', {}) + if data: + download_hashes = data.get('download_hash', '') + decoded_urls = [] + + if download_hashes: + for i, hash_part in enumerate(download_hashes.split(','), 1): + try: + decoded = base64.b64decode(hash_part.strip()).decode('utf-8') + decoded_urls.append(f"{i}. {decoded}") + except: + decoded_urls.append(f"{i}. Invalid hash") + + formatted_message = ( + f"UUID: {data.get('uuid', 'N/A')}\n" + f"Name: {data.get('name', 'N/A')}\n" + f"Creator: {data.get('creator', 'N/A')}\n" + f"Version: {data.get('version', 'N/A')}\n" + ) + + if decoded_urls: + formatted_message += "Download URLs:\n" + "\n".join(decoded_urls) + else: + formatted_message += "No download URLs found" + + await send_response(update, context, f"📦 Addon Information:\n{formatted_message}") + else: + await send_response(update, context, "❌ No information found for this UUID.") + + except requests.exceptions.HTTPError as e: + if e.response.status_code == 403: + await send_response(update, context, "⛔ Access denied for this UUID") + else: + await send_response(update, context, f"🔧 API Error: {e.response.status_code}") + except Exception as e: + log_activity(f"UUID processing error: {str(e)}") + await send_response(update, context, "⚠️ Error fetching UUID details") + +async def send_response(update: Update, context: ContextTypes.DEFAULT_TYPE, text: str) -> None: + await context.bot.send_message( + chat_id=update.effective_chat.id, + text=text + ) + log_activity(f"Sent response: '{text}'") + +def main() -> None: + application = Application.builder().token(TELEGRAM_TOKEN).build() + application.add_handler(CommandHandler("start", start)) + application.add_handler(CommandHandler("cancel", cancel)) + + # Modified message handler configuration + application.add_handler(MessageHandler( + filters.TEXT & + ~filters.COMMAND & + filters.ChatType.PRIVATE, + handle_text + )) + + application.run_polling() + +if __name__ == '__main__': + main() diff --git a/DSbot.py b/DSbot.py new file mode 100644 index 0000000..07e5bcf --- /dev/null +++ b/DSbot.py @@ -0,0 +1,254 @@ +import os +import requests +import base64 +import re +import json +from datetime import datetime +import discord +from discord.ext import commands +from discord.ui import Button, View + +# DISCORD SERVER + DM DIRECTUS BOT + +# Configuration +API_BASE_URL = "" +BEARER_TOKEN = "" # Replace with your actual token +DISCORD_TOKEN = "" # Replace +ALLOWED_CHANNELS = [123456789] # REPLACE WITH YOUR CHANNEL IDS + +# UUID validation pattern +UUID_PATTERN = re.compile(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$') + +def log_activity(message: str) -> None: + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{timestamp}] {message}") + +intents = discord.Intents.default() +intents.message_content = True +bot = commands.Bot(command_prefix='!', intents=intents) + +class SearchResultsView(View): + def __init__(self, results): + super().__init__(timeout=60) + self.results = results + self.add_buttons() + + def add_buttons(self): + for idx, item in enumerate(self.results[:25], 1): + self.add_item(ResultButton(item, idx)) + +class ResultButton(Button): + def __init__(self, item, index): + super().__init__( + label=f"{index}", + style=discord.ButtonStyle.primary, + custom_id=f"result_{item['uuid']}" + ) + self.item = item + + async def callback(self, interaction: discord.Interaction): + await self.show_addon_embed(interaction) + + async def show_addon_embed(self, interaction: discord.Interaction): + data = self.item + download_hashes = data.get('download_hash', '') + decoded_urls = [] + + if download_hashes: + for i, hash_part in enumerate(download_hashes.split(','), 1): + try: + decoded = base64.b64decode(hash_part.strip()).decode('utf-8') + decoded_urls.append(f"{i}. {decoded}") + except: + decoded_urls.append(f"{i}. Invalid hash") + + embed = discord.Embed( + title=data.get('name', 'N/A'), + color=discord.Color.blue() + ) + embed.add_field(name="UUID", value=data.get('uuid', 'N/A'), inline=False) + embed.add_field(name="Creator", value=data.get('creator', 'N/A'), inline=True) + embed.add_field(name="Version", value=data.get('version', 'N/A'), inline=True) + + if decoded_urls: + embed.add_field( + name="Download URLs", + value="\n".join(decoded_urls), + inline=False + ) + + await interaction.response.send_message(embed=embed, ephemeral=True) + +@bot.event +async def on_ready(): + log_activity(f'Logged in as {bot.user.name}') + +async def send_start(ctx): + embed = discord.Embed( + title="Addon Search Bot", + description="I can help you find addons!\n\n" + "🔍 **Search**: `!mpbot [search terms]`\n" + "🔎 **UUID Lookup**: `!mpbot [uuid]`\n" + "🆘 **Help**: `!mpbot help`\n" + "❌ **Cancel**: `!mpbot cancel`", + color=discord.Color.green() + ) + await ctx.send(embed=embed) + +async def send_cancel(ctx): + embed = discord.Embed( + description="⚠️ No active operation to cancel", + color=discord.Color.orange() + ) + await ctx.send(embed=embed, delete_after=5) + +@bot.event +async def on_message(message): + if message.author == bot.user: + return + + # Channel check + if message.channel.id not in ALLOWED_CHANNELS: + return + + # Command prefix check + if not message.content.startswith('!mpbot'): + return + + ctx = await bot.get_context(message) + content = message.content[len('!mpbot'):].strip() + + if not content: + await send_start(ctx) + return + + if content.lower() == 'help': + await send_start(ctx) + elif content.lower() == 'cancel': + await send_cancel(ctx) + elif UUID_PATTERN.match(content): + await process_uuid(ctx, content) + else: + await perform_search(ctx, content) + +async def perform_search(ctx, query): + try: + headers = {"Authorization": f"Bearer {BEARER_TOKEN}"} + search_params = { + "filter": json.dumps({ + "_or": [ + {"name": {"_icontains": query}}, + {"creator": {"_icontains": query}} + ] + }), + "fields": "uuid,name,creator,version,download_hash", + "limit": "10" + } + + response = requests.get(API_BASE_URL, headers=headers, params=search_params) + response.raise_for_status() + + search_data = response.json().get('data', []) + + if not search_data: + embed = discord.Embed( + description="🔍 No results found for your query.", + color=discord.Color.red() + ) + await ctx.send(embed=embed, ephemeral=True) + return + + results_list = [ + f"{idx}. **{item.get('name', 'N/A')}** by {item.get('creator', 'N/A')}" + for idx, item in enumerate(search_data, 1) + ] + + embed = discord.Embed( + title=f"Search Results for '{query}'", + description="\n".join(results_list) + "\n\nClick a button below to view details:", + color=discord.Color.blue() + ) + + view = SearchResultsView(search_data) + await ctx.send(embed=embed, view=view) + + except requests.exceptions.HTTPError as e: + embed = discord.Embed( + title="API Error", + description=f"Error code: {e.response.status_code}", + color=discord.Color.red() + ) + await ctx.send(embed=embed, ephemeral=True) + except Exception as e: + log_activity(f"Search error: {str(e)}") + embed = discord.Embed( + title="Error", + description="⚠️ Error processing search", + color=discord.Color.red() + ) + await ctx.send(embed=embed, ephemeral=True) + +async def process_uuid(ctx, uuid): + try: + headers = {"Authorization": f"Bearer {BEARER_TOKEN}"} + response = requests.get(f"{API_BASE_URL}{uuid}", headers=headers) + response.raise_for_status() + + data = response.json().get('data', {}) + if data: + download_hashes = data.get('download_hash', '') + decoded_urls = [] + + if download_hashes: + for i, hash_part in enumerate(download_hashes.split(','), 1): + try: + decoded = base64.b64decode(hash_part.strip()).decode('utf-8') + decoded_urls.append(f"{i}. {decoded}") + except: + decoded_urls.append(f"{i}. Invalid hash") + + embed = discord.Embed( + title=data.get('name', 'N/A'), + color=discord.Color.blue() + ) + embed.add_field(name="UUID", value=data.get('uuid', 'N/A'), inline=False) + embed.add_field(name="Creator", value=data.get('creator', 'N/A'), inline=True) + embed.add_field(name="Version", value=data.get('version', 'N/A'), inline=True) + + if decoded_urls: + embed.add_field( + name="Download URLs", + value="\n".join(decoded_urls), + inline=False + ) + + await ctx.send(embed=embed, ephemeral=True) + else: + embed = discord.Embed( + description="❌ No information found for this UUID.", + color=discord.Color.red() + ) + await ctx.send(embed=embed, ephemeral=True) + + except requests.exceptions.HTTPError as e: + if e.response.status_code == 403: + embed = discord.Embed( + description="⛔ Access denied for this UUID", + color=discord.Color.red() + ) + else: + embed = discord.Embed( + description=f"🔧 API Error: {e.response.status_code}", + color=discord.Color.red() + ) + await ctx.send(embed=embed, ephemeral=True) + except Exception as e: + log_activity(f"UUID processing error: {str(e)}") + embed = discord.Embed( + description="⚠️ Error fetching UUID details", + color=discord.Color.red() + ) + await ctx.send(embed=embed, ephemeral=True) + +if __name__ == '__main__': + bot.run(DISCORD_TOKEN) diff --git a/GTbot.py b/GTbot.py new file mode 100644 index 0000000..b21385f --- /dev/null +++ b/GTbot.py @@ -0,0 +1,200 @@ +import os +import requests +import base64 +import re +import json +from datetime import datetime +from telegram import Update +from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes + +# TELEGRAM GROUP TOPIC DIRECTUS BOT + +# Configuration +API_BASE_URL = "" +BEARER_TOKEN = "" # Replace with your actual token +TELEGRAM_TOKEN = "" # Replace with your bot token +ALLOWED_CHAT_ID = +ALLOWED_THREAD_ID = + + +# UUID validation pattern +UUID_PATTERN = re.compile(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$') + +def log_activity(message: str) -> None: + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{timestamp}] {message}") + +def validate_topic(update: Update) -> bool: + return (update.effective_chat.id == ALLOWED_CHAT_ID and + update.effective_message.message_thread_id == ALLOWED_THREAD_ID) + +async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if validate_topic(update): + help_text = ( + "Hello! I can help you find addons.\n\n" + "🔍 Search by name/creator: Type any search term\n" + "🔎 Get by UUID: Paste a valid UUID\n" + "❌ Cancel ongoing search: Type /cancel or 'cancel'" + ) + await send_response(update, context, help_text) + +async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if context.user_data.get('awaiting_index'): + context.user_data.clear() + await send_response(update, context, "❌ Search cancelled. You can start a new search.") + log_activity("Search cancelled by user") + else: + await send_response(update, context, "⚠️ No active operation to cancel") + +async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + user_info = f"@{update.effective_user.username}" if update.effective_user.username else "Unknown User" + log_activity(f"Received message from {user_info}: '{update.message.text}'") + + if not validate_topic(update): + log_activity("Message ignored - not in allowed topic") + return + + user_input = update.message.text.strip().lower() + + # Handle cancellation first + if user_input in ['cancel', 'abort']: + await cancel(update, context) + return + + # Check if expecting index selection + if context.user_data.get('awaiting_index'): + try: + index = int(user_input) - 1 + search_results = context.user_data.get('search_results', []) + if 0 <= index < len(search_results): + selected_uuid = search_results[index]['uuid'] + context.user_data.clear() + await process_uuid(update, context, selected_uuid) + else: + await send_response(update, context, "❌ Invalid index. Type /cancel to abort.") + except ValueError: + await send_response(update, context, "❌ Please enter a valid number or type /cancel to abort") + return + + # Check if input is UUID + if UUID_PATTERN.match(user_input): + await process_uuid(update, context, user_input) + else: + await perform_search(update, context, user_input) + +async def perform_search(update: Update, context: ContextTypes.DEFAULT_TYPE, query: str) -> None: + try: + log_activity(f"Searching for: {query}") + headers = {"Authorization": f"Bearer {BEARER_TOKEN}"} + + # Directus filter parameters + search_params = { + "filter": json.dumps({ + "_or": [ + {"name": {"_icontains": query}}, + {"creator": {"_icontains": query}} + ] + }), + "fields": "uuid,name,creator,version,download_hash", + "limit": "10" + } + + response = requests.get( + url=f"{API_BASE_URL}", + headers=headers, + params=search_params + ) + response.raise_for_status() + + # Directus returns items directly in data array + search_data = response.json().get('data', []) + + if not search_data: + await send_response(update, context, "🔍 No results found for your query.") + return + + context.user_data['search_results'] = search_data + context.user_data['awaiting_index'] = True + + results_list = [ + f"{idx}. {item.get('name', 'N/A')} by {item.get('creator', 'N/A')}" + for idx, item in enumerate(search_data, 1) + ] + + response_message = "🔍 Search Results:\n" + "\n".join(results_list) + "\n\nReply with the number to view details:" + await send_response(update, context, response_message) + + except requests.exceptions.HTTPError as e: + error_msg = f"🔧 API Error: {e.response.status_code}" + await send_response(update, context, error_msg) + log_activity(f"Search API Error: {str(e)}") + except Exception as e: + log_activity(f"Search error: {str(e)}") + await send_response(update, context, "⚠️ Error processing search") + +async def process_uuid(update: Update, context: ContextTypes.DEFAULT_TYPE, uuid: str) -> None: + try: + headers = {"Authorization": f"Bearer {BEARER_TOKEN}"} + response = requests.get(f"{API_BASE_URL}{uuid}", headers=headers) + response.raise_for_status() + + data = response.json().get('data', {}) + if data: + download_hashes = data.get('download_hash', '') + decoded_urls = [] + + if download_hashes: + for i, hash_part in enumerate(download_hashes.split(','), 1): + try: + decoded = base64.b64decode(hash_part.strip()).decode('utf-8') + decoded_urls.append(f"{i}. {decoded}") + except: + decoded_urls.append(f"{i}. Invalid hash") + + formatted_message = ( + f"UUID: {data.get('uuid', 'N/A')}\n" + f"Name: {data.get('name', 'N/A')}\n" + f"Creator: {data.get('creator', 'N/A')}\n" + f"Version: {data.get('version', 'N/A')}\n" + ) + + if decoded_urls: + formatted_message += "Download URLs:\n" + "\n".join(decoded_urls) + else: + formatted_message += "No download URLs found" + + await send_response(update, context, f"📦 Addon Information:\n{formatted_message}") + else: + await send_response(update, context, "❌ No information found for this UUID.") + + except requests.exceptions.HTTPError as e: + if e.response.status_code == 403: + await send_response(update, context, "⛔ Access denied for this UUID") + else: + await send_response(update, context, f"🔧 API Error: {e.response.status_code}") + except Exception as e: + log_activity(f"UUID processing error: {str(e)}") + await send_response(update, context, "⚠️ Error fetching UUID details") + +async def send_response(update: Update, context: ContextTypes.DEFAULT_TYPE, text: str) -> None: + await context.bot.send_message( + chat_id=update.effective_chat.id, + message_thread_id=update.effective_message.message_thread_id, + text=text + ) + log_activity(f"Sent response: '{text}'") + +def main() -> None: + application = Application.builder().token(TELEGRAM_TOKEN).build() + application.add_handler(CommandHandler("start", start)) + application.add_handler(CommandHandler("cancel", cancel)) + application.add_handler(MessageHandler( + filters.TEXT & + ~filters.COMMAND & + ~filters.Regex(re.compile(r'^(cancel|abort)$', flags=re.IGNORECASE)), + handle_text + )) + application.run_polling() + +if __name__ == '__main__': + main()