From 0b5701624adf1b4ffe78a84fcc23e80c31120d29 Mon Sep 17 00:00:00 2001 From: collecting Date: Fri, 16 Jan 2026 17:42:54 -0500 Subject: [PATCH] Service: Sockets: Fix busy-waiting CPU starvation and Close/Socket race conditions --- src/core/hle/service/sockets/bsd.cpp | 10 +- src/core/internal_network/socket_proxy.cpp | 113 +++++++++++++-------- src/core/internal_network/socket_proxy.h | 3 + 3 files changed, 81 insertions(+), 45 deletions(-) diff --git a/src/core/hle/service/sockets/bsd.cpp b/src/core/hle/service/sockets/bsd.cpp index c9a77203b..11c65754a 100644 --- a/src/core/hle/service/sockets/bsd.cpp +++ b/src/core/hle/service/sockets/bsd.cpp @@ -531,6 +531,9 @@ std::pair BSD::SocketImpl(Domain domain, Type type, Protocol protoco UNIMPLEMENTED_IF_MSG(unk_flag, "Unknown flag in type"); type = static_cast(static_cast(type) & ~0x20000000); + // Lock the table before searching for or creating a descriptor + std::lock_guard table_lock(fd_table_mutex); + const s32 fd = FindFreeFileDescriptorHandle(); if (fd < 0) { LOG_ERROR(Service, "No more file descriptors available"); @@ -539,7 +542,6 @@ std::pair BSD::SocketImpl(Domain domain, Type type, Protocol protoco file_descriptors[fd] = FileDescriptor{}; FileDescriptor& descriptor = *file_descriptors[fd]; - // ENONMEM might be thrown here auto room_member = room_network.GetRoomMember().lock(); const bool using_proxy = room_member && room_member->IsConnected(); @@ -547,23 +549,21 @@ std::pair BSD::SocketImpl(Domain domain, Type type, Protocol protoco LOG_INFO(Service, "New socket fd={} domain={} type={} protocol={} proxy={}", fd, domain, type, protocol, using_proxy); - // Store socket type information for pooling descriptor.domain = Translate(domain); descriptor.type = Translate(type); descriptor.protocol = Translate(protocol); descriptor.is_connection_based = IsConnectionBased(type); - // Try to reuse a socket from the pool if using proxy if (using_proxy) { SocketPoolKey key{descriptor.domain, descriptor.type, descriptor.protocol}; - std::lock_guard lock(socket_pool_mutex); + std::lock_guard pool_lock(socket_pool_mutex); auto it = socket_pool.find(key); if (it != socket_pool.end() && !it->second.empty()) { descriptor.socket = it->second.back(); it->second.pop_back(); - // call Initialize here so socket_proxy.cpp functions work + // Reset the socket state so 'closed' is false and the queue is empty descriptor.socket->Initialize(descriptor.domain, descriptor.type, descriptor.protocol); LOG_DEBUG(Service, "Reused socket from pool for fd={}", fd); diff --git a/src/core/internal_network/socket_proxy.cpp b/src/core/internal_network/socket_proxy.cpp index b6368e4b5..48455d849 100644 --- a/src/core/internal_network/socket_proxy.cpp +++ b/src/core/internal_network/socket_proxy.cpp @@ -34,30 +34,24 @@ void ProxySocket::HandleProxyPacket(const ProxyPacket& packet) { const auto my_ip = room_member->GetFakeIpAddress(); - // If the sender (local_endpoint) is OUR IP, ignore it. - // We don't want to process our own sent packets. if (packet.local_endpoint.ip == my_ip) { return; } - // Only accept packets meant for us or actual broadcasts. if (packet.remote_endpoint.ip != my_ip && !packet.broadcast) { return; } - // PROTOCOL & PORT CHECK if (protocol != packet.protocol || local_endpoint.portno != packet.remote_endpoint.portno || closed) { stats.packets_dropped++; return; } - // BROADCAST CHECK if (!broadcast && packet.broadcast) { stats.packets_dropped++; return; } - // DECOMPRESSION & QUEUEING auto decompressed = packet; decompressed.data = Common::Compression::DecompressDataZSTD(packet.data); if (decompressed.data.empty() && !packet.data.empty()) { @@ -65,10 +59,14 @@ void ProxySocket::HandleProxyPacket(const ProxyPacket& packet) { return; } - std::lock_guard guard(packets_mutex); - received_packets.push(decompressed); - stats.packets_received++; - stats.bytes_received += decompressed.data.size(); + { + std::lock_guard guard(packets_mutex); + received_packets.push(decompressed); + stats.packets_received++; + stats.bytes_received += decompressed.data.size(); + } + // Wake up RecvFrom immediately + cv_packet_received.notify_all(); } template @@ -155,34 +153,65 @@ std::pair ProxySocket::RecvFrom(int flags, std::span message, So ASSERT(flags == 0); ASSERT(message.size() < static_cast(std::numeric_limits::max())); - // TODO (flTobi): Verify the timeout behavior and break when connection is lost - const auto timestamp = std::chrono::steady_clock::now(); - // When receive_timeout is set to zero, the socket is supposed to wait indefinitely until a - // packet arrives. In order to prevent lost packets from hanging the emulation thread, we set - // the timeout to 5s instead - const auto timeout = receive_timeout == 0 ? 5000 : receive_timeout; - while (true) { - { - std::lock_guard guard(packets_mutex); - if (received_packets.size() > 0) { - return ReceivePacket(flags, message, addr, message.size()); + const auto timeout_ms = receive_timeout == 0 ? 5000 : receive_timeout; + + std::unique_lock lock(packets_mutex); + + // If not blocking and no packets, return immediately + if (received_packets.empty() && !blocking) { + return {-1, Errno::AGAIN}; + } + + bool signaled = cv_packet_received.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this] { + return !received_packets.empty() || closed; + }); + + if (closed) { + return {-1, Errno::BADF}; + } + + if (!signaled) { + return {-1, Errno::TIMEDOUT}; + } + + // Packet is ready, process it while still holding the lock + ProxyPacket& packet = received_packets.front(); + if (addr) { + addr->family = Domain::INET; + addr->ip = packet.local_endpoint.ip; + addr->portno = packet.local_endpoint.portno; + } + + bool peek = (flags & FLAG_MSG_PEEK) != 0; + std::size_t read_bytes; + std::size_t max_length = message.size(); + + if (packet.data.size() > max_length) { + read_bytes = max_length; + std::memcpy(message.data(), packet.data.data(), max_length); + + if (protocol == Protocol::UDP) { + if (!peek) { + received_packets.pop(); + } + return {-1, Errno::MSGSIZE}; + } else if (protocol == Protocol::TCP) { + if (!peek) { + std::vector numArray; + numArray.reserve(packet.data.size() - max_length); + std::copy(packet.data.begin() + max_length, packet.data.end(), std::back_inserter(numArray)); + packet.data = std::move(numArray); } } - - if (!blocking) { - return {-1, Errno::AGAIN}; - } - - std::this_thread::yield(); - - const auto time_diff = std::chrono::steady_clock::now() - timestamp; - const auto time_diff_ms = - std::chrono::duration_cast(time_diff).count(); - - if (time_diff_ms > timeout) { - return {-1, Errno::TIMEDOUT}; + } else { + read_bytes = packet.data.size(); + std::memcpy(message.data(), packet.data.data(), read_bytes); + if (!peek) { + received_packets.pop(); } } + + return {static_cast(read_bytes), Errno::SUCCESS}; } std::pair ProxySocket::ReceivePacket(int flags, std::span message, SockAddrIn* addr, @@ -307,15 +336,19 @@ std::pair ProxySocket::SendTo(u32 flags, std::span message } Errno ProxySocket::Close() { - std::lock_guard guard(packets_mutex); - fd = INVALID_SOCKET; - closed = true; + { + std::lock_guard guard(packets_mutex); + fd = INVALID_SOCKET; + closed = true; - // Flush any pending packets so they don't get processed after closure - while (!received_packets.empty()) { - received_packets.pop(); + while (!received_packets.empty()) { + received_packets.pop(); + } } + // Wake up any threads stuck in RecvFrom so they can close properly + cv_packet_received.notify_all(); + return Errno::SUCCESS; } diff --git a/src/core/internal_network/socket_proxy.h b/src/core/internal_network/socket_proxy.h index 56efc557b..bb2bef8d1 100644 --- a/src/core/internal_network/socket_proxy.h +++ b/src/core/internal_network/socket_proxy.h @@ -1,9 +1,11 @@ // SPDX-FileCopyrightText: Copyright 2022 yuzu Emulator Project +// SPDX-FileCopyrightText: Copyright 2026 citron Emulator Project // SPDX-License-Identifier: GPL-2.0-or-later #pragma once #include +#include #include #include #include @@ -92,6 +94,7 @@ private: Protocol protocol; std::mutex packets_mutex; + std::condition_variable cv_packet_received; RoomNetwork& room_network;