[traffic] Allow decoding to be interrupted after each message

Message deduplication currently disabled

Signed-off-by: mvglasow <michael -at- vonglasow.com>
This commit is contained in:
mvglasow
2025-05-15 23:54:21 +03:00
parent 3a713c477a
commit cf57942a0b
2 changed files with 113 additions and 73 deletions

View File

@@ -25,6 +25,9 @@ using namespace std::chrono;
namespace namespace
{ {
/**
* Poll interval for traffic data
*/
auto constexpr kUpdateInterval = minutes(1); auto constexpr kUpdateInterval = minutes(1);
auto constexpr kOutdatedDataTimeout = minutes(5) + kUpdateInterval; auto constexpr kOutdatedDataTimeout = minutes(5) + kUpdateInterval;
auto constexpr kNetworkErrorTimeout = minutes(20); auto constexpr kNetworkErrorTimeout = minutes(20);
@@ -337,11 +340,15 @@ bool TrafficManager::Poll()
std::lock_guard<std::mutex> lock(m_mutex); std::lock_guard<std::mutex> lock(m_mutex);
m_feedQueue.push_back(feed); m_feedQueue.push_back(feed);
} }
m_lastResponseTime = steady_clock::now();
m_isPollNeeded = false;
return true; return true;
} }
else else
{ {
LOG(LWARNING, ("An error occurred parsing the TraFF feed")); LOG(LWARNING, ("An error occurred parsing the TraFF feed"));
// TODO should we really reset m_isPollNeeded here?
m_isPollNeeded = false;
return false; return false;
} }
} }
@@ -350,6 +357,7 @@ void TrafficManager::Push(traffxml::TraffFeed feed)
{ {
std::lock_guard<std::mutex> lock(m_mutex); std::lock_guard<std::mutex> lock(m_mutex);
m_feedQueue.push_back(feed); m_feedQueue.push_back(feed);
// TODO should we update m_lastResponseTime?
} }
void TrafficManager::ConsolidateFeedQueue() void TrafficManager::ConsolidateFeedQueue()
@@ -402,36 +410,6 @@ void TrafficManager::ConsolidateFeedQueue()
++it; ++it;
} }
void TrafficManager::UpdateMessageCache(std::map<std::string, traffxml::TraffMessage> & cache)
{
traffxml::TraffFeed feed;
// Thread-safe iteration over m_feedQueue, releasing the mutex during the loop
while (true)
{
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_feedQueue.empty())
{
feed = m_feedQueue.front();
m_feedQueue.erase(m_feedQueue.begin());
}
else
break;
}
for (auto message : feed)
{
LOG(LINFO, (" message:", message));
auto it = cache.find(message.m_id);
bool process = (it == cache.end());
if (!process)
process = (it->second.m_updateTime < message.m_updateTime);
if (process)
cache.insert_or_assign(message.m_id, message);
}
}
}
void TrafficManager::InitializeDataSources(std::vector<FrozenDataSource> & dataSources) void TrafficManager::InitializeDataSources(std::vector<FrozenDataSource> & dataSources)
{ {
/* /*
@@ -558,6 +536,46 @@ void TrafficManager::DecodeMessage(traffxml::TraffMessage & message)
} }
} }
void TrafficManager::DecodeFirstMessage()
{
traffxml::TraffMessage message;
{
// Lock the mutex while iterating over the feed queue
std::lock_guard<std::mutex> lock(m_mutex);
// remove empty feeds from the beginning of the queue
while (!m_feedQueue.empty() && m_feedQueue.front().empty())
m_feedQueue.erase(m_feedQueue.begin());
// if we have no more feeds, return (nothing to do)
if (m_feedQueue.empty())
return;
// retrieve the first message from the first feed, remove it from the feed
std::swap(message, m_feedQueue.front().front());
m_feedQueue.front().erase(m_feedQueue.front().begin());
// if the feed has no more messages, erase it (eager erase, as an empty queue is used as a condition later)
if (m_feedQueue.front().empty())
m_feedQueue.erase(m_feedQueue.begin());
}
// check if message is actually newer
auto it = m_messageCache.find(message.m_id);
bool process = (it == m_messageCache.end());
if (!process)
process = (it->second.m_updateTime < message.m_updateTime);
if (!process)
{
LOG(LINFO, ("message", message.m_id, "is already in cache, skipping"));
return;
}
LOG(LINFO, (" ", message.m_id, ":", message));
DecodeMessage(message);
// store message in cache
//m_messageCache.insert_or_assign(message.m_id, message);
// store message coloring in AllMwmColoring
// TODO trigger full cache processing if segments were removed or traffic has eased
traffxml::MergeMultiMwmColoring(message.m_decoded, m_allMwmColoring);
}
void TrafficManager::ThreadRoutine() void TrafficManager::ThreadRoutine()
{ {
std::vector<MwmSet::MwmId> mwms; std::vector<MwmSet::MwmId> mwms;
@@ -565,9 +583,9 @@ void TrafficManager::ThreadRoutine()
{ {
// TODO clean out expired messages // TODO clean out expired messages
// poll is always needed, unless a new subscription or a subscription change returns a feed LOG(LINFO, ("start loop, active MWMs changed:", m_activeMwmsChanged, ", poll needed:", m_isPollNeeded));
m_isPollNeeded = true;
// this is a no-op if active MWMs have not changed
if (!SetSubscriptionArea()) if (!SetSubscriptionArea())
{ {
LOG(LWARNING, ("SetSubscriptionArea failed.")); LOG(LWARNING, ("SetSubscriptionArea failed."));
@@ -576,7 +594,10 @@ void TrafficManager::ThreadRoutine()
LOG(LWARNING, ("No subscription, no traffic data will be retrieved.")); LOG(LWARNING, ("No subscription, no traffic data will be retrieved."));
} }
// fetch traffic data if subscribed, unless this has already happened in the previous step /*
* Fetch traffic data if needed and we have a subscription.
* m_isPollNeeded may be set by WaitForRequest() and set/unset by SetSubscriptionArea().
*/
if (m_isPollNeeded && IsSubscribed()) if (m_isPollNeeded && IsSubscribed())
{ {
if (!Poll()) if (!Poll())
@@ -590,31 +611,11 @@ void TrafficManager::ThreadRoutine()
// consolidate feed queue (remove older messages in favor of newer ones) // consolidate feed queue (remove older messages in favor of newer ones)
ConsolidateFeedQueue(); ConsolidateFeedQueue();
/* // decode one message and add it to the cache
* TODO call on a temp struct, then unite with m_messageCache, processing only messages with changes DecodeFirstMessage();
* (adding segments for new messages, removing segments for deleted messages, replacing segments
* for updated messages) and leaving all other segments untouched
*/
UpdateMessageCache(m_messageCache);
LOG(LINFO, (m_messageCache.size(), "message(s) in cache"));
/*
* Map between MWM IDs and their colorings.
*/
//TODO should we use std::map<MwmSet::MwmId, std::shared_ptr<const traffic::TrafficInfo::Coloring>> ?
// TODO store mwm/segment/speed group map with each message, build allMwmColoring on the fly
std::map<MwmSet::MwmId, traffic::TrafficInfo::Coloring> allMwmColoring;
for (auto [id, message] : m_messageCache)
{
LOG(LINFO, (" ", id, ":", message));
DecodeMessage(message);
// store message coloring in AllMwmColoring
// TODO do this in a later pass...?
traffxml::MergeMultiMwmColoring(message.m_decoded, allMwmColoring);
}
// set new coloring for MWMs // set new coloring for MWMs
OnTrafficDataUpdate(allMwmColoring); OnTrafficDataUpdate(m_allMwmColoring);
// TODO no longer needed // TODO no longer needed
#ifdef traffic_dead_code #ifdef traffic_dead_code
@@ -657,20 +658,44 @@ bool TrafficManager::WaitForRequest(std::vector<MwmSet::MwmId> & mwms)
{ {
std::unique_lock<std::mutex> lock(m_mutex); std::unique_lock<std::mutex> lock(m_mutex);
bool const timeout = !m_condition.wait_for(lock, kUpdateInterval, [this] /*
{ * if we got terminated, return false immediately
return !m_isRunning || !m_requestedMwms.empty(); * (dont wait until sleep, we might not get much sleep if were busy processing a long feed)
}); */
if (!m_isRunning) if (!m_isRunning)
return false; return false;
if (timeout) // if we have feeds in the queue, return immediately
RequestTrafficData(); if (!m_feedQueue.empty())
{
LOG(LINFO, ("feed queue not empty, returning immediately"));
return true;
}
if (!m_requestedMwms.empty()) // if update interval has elapsed, return immediately
mwms.swap(m_requestedMwms); auto const currentTime = steady_clock::now();
auto const passedSeconds = currentTime - m_lastResponseTime;
if (passedSeconds >= kUpdateInterval)
{
LOG(LINFO, ("last response was", passedSeconds, "ago, returning immediately"));
m_isPollNeeded = true;
return true;
}
LOG(LINFO, ("nothing to do for now, waiting for timeout or notification"));
bool const timeout = !m_condition.wait_for(lock, kUpdateInterval, [this]
{
return !m_isRunning || m_activeMwmsChanged;
});
// check again if we got terminated while waiting (or woken up because we got terminated)
if (!m_isRunning)
return false;
// this works as long as wait timeout is at least equal to the poll interval
m_isPollNeeded |= timeout;
LOG(LINFO, ("timeout:", timeout, "active MWMs changed:", m_activeMwmsChanged));
return true; return true;
} }
@@ -754,6 +779,14 @@ void TrafficManager::OnTrafficRequestFailed(traffic::TrafficInfo && info)
void TrafficManager::OnTrafficDataUpdate(std::map<MwmSet::MwmId, traffic::TrafficInfo::Coloring> & trafficCache) void TrafficManager::OnTrafficDataUpdate(std::map<MwmSet::MwmId, traffic::TrafficInfo::Coloring> & trafficCache)
{ {
/*
* TODO do not update coloring on every single pass (i.e. after every single message).
* Coloring needs to be updated when the queue is empty (i.e. we have processed all messages for now)
* and should be updated periodically while larger numbers of messages are being processed.
* The interval is TBD and we may want to choose different intervals for the graphics engine
* and the routing engine: 1020 seconds seems reasonable for graphics, for the routing engine the
* interval should not be shorter than the time needed for route recalculation.
*/
/* /*
* Much of this code is copied and pasted together from old MWM code, with some minor adaptations: * Much of this code is copied and pasted together from old MWM code, with some minor adaptations:
* *

View File

@@ -246,14 +246,6 @@ private:
*/ */
void ConsolidateFeedQueue(); void ConsolidateFeedQueue();
/**
* @brief Merges new messages from `m_feedQueue` into a message cache.
*
* Existing messages in `cache` will be overwritten by newer messages with the same ID in `m_feedQueue`.
* @param cache The message cache.
*/
void UpdateMessageCache(std::map<std::string, traffxml::TraffMessage> & cache);
/** /**
* @brief Initializes the data sources for an OpenLR decoder. * @brief Initializes the data sources for an OpenLR decoder.
* *
@@ -261,6 +253,11 @@ private:
*/ */
void InitializeDataSources(std::vector<FrozenDataSource> &dataSources); void InitializeDataSources(std::vector<FrozenDataSource> &dataSources);
/**
* @brief Removes the first message from the first feed and decodes it.
*/
void DecodeFirstMessage();
/** /**
* @brief Decodes a single message to its segments and their speed groups. * @brief Decodes a single message to its segments and their speed groups.
* *
@@ -489,6 +486,11 @@ private:
*/ */
threads::SimpleThread m_thread; threads::SimpleThread m_thread;
/**
* @brief When the last response was received.
*/
std::chrono::time_point<std::chrono::steady_clock> m_lastResponseTime;
/** /**
* @brief Whether active MWMs have changed since the last request. * @brief Whether active MWMs have changed since the last request.
*/ */
@@ -530,6 +532,11 @@ private:
* Used to decode TraFF locations into road segments on the map. * Used to decode TraFF locations into road segments on the map.
*/ */
openlr::OpenLRDecoder m_openLrDecoder; openlr::OpenLRDecoder m_openLrDecoder;
/**
* @brief Map between MWM IDs and their colorings.
*/
std::map<MwmSet::MwmId, traffic::TrafficInfo::Coloring> m_allMwmColoring;
}; };
extern std::string DebugPrint(TrafficManager::TrafficState state); extern std::string DebugPrint(TrafficManager::TrafficState state);