revert upload telegram DM + group topic bots and discord version of the same
This commit is contained in:
driftywinds
2025-05-11 12:52:43 +00:00
parent 1298c4f1ba
commit 3259d5f902
3 changed files with 0 additions and 643 deletions

189
DMbot.py
View File

@@ -1,189 +0,0 @@
import os
import requests
import base64
import re
import json
from datetime import datetime
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
# TELEGRAM DM DIRECTUS BOT
# Configuration
API_BASE_URL = "<YOUR DIRECTUS API URL>"
BEARER_TOKEN = "<YOUR DIRECTUS API BEARER TOKEN>" # Replace with your actual token
TELEGRAM_TOKEN = "<YOUR TELEGRAM BOT TOKEN>" # Replace with your bot token
# UUID validation pattern
UUID_PATTERN = re.compile(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$')
def log_activity(message: str) -> None:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
help_text = (
"Hello! I can help you find addons.\n\n"
"🔍 Search by name/creator: Type any search term\n"
"🔎 Get by UUID: Paste a valid UUID\n"
"❌ Cancel ongoing search: Type /cancel or 'cancel'"
)
await send_response(update, context, help_text)
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if context.user_data.get('awaiting_index'):
context.user_data.clear()
await send_response(update, context, "❌ Search cancelled. You can start a new search.")
log_activity("Search cancelled by user")
else:
await send_response(update, context, "⚠️ No active operation to cancel")
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_info = f"@{update.effective_user.username}" if update.effective_user.username else "Unknown User"
log_activity(f"Received message from {user_info}: '{update.message.text}'")
user_input = update.message.text.strip().lower()
# Handle cancellation first
if user_input in ['cancel', 'abort']:
await cancel(update, context)
return
# Check if expecting index selection
if context.user_data.get('awaiting_index'):
try:
index = int(user_input) - 1
search_results = context.user_data.get('search_results', [])
if 0 <= index < len(search_results):
selected_uuid = search_results[index]['uuid']
context.user_data.clear()
await process_uuid(update, context, selected_uuid)
else:
await send_response(update, context, "❌ Invalid index. Type /cancel to abort.")
except ValueError:
await send_response(update, context, "❌ Please enter a valid number or type /cancel to abort")
return
# Check if input is UUID
if UUID_PATTERN.match(user_input):
await process_uuid(update, context, user_input)
else:
await perform_search(update, context, user_input)
async def perform_search(update: Update, context: ContextTypes.DEFAULT_TYPE, query: str) -> None:
try:
log_activity(f"Searching for: {query}")
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
# Directus filter parameters
search_params = {
"filter": json.dumps({
"_or": [
{"name": {"_icontains": query}},
{"creator": {"_icontains": query}}
]
}),
"fields": "uuid,name,creator,version,download_hash",
"limit": "10"
}
response = requests.get(
url=f"{API_BASE_URL}",
headers=headers,
params=search_params
)
response.raise_for_status()
search_data = response.json().get('data', [])
if not search_data:
await send_response(update, context, "🔍 No results found for your query.")
return
context.user_data['search_results'] = search_data
context.user_data['awaiting_index'] = True
results_list = [
f"{idx}. {item.get('name', 'N/A')} by {item.get('creator', 'N/A')}"
for idx, item in enumerate(search_data, 1)
]
response_message = "🔍 Search Results:\n" + "\n".join(results_list) + "\n\nReply with the number to view details:"
await send_response(update, context, response_message)
except requests.exceptions.HTTPError as e:
error_msg = f"🔧 API Error: {e.response.status_code}"
await send_response(update, context, error_msg)
log_activity(f"Search API Error: {str(e)}")
except Exception as e:
log_activity(f"Search error: {str(e)}")
await send_response(update, context, "⚠️ Error processing search")
async def process_uuid(update: Update, context: ContextTypes.DEFAULT_TYPE, uuid: str) -> None:
try:
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
response = requests.get(f"{API_BASE_URL}{uuid}", headers=headers)
response.raise_for_status()
data = response.json().get('data', {})
if data:
download_hashes = data.get('download_hash', '')
decoded_urls = []
if download_hashes:
for i, hash_part in enumerate(download_hashes.split(','), 1):
try:
decoded = base64.b64decode(hash_part.strip()).decode('utf-8')
decoded_urls.append(f"{i}. {decoded}")
except:
decoded_urls.append(f"{i}. Invalid hash")
formatted_message = (
f"UUID: {data.get('uuid', 'N/A')}\n"
f"Name: {data.get('name', 'N/A')}\n"
f"Creator: {data.get('creator', 'N/A')}\n"
f"Version: {data.get('version', 'N/A')}\n"
)
if decoded_urls:
formatted_message += "Download URLs:\n" + "\n".join(decoded_urls)
else:
formatted_message += "No download URLs found"
await send_response(update, context, f"📦 Addon Information:\n{formatted_message}")
else:
await send_response(update, context, "❌ No information found for this UUID.")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 403:
await send_response(update, context, "⛔ Access denied for this UUID")
else:
await send_response(update, context, f"🔧 API Error: {e.response.status_code}")
except Exception as e:
log_activity(f"UUID processing error: {str(e)}")
await send_response(update, context, "⚠️ Error fetching UUID details")
async def send_response(update: Update, context: ContextTypes.DEFAULT_TYPE, text: str) -> None:
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=text
)
log_activity(f"Sent response: '{text}'")
def main() -> None:
application = Application.builder().token(TELEGRAM_TOKEN).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("cancel", cancel))
# Modified message handler configuration
application.add_handler(MessageHandler(
filters.TEXT &
~filters.COMMAND &
filters.ChatType.PRIVATE,
handle_text
))
application.run_polling()
if __name__ == '__main__':
main()

254
DSbot.py
View File

@@ -1,254 +0,0 @@
import os
import requests
import base64
import re
import json
from datetime import datetime
import discord
from discord.ext import commands
from discord.ui import Button, View
# DISCORD SERVER + DM DIRECTUS BOT
# Configuration
API_BASE_URL = "<YOUR DIRECTUS API URL>"
BEARER_TOKEN = "<YOUR DIRECTUS API BEARER TOKEN>" # Replace with your actual token
DISCORD_TOKEN = "<YOUR DISCORD BOT TOKEN>" # Replace
ALLOWED_CHANNELS = [123456789] # REPLACE WITH YOUR CHANNEL IDS
# UUID validation pattern
UUID_PATTERN = re.compile(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$')
def log_activity(message: str) -> None:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix='!', intents=intents)
class SearchResultsView(View):
def __init__(self, results):
super().__init__(timeout=60)
self.results = results
self.add_buttons()
def add_buttons(self):
for idx, item in enumerate(self.results[:25], 1):
self.add_item(ResultButton(item, idx))
class ResultButton(Button):
def __init__(self, item, index):
super().__init__(
label=f"{index}",
style=discord.ButtonStyle.primary,
custom_id=f"result_{item['uuid']}"
)
self.item = item
async def callback(self, interaction: discord.Interaction):
await self.show_addon_embed(interaction)
async def show_addon_embed(self, interaction: discord.Interaction):
data = self.item
download_hashes = data.get('download_hash', '')
decoded_urls = []
if download_hashes:
for i, hash_part in enumerate(download_hashes.split(','), 1):
try:
decoded = base64.b64decode(hash_part.strip()).decode('utf-8')
decoded_urls.append(f"{i}. {decoded}")
except:
decoded_urls.append(f"{i}. Invalid hash")
embed = discord.Embed(
title=data.get('name', 'N/A'),
color=discord.Color.blue()
)
embed.add_field(name="UUID", value=data.get('uuid', 'N/A'), inline=False)
embed.add_field(name="Creator", value=data.get('creator', 'N/A'), inline=True)
embed.add_field(name="Version", value=data.get('version', 'N/A'), inline=True)
if decoded_urls:
embed.add_field(
name="Download URLs",
value="\n".join(decoded_urls),
inline=False
)
await interaction.response.send_message(embed=embed, ephemeral=True)
@bot.event
async def on_ready():
log_activity(f'Logged in as {bot.user.name}')
async def send_start(ctx):
embed = discord.Embed(
title="Addon Search Bot",
description="I can help you find addons!\n\n"
"🔍 **Search**: `!mpbot [search terms]`\n"
"🔎 **UUID Lookup**: `!mpbot [uuid]`\n"
"🆘 **Help**: `!mpbot help`\n"
"❌ **Cancel**: `!mpbot cancel`",
color=discord.Color.green()
)
await ctx.send(embed=embed)
async def send_cancel(ctx):
embed = discord.Embed(
description="⚠️ No active operation to cancel",
color=discord.Color.orange()
)
await ctx.send(embed=embed, delete_after=5)
@bot.event
async def on_message(message):
if message.author == bot.user:
return
# Channel check
if message.channel.id not in ALLOWED_CHANNELS:
return
# Command prefix check
if not message.content.startswith('!mpbot'):
return
ctx = await bot.get_context(message)
content = message.content[len('!mpbot'):].strip()
if not content:
await send_start(ctx)
return
if content.lower() == 'help':
await send_start(ctx)
elif content.lower() == 'cancel':
await send_cancel(ctx)
elif UUID_PATTERN.match(content):
await process_uuid(ctx, content)
else:
await perform_search(ctx, content)
async def perform_search(ctx, query):
try:
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
search_params = {
"filter": json.dumps({
"_or": [
{"name": {"_icontains": query}},
{"creator": {"_icontains": query}}
]
}),
"fields": "uuid,name,creator,version,download_hash",
"limit": "10"
}
response = requests.get(API_BASE_URL, headers=headers, params=search_params)
response.raise_for_status()
search_data = response.json().get('data', [])
if not search_data:
embed = discord.Embed(
description="🔍 No results found for your query.",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
return
results_list = [
f"{idx}. **{item.get('name', 'N/A')}** by {item.get('creator', 'N/A')}"
for idx, item in enumerate(search_data, 1)
]
embed = discord.Embed(
title=f"Search Results for '{query}'",
description="\n".join(results_list) + "\n\nClick a button below to view details:",
color=discord.Color.blue()
)
view = SearchResultsView(search_data)
await ctx.send(embed=embed, view=view)
except requests.exceptions.HTTPError as e:
embed = discord.Embed(
title="API Error",
description=f"Error code: {e.response.status_code}",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
except Exception as e:
log_activity(f"Search error: {str(e)}")
embed = discord.Embed(
title="Error",
description="⚠️ Error processing search",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
async def process_uuid(ctx, uuid):
try:
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
response = requests.get(f"{API_BASE_URL}{uuid}", headers=headers)
response.raise_for_status()
data = response.json().get('data', {})
if data:
download_hashes = data.get('download_hash', '')
decoded_urls = []
if download_hashes:
for i, hash_part in enumerate(download_hashes.split(','), 1):
try:
decoded = base64.b64decode(hash_part.strip()).decode('utf-8')
decoded_urls.append(f"{i}. {decoded}")
except:
decoded_urls.append(f"{i}. Invalid hash")
embed = discord.Embed(
title=data.get('name', 'N/A'),
color=discord.Color.blue()
)
embed.add_field(name="UUID", value=data.get('uuid', 'N/A'), inline=False)
embed.add_field(name="Creator", value=data.get('creator', 'N/A'), inline=True)
embed.add_field(name="Version", value=data.get('version', 'N/A'), inline=True)
if decoded_urls:
embed.add_field(
name="Download URLs",
value="\n".join(decoded_urls),
inline=False
)
await ctx.send(embed=embed, ephemeral=True)
else:
embed = discord.Embed(
description="❌ No information found for this UUID.",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 403:
embed = discord.Embed(
description="⛔ Access denied for this UUID",
color=discord.Color.red()
)
else:
embed = discord.Embed(
description=f"🔧 API Error: {e.response.status_code}",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
except Exception as e:
log_activity(f"UUID processing error: {str(e)}")
embed = discord.Embed(
description="⚠️ Error fetching UUID details",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
if __name__ == '__main__':
bot.run(DISCORD_TOKEN)

200
GTbot.py
View File

@@ -1,200 +0,0 @@
import os
import requests
import base64
import re
import json
from datetime import datetime
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
# TELEGRAM GROUP TOPIC DIRECTUS BOT
# Configuration
API_BASE_URL = "<YOUR DIRECTUS API URL>"
BEARER_TOKEN = "<YOUR DIRECTUS API BEARER TOKEN>" # Replace with your actual token
TELEGRAM_TOKEN = "<YOUR TELEGRAM BOT TOKEN>" # Replace with your bot token
ALLOWED_CHAT_ID = <YOUR TELEGRAM GROUP ID>
ALLOWED_THREAD_ID = <YOUR TELEGRAM TOPIC ID>
# UUID validation pattern
UUID_PATTERN = re.compile(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$')
def log_activity(message: str) -> None:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
def validate_topic(update: Update) -> bool:
return (update.effective_chat.id == ALLOWED_CHAT_ID and
update.effective_message.message_thread_id == ALLOWED_THREAD_ID)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if validate_topic(update):
help_text = (
"Hello! I can help you find addons.\n\n"
"🔍 Search by name/creator: Type any search term\n"
"🔎 Get by UUID: Paste a valid UUID\n"
"❌ Cancel ongoing search: Type /cancel or 'cancel'"
)
await send_response(update, context, help_text)
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if context.user_data.get('awaiting_index'):
context.user_data.clear()
await send_response(update, context, "❌ Search cancelled. You can start a new search.")
log_activity("Search cancelled by user")
else:
await send_response(update, context, "⚠️ No active operation to cancel")
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_info = f"@{update.effective_user.username}" if update.effective_user.username else "Unknown User"
log_activity(f"Received message from {user_info}: '{update.message.text}'")
if not validate_topic(update):
log_activity("Message ignored - not in allowed topic")
return
user_input = update.message.text.strip().lower()
# Handle cancellation first
if user_input in ['cancel', 'abort']:
await cancel(update, context)
return
# Check if expecting index selection
if context.user_data.get('awaiting_index'):
try:
index = int(user_input) - 1
search_results = context.user_data.get('search_results', [])
if 0 <= index < len(search_results):
selected_uuid = search_results[index]['uuid']
context.user_data.clear()
await process_uuid(update, context, selected_uuid)
else:
await send_response(update, context, "❌ Invalid index. Type /cancel to abort.")
except ValueError:
await send_response(update, context, "❌ Please enter a valid number or type /cancel to abort")
return
# Check if input is UUID
if UUID_PATTERN.match(user_input):
await process_uuid(update, context, user_input)
else:
await perform_search(update, context, user_input)
async def perform_search(update: Update, context: ContextTypes.DEFAULT_TYPE, query: str) -> None:
try:
log_activity(f"Searching for: {query}")
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
# Directus filter parameters
search_params = {
"filter": json.dumps({
"_or": [
{"name": {"_icontains": query}},
{"creator": {"_icontains": query}}
]
}),
"fields": "uuid,name,creator,version,download_hash",
"limit": "10"
}
response = requests.get(
url=f"{API_BASE_URL}",
headers=headers,
params=search_params
)
response.raise_for_status()
# Directus returns items directly in data array
search_data = response.json().get('data', [])
if not search_data:
await send_response(update, context, "🔍 No results found for your query.")
return
context.user_data['search_results'] = search_data
context.user_data['awaiting_index'] = True
results_list = [
f"{idx}. {item.get('name', 'N/A')} by {item.get('creator', 'N/A')}"
for idx, item in enumerate(search_data, 1)
]
response_message = "🔍 Search Results:\n" + "\n".join(results_list) + "\n\nReply with the number to view details:"
await send_response(update, context, response_message)
except requests.exceptions.HTTPError as e:
error_msg = f"🔧 API Error: {e.response.status_code}"
await send_response(update, context, error_msg)
log_activity(f"Search API Error: {str(e)}")
except Exception as e:
log_activity(f"Search error: {str(e)}")
await send_response(update, context, "⚠️ Error processing search")
async def process_uuid(update: Update, context: ContextTypes.DEFAULT_TYPE, uuid: str) -> None:
try:
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
response = requests.get(f"{API_BASE_URL}{uuid}", headers=headers)
response.raise_for_status()
data = response.json().get('data', {})
if data:
download_hashes = data.get('download_hash', '')
decoded_urls = []
if download_hashes:
for i, hash_part in enumerate(download_hashes.split(','), 1):
try:
decoded = base64.b64decode(hash_part.strip()).decode('utf-8')
decoded_urls.append(f"{i}. {decoded}")
except:
decoded_urls.append(f"{i}. Invalid hash")
formatted_message = (
f"UUID: {data.get('uuid', 'N/A')}\n"
f"Name: {data.get('name', 'N/A')}\n"
f"Creator: {data.get('creator', 'N/A')}\n"
f"Version: {data.get('version', 'N/A')}\n"
)
if decoded_urls:
formatted_message += "Download URLs:\n" + "\n".join(decoded_urls)
else:
formatted_message += "No download URLs found"
await send_response(update, context, f"📦 Addon Information:\n{formatted_message}")
else:
await send_response(update, context, "❌ No information found for this UUID.")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 403:
await send_response(update, context, "⛔ Access denied for this UUID")
else:
await send_response(update, context, f"🔧 API Error: {e.response.status_code}")
except Exception as e:
log_activity(f"UUID processing error: {str(e)}")
await send_response(update, context, "⚠️ Error fetching UUID details")
async def send_response(update: Update, context: ContextTypes.DEFAULT_TYPE, text: str) -> None:
await context.bot.send_message(
chat_id=update.effective_chat.id,
message_thread_id=update.effective_message.message_thread_id,
text=text
)
log_activity(f"Sent response: '{text}'")
def main() -> None:
application = Application.builder().token(TELEGRAM_TOKEN).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("cancel", cancel))
application.add_handler(MessageHandler(
filters.TEXT &
~filters.COMMAND &
~filters.Regex(re.compile(r'^(cancel|abort)$', flags=re.IGNORECASE)),
handle_text
))
application.run_polling()
if __name__ == '__main__':
main()