Service: Sockets: Fix busy-waiting CPU starvation and Close/Socket race conditions

This commit is contained in:
collecting
2026-01-16 17:42:54 -05:00
parent 31d0bca2da
commit 0b5701624a
3 changed files with 81 additions and 45 deletions

View File

@@ -531,6 +531,9 @@ std::pair<s32, Errno> BSD::SocketImpl(Domain domain, Type type, Protocol protoco
UNIMPLEMENTED_IF_MSG(unk_flag, "Unknown flag in type");
type = static_cast<Type>(static_cast<u32>(type) & ~0x20000000);
// Lock the table before searching for or creating a descriptor
std::lock_guard table_lock(fd_table_mutex);
const s32 fd = FindFreeFileDescriptorHandle();
if (fd < 0) {
LOG_ERROR(Service, "No more file descriptors available");
@@ -539,7 +542,6 @@ std::pair<s32, Errno> BSD::SocketImpl(Domain domain, Type type, Protocol protoco
file_descriptors[fd] = FileDescriptor{};
FileDescriptor& descriptor = *file_descriptors[fd];
// ENONMEM might be thrown here
auto room_member = room_network.GetRoomMember().lock();
const bool using_proxy = room_member && room_member->IsConnected();
@@ -547,23 +549,21 @@ std::pair<s32, Errno> BSD::SocketImpl(Domain domain, Type type, Protocol protoco
LOG_INFO(Service, "New socket fd={} domain={} type={} protocol={} proxy={}",
fd, domain, type, protocol, using_proxy);
// Store socket type information for pooling
descriptor.domain = Translate(domain);
descriptor.type = Translate(type);
descriptor.protocol = Translate(protocol);
descriptor.is_connection_based = IsConnectionBased(type);
// Try to reuse a socket from the pool if using proxy
if (using_proxy) {
SocketPoolKey key{descriptor.domain, descriptor.type, descriptor.protocol};
std::lock_guard lock(socket_pool_mutex);
std::lock_guard pool_lock(socket_pool_mutex);
auto it = socket_pool.find(key);
if (it != socket_pool.end() && !it->second.empty()) {
descriptor.socket = it->second.back();
it->second.pop_back();
// call Initialize here so socket_proxy.cpp functions work
// Reset the socket state so 'closed' is false and the queue is empty
descriptor.socket->Initialize(descriptor.domain, descriptor.type, descriptor.protocol);
LOG_DEBUG(Service, "Reused socket from pool for fd={}", fd);

View File

@@ -34,30 +34,24 @@ void ProxySocket::HandleProxyPacket(const ProxyPacket& packet) {
const auto my_ip = room_member->GetFakeIpAddress();
// If the sender (local_endpoint) is OUR IP, ignore it.
// We don't want to process our own sent packets.
if (packet.local_endpoint.ip == my_ip) {
return;
}
// Only accept packets meant for us or actual broadcasts.
if (packet.remote_endpoint.ip != my_ip && !packet.broadcast) {
return;
}
// PROTOCOL & PORT CHECK
if (protocol != packet.protocol || local_endpoint.portno != packet.remote_endpoint.portno || closed) {
stats.packets_dropped++;
return;
}
// BROADCAST CHECK
if (!broadcast && packet.broadcast) {
stats.packets_dropped++;
return;
}
// DECOMPRESSION & QUEUEING
auto decompressed = packet;
decompressed.data = Common::Compression::DecompressDataZSTD(packet.data);
if (decompressed.data.empty() && !packet.data.empty()) {
@@ -65,10 +59,14 @@ void ProxySocket::HandleProxyPacket(const ProxyPacket& packet) {
return;
}
std::lock_guard guard(packets_mutex);
received_packets.push(decompressed);
stats.packets_received++;
stats.bytes_received += decompressed.data.size();
{
std::lock_guard guard(packets_mutex);
received_packets.push(decompressed);
stats.packets_received++;
stats.bytes_received += decompressed.data.size();
}
// Wake up RecvFrom immediately
cv_packet_received.notify_all();
}
template <typename T>
@@ -155,34 +153,65 @@ std::pair<s32, Errno> ProxySocket::RecvFrom(int flags, std::span<u8> message, So
ASSERT(flags == 0);
ASSERT(message.size() < static_cast<size_t>(std::numeric_limits<int>::max()));
// TODO (flTobi): Verify the timeout behavior and break when connection is lost
const auto timestamp = std::chrono::steady_clock::now();
// When receive_timeout is set to zero, the socket is supposed to wait indefinitely until a
// packet arrives. In order to prevent lost packets from hanging the emulation thread, we set
// the timeout to 5s instead
const auto timeout = receive_timeout == 0 ? 5000 : receive_timeout;
while (true) {
{
std::lock_guard guard(packets_mutex);
if (received_packets.size() > 0) {
return ReceivePacket(flags, message, addr, message.size());
const auto timeout_ms = receive_timeout == 0 ? 5000 : receive_timeout;
std::unique_lock lock(packets_mutex);
// If not blocking and no packets, return immediately
if (received_packets.empty() && !blocking) {
return {-1, Errno::AGAIN};
}
bool signaled = cv_packet_received.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this] {
return !received_packets.empty() || closed;
});
if (closed) {
return {-1, Errno::BADF};
}
if (!signaled) {
return {-1, Errno::TIMEDOUT};
}
// Packet is ready, process it while still holding the lock
ProxyPacket& packet = received_packets.front();
if (addr) {
addr->family = Domain::INET;
addr->ip = packet.local_endpoint.ip;
addr->portno = packet.local_endpoint.portno;
}
bool peek = (flags & FLAG_MSG_PEEK) != 0;
std::size_t read_bytes;
std::size_t max_length = message.size();
if (packet.data.size() > max_length) {
read_bytes = max_length;
std::memcpy(message.data(), packet.data.data(), max_length);
if (protocol == Protocol::UDP) {
if (!peek) {
received_packets.pop();
}
return {-1, Errno::MSGSIZE};
} else if (protocol == Protocol::TCP) {
if (!peek) {
std::vector<u8> numArray;
numArray.reserve(packet.data.size() - max_length);
std::copy(packet.data.begin() + max_length, packet.data.end(), std::back_inserter(numArray));
packet.data = std::move(numArray);
}
}
if (!blocking) {
return {-1, Errno::AGAIN};
}
std::this_thread::yield();
const auto time_diff = std::chrono::steady_clock::now() - timestamp;
const auto time_diff_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(time_diff).count();
if (time_diff_ms > timeout) {
return {-1, Errno::TIMEDOUT};
} else {
read_bytes = packet.data.size();
std::memcpy(message.data(), packet.data.data(), read_bytes);
if (!peek) {
received_packets.pop();
}
}
return {static_cast<s32>(read_bytes), Errno::SUCCESS};
}
std::pair<s32, Errno> ProxySocket::ReceivePacket(int flags, std::span<u8> message, SockAddrIn* addr,
@@ -307,15 +336,19 @@ std::pair<s32, Errno> ProxySocket::SendTo(u32 flags, std::span<const u8> message
}
Errno ProxySocket::Close() {
std::lock_guard guard(packets_mutex);
fd = INVALID_SOCKET;
closed = true;
{
std::lock_guard guard(packets_mutex);
fd = INVALID_SOCKET;
closed = true;
// Flush any pending packets so they don't get processed after closure
while (!received_packets.empty()) {
received_packets.pop();
while (!received_packets.empty()) {
received_packets.pop();
}
}
// Wake up any threads stuck in RecvFrom so they can close properly
cv_packet_received.notify_all();
return Errno::SUCCESS;
}

View File

@@ -1,9 +1,11 @@
// SPDX-FileCopyrightText: Copyright 2022 yuzu Emulator Project
// SPDX-FileCopyrightText: Copyright 2026 citron Emulator Project
// SPDX-License-Identifier: GPL-2.0-or-later
#pragma once
#include <mutex>
#include <condition_variable>
#include <span>
#include <vector>
#include <queue>
@@ -92,6 +94,7 @@ private:
Protocol protocol;
std::mutex packets_mutex;
std::condition_variable cv_packet_received;
RoomNetwork& room_network;