From edb1b7e784cfe244bbb1dbf61127dddee3bf497d Mon Sep 17 00:00:00 2001 From: mvglasow Date: Wed, 14 May 2025 21:03:54 +0300 Subject: [PATCH] [traffic] Consolidate feed queue before decoding messages Signed-off-by: mvglasow --- map/traffic_manager.cpp | 53 ++++++++++++++++++++++++++++++++++++++++ map/traffic_manager.hpp | 9 +++++++ traffxml/traff_model.cpp | 14 +++++++++++ traffxml/traff_model.hpp | 4 +++ 4 files changed, 80 insertions(+) diff --git a/map/traffic_manager.cpp b/map/traffic_manager.cpp index 527ffdd2f..7e2e6182c 100644 --- a/map/traffic_manager.cpp +++ b/map/traffic_manager.cpp @@ -352,6 +352,56 @@ void TrafficManager::Push(traffxml::TraffFeed feed) m_feedQueue.push_back(feed); } +void TrafficManager::ConsolidateFeedQueue() +{ + std::lock_guard lock(m_mutex); + if (m_feedQueue.empty()) + return; + for (size_t i = m_feedQueue.size() - 1; i <= 0; i--) + for (size_t j = m_feedQueue.size() - 1; j <= 0; j--) + { + if (i == j) + continue; + for (auto it_i = m_feedQueue[i].begin(); it_i != m_feedQueue[i].end(); ) + for (auto it_j = m_feedQueue[j].end(); it_j != m_feedQueue[j].end(); ) + if (it_i->m_id == it_j->m_id) + { + // dupe, remove older + if (traffxml::operator<(it_i->m_updateTime, it_j->m_updateTime)) + { + // standard case: i has the newer one + ++it_i; + it_j = m_feedQueue[j].erase(it_j); + } + else if (traffxml::operator<(it_i->m_updateTime, it_j->m_updateTime)) + { + // j has the newer one + it_i = m_feedQueue[i].erase(it_i); + ++it_j; + } + else if (i > j) + { + // same time, but feed i was received after j, keep i + ++it_i; + it_j = m_feedQueue[j].erase(it_j); + } + else + { + // same time, but feed j was received after i, keep j + ASSERT(i != j, ()); + it_i = m_feedQueue[i].erase(it_i); + ++it_j; + } + } + } + // remove empty feeds + for (auto it = m_feedQueue.begin(); it != m_feedQueue.end(); ) + if (it->empty()) + it = m_feedQueue.erase(it); + else + ++it; +} + void TrafficManager::UpdateMessageCache(std::map & cache) { traffxml::TraffFeed feed; @@ -537,6 +587,9 @@ void TrafficManager::ThreadRoutine() } LOG(LINFO, (m_feedQueue.size(), "feed(s) in queue")); + // consolidate feed queue (remove older messages in favor of newer ones) + ConsolidateFeedQueue(); + /* * TODO call on a temp struct, then unite with m_messageCache, processing only messages with changes * (adding segments for new messages, removing segments for deleted messages, replacing segments diff --git a/map/traffic_manager.hpp b/map/traffic_manager.hpp index 33977e271..613ad3bd0 100644 --- a/map/traffic_manager.hpp +++ b/map/traffic_manager.hpp @@ -237,7 +237,16 @@ private: void Push(traffxml::TraffFeed feed); /** + * @brief Consolidates the feed queue. * + * If multiple feeds in the queue have the same message ID, only the message with the newest + * update time is kept (if two messages have the same ID and update time, the one in the feed + * with the higher index is kept); other messages with the same ID are discarded. Empty feeds + * are discarded. + */ + void ConsolidateFeedQueue(); + + /** * @brief Merges new messages from `m_feedQueue` into a message cache. * * Existing messages in `cache` will be overwritten by newer messages with the same ID in `m_feedQueue`. diff --git a/traffxml/traff_model.cpp b/traffxml/traff_model.cpp index f61b07adf..c68ed461d 100644 --- a/traffxml/traff_model.cpp +++ b/traffxml/traff_model.cpp @@ -76,6 +76,20 @@ const std::map kEventDelayMap{ // TODO Security*, Transport*, Weather* (not in enum yet) }; +bool operator< (IsoTime lhs, IsoTime rhs) +{ + std::time_t t_lhs = std::mktime(&lhs); + std::time_t t_rhs = std::mktime(&rhs); + return t_lhs < t_rhs; +} + +bool operator> (IsoTime lhs, IsoTime rhs) +{ + std::time_t t_lhs = std::mktime(&lhs); + std::time_t t_rhs = std::mktime(&rhs); + return t_lhs > t_rhs; +} + openlr::LocationReferencePoint Point::ToLrp() { openlr::LocationReferencePoint result; diff --git a/traffxml/traff_model.hpp b/traffxml/traff_model.hpp index 436f99966..faa8135ab 100644 --- a/traffxml/traff_model.hpp +++ b/traffxml/traff_model.hpp @@ -35,8 +35,12 @@ constexpr uint8_t kMaxspeedNone = 255; * Where no time zone is indicated, the timestamp shall always be interpreted as UTC. * `IsoTime` currently maps to `std::tm`, but this is not guaranteed. */ +// TODO make this a class with a private std::tm member using IsoTime = std::tm; +bool operator< (IsoTime lhs, IsoTime rhs); +bool operator> (IsoTime lhs, IsoTime rhs); + // TODO enum urgency enum class Directionality