[traffic] Consolidate feed queue before decoding messages

Signed-off-by: mvglasow <michael -at- vonglasow.com>
This commit is contained in:
mvglasow
2025-05-14 21:03:54 +03:00
parent 53e80b9283
commit edb1b7e784
4 changed files with 80 additions and 0 deletions

View File

@@ -352,6 +352,56 @@ void TrafficManager::Push(traffxml::TraffFeed feed)
m_feedQueue.push_back(feed);
}
void TrafficManager::ConsolidateFeedQueue()
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_feedQueue.empty())
return;
for (size_t i = m_feedQueue.size() - 1; i <= 0; i--)
for (size_t j = m_feedQueue.size() - 1; j <= 0; j--)
{
if (i == j)
continue;
for (auto it_i = m_feedQueue[i].begin(); it_i != m_feedQueue[i].end(); )
for (auto it_j = m_feedQueue[j].end(); it_j != m_feedQueue[j].end(); )
if (it_i->m_id == it_j->m_id)
{
// dupe, remove older
if (traffxml::operator<(it_i->m_updateTime, it_j->m_updateTime))
{
// standard case: i has the newer one
++it_i;
it_j = m_feedQueue[j].erase(it_j);
}
else if (traffxml::operator<(it_i->m_updateTime, it_j->m_updateTime))
{
// j has the newer one
it_i = m_feedQueue[i].erase(it_i);
++it_j;
}
else if (i > j)
{
// same time, but feed i was received after j, keep i
++it_i;
it_j = m_feedQueue[j].erase(it_j);
}
else
{
// same time, but feed j was received after i, keep j
ASSERT(i != j, ());
it_i = m_feedQueue[i].erase(it_i);
++it_j;
}
}
}
// remove empty feeds
for (auto it = m_feedQueue.begin(); it != m_feedQueue.end(); )
if (it->empty())
it = m_feedQueue.erase(it);
else
++it;
}
void TrafficManager::UpdateMessageCache(std::map<std::string, traffxml::TraffMessage> & cache)
{
traffxml::TraffFeed feed;
@@ -537,6 +587,9 @@ void TrafficManager::ThreadRoutine()
}
LOG(LINFO, (m_feedQueue.size(), "feed(s) in queue"));
// consolidate feed queue (remove older messages in favor of newer ones)
ConsolidateFeedQueue();
/*
* TODO call on a temp struct, then unite with m_messageCache, processing only messages with changes
* (adding segments for new messages, removing segments for deleted messages, replacing segments