Files
marketplace-bay-bot/matrix/MTbot.py
2025-05-14 05:51:03 +00:00

175 lines
6.6 KiB
Python

import os
import requests
import base64
import re
import json
import asyncio
from datetime import datetime
from nio import AsyncClient, MatrixRoom, RoomMessageText, RoomMessageFormatted
from dotenv import load_dotenv
# MATRIX DIRECTUS BOT
load_dotenv()
# Configuration
API_BASE_URL = os.getenv("API_BASE_URL") # Configure in .env
BEARER_TOKEN = os.getenv("BEARER_TOKEN") # Configure in .env
MATRIX_HOMESERVER = os.getenv("MATRIX_HOMESERVER") # Configure in .env
MATRIX_USER = os.getenv("MATRIX_USER") # Configure in .env
MATRIX_PASSWORD = os.getenv("MATRIX_PASSWORD") # Configure in .env
BOT_USER_ID = os.getenv("MATRIX_USER") # Use the bot's Matrix ID to ignore its own messages
ALLOWED_ROOMS = os.getenv("ALLOWED_ROOMS", "").split(",") # Comma-separated list of allowed room IDs
# UUID validation pattern
UUID_PATTERN = re.compile(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$')
# In-memory search tracking
active_searches = {}
# Track bot startup time
BOT_START_TIME = datetime.now()
async def log_activity(message: str) -> None:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
async def send_response(client: AsyncClient, room_id: str, message: str) -> None:
await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": message
}
)
await log_activity(f"Sent response: '{message}'")
async def process_message(client: AsyncClient, room: MatrixRoom, event: RoomMessageText) -> None:
# Ignore messages from the bot itself
if event.sender == BOT_USER_ID:
return
room_id = room.room_id
# Check if the room is allowed
if room_id not in ALLOWED_ROOMS:
await log_activity(f"Ignored message from unauthorized room: {room_id}")
return
# Ignore messages sent before the bot started
if event.server_timestamp / 1000 < BOT_START_TIME.timestamp():
await log_activity(f"Ignored old message from {room_id}")
return
user_message = event.body.strip().lower()
sender = event.sender
await log_activity(f"Received message from {sender} in {room_id}: '{user_message}'")
# Handle cancellation
if user_message in ['cancel', 'abort']:
active_searches.pop(room_id, None)
await send_response(client, room_id, "❌ Operation cancelled. You can start a new search.")
return
# Check if room has an active search
if room_id in active_searches:
try:
index = int(user_message) - 1
search_results = active_searches[room_id]
if 0 <= index < len(search_results):
selected_uuid = search_results[index]['uuid']
del active_searches[room_id] # Clear active search after selection
await process_uuid(client, room_id, selected_uuid)
return
else:
await send_response(client, room_id, "❌ Invalid index. Please enter a valid number or type 'cancel' to abort.")
return
except ValueError:
await send_response(client, room_id, "❌ Please enter a valid number or type 'cancel' to abort.")
return
# Check if input is UUID
if UUID_PATTERN.match(user_message):
await process_uuid(client, room_id, user_message)
else:
await perform_search(client, room_id, user_message)
async def perform_search(client: AsyncClient, room_id: str, query: str) -> None:
try:
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
search_params = {
"filter": json.dumps({
"_or": [
{"name": {"_icontains": query}},
{"creator": {"_icontains": query}}
]
}),
"fields": "uuid,name,creator,version,download_hash",
"limit": "10"
}
response = requests.get(url=f"{API_BASE_URL}", headers=headers, params=search_params)
response.raise_for_status()
data = response.json().get('data', [])
if not data:
await send_response(client, room_id, "🔍 No results found for your query.")
return
active_searches[room_id] = data # Track active search for the room
results_list = [f"{idx + 1}. {item.get('name', 'N/A')} by {item.get('creator', 'N/A')}" for idx, item in enumerate(data)]
response_message = "🔍 Search Results:\n" + "\n".join(results_list) + "\n\nReply with the number to view details, or type 'cancel' to abort."
await send_response(client, room_id, response_message)
except Exception as e:
await log_activity(f"Search error: {str(e)}")
await send_response(client, room_id, "⚠️ Error processing search")
async def process_uuid(client: AsyncClient, room_id: str, uuid: str) -> None:
try:
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}
response = requests.get(f"{API_BASE_URL}{uuid}", headers=headers)
response.raise_for_status()
data = response.json().get('data', {})
if not data:
await send_response(client, room_id, "❌ No information found for this UUID.")
return
download_hashes = data.get('download_hash', '')
decoded_urls = []
if download_hashes:
for i, hash_part in enumerate(download_hashes.split(','), 1):
try:
decoded = base64.b64decode(hash_part.strip()).decode('utf-8')
decoded_urls.append(f"{i}. {decoded}")
except:
decoded_urls.append(f"{i}. Invalid hash")
formatted_message = (
f"UUID: {data.get('uuid', 'N/A')}\n"
f"Name: {data.get('name', 'N/A')}\n"
f"Creator: {data.get('creator', 'N/A')}\n"
f"Version: {data.get('version', 'N/A')}\n"
)
if decoded_urls:
formatted_message += "Download URLs:\n" + "\n".join(decoded_urls)
else:
formatted_message += "No download URLs found"
await send_response(client, room_id, f"📦 Addon Information:\n{formatted_message}")
except Exception as e:
await log_activity(f"UUID processing error: {str(e)}")
await send_response(client, room_id, "⚠️ Error fetching UUID details")
async def main() -> None:
client = AsyncClient(MATRIX_HOMESERVER, MATRIX_USER)
await client.login(MATRIX_PASSWORD)
await log_activity(f"Logged in as {MATRIX_USER}")
client.add_event_callback(lambda room, event: process_message(client, room, event), RoomMessageText)
await client.sync_forever(timeout=30000)
if __name__ == '__main__':
asyncio.run(main())