REVAMPED
This commit is contained in:
192
DMbot.py
Normal file
192
DMbot.py
Normal file
@@ -0,0 +1,192 @@
|
||||
import os
|
||||
import requests
|
||||
import base64
|
||||
import re
|
||||
import json
|
||||
from datetime import datetime
|
||||
from telegram import Update
|
||||
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# TELEGRAM DM DIRECTUS BOT
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# Configuration
|
||||
API_BASE_URL = os.getenv("API_BASE_URL") # Configure in .env
|
||||
BEARER_TOKEN = os.getenv("BEARER_TOKEN") # Configure in .env
|
||||
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") # Configure in .env
|
||||
|
||||
# UUID validation pattern
|
||||
UUID_PATTERN = re.compile(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$')
|
||||
|
||||
def log_activity(message: str) -> None:
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
print(f"[{timestamp}] {message}")
|
||||
|
||||
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
help_text = (
|
||||
"Hello! I can help you find addons.\n\n"
|
||||
"🔍 Search by name/creator: Type any search term\n"
|
||||
"🔎 Get by UUID: Paste a valid UUID\n"
|
||||
"❌ Cancel ongoing search: Type /cancel or 'cancel'"
|
||||
)
|
||||
await send_response(update, context, help_text)
|
||||
|
||||
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
if context.user_data.get('awaiting_index'):
|
||||
context.user_data.clear()
|
||||
await send_response(update, context, "❌ Search cancelled. You can start a new search.")
|
||||
log_activity("Search cancelled by user")
|
||||
else:
|
||||
await send_response(update, context, "⚠️ No active operation to cancel")
|
||||
|
||||
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
user_info = f"@{update.effective_user.username}" if update.effective_user.username else "Unknown User"
|
||||
log_activity(f"Received message from {user_info}: '{update.message.text}'")
|
||||
|
||||
user_input = update.message.text.strip().lower()
|
||||
|
||||
# Handle cancellation first
|
||||
if user_input in ['cancel', 'abort']:
|
||||
await cancel(update, context)
|
||||
return
|
||||
|
||||
# Check if expecting index selection
|
||||
if context.user_data.get('awaiting_index'):
|
||||
try:
|
||||
index = int(user_input) - 1
|
||||
search_results = context.user_data.get('search_results', [])
|
||||
if 0 <= index < len(search_results):
|
||||
selected_uuid = search_results[index]['uuid']
|
||||
context.user_data.clear()
|
||||
await process_uuid(update, context, selected_uuid)
|
||||
else:
|
||||
await send_response(update, context, "❌ Invalid index. Type /cancel to abort.")
|
||||
except ValueError:
|
||||
await send_response(update, context, "❌ Please enter a valid number or type /cancel to abort")
|
||||
return
|
||||
|
||||
# Check if input is UUID
|
||||
if UUID_PATTERN.match(user_input):
|
||||
await process_uuid(update, context, user_input)
|
||||
else:
|
||||
await perform_search(update, context, user_input)
|
||||
|
||||
async def perform_search(update: Update, context: ContextTypes.DEFAULT_TYPE, query: str) -> None:
|
||||
try:
|
||||
log_activity(f"Searching for: {query}")
|
||||
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
|
||||
|
||||
# Directus filter parameters
|
||||
search_params = {
|
||||
"filter": json.dumps({
|
||||
"_or": [
|
||||
{"name": {"_icontains": query}},
|
||||
{"creator": {"_icontains": query}}
|
||||
]
|
||||
}),
|
||||
"fields": "uuid,name,creator,version,download_hash",
|
||||
"limit": "10"
|
||||
}
|
||||
|
||||
response = requests.get(
|
||||
url=f"{API_BASE_URL}",
|
||||
headers=headers,
|
||||
params=search_params
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
search_data = response.json().get('data', [])
|
||||
|
||||
if not search_data:
|
||||
await send_response(update, context, "🔍 No results found for your query.")
|
||||
return
|
||||
|
||||
context.user_data['search_results'] = search_data
|
||||
context.user_data['awaiting_index'] = True
|
||||
|
||||
results_list = [
|
||||
f"{idx}. {item.get('name', 'N/A')} by {item.get('creator', 'N/A')}"
|
||||
for idx, item in enumerate(search_data, 1)
|
||||
]
|
||||
|
||||
response_message = "🔍 Search Results:\n" + "\n".join(results_list) + "\n\nReply with the number to view details:"
|
||||
await send_response(update, context, response_message)
|
||||
|
||||
except requests.exceptions.HTTPError as e:
|
||||
error_msg = f"🔧 API Error: {e.response.status_code}"
|
||||
await send_response(update, context, error_msg)
|
||||
log_activity(f"Search API Error: {str(e)}")
|
||||
except Exception as e:
|
||||
log_activity(f"Search error: {str(e)}")
|
||||
await send_response(update, context, "⚠️ Error processing search")
|
||||
|
||||
async def process_uuid(update: Update, context: ContextTypes.DEFAULT_TYPE, uuid: str) -> None:
|
||||
try:
|
||||
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
|
||||
response = requests.get(f"{API_BASE_URL}{uuid}", headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json().get('data', {})
|
||||
if data:
|
||||
download_hashes = data.get('download_hash', '')
|
||||
decoded_urls = []
|
||||
|
||||
if download_hashes:
|
||||
for i, hash_part in enumerate(download_hashes.split(','), 1):
|
||||
try:
|
||||
decoded = base64.b64decode(hash_part.strip()).decode('utf-8')
|
||||
decoded_urls.append(f"{i}. {decoded}")
|
||||
except:
|
||||
decoded_urls.append(f"{i}. Invalid hash")
|
||||
|
||||
formatted_message = (
|
||||
f"UUID: {data.get('uuid', 'N/A')}\n"
|
||||
f"Name: {data.get('name', 'N/A')}\n"
|
||||
f"Creator: {data.get('creator', 'N/A')}\n"
|
||||
f"Version: {data.get('version', 'N/A')}\n"
|
||||
)
|
||||
|
||||
if decoded_urls:
|
||||
formatted_message += "Download URLs:\n" + "\n".join(decoded_urls)
|
||||
else:
|
||||
formatted_message += "No download URLs found"
|
||||
|
||||
await send_response(update, context, f"📦 Addon Information:\n{formatted_message}")
|
||||
else:
|
||||
await send_response(update, context, "❌ No information found for this UUID.")
|
||||
|
||||
except requests.exceptions.HTTPError as e:
|
||||
if e.response.status_code == 403:
|
||||
await send_response(update, context, "⛔ Access denied for this UUID")
|
||||
else:
|
||||
await send_response(update, context, f"🔧 API Error: {e.response.status_code}")
|
||||
except Exception as e:
|
||||
log_activity(f"UUID processing error: {str(e)}")
|
||||
await send_response(update, context, "⚠️ Error fetching UUID details")
|
||||
|
||||
async def send_response(update: Update, context: ContextTypes.DEFAULT_TYPE, text: str) -> None:
|
||||
await context.bot.send_message(
|
||||
chat_id=update.effective_chat.id,
|
||||
text=text
|
||||
)
|
||||
log_activity(f"Sent response: '{text}'")
|
||||
|
||||
def main() -> None:
|
||||
application = Application.builder().token(TELEGRAM_TOKEN).build()
|
||||
application.add_handler(CommandHandler("start", start))
|
||||
application.add_handler(CommandHandler("cancel", cancel))
|
||||
|
||||
# Modified message handler configuration
|
||||
application.add_handler(MessageHandler(
|
||||
filters.TEXT &
|
||||
~filters.COMMAND &
|
||||
filters.ChatType.PRIVATE,
|
||||
handle_text
|
||||
))
|
||||
|
||||
application.run_polling()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
257
DSbot.py
Normal file
257
DSbot.py
Normal file
@@ -0,0 +1,257 @@
|
||||
import os
|
||||
import requests
|
||||
import base64
|
||||
import re
|
||||
import json
|
||||
from datetime import datetime
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
from discord.ui import Button, View
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# DISCORD SERVER + DM DIRECTUS BOT
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# Configuration
|
||||
API_BASE_URL = os.getenv("API_BASE_URL") # Configure in .env
|
||||
BEARER_TOKEN = os.getenv("BEARER_TOKEN") # Configure in .env
|
||||
DISCORD_TOKEN = os.getenv("DISCORD_TOKEN") # Configure in .env
|
||||
ALLOWED_CHANNELS = [123456789] # REPLACE WITH YOUR CHANNEL IDS
|
||||
|
||||
# UUID validation pattern
|
||||
UUID_PATTERN = re.compile(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$')
|
||||
|
||||
def log_activity(message: str) -> None:
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
print(f"[{timestamp}] {message}")
|
||||
|
||||
intents = discord.Intents.default()
|
||||
intents.message_content = True
|
||||
bot = commands.Bot(command_prefix='!', intents=intents)
|
||||
|
||||
class SearchResultsView(View):
|
||||
def __init__(self, results):
|
||||
super().__init__(timeout=60)
|
||||
self.results = results
|
||||
self.add_buttons()
|
||||
|
||||
def add_buttons(self):
|
||||
for idx, item in enumerate(self.results[:25], 1):
|
||||
self.add_item(ResultButton(item, idx))
|
||||
|
||||
class ResultButton(Button):
|
||||
def __init__(self, item, index):
|
||||
super().__init__(
|
||||
label=f"{index}",
|
||||
style=discord.ButtonStyle.primary,
|
||||
custom_id=f"result_{item['uuid']}"
|
||||
)
|
||||
self.item = item
|
||||
|
||||
async def callback(self, interaction: discord.Interaction):
|
||||
await self.show_addon_embed(interaction)
|
||||
|
||||
async def show_addon_embed(self, interaction: discord.Interaction):
|
||||
data = self.item
|
||||
download_hashes = data.get('download_hash', '')
|
||||
decoded_urls = []
|
||||
|
||||
if download_hashes:
|
||||
for i, hash_part in enumerate(download_hashes.split(','), 1):
|
||||
try:
|
||||
decoded = base64.b64decode(hash_part.strip()).decode('utf-8')
|
||||
decoded_urls.append(f"{i}. {decoded}")
|
||||
except:
|
||||
decoded_urls.append(f"{i}. Invalid hash")
|
||||
|
||||
embed = discord.Embed(
|
||||
title=data.get('name', 'N/A'),
|
||||
color=discord.Color.blue()
|
||||
)
|
||||
embed.add_field(name="UUID", value=data.get('uuid', 'N/A'), inline=False)
|
||||
embed.add_field(name="Creator", value=data.get('creator', 'N/A'), inline=True)
|
||||
embed.add_field(name="Version", value=data.get('version', 'N/A'), inline=True)
|
||||
|
||||
if decoded_urls:
|
||||
embed.add_field(
|
||||
name="Download URLs",
|
||||
value="\n".join(decoded_urls),
|
||||
inline=False
|
||||
)
|
||||
|
||||
await interaction.response.send_message(embed=embed, ephemeral=True)
|
||||
|
||||
@bot.event
|
||||
async def on_ready():
|
||||
log_activity(f'Logged in as {bot.user.name}')
|
||||
|
||||
async def send_start(ctx):
|
||||
embed = discord.Embed(
|
||||
title="Addon Search Bot",
|
||||
description="I can help you find addons!\n\n"
|
||||
"🔍 **Search**: `!mpbot [search terms]`\n"
|
||||
"🔎 **UUID Lookup**: `!mpbot [uuid]`\n"
|
||||
"🆘 **Help**: `!mpbot help`\n"
|
||||
"❌ **Cancel**: `!mpbot cancel`",
|
||||
color=discord.Color.green()
|
||||
)
|
||||
await ctx.send(embed=embed)
|
||||
|
||||
async def send_cancel(ctx):
|
||||
embed = discord.Embed(
|
||||
description="⚠️ No active operation to cancel",
|
||||
color=discord.Color.orange()
|
||||
)
|
||||
await ctx.send(embed=embed, delete_after=5)
|
||||
|
||||
@bot.event
|
||||
async def on_message(message):
|
||||
if message.author == bot.user:
|
||||
return
|
||||
|
||||
# Channel check
|
||||
if message.channel.id not in ALLOWED_CHANNELS:
|
||||
return
|
||||
|
||||
# Command prefix check
|
||||
if not message.content.startswith('!mpbot'):
|
||||
return
|
||||
|
||||
ctx = await bot.get_context(message)
|
||||
content = message.content[len('!mpbot'):].strip()
|
||||
|
||||
if not content:
|
||||
await send_start(ctx)
|
||||
return
|
||||
|
||||
if content.lower() == 'help':
|
||||
await send_start(ctx)
|
||||
elif content.lower() == 'cancel':
|
||||
await send_cancel(ctx)
|
||||
elif UUID_PATTERN.match(content):
|
||||
await process_uuid(ctx, content)
|
||||
else:
|
||||
await perform_search(ctx, content)
|
||||
|
||||
async def perform_search(ctx, query):
|
||||
try:
|
||||
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
|
||||
search_params = {
|
||||
"filter": json.dumps({
|
||||
"_or": [
|
||||
{"name": {"_icontains": query}},
|
||||
{"creator": {"_icontains": query}}
|
||||
]
|
||||
}),
|
||||
"fields": "uuid,name,creator,version,download_hash",
|
||||
"limit": "10"
|
||||
}
|
||||
|
||||
response = requests.get(API_BASE_URL, headers=headers, params=search_params)
|
||||
response.raise_for_status()
|
||||
|
||||
search_data = response.json().get('data', [])
|
||||
|
||||
if not search_data:
|
||||
embed = discord.Embed(
|
||||
description="🔍 No results found for your query.",
|
||||
color=discord.Color.red()
|
||||
)
|
||||
await ctx.send(embed=embed, ephemeral=True)
|
||||
return
|
||||
|
||||
results_list = [
|
||||
f"{idx}. **{item.get('name', 'N/A')}** by {item.get('creator', 'N/A')}"
|
||||
for idx, item in enumerate(search_data, 1)
|
||||
]
|
||||
|
||||
embed = discord.Embed(
|
||||
title=f"Search Results for '{query}'",
|
||||
description="\n".join(results_list) + "\n\nClick a button below to view details:",
|
||||
color=discord.Color.blue()
|
||||
)
|
||||
|
||||
view = SearchResultsView(search_data)
|
||||
await ctx.send(embed=embed, view=view)
|
||||
|
||||
except requests.exceptions.HTTPError as e:
|
||||
embed = discord.Embed(
|
||||
title="API Error",
|
||||
description=f"Error code: {e.response.status_code}",
|
||||
color=discord.Color.red()
|
||||
)
|
||||
await ctx.send(embed=embed, ephemeral=True)
|
||||
except Exception as e:
|
||||
log_activity(f"Search error: {str(e)}")
|
||||
embed = discord.Embed(
|
||||
title="Error",
|
||||
description="⚠️ Error processing search",
|
||||
color=discord.Color.red()
|
||||
)
|
||||
await ctx.send(embed=embed, ephemeral=True)
|
||||
|
||||
async def process_uuid(ctx, uuid):
|
||||
try:
|
||||
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
|
||||
response = requests.get(f"{API_BASE_URL}{uuid}", headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json().get('data', {})
|
||||
if data:
|
||||
download_hashes = data.get('download_hash', '')
|
||||
decoded_urls = []
|
||||
|
||||
if download_hashes:
|
||||
for i, hash_part in enumerate(download_hashes.split(','), 1):
|
||||
try:
|
||||
decoded = base64.b64decode(hash_part.strip()).decode('utf-8')
|
||||
decoded_urls.append(f"{i}. {decoded}")
|
||||
except:
|
||||
decoded_urls.append(f"{i}. Invalid hash")
|
||||
|
||||
embed = discord.Embed(
|
||||
title=data.get('name', 'N/A'),
|
||||
color=discord.Color.blue()
|
||||
)
|
||||
embed.add_field(name="UUID", value=data.get('uuid', 'N/A'), inline=False)
|
||||
embed.add_field(name="Creator", value=data.get('creator', 'N/A'), inline=True)
|
||||
embed.add_field(name="Version", value=data.get('version', 'N/A'), inline=True)
|
||||
|
||||
if decoded_urls:
|
||||
embed.add_field(
|
||||
name="Download URLs",
|
||||
value="\n".join(decoded_urls),
|
||||
inline=False
|
||||
)
|
||||
|
||||
await ctx.send(embed=embed, ephemeral=True)
|
||||
else:
|
||||
embed = discord.Embed(
|
||||
description="❌ No information found for this UUID.",
|
||||
color=discord.Color.red()
|
||||
)
|
||||
await ctx.send(embed=embed, ephemeral=True)
|
||||
|
||||
except requests.exceptions.HTTPError as e:
|
||||
if e.response.status_code == 403:
|
||||
embed = discord.Embed(
|
||||
description="⛔ Access denied for this UUID",
|
||||
color=discord.Color.red()
|
||||
)
|
||||
else:
|
||||
embed = discord.Embed(
|
||||
description=f"🔧 API Error: {e.response.status_code}",
|
||||
color=discord.Color.red()
|
||||
)
|
||||
await ctx.send(embed=embed, ephemeral=True)
|
||||
except Exception as e:
|
||||
log_activity(f"UUID processing error: {str(e)}")
|
||||
embed = discord.Embed(
|
||||
description="⚠️ Error fetching UUID details",
|
||||
color=discord.Color.red()
|
||||
)
|
||||
await ctx.send(embed=embed, ephemeral=True)
|
||||
|
||||
if __name__ == '__main__':
|
||||
bot.run(DISCORD_TOKEN)
|
||||
203
GTbot.py
Normal file
203
GTbot.py
Normal file
@@ -0,0 +1,203 @@
|
||||
import os
|
||||
import requests
|
||||
import base64
|
||||
import re
|
||||
import json
|
||||
from dotenv import load_dotenv
|
||||
from datetime import datetime
|
||||
from telegram import Update
|
||||
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
|
||||
|
||||
# TELEGRAM GROUP TOPIC DIRECTUS BOT
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# Configuration
|
||||
API_BASE_URL = os.getenv("API_BASE_URL") # Configure in .env
|
||||
BEARER_TOKEN = os.getenv("BEARER_TOKEN") # Configure in .env
|
||||
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") # Configure in .env
|
||||
ALLOWED_CHAT_ID = os.getenv("ALLOWED_CHAT_ID") # Configure in .env
|
||||
ALLOWED_THREAD_ID = os.getenv("ALLOWED_THREAD_ID") # Configure in .env
|
||||
|
||||
|
||||
# UUID validation pattern
|
||||
UUID_PATTERN = re.compile(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$')
|
||||
|
||||
def log_activity(message: str) -> None:
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
print(f"[{timestamp}] {message}")
|
||||
|
||||
def validate_topic(update: Update) -> bool:
|
||||
return (update.effective_chat.id == ALLOWED_CHAT_ID and
|
||||
update.effective_message.message_thread_id == ALLOWED_THREAD_ID)
|
||||
|
||||
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
if validate_topic(update):
|
||||
help_text = (
|
||||
"Hello! I can help you find addons.\n\n"
|
||||
"🔍 Search by name/creator: Type any search term\n"
|
||||
"🔎 Get by UUID: Paste a valid UUID\n"
|
||||
"❌ Cancel ongoing search: Type /cancel or 'cancel'"
|
||||
)
|
||||
await send_response(update, context, help_text)
|
||||
|
||||
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
if context.user_data.get('awaiting_index'):
|
||||
context.user_data.clear()
|
||||
await send_response(update, context, "❌ Search cancelled. You can start a new search.")
|
||||
log_activity("Search cancelled by user")
|
||||
else:
|
||||
await send_response(update, context, "⚠️ No active operation to cancel")
|
||||
|
||||
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
user_info = f"@{update.effective_user.username}" if update.effective_user.username else "Unknown User"
|
||||
log_activity(f"Received message from {user_info}: '{update.message.text}'")
|
||||
|
||||
if not validate_topic(update):
|
||||
log_activity("Message ignored - not in allowed topic")
|
||||
return
|
||||
|
||||
user_input = update.message.text.strip().lower()
|
||||
|
||||
# Handle cancellation first
|
||||
if user_input in ['cancel', 'abort']:
|
||||
await cancel(update, context)
|
||||
return
|
||||
|
||||
# Check if expecting index selection
|
||||
if context.user_data.get('awaiting_index'):
|
||||
try:
|
||||
index = int(user_input) - 1
|
||||
search_results = context.user_data.get('search_results', [])
|
||||
if 0 <= index < len(search_results):
|
||||
selected_uuid = search_results[index]['uuid']
|
||||
context.user_data.clear()
|
||||
await process_uuid(update, context, selected_uuid)
|
||||
else:
|
||||
await send_response(update, context, "❌ Invalid index. Type /cancel to abort.")
|
||||
except ValueError:
|
||||
await send_response(update, context, "❌ Please enter a valid number or type /cancel to abort")
|
||||
return
|
||||
|
||||
# Check if input is UUID
|
||||
if UUID_PATTERN.match(user_input):
|
||||
await process_uuid(update, context, user_input)
|
||||
else:
|
||||
await perform_search(update, context, user_input)
|
||||
|
||||
async def perform_search(update: Update, context: ContextTypes.DEFAULT_TYPE, query: str) -> None:
|
||||
try:
|
||||
log_activity(f"Searching for: {query}")
|
||||
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
|
||||
|
||||
# Directus filter parameters
|
||||
search_params = {
|
||||
"filter": json.dumps({
|
||||
"_or": [
|
||||
{"name": {"_icontains": query}},
|
||||
{"creator": {"_icontains": query}}
|
||||
]
|
||||
}),
|
||||
"fields": "uuid,name,creator,version,download_hash",
|
||||
"limit": "10"
|
||||
}
|
||||
|
||||
response = requests.get(
|
||||
url=f"{API_BASE_URL}",
|
||||
headers=headers,
|
||||
params=search_params
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
# Directus returns items directly in data array
|
||||
search_data = response.json().get('data', [])
|
||||
|
||||
if not search_data:
|
||||
await send_response(update, context, "🔍 No results found for your query.")
|
||||
return
|
||||
|
||||
context.user_data['search_results'] = search_data
|
||||
context.user_data['awaiting_index'] = True
|
||||
|
||||
results_list = [
|
||||
f"{idx}. {item.get('name', 'N/A')} by {item.get('creator', 'N/A')}"
|
||||
for idx, item in enumerate(search_data, 1)
|
||||
]
|
||||
|
||||
response_message = "🔍 Search Results:\n" + "\n".join(results_list) + "\n\nReply with the number to view details:"
|
||||
await send_response(update, context, response_message)
|
||||
|
||||
except requests.exceptions.HTTPError as e:
|
||||
error_msg = f"🔧 API Error: {e.response.status_code}"
|
||||
await send_response(update, context, error_msg)
|
||||
log_activity(f"Search API Error: {str(e)}")
|
||||
except Exception as e:
|
||||
log_activity(f"Search error: {str(e)}")
|
||||
await send_response(update, context, "⚠️ Error processing search")
|
||||
|
||||
async def process_uuid(update: Update, context: ContextTypes.DEFAULT_TYPE, uuid: str) -> None:
|
||||
try:
|
||||
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
|
||||
response = requests.get(f"{API_BASE_URL}{uuid}", headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json().get('data', {})
|
||||
if data:
|
||||
download_hashes = data.get('download_hash', '')
|
||||
decoded_urls = []
|
||||
|
||||
if download_hashes:
|
||||
for i, hash_part in enumerate(download_hashes.split(','), 1):
|
||||
try:
|
||||
decoded = base64.b64decode(hash_part.strip()).decode('utf-8')
|
||||
decoded_urls.append(f"{i}. {decoded}")
|
||||
except:
|
||||
decoded_urls.append(f"{i}. Invalid hash")
|
||||
|
||||
formatted_message = (
|
||||
f"UUID: {data.get('uuid', 'N/A')}\n"
|
||||
f"Name: {data.get('name', 'N/A')}\n"
|
||||
f"Creator: {data.get('creator', 'N/A')}\n"
|
||||
f"Version: {data.get('version', 'N/A')}\n"
|
||||
)
|
||||
|
||||
if decoded_urls:
|
||||
formatted_message += "Download URLs:\n" + "\n".join(decoded_urls)
|
||||
else:
|
||||
formatted_message += "No download URLs found"
|
||||
|
||||
await send_response(update, context, f"📦 Addon Information:\n{formatted_message}")
|
||||
else:
|
||||
await send_response(update, context, "❌ No information found for this UUID.")
|
||||
|
||||
except requests.exceptions.HTTPError as e:
|
||||
if e.response.status_code == 403:
|
||||
await send_response(update, context, "⛔ Access denied for this UUID")
|
||||
else:
|
||||
await send_response(update, context, f"🔧 API Error: {e.response.status_code}")
|
||||
except Exception as e:
|
||||
log_activity(f"UUID processing error: {str(e)}")
|
||||
await send_response(update, context, "⚠️ Error fetching UUID details")
|
||||
|
||||
async def send_response(update: Update, context: ContextTypes.DEFAULT_TYPE, text: str) -> None:
|
||||
await context.bot.send_message(
|
||||
chat_id=update.effective_chat.id,
|
||||
message_thread_id=update.effective_message.message_thread_id,
|
||||
text=text
|
||||
)
|
||||
log_activity(f"Sent response: '{text}'")
|
||||
|
||||
def main() -> None:
|
||||
application = Application.builder().token(TELEGRAM_TOKEN).build()
|
||||
application.add_handler(CommandHandler("start", start))
|
||||
application.add_handler(CommandHandler("cancel", cancel))
|
||||
application.add_handler(MessageHandler(
|
||||
filters.TEXT &
|
||||
~filters.COMMAND &
|
||||
~filters.Regex(re.compile(r'^(cancel|abort)$', flags=re.IGNORECASE)),
|
||||
handle_text
|
||||
))
|
||||
application.run_polling()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
6
example.env
Normal file
6
example.env
Normal file
@@ -0,0 +1,6 @@
|
||||
API_BASE_URL=https://directus.example.com/items/list/
|
||||
BEARER_TOKEN=<bearer token for directus>
|
||||
TELEGRAM_TOKEN=<telegram bot token from bot father>
|
||||
ALLOWED_CHAT_ID=<ID of allowed group for telegram>
|
||||
ALLOWED_THREAD_ID=<ID allowed topic in the group above for telegram>
|
||||
DISCORD_TOKEN=<discord bot token>
|
||||
Reference in New Issue
Block a user