diff --git a/src/core/hle/service/sockets/bsd.cpp b/src/core/hle/service/sockets/bsd.cpp index 11c65754a..c9a77203b 100644 --- a/src/core/hle/service/sockets/bsd.cpp +++ b/src/core/hle/service/sockets/bsd.cpp @@ -531,9 +531,6 @@ std::pair BSD::SocketImpl(Domain domain, Type type, Protocol protoco UNIMPLEMENTED_IF_MSG(unk_flag, "Unknown flag in type"); type = static_cast(static_cast(type) & ~0x20000000); - // Lock the table before searching for or creating a descriptor - std::lock_guard table_lock(fd_table_mutex); - const s32 fd = FindFreeFileDescriptorHandle(); if (fd < 0) { LOG_ERROR(Service, "No more file descriptors available"); @@ -542,6 +539,7 @@ std::pair BSD::SocketImpl(Domain domain, Type type, Protocol protoco file_descriptors[fd] = FileDescriptor{}; FileDescriptor& descriptor = *file_descriptors[fd]; + // ENONMEM might be thrown here auto room_member = room_network.GetRoomMember().lock(); const bool using_proxy = room_member && room_member->IsConnected(); @@ -549,21 +547,23 @@ std::pair BSD::SocketImpl(Domain domain, Type type, Protocol protoco LOG_INFO(Service, "New socket fd={} domain={} type={} protocol={} proxy={}", fd, domain, type, protocol, using_proxy); + // Store socket type information for pooling descriptor.domain = Translate(domain); descriptor.type = Translate(type); descriptor.protocol = Translate(protocol); descriptor.is_connection_based = IsConnectionBased(type); + // Try to reuse a socket from the pool if using proxy if (using_proxy) { SocketPoolKey key{descriptor.domain, descriptor.type, descriptor.protocol}; - std::lock_guard pool_lock(socket_pool_mutex); + std::lock_guard lock(socket_pool_mutex); auto it = socket_pool.find(key); if (it != socket_pool.end() && !it->second.empty()) { descriptor.socket = it->second.back(); it->second.pop_back(); - // Reset the socket state so 'closed' is false and the queue is empty + // call Initialize here so socket_proxy.cpp functions work descriptor.socket->Initialize(descriptor.domain, descriptor.type, descriptor.protocol); LOG_DEBUG(Service, "Reused socket from pool for fd={}", fd); diff --git a/src/core/internal_network/socket_proxy.cpp b/src/core/internal_network/socket_proxy.cpp index 48455d849..b6368e4b5 100644 --- a/src/core/internal_network/socket_proxy.cpp +++ b/src/core/internal_network/socket_proxy.cpp @@ -34,24 +34,30 @@ void ProxySocket::HandleProxyPacket(const ProxyPacket& packet) { const auto my_ip = room_member->GetFakeIpAddress(); + // If the sender (local_endpoint) is OUR IP, ignore it. + // We don't want to process our own sent packets. if (packet.local_endpoint.ip == my_ip) { return; } + // Only accept packets meant for us or actual broadcasts. if (packet.remote_endpoint.ip != my_ip && !packet.broadcast) { return; } + // PROTOCOL & PORT CHECK if (protocol != packet.protocol || local_endpoint.portno != packet.remote_endpoint.portno || closed) { stats.packets_dropped++; return; } + // BROADCAST CHECK if (!broadcast && packet.broadcast) { stats.packets_dropped++; return; } + // DECOMPRESSION & QUEUEING auto decompressed = packet; decompressed.data = Common::Compression::DecompressDataZSTD(packet.data); if (decompressed.data.empty() && !packet.data.empty()) { @@ -59,14 +65,10 @@ void ProxySocket::HandleProxyPacket(const ProxyPacket& packet) { return; } - { - std::lock_guard guard(packets_mutex); - received_packets.push(decompressed); - stats.packets_received++; - stats.bytes_received += decompressed.data.size(); - } - // Wake up RecvFrom immediately - cv_packet_received.notify_all(); + std::lock_guard guard(packets_mutex); + received_packets.push(decompressed); + stats.packets_received++; + stats.bytes_received += decompressed.data.size(); } template @@ -153,65 +155,34 @@ std::pair ProxySocket::RecvFrom(int flags, std::span message, So ASSERT(flags == 0); ASSERT(message.size() < static_cast(std::numeric_limits::max())); - const auto timeout_ms = receive_timeout == 0 ? 5000 : receive_timeout; - - std::unique_lock lock(packets_mutex); - - // If not blocking and no packets, return immediately - if (received_packets.empty() && !blocking) { - return {-1, Errno::AGAIN}; - } - - bool signaled = cv_packet_received.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this] { - return !received_packets.empty() || closed; - }); - - if (closed) { - return {-1, Errno::BADF}; - } - - if (!signaled) { - return {-1, Errno::TIMEDOUT}; - } - - // Packet is ready, process it while still holding the lock - ProxyPacket& packet = received_packets.front(); - if (addr) { - addr->family = Domain::INET; - addr->ip = packet.local_endpoint.ip; - addr->portno = packet.local_endpoint.portno; - } - - bool peek = (flags & FLAG_MSG_PEEK) != 0; - std::size_t read_bytes; - std::size_t max_length = message.size(); - - if (packet.data.size() > max_length) { - read_bytes = max_length; - std::memcpy(message.data(), packet.data.data(), max_length); - - if (protocol == Protocol::UDP) { - if (!peek) { - received_packets.pop(); - } - return {-1, Errno::MSGSIZE}; - } else if (protocol == Protocol::TCP) { - if (!peek) { - std::vector numArray; - numArray.reserve(packet.data.size() - max_length); - std::copy(packet.data.begin() + max_length, packet.data.end(), std::back_inserter(numArray)); - packet.data = std::move(numArray); + // TODO (flTobi): Verify the timeout behavior and break when connection is lost + const auto timestamp = std::chrono::steady_clock::now(); + // When receive_timeout is set to zero, the socket is supposed to wait indefinitely until a + // packet arrives. In order to prevent lost packets from hanging the emulation thread, we set + // the timeout to 5s instead + const auto timeout = receive_timeout == 0 ? 5000 : receive_timeout; + while (true) { + { + std::lock_guard guard(packets_mutex); + if (received_packets.size() > 0) { + return ReceivePacket(flags, message, addr, message.size()); } } - } else { - read_bytes = packet.data.size(); - std::memcpy(message.data(), packet.data.data(), read_bytes); - if (!peek) { - received_packets.pop(); + + if (!blocking) { + return {-1, Errno::AGAIN}; + } + + std::this_thread::yield(); + + const auto time_diff = std::chrono::steady_clock::now() - timestamp; + const auto time_diff_ms = + std::chrono::duration_cast(time_diff).count(); + + if (time_diff_ms > timeout) { + return {-1, Errno::TIMEDOUT}; } } - - return {static_cast(read_bytes), Errno::SUCCESS}; } std::pair ProxySocket::ReceivePacket(int flags, std::span message, SockAddrIn* addr, @@ -336,19 +307,15 @@ std::pair ProxySocket::SendTo(u32 flags, std::span message } Errno ProxySocket::Close() { - { - std::lock_guard guard(packets_mutex); - fd = INVALID_SOCKET; - closed = true; + std::lock_guard guard(packets_mutex); + fd = INVALID_SOCKET; + closed = true; - while (!received_packets.empty()) { - received_packets.pop(); - } + // Flush any pending packets so they don't get processed after closure + while (!received_packets.empty()) { + received_packets.pop(); } - // Wake up any threads stuck in RecvFrom so they can close properly - cv_packet_received.notify_all(); - return Errno::SUCCESS; } diff --git a/src/core/internal_network/socket_proxy.h b/src/core/internal_network/socket_proxy.h index bb2bef8d1..56efc557b 100644 --- a/src/core/internal_network/socket_proxy.h +++ b/src/core/internal_network/socket_proxy.h @@ -1,11 +1,9 @@ // SPDX-FileCopyrightText: Copyright 2022 yuzu Emulator Project -// SPDX-FileCopyrightText: Copyright 2026 citron Emulator Project // SPDX-License-Identifier: GPL-2.0-or-later #pragma once #include -#include #include #include #include @@ -94,7 +92,6 @@ private: Protocol protocol; std::mutex packets_mutex; - std::condition_variable cv_packet_received; RoomNetwork& room_network;