From cf57942a0b0fd19ebb23d2c6b5ff831b70fc682f Mon Sep 17 00:00:00 2001 From: mvglasow Date: Thu, 15 May 2025 23:54:21 +0300 Subject: [PATCH] [traffic] Allow decoding to be interrupted after each message Message deduplication currently disabled Signed-off-by: mvglasow --- map/traffic_manager.cpp | 163 ++++++++++++++++++++++++---------------- map/traffic_manager.hpp | 23 ++++-- 2 files changed, 113 insertions(+), 73 deletions(-) diff --git a/map/traffic_manager.cpp b/map/traffic_manager.cpp index 9e33135f2..d8a016a90 100644 --- a/map/traffic_manager.cpp +++ b/map/traffic_manager.cpp @@ -25,6 +25,9 @@ using namespace std::chrono; namespace { +/** + * Poll interval for traffic data + */ auto constexpr kUpdateInterval = minutes(1); auto constexpr kOutdatedDataTimeout = minutes(5) + kUpdateInterval; auto constexpr kNetworkErrorTimeout = minutes(20); @@ -337,11 +340,15 @@ bool TrafficManager::Poll() std::lock_guard lock(m_mutex); m_feedQueue.push_back(feed); } + m_lastResponseTime = steady_clock::now(); + m_isPollNeeded = false; return true; } else { LOG(LWARNING, ("An error occurred parsing the TraFF feed")); + // TODO should we really reset m_isPollNeeded here? + m_isPollNeeded = false; return false; } } @@ -350,6 +357,7 @@ void TrafficManager::Push(traffxml::TraffFeed feed) { std::lock_guard lock(m_mutex); m_feedQueue.push_back(feed); + // TODO should we update m_lastResponseTime? } void TrafficManager::ConsolidateFeedQueue() @@ -402,36 +410,6 @@ void TrafficManager::ConsolidateFeedQueue() ++it; } -void TrafficManager::UpdateMessageCache(std::map & cache) -{ - traffxml::TraffFeed feed; - // Thread-safe iteration over m_feedQueue, releasing the mutex during the loop - while (true) - { - { - std::lock_guard lock(m_mutex); - if (!m_feedQueue.empty()) - { - feed = m_feedQueue.front(); - m_feedQueue.erase(m_feedQueue.begin()); - } - else - break; - } - - for (auto message : feed) - { - LOG(LINFO, (" message:", message)); - auto it = cache.find(message.m_id); - bool process = (it == cache.end()); - if (!process) - process = (it->second.m_updateTime < message.m_updateTime); - if (process) - cache.insert_or_assign(message.m_id, message); - } - } -} - void TrafficManager::InitializeDataSources(std::vector & dataSources) { /* @@ -558,6 +536,46 @@ void TrafficManager::DecodeMessage(traffxml::TraffMessage & message) } } +void TrafficManager::DecodeFirstMessage() +{ + traffxml::TraffMessage message; + { + // Lock the mutex while iterating over the feed queue + std::lock_guard lock(m_mutex); + // remove empty feeds from the beginning of the queue + while (!m_feedQueue.empty() && m_feedQueue.front().empty()) + m_feedQueue.erase(m_feedQueue.begin()); + // if we have no more feeds, return (nothing to do) + if (m_feedQueue.empty()) + return; + // retrieve the first message from the first feed, remove it from the feed + std::swap(message, m_feedQueue.front().front()); + m_feedQueue.front().erase(m_feedQueue.front().begin()); + // if the feed has no more messages, erase it (eager erase, as an empty queue is used as a condition later) + if (m_feedQueue.front().empty()) + m_feedQueue.erase(m_feedQueue.begin()); + } + + // check if message is actually newer + auto it = m_messageCache.find(message.m_id); + bool process = (it == m_messageCache.end()); + if (!process) + process = (it->second.m_updateTime < message.m_updateTime); + if (!process) + { + LOG(LINFO, ("message", message.m_id, "is already in cache, skipping")); + return; + } + + LOG(LINFO, (" ", message.m_id, ":", message)); + DecodeMessage(message); + // store message in cache + //m_messageCache.insert_or_assign(message.m_id, message); + // store message coloring in AllMwmColoring + // TODO trigger full cache processing if segments were removed or traffic has eased + traffxml::MergeMultiMwmColoring(message.m_decoded, m_allMwmColoring); +} + void TrafficManager::ThreadRoutine() { std::vector mwms; @@ -565,9 +583,9 @@ void TrafficManager::ThreadRoutine() { // TODO clean out expired messages - // poll is always needed, unless a new subscription or a subscription change returns a feed - m_isPollNeeded = true; + LOG(LINFO, ("start loop, active MWMs changed:", m_activeMwmsChanged, ", poll needed:", m_isPollNeeded)); + // this is a no-op if active MWMs have not changed if (!SetSubscriptionArea()) { LOG(LWARNING, ("SetSubscriptionArea failed.")); @@ -576,7 +594,10 @@ void TrafficManager::ThreadRoutine() LOG(LWARNING, ("No subscription, no traffic data will be retrieved.")); } - // fetch traffic data if subscribed, unless this has already happened in the previous step + /* + * Fetch traffic data if needed and we have a subscription. + * m_isPollNeeded may be set by WaitForRequest() and set/unset by SetSubscriptionArea(). + */ if (m_isPollNeeded && IsSubscribed()) { if (!Poll()) @@ -590,31 +611,11 @@ void TrafficManager::ThreadRoutine() // consolidate feed queue (remove older messages in favor of newer ones) ConsolidateFeedQueue(); - /* - * TODO call on a temp struct, then unite with m_messageCache, processing only messages with changes - * (adding segments for new messages, removing segments for deleted messages, replacing segments - * for updated messages) and leaving all other segments untouched - */ - UpdateMessageCache(m_messageCache); - LOG(LINFO, (m_messageCache.size(), "message(s) in cache")); - - /* - * Map between MWM IDs and their colorings. - */ - //TODO should we use std::map> ? - // TODO store mwm/segment/speed group map with each message, build allMwmColoring on the fly - std::map allMwmColoring; - for (auto [id, message] : m_messageCache) - { - LOG(LINFO, (" ", id, ":", message)); - DecodeMessage(message); - // store message coloring in AllMwmColoring - // TODO do this in a later pass...? - traffxml::MergeMultiMwmColoring(message.m_decoded, allMwmColoring); - } + // decode one message and add it to the cache + DecodeFirstMessage(); // set new coloring for MWMs - OnTrafficDataUpdate(allMwmColoring); + OnTrafficDataUpdate(m_allMwmColoring); // TODO no longer needed #ifdef traffic_dead_code @@ -657,20 +658,44 @@ bool TrafficManager::WaitForRequest(std::vector & mwms) { std::unique_lock lock(m_mutex); - bool const timeout = !m_condition.wait_for(lock, kUpdateInterval, [this] - { - return !m_isRunning || !m_requestedMwms.empty(); - }); - + /* + * if we got terminated, return false immediately + * (don’t wait until sleep, we might not get much sleep if we’re busy processing a long feed) + */ if (!m_isRunning) return false; - if (timeout) - RequestTrafficData(); + // if we have feeds in the queue, return immediately + if (!m_feedQueue.empty()) + { + LOG(LINFO, ("feed queue not empty, returning immediately")); + return true; + } - if (!m_requestedMwms.empty()) - mwms.swap(m_requestedMwms); + // if update interval has elapsed, return immediately + auto const currentTime = steady_clock::now(); + auto const passedSeconds = currentTime - m_lastResponseTime; + if (passedSeconds >= kUpdateInterval) + { + LOG(LINFO, ("last response was", passedSeconds, "ago, returning immediately")); + m_isPollNeeded = true; + return true; + } + LOG(LINFO, ("nothing to do for now, waiting for timeout or notification")); + bool const timeout = !m_condition.wait_for(lock, kUpdateInterval, [this] + { + return !m_isRunning || m_activeMwmsChanged; + }); + + // check again if we got terminated while waiting (or woken up because we got terminated) + if (!m_isRunning) + return false; + + // this works as long as wait timeout is at least equal to the poll interval + m_isPollNeeded |= timeout; + + LOG(LINFO, ("timeout:", timeout, "active MWMs changed:", m_activeMwmsChanged)); return true; } @@ -754,6 +779,14 @@ void TrafficManager::OnTrafficRequestFailed(traffic::TrafficInfo && info) void TrafficManager::OnTrafficDataUpdate(std::map & trafficCache) { + /* + * TODO do not update coloring on every single pass (i.e. after every single message). + * Coloring needs to be updated when the queue is empty (i.e. we have processed all messages for now) + * and should be updated periodically while larger numbers of messages are being processed. + * The interval is TBD and we may want to choose different intervals for the graphics engine + * and the routing engine: 10–20 seconds seems reasonable for graphics, for the routing engine the + * interval should not be shorter than the time needed for route recalculation. + */ /* * Much of this code is copied and pasted together from old MWM code, with some minor adaptations: * diff --git a/map/traffic_manager.hpp b/map/traffic_manager.hpp index 613ad3bd0..3975044e0 100644 --- a/map/traffic_manager.hpp +++ b/map/traffic_manager.hpp @@ -246,14 +246,6 @@ private: */ void ConsolidateFeedQueue(); - /** - * @brief Merges new messages from `m_feedQueue` into a message cache. - * - * Existing messages in `cache` will be overwritten by newer messages with the same ID in `m_feedQueue`. - * @param cache The message cache. - */ - void UpdateMessageCache(std::map & cache); - /** * @brief Initializes the data sources for an OpenLR decoder. * @@ -261,6 +253,11 @@ private: */ void InitializeDataSources(std::vector &dataSources); + /** + * @brief Removes the first message from the first feed and decodes it. + */ + void DecodeFirstMessage(); + /** * @brief Decodes a single message to its segments and their speed groups. * @@ -489,6 +486,11 @@ private: */ threads::SimpleThread m_thread; + /** + * @brief When the last response was received. + */ + std::chrono::time_point m_lastResponseTime; + /** * @brief Whether active MWMs have changed since the last request. */ @@ -530,6 +532,11 @@ private: * Used to decode TraFF locations into road segments on the map. */ openlr::OpenLRDecoder m_openLrDecoder; + + /** + * @brief Map between MWM IDs and their colorings. + */ + std::map m_allMwmColoring; }; extern std::string DebugPrint(TrafficManager::TrafficState state);