Files
comaps/map/traffic_manager.cpp
mvglasow d03b47bee0 [traffic] Refactoring
Signed-off-by: mvglasow <michael -at- vonglasow.com>
2025-07-28 00:33:25 +03:00

1304 lines
37 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "map/traffic_manager.hpp"
#include "routing/routing_helpers.hpp"
#include "drape_frontend/drape_engine.hpp"
#include "drape_frontend/visual_params.hpp"
#include "indexer/ftypes_matcher.hpp"
#include "indexer/scales.hpp"
#include "geometry/mercator.hpp"
#include "platform/platform.hpp"
#include "traffxml/traff_model_xml.hpp"
using namespace std::chrono;
namespace
{
/**
* Poll interval for traffic data
*/
auto constexpr kUpdateInterval = minutes(1);
/**
* Purge interval for expired traffic messages
*/
auto constexpr kPurgeInterval = minutes(1);
auto constexpr kOutdatedDataTimeout = minutes(5) + kUpdateInterval;
auto constexpr kNetworkErrorTimeout = minutes(20);
auto constexpr kMaxRetriesCount = 5;
/**
* Interval at which the Drape engine gets traffic updates while messages are being processed.
*/
auto constexpr kDrapeUpdateInterval = seconds(10);
/**
* Interval at which the traffic observer gets traffic updates while messages are being processed.
*/
auto constexpr kObserverUpdateInterval = minutes(1);
/**
* Interval at which the message cache file is updated while messages are being processed.
*/
auto constexpr kStorageUpdateInterval = minutes(1);
/**
* File name at which traffic data is persisted.
*/
auto constexpr kTrafficXMLFileName = "traffic.xml";
} // namespace
// TODO no longer needed
#ifdef traffic_dead_code
TrafficManager::CacheEntry::CacheEntry()
: m_isLoaded(false)
// TODO no longer needed
#ifdef traffic_dead_code
, m_dataSize(0)
#endif
, m_retriesCount(0)
, m_isWaitingForResponse(false)
, m_lastAvailability(traffic::TrafficInfo::Availability::Unknown)
{}
TrafficManager::CacheEntry::CacheEntry(time_point<steady_clock> const & requestTime)
: m_isLoaded(false)
// TODO no longer needed
#ifdef traffic_dead_code
, m_dataSize(0)
#endif
, m_lastActiveTime(requestTime)
, m_lastRequestTime(requestTime)
, m_retriesCount(0)
, m_isWaitingForResponse(true)
, m_lastAvailability(traffic::TrafficInfo::Availability::Unknown)
{}
#endif
TrafficManager::TrafficManager(DataSource & dataSource, CountryInfoGetterFn countryInfoGetter,
const CountryParentNameGetterFn &countryParentNameGetter,
GetMwmsByRectFn const & getMwmsByRectFn, size_t maxCacheSizeBytes,
routing::RoutingSession & routingSession)
: m_dataSource(dataSource)
, m_countryInfoGetterFn(countryInfoGetter)
, m_countryParentNameGetterFn(countryParentNameGetter)
, m_getMwmsByRectFn(getMwmsByRectFn)
, m_routingSession(routingSession)
, m_currentDataVersion(0)
, m_state(TrafficState::Disabled)
// TODO no longer needed
#ifdef traffic_dead_code
, m_maxCacheSizeBytes(maxCacheSizeBytes)
#endif
, m_isRunning(true)
, m_isPaused(false)
, m_thread(&TrafficManager::ThreadRoutine, this)
{
CHECK(m_getMwmsByRectFn != nullptr, ());
GetPlatform().RunTask(Platform::Thread::Gui, [this]() {
m_routingSession.SetChangeSessionStateCallback([this](routing::SessionState previous, routing::SessionState current) {
OnChangeRoutingSessionState(previous, current);
});
});
}
TrafficManager::~TrafficManager()
{
#ifdef DEBUG
{
std::lock_guard<std::mutex> lock(m_mutex);
ASSERT(!m_isRunning, ());
}
#endif
}
void TrafficManager::Teardown()
{
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_isRunning)
return;
m_isRunning = false;
}
m_condition.notify_one();
m_thread.join();
}
std::map<std::string, traffxml::TraffMessage> TrafficManager::GetMessageCache()
{
std::lock_guard<std::mutex> lock(m_mutex);
return m_messageCache;
}
TrafficManager::TrafficState TrafficManager::GetState() const
{
return m_state;
}
void TrafficManager::SetStateListener(TrafficStateChangedFn const & onStateChangedFn)
{
m_onStateChangedFn = onStateChangedFn;
}
void TrafficManager::SetEnabled(bool enabled)
{
/*
* Whether to notify interested parties that traffic data has been updated.
* This is based on the return value of RestoreCache().
*/
bool notifyUpdate = false;
{
std::lock_guard<std::mutex> lock(m_mutex);
if (enabled == IsEnabled())
return;
if (enabled)
{
if (!m_traffDecoder)
// deferred decoder initialization (requires maps to be loaded)
m_traffDecoder = make_unique<traffxml::DefaultTraffDecoder>(m_dataSource, m_countryInfoGetterFn,
m_countryParentNameGetterFn, m_messageCache);
if (!m_storage && !IsTestMode())
{
m_storage = make_unique<traffxml::LocalStorage>(kTrafficXMLFileName);
notifyUpdate = RestoreCache();
m_lastStorageUpdate = steady_clock::now();
}
}
ChangeState(enabled ? TrafficState::Enabled : TrafficState::Disabled);
}
m_drapeEngine.SafeCall(&df::DrapeEngine::EnableTraffic, enabled);
if (enabled)
{
if (notifyUpdate)
OnTrafficDataUpdate();
else
RecalculateSubscription();
m_canSetMode = false;
}
else
m_routingSession.OnTrafficInfoClear();
}
void TrafficManager::Clear()
{
// TODO what should we do about subscriptions?
{
std::lock_guard<std::mutex> lock(m_mutex);
LOG(LINFO, ("Messages in cache:", m_messageCache.size()));
LOG(LINFO, ("Feeds in queue:", m_feedQueue.size()));
LOG(LINFO, ("MWMs with coloring:", m_allMwmColoring.size()));
LOG(LINFO, ("Clearing..."));
// TODO no longer needed
#ifdef traffic_dead_code
m_currentCacheSizeBytes = 0;
#endif
m_messageCache.clear();
m_feedQueue.clear();
// TODO no longer needed
#ifdef traffic_dead_code
m_mwmCache.clear();
#endif
// TODO figure out which of the ones below we still need
m_lastDrapeMwmsByRect.clear();
m_lastPositionMwmsByRect.clear();
#ifdef traffic_dead_code
m_requestedMwms.clear();
m_trafficETags.clear();
#endif
LOG(LINFO, ("Messages in cache:", m_messageCache.size()));
LOG(LINFO, ("Feeds in queue:", m_feedQueue.size()));
LOG(LINFO, ("MWMs with coloring:", m_allMwmColoring.size()));
}
OnTrafficDataUpdate();
}
void TrafficManager::SetDrapeEngine(ref_ptr<df::DrapeEngine> engine)
{
m_drapeEngine.Set(engine);
}
void TrafficManager::SetCurrentDataVersion(int64_t dataVersion)
{
m_currentDataVersion = dataVersion;
}
void TrafficManager::OnMwmDeregistered(platform::LocalCountryFile const & countryFile)
{
if (!IsEnabled())
return;
// TODO no longer needed
#ifdef traffic_dead_code
{
std::lock_guard<std::mutex> lock(m_mutex);
MwmSet::MwmId mwmId;
for (auto const & cacheEntry : m_mwmCache)
{
if (cacheEntry.first.IsDeregistered(countryFile))
{
mwmId = cacheEntry.first;
break;
}
}
ClearCache(mwmId);
}
#endif
}
void TrafficManager::OnDestroySurface()
{
Pause();
}
void TrafficManager::OnRecoverSurface()
{
Resume();
}
void TrafficManager::OnChangeRoutingSessionState(routing::SessionState previous, routing::SessionState current)
{
// TODO assert were running on the UI thread
LOG(LINFO, ("Routing session state changed from", previous, "to", current));
LOG(LINFO, ("Running on thread", std::this_thread::get_id()));
/*
* Filter based on session state (see routing_callbacks.hpp for states and transitions).
*
* GetAllRegions() seems to get fresh data during build (presumably also rebuild), which will be
* available on the next state transition (to RouteNotStarted or OnRoute) and remain unchanged
* until the next route (re)build. Therefore, calling GetAllRegions() when new state is one of
* RouteNotStarted, OnRoute or RouteNoFollowing, and clearing MWMs when the new state is
* NoValidRoute, and ignoring all other transitions, should work for our purposes.
* There may be cases where we first calculate the route, then subscribe to the regions and only
* then get notified about a traffic problem on the route, requiring us to recalculate.
*/
std::set<std::string> mwmNames;
if (current == routing::SessionState::RouteNotStarted
|| current == routing::SessionState::OnRoute
|| current == routing::SessionState::RouteNoFollowing)
/*
* GetAllRegions() may block when run for the first time. This should happen during routing, so
* the call here would always return cached results without blocking, causing no problems on the
* UI thread.
*
* Note that this method is called before the state is updated internally, thus `GetAllRegions()`
* and any other functions would still have `previous` as their state.
*/
m_routingSession.GetAllRegions(mwmNames);
else if (current == routing::SessionState::NoValidRoute)
mwmNames.clear();
else
// for all other states, keep current set
return;
LOG(LINFO, ("Router MWMs:", mwmNames));
std::set<MwmSet::MwmId> mwms;
for (auto const & mwmName : mwmNames)
{
MwmSet::MwmId mwmId = m_dataSource.GetMwmIdByCountryFile(platform::CountryFile(mwmName));
if (mwmId.IsAlive())
mwms.insert(mwmId);
}
LOG(LINFO, ("MWM set:", mwms));
{
std::lock_guard<std::mutex> lock(m_mutex);
if (mwms != m_activeRoutingMwms)
{
m_activeMwmsChanged = true;
std::swap(mwms, m_activeRoutingMwms);
if ((m_activeDrapeMwms.empty() && m_activePositionMwms.empty() && m_activeRoutingMwms.empty())
|| !IsEnabled() || IsInvalidState() || m_isPaused)
return;
m_condition.notify_one();
}
}
}
void TrafficManager::RecalculateSubscription()
{
if (!IsEnabled())
return;
if (m_currentModelView.second)
UpdateViewport(m_currentModelView.first);
if (m_currentPosition.second)
UpdateMyPosition(m_currentPosition.first);
// TODO update routing if needed
}
void TrafficManager::Invalidate(MwmSet::MwmId const & mwmId)
{
auto const mwmRect = mwmId.GetInfo()->m_bordersRect; // m2::RectD
traffxml::TraffFeed invalidated;
{
std::lock_guard<std::mutex> lock(m_mutex);
for (auto it = m_messageCache.begin(); it != m_messageCache.end(); )
{
if (!it->second.m_location)
continue;
bool isInvalid = false;
// invalidate if decoded location uses a previous version of the MWM
for (auto const & [decodedMwmId, coloring] : it->second.m_decoded)
if (decodedMwmId.GetInfo()->GetCountryName() == mwmId.GetInfo()->GetCountryName())
isInvalid = true;
// invalidate if bounding rect of reference points intersects with bounding rect of MWM
if (!isInvalid)
{
m2::RectD locationRect;
for (auto const & point : {it->second.m_location.value().m_from,
it->second.m_location.value().m_via,
it->second.m_location.value().m_at,
it->second.m_location.value().m_to})
if (point)
locationRect.Add(mercator::FromLatLon(point.value().m_coordinates));
isInvalid = locationRect.IsIntersect(mwmRect);
}
if (isInvalid)
{
traffxml::TraffMessage message(it->second);
message.m_decoded.clear();
invalidated.push_back(message);
it = m_messageCache.erase(it);
}
else
++it;
}
if (!invalidated.empty())
{
m_feedQueue.insert(m_feedQueue.begin(), invalidated);
m_condition.notify_one();
}
}
}
void TrafficManager::UpdateActiveMwms(m2::RectD const & rect,
std::vector<MwmSet::MwmId> & lastMwmsByRect,
std::set<MwmSet::MwmId> & activeMwms)
{
auto mwms = m_getMwmsByRectFn(rect);
if (lastMwmsByRect == mwms)
return;
lastMwmsByRect = mwms;
{
std::lock_guard<std::mutex> lock(m_mutex);
m_activeMwmsChanged = true;
activeMwms.clear();
for (auto const & mwm : mwms)
{
if (mwm.IsAlive())
activeMwms.insert(mwm);
}
if ((m_activeDrapeMwms.empty() && m_activePositionMwms.empty() && m_activeRoutingMwms.empty())
|| !IsEnabled() || IsInvalidState() || m_isPaused)
return;
m_condition.notify_one();
}
}
void TrafficManager::UpdateMyPosition(MyPosition const & myPosition)
{
// Side of square around |myPosition|. Every mwm which is covered by the square
// will get traffic info.
double const kSquareSideM = 5000.0;
m_currentPosition = {myPosition, true /* initialized */};
if (!IsEnabled() || IsInvalidState() || m_isPaused)
return;
m2::RectD const rect =
mercator::RectByCenterXYAndSizeInMeters(myPosition.m_position, kSquareSideM / 2.0);
// Request traffic.
UpdateActiveMwms(rect, m_lastPositionMwmsByRect, m_activePositionMwms);
}
void TrafficManager::UpdateViewport(ScreenBase const & screen)
{
m_currentModelView = {screen, true /* initialized */};
if (!IsEnabled() || IsInvalidState() || m_isPaused)
return;
if (df::GetZoomLevel(screen.GetScale()) < df::kRoadClass0ZoomLevel)
return;
// Request traffic.
UpdateActiveMwms(screen.ClipRect(), m_lastDrapeMwmsByRect, m_activeDrapeMwms);
}
void TrafficManager::SubscribeOrChangeSubscription()
{
std::set<MwmSet::MwmId> activeMwms;
if (m_activeMwmsChanged)
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_activeMwmsChanged = false;
UniteActiveMwms(activeMwms);
}
{
std::lock_guard<std::mutex> lock(m_trafficSourceMutex);
for (auto & source : m_trafficSources)
source->SubscribeOrChangeSubscription(activeMwms);
}
}
}
void TrafficManager::Unsubscribe()
{
std::lock_guard<std::mutex> lock(m_trafficSourceMutex);
for (auto & source : m_trafficSources)
source->Unsubscribe();
}
bool TrafficManager::RestoreCache()
{
ASSERT(m_storage, ("m_storage cannot be null"));
pugi::xml_document document;
if (!m_storage->Load(document))
{
LOG(LWARNING, ("Failed to reload cache from storage"));
return false;
}
traffxml::TraffFeed feedIn;
traffxml::TraffFeed feedOut;
bool hasDecoded = false;
bool hasUndecoded = false;
if (traffxml::ParseTraff(document, std::nullopt /* dataSource */, feedIn))
{
while (!feedIn.empty())
{
traffxml::TraffMessage message;
std::swap(message, feedIn.front());
feedIn.erase(feedIn.begin());
if (!message.IsExpired(traffxml::IsoTime::Now()))
{
if (!message.m_decoded.empty())
{
hasDecoded = true;
// store message in cache
m_messageCache.insert_or_assign(message.m_id, message);
}
else
{
hasUndecoded = true;
// message needs decoding, prepare to enqueue
feedOut.push_back(message);
}
}
}
if (!feedOut.empty())
m_feedQueue.insert(m_feedQueue.begin(), feedOut);
// update notification is callers responsibility
return hasDecoded && !hasUndecoded;
}
else
{
LOG(LWARNING, ("An error occurred parsing the cache file"));
}
return false;
}
void TrafficManager::Poll()
{
std::lock_guard<std::mutex> lock(m_trafficSourceMutex);
for (auto & source : m_trafficSources)
if (source->IsPollNeeded())
source->Poll();
}
void TrafficManager::ReceiveFeed(traffxml::TraffFeed feed)
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_feedQueue.push_back(feed);
}
m_condition.notify_one();
}
void TrafficManager::RegisterSource(std::unique_ptr<traffxml::TraffSource> source)
{
std::set<MwmSet::MwmId> activeMwms;
{
std::lock_guard<std::mutex> lock(m_mutex);
UniteActiveMwms(activeMwms);
}
source->SubscribeOrChangeSubscription(activeMwms);
{
std::lock_guard<std::mutex> lock(m_trafficSourceMutex);
m_trafficSources.push_back(std::move(source));
}
m_isPollNeeded = true;
}
void TrafficManager::PurgeExpiredMessages()
{
PurgeExpiredMessagesImpl();
OnTrafficDataUpdate();
}
bool TrafficManager::PurgeExpiredMessagesImpl()
{
std::lock_guard<std::mutex> lock(m_mutex);
bool result = false;
LOG(LINFO, ("before:", m_messageCache.size(), "message(s)"));
traffxml::IsoTime now = traffxml::IsoTime::Now();
for (auto it = m_messageCache.begin(); it != m_messageCache.end(); )
{
if (it->second.IsExpired(now))
{
it = m_messageCache.erase(it);
result = true;
}
else
++it;
}
LOG(LINFO, ("after:", m_messageCache.size(), "message(s)"));
return result;
}
void TrafficManager::ConsolidateFeedQueue()
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_feedQueue.empty())
return;
for (size_t i = m_feedQueue.size() - 1; i <= 0; i--)
for (size_t j = m_feedQueue.size() - 1; j <= 0; j--)
{
if (i == j)
continue;
for (auto it_i = m_feedQueue[i].begin(); it_i != m_feedQueue[i].end(); )
for (auto it_j = m_feedQueue[j].end(); it_j != m_feedQueue[j].end(); )
if (it_i->m_id == it_j->m_id)
{
// dupe, remove older
if (it_i->m_updateTime < it_j->m_updateTime)
{
// standard case: i has the newer one
++it_i;
it_j = m_feedQueue[j].erase(it_j);
}
else if (it_i->m_updateTime < it_j->m_updateTime)
{
// j has the newer one
it_i = m_feedQueue[i].erase(it_i);
++it_j;
}
else if (i > j)
{
// same time, but feed i was received after j, keep i
++it_i;
it_j = m_feedQueue[j].erase(it_j);
}
else
{
// same time, but feed j was received after i, keep j
ASSERT(i != j, ());
it_i = m_feedQueue[i].erase(it_i);
++it_j;
}
}
}
// remove empty feeds
for (auto it = m_feedQueue.begin(); it != m_feedQueue.end(); )
if (it->empty())
it = m_feedQueue.erase(it);
else
++it;
}
void TrafficManager::DecodeFirstMessage()
{
traffxml::TraffMessage message;
{
// Lock the mutex while iterating over the feed queue
std::lock_guard<std::mutex> lock(m_mutex);
// remove empty feeds from the beginning of the queue
while (!m_feedQueue.empty() && m_feedQueue.front().empty())
m_feedQueue.erase(m_feedQueue.begin());
// if we have no more feeds, return (nothing to do)
if (m_feedQueue.empty())
return;
// retrieve the first message from the first feed, remove it from the feed
std::swap(message, m_feedQueue.front().front());
m_feedQueue.front().erase(m_feedQueue.front().begin());
// if the feed has no more messages, erase it (eager erase, as an empty queue is used as a condition later)
if (m_feedQueue.front().empty())
m_feedQueue.erase(m_feedQueue.begin());
}
{
std::lock_guard<std::mutex> lock(m_mutex);
// check if message is actually newer
auto it = m_messageCache.find(message.m_id);
bool process = (it == m_messageCache.end());
if (!process)
process = (it->second.m_updateTime < message.m_updateTime);
if (!process)
{
LOG(LINFO, ("message", message.m_id, "is already in cache, skipping"));
return;
}
}
LOG(LINFO, (" ", message.m_id, ":", message));
m_traffDecoder->DecodeMessage(message);
{
std::lock_guard<std::mutex> lock(m_mutex);
// store message in cache
m_messageCache.insert_or_assign(message.m_id, message);
}
/*
* TODO detect if we can do a quick update:
* - new message which does not replace any existing message
* - coloring “wins” over replaced message:
* - contains all the segments of the previous message (always true when location is the same)
* - speed groups are the same or lower as in previous message (always true when all members of
* traffic impact are unchanged or have worsened) for this purpose, closure is considered
* lower than any other speed group
* In this case, run:
* traffxml::MergeMultiMwmColoring(message.m_decoded, m_allMwmColoring);
* Otherwise, set a flag indicating we need to process coloring in full.
*/
}
void TrafficManager::ThreadRoutine()
{
// initially, treat last purge and drape/observer update as having just happened
auto lastPurged = steady_clock::now();
m_lastDrapeUpdate = steady_clock::now();
m_lastObserverUpdate = steady_clock::now();
while (WaitForRequest())
{
if (!IsEnabled())
continue;
/*
* Whether to call OnTrafficDataUpdate() at the end of the current round.
* The logic may fail to catch cases in which the first message in queue replaces another
* message without changing coloring. This would usually occur in a larger feed, where other
* messages would likely require an announcement, making this a minor issue. A single round
* (after a timeout) with no messages expired and an empty queue would not trigger an update.
*/
bool hasUpdates = false;
if (!IsTestMode())
{
if (steady_clock::now() - lastPurged >= kPurgeInterval)
{
lastPurged == steady_clock::now();
hasUpdates |= PurgeExpiredMessagesImpl();
}
LOG(LINFO, ("active MWMs changed:", m_activeMwmsChanged, ", poll needed:", m_isPollNeeded));
// this is a no-op if active MWMs have not changed
SubscribeOrChangeSubscription();
/*
* Poll sources if needed.
* m_isPollNeeded may be set by WaitForRequest() and set/unset by SubscribeOrChangeSubscription().
*/
if (m_isPollNeeded)
{
m_lastResponseTime = steady_clock::now();
m_isPollNeeded = false;
Poll();
}
}
LOG(LINFO, (m_feedQueue.size(), "feed(s) in queue"));
// consolidate feed queue (remove older messages in favor of newer ones)
ConsolidateFeedQueue();
hasUpdates |= !m_feedQueue.empty();
// decode one message and add it to the cache
DecodeFirstMessage();
// set new coloring for MWMs
// `m_mutex` is obtained inside the method, no need to do it here
// TODO drop the argument, use class member inside method
if (hasUpdates)
OnTrafficDataUpdate();
// TODO no longer needed
#ifdef traffic_dead_code
for (auto const & mwm : mwms)
{
if (!mwm.IsAlive())
continue;
traffic::TrafficInfo info(mwm, m_currentDataVersion);
std::string tag;
{
std::lock_guard<std::mutex> lock(m_mutex);
tag = m_trafficETags[mwm];
}
if (info.ReceiveTrafficData(tag))
{
OnTrafficDataResponse(std::move(info));
}
else
{
LOG(LWARNING, ("Traffic request failed. Mwm =", mwm));
OnTrafficRequestFailed(std::move(info));
}
{
std::lock_guard<std::mutex> lock(m_mutex);
m_trafficETags[mwm] = tag;
}
}
mwms.clear();
#endif
}
// Calling Unsubscribe() from the worker thread on exit makes thread synchronization easier
Unsubscribe();
}
bool TrafficManager::WaitForRequest()
{
std::unique_lock<std::mutex> lock(m_mutex);
/*
* if we got terminated, return false immediately
* (dont wait until sleep, we might not get much sleep if were busy processing a long feed)
*/
if (!m_isRunning)
return false;
if (IsEnabled())
{
// if we have feeds in the queue, return immediately
if (!m_feedQueue.empty())
{
LOG(LINFO, ("feed queue not empty, returning immediately"));
return true;
}
if (!IsTestMode())
{
// if update interval has elapsed, return immediately
auto const currentTime = steady_clock::now();
auto const passedSeconds = currentTime - m_lastResponseTime;
if (passedSeconds >= kUpdateInterval)
{
LOG(LINFO, ("last response was", passedSeconds, "ago, returning immediately"));
m_isPollNeeded = true;
return true;
}
}
}
LOG(LINFO, ("nothing to do for now, waiting for timeout or notification"));
bool const timeout = !m_condition.wait_for(lock, kUpdateInterval, [this]
{
// return true for any condition we want to process immediately
return !m_isRunning || (m_activeMwmsChanged && !IsTestMode()) || !m_feedQueue.empty();
});
// check again if we got terminated while waiting (or woken up because we got terminated)
if (!m_isRunning)
return false;
// this works as long as wait timeout is at least equal to the poll interval
if (IsEnabled())
m_isPollNeeded |= timeout;
LOG(LINFO, ("timeout:", timeout, "active MWMs changed:", m_activeMwmsChanged, "test mode:", IsTestMode()));
return true;
}
// TODO no longer needed
#ifdef traffic_dead_code
void TrafficManager::RequestTrafficSubscription()
{
if ((m_activeDrapeMwms.empty() && m_activePositionMwms.empty() && m_activeRoutingMwms.empty())
|| !IsEnabled() || IsInvalidState() || m_isPaused)
{
return;
}
ForEachActiveMwm([this](MwmSet::MwmId const & mwmId) {
ASSERT(mwmId.IsAlive(), ());
// TODO no longer needed
#ifdef traffic_dead_code
m_mwmCache.insert(mwmId);
#endif
});
m_condition.notify_one();
// TODO no longer needed
#ifdef traffic_dead_code
UpdateState();
#endif
}
void TrafficManager::OnTrafficRequestFailed(traffic::TrafficInfo && info)
{
std::lock_guard<std::mutex> lock(m_mutex);
auto it = m_mwmCache.find(info.GetMwmId());
if (it == m_mwmCache.end())
return;
it->second.m_isWaitingForResponse = false;
it->second.m_lastAvailability = info.GetAvailability();
if (info.GetAvailability() == traffic::TrafficInfo::Availability::Unknown &&
!it->second.m_isLoaded)
{
if (m_activeDrapeMwms.find(info.GetMwmId()) != m_activeDrapeMwms.cend() ||
m_activeRoutingMwms.find(info.GetMwmId()) != m_activeRoutingMwms.cend())
{
if (it->second.m_retriesCount < kMaxRetriesCount)
RequestTrafficData(info.GetMwmId(), true /* force */);
++it->second.m_retriesCount;
}
else
{
it->second.m_retriesCount = 0;
}
}
UpdateState();
}
#endif
void TrafficManager::OnTrafficDataUpdate()
{
bool feedQueueEmpty = false;
{
std::lock_guard<std::mutex> lock(m_mutex);
feedQueueEmpty = m_feedQueue.empty();
}
// Whether to notify the Drape engine of the update.
bool notifyDrape = (feedQueueEmpty);
// Whether to notify the observer of the update.
bool notifyObserver = (feedQueueEmpty);
// Whether to update the cache file.
bool updateStorage = (feedQueueEmpty);
if (!feedQueueEmpty)
{
auto const currentTime = steady_clock::now();
auto const drapeAge = currentTime - m_lastDrapeUpdate;
auto const observerAge = currentTime - m_lastObserverUpdate;
auto const storageAge = currentTime - m_lastStorageUpdate;
notifyDrape = (drapeAge >= kDrapeUpdateInterval);
notifyObserver = (observerAge >= kObserverUpdateInterval);
updateStorage = (storageAge >= kStorageUpdateInterval);
}
if (!m_storage || IsTestMode())
updateStorage = false;
if (updateStorage)
{
std::lock_guard<std::mutex> lock(m_mutex);
pugi::xml_document document;
traffxml::GenerateTraff(m_messageCache, document);
if (!m_storage->Save(document))
LOG(LWARNING, ("Storing message cache to file failed."));
m_lastStorageUpdate = steady_clock::now();
}
if (!notifyDrape && !notifyObserver)
return;
LOG(LINFO, ("Announcing traffic update, notifyDrape:", notifyDrape, "notifyObserver:", notifyObserver));
/*
* TODO introduce a flag to indicate we need to fully reprocess coloring, skip if it is false.
* The flag would get set when messages get deleted (including any clear/purge operations),
* or when a new message is added without indicating a simplified update in `DecodeFirstMessage()`.
* When we reprocess coloring in full (the block below), reset this flag.
*/
{
std::lock_guard<std::mutex> lock(m_mutex);
m_allMwmColoring.clear();
for (const auto & [id, message] : m_messageCache)
traffxml::MergeMultiMwmColoring(message.m_decoded, m_allMwmColoring);
}
/*
* Much of this code is copied and pasted together from old MWM code, with some minor adaptations:
*
* ForEachActiveMwm and the assertion (not the rest of the body) is from RequestTrafficData()
* (now RequestTrafficSubscription()), modification: cycle over all MWMs (active or not).
* trafficCache lookup is original code.
* TrafficInfo construction is taken fron ThreadRoutine(), with modifications (different constructor).
* The remainder of the loop is from OnTrafficDataResponse(traffic::TrafficInfo &&), with some modifications
* (removed CacheEntry logic; deciding whether to notify a component and managing timestamps is original code).
* Existing coloring deletion (if there is no new coloring) is original code.
*/
ForEachMwm([this, notifyDrape, notifyObserver](std::shared_ptr<MwmInfo> info) {
std::lock_guard<std::mutex> lock(m_mutex);
if (info->GetCountryName().starts_with(WORLD_FILE_NAME))
return;
MwmSet::MwmId const mwmId(info);
ASSERT(mwmId.IsAlive(), ());
auto tcit = m_allMwmColoring.find(mwmId);
if (tcit != m_allMwmColoring.end())
{
traffic::TrafficInfo::Coloring coloring = tcit->second;
LOG(LINFO, ("Setting new coloring for", mwmId, "with", coloring.size(), "entries"));
traffic::TrafficInfo info(mwmId, std::move(coloring));
// TODO no longer needed
#ifdef traffic_dead_code
UpdateState();
#endif
if (notifyDrape)
{
/*
* TODO calling ClearTrafficCache before UpdateTraffic is a workaround for a bug in the
* Drape engine: some segments found in the old coloring but not in the new one may get
* left behind. This was not a problem for MapsWithMe as the set of segments never
* changed, but is an issue wherever the segment set is dynamic. Workaround is to clear
* before sending an update. Ultimately, the processing logic for UpdateTraffic needs to
* be fixed, but the code is hard to read (involves multiple messages getting thrown back
* and forth between threads).
*/
m_drapeEngine.SafeCall(&df::DrapeEngine::ClearTrafficCache,
static_cast<MwmSet::MwmId const &>(mwmId));
m_drapeEngine.SafeCall(&df::DrapeEngine::UpdateTraffic,
static_cast<traffic::TrafficInfo const &>(info));
m_lastDrapeUpdate = steady_clock::now();
}
if (notifyObserver)
{
// Update traffic colors for routing.
m_routingSession.OnTrafficInfoAdded(std::move(info));
m_lastObserverUpdate = steady_clock::now();
}
}
else
{
if (notifyDrape)
{
m_drapeEngine.SafeCall(&df::DrapeEngine::ClearTrafficCache,
static_cast<MwmSet::MwmId const &>(mwmId));
m_lastDrapeUpdate = steady_clock::now();
}
if (notifyObserver)
{
// Update traffic colors for routing.
m_routingSession.OnTrafficInfoRemoved(mwmId);
m_lastObserverUpdate = steady_clock::now();
}
}
});
}
// TODO no longer needed
#ifdef traffic_dead_code
void TrafficManager::OnTrafficDataResponse(traffic::TrafficInfo && info)
{
{
std::lock_guard<std::mutex> lock(m_mutex);
auto it = m_mwmCache.find(info.GetMwmId());
if (it == m_mwmCache.end())
return;
it->second.m_isLoaded = true;
it->second.m_lastResponseTime = steady_clock::now();
it->second.m_isWaitingForResponse = false;
it->second.m_lastAvailability = info.GetAvailability();
if (!info.GetColoring().empty())
{
// Update cache.
size_t constexpr kElementSize = sizeof(traffic::TrafficInfo::RoadSegmentId) + sizeof(traffic::SpeedGroup);
size_t const dataSize = info.GetColoring().size() * kElementSize;
m_currentCacheSizeBytes += (dataSize - it->second.m_dataSize);
it->second.m_dataSize = dataSize;
ShrinkCacheToAllowableSize();
}
UpdateState();
}
if (!info.GetColoring().empty())
{
m_drapeEngine.SafeCall(&df::DrapeEngine::UpdateTraffic,
static_cast<traffic::TrafficInfo const &>(info));
// Update traffic colors for routing.
m_observer.OnTrafficInfoAdded(std::move(info));
}
}
#endif
void TrafficManager::GetActiveMwms(std::set<MwmSet::MwmId> & activeMwms)
{
std::lock_guard<std::mutex> lock(m_mutex);
UniteActiveMwms(activeMwms);
}
void TrafficManager::UniteActiveMwms(std::set<MwmSet::MwmId> & activeMwms) const
{
activeMwms.insert(m_activeDrapeMwms.cbegin(), m_activeDrapeMwms.cend());
activeMwms.insert(m_activePositionMwms.cbegin(), m_activePositionMwms.cend());
activeMwms.insert(m_activeRoutingMwms.cbegin(), m_activeRoutingMwms.cend());
}
// TODO no longer needed
#ifdef traffic_dead_code
void TrafficManager::ShrinkCacheToAllowableSize()
{
// Calculating number of different active mwms.
std::set<MwmSet::MwmId> activeMwms;
UniteActiveMwms(activeMwms);
size_t const numActiveMwms = activeMwms.size();
if (m_currentCacheSizeBytes > m_maxCacheSizeBytes && m_mwmCache.size() > numActiveMwms)
{
std::multimap<time_point<steady_clock>, MwmSet::MwmId> seenTimings;
for (auto const & mwmInfo : m_mwmCache)
seenTimings.insert(std::make_pair(mwmInfo.second.m_lastActiveTime, mwmInfo.first));
auto itSeen = seenTimings.begin();
while (m_currentCacheSizeBytes > m_maxCacheSizeBytes && m_mwmCache.size() > numActiveMwms)
{
ClearCache(itSeen->second);
++itSeen;
}
}
}
void TrafficManager::ClearCache(MwmSet::MwmId const & mwmId)
{
auto const it = m_mwmCache.find(mwmId);
if (it == m_mwmCache.end())
return;
if (it->second.m_isLoaded)
{
// TODO no longer needed
#ifdef traffic_dead_code
ASSERT_GREATER_OR_EQUAL(m_currentCacheSizeBytes, it->second.m_dataSize, ());
m_currentCacheSizeBytes -= it->second.m_dataSize;
#endif
m_drapeEngine.SafeCall(&df::DrapeEngine::ClearTrafficCache, mwmId);
GetPlatform().RunTask(Platform::Thread::Gui, [this, mwmId]()
{
m_observer.OnTrafficInfoRemoved(mwmId);
});
}
m_mwmCache.erase(it);
m_trafficETags.erase(mwmId);
m_activeDrapeMwms.erase(mwmId);
m_activeRoutingMwms.erase(mwmId);
m_lastDrapeMwmsByRect.clear();
m_lastRoutingMwmsByRect.clear();
}
#endif
bool TrafficManager::IsEnabled() const
{
return m_state != TrafficState::Disabled;
}
bool TrafficManager::IsInvalidState() const
{
return m_state == TrafficState::NetworkError;
}
// TODO no longer needed
#ifdef traffic_dead_code
void TrafficManager::UpdateState()
{
if (!IsEnabled() || IsInvalidState())
return;
auto const currentTime = steady_clock::now();
auto maxPassedTime = steady_clock::duration::zero();
bool waiting = false;
bool networkError = false;
bool expiredApp = false;
bool expiredData = false;
bool noData = false;
for (MwmSet::MwmId const & mwmId : m_activeDrapeMwms)
{
auto it = m_mwmCache.find(mwmId);
ASSERT(it != m_mwmCache.end(), ());
if (it->second.m_isWaitingForResponse)
{
waiting = true;
}
else
{
expiredApp |= it->second.m_lastAvailability == traffic::TrafficInfo::Availability::ExpiredApp;
expiredData |= it->second.m_lastAvailability == traffic::TrafficInfo::Availability::ExpiredData;
noData |= it->second.m_lastAvailability == traffic::TrafficInfo::Availability::NoData;
if (it->second.m_isLoaded)
{
auto const timeSinceLastResponse = currentTime - it->second.m_lastResponseTime;
if (timeSinceLastResponse > maxPassedTime)
maxPassedTime = timeSinceLastResponse;
}
else if (it->second.m_retriesCount >= kMaxRetriesCount)
{
networkError = true;
}
}
}
if (networkError || maxPassedTime >= kNetworkErrorTimeout)
ChangeState(TrafficState::NetworkError);
else if (waiting)
ChangeState(TrafficState::WaitingData);
else if (expiredApp)
ChangeState(TrafficState::ExpiredApp);
else if (expiredData)
ChangeState(TrafficState::ExpiredData);
else if (noData)
ChangeState(TrafficState::NoData);
else if (maxPassedTime >= kOutdatedDataTimeout)
ChangeState(TrafficState::Outdated);
else
ChangeState(TrafficState::Enabled);
}
#endif
void TrafficManager::ChangeState(TrafficState newState)
{
if (m_state == newState)
return;
m_state = newState;
GetPlatform().RunTask(Platform::Thread::Gui, [this, newState]()
{
if (m_onStateChangedFn != nullptr)
m_onStateChangedFn(newState);
});
}
void TrafficManager::OnEnterForeground()
{
Resume();
}
void TrafficManager::OnEnterBackground()
{
Pause();
}
void TrafficManager::Pause()
{
m_isPaused = true;
}
void TrafficManager::Resume()
{
if (!m_isPaused)
return;
m_isPaused = false;
RecalculateSubscription();
}
void TrafficManager::SetSimplifiedColorScheme(bool simplified)
{
m_hasSimplifiedColorScheme = simplified;
m_drapeEngine.SafeCall(&df::DrapeEngine::SetSimplifiedTrafficColors, simplified);
}
void TrafficManager::SetTestMode()
{
if (!m_canSetMode)
{
LOG(LWARNING, ("Mode cannot be set once the traffic manager has been enabled"));
return;
}
m_mode = Mode::Test;
}
std::string DebugPrint(TrafficManager::TrafficState state)
{
switch (state)
{
case TrafficManager::TrafficState::Disabled:
return "Disabled";
case TrafficManager::TrafficState::Enabled:
return "Enabled";
case TrafficManager::TrafficState::WaitingData:
return "WaitingData";
case TrafficManager::TrafficState::Outdated:
return "Outdated";
case TrafficManager::TrafficState::NoData:
return "NoData";
case TrafficManager::TrafficState::NetworkError:
return "NetworkError";
case TrafficManager::TrafficState::ExpiredData:
return "ExpiredData";
case TrafficManager::TrafficState::ExpiredApp:
return "ExpiredApp";
default:
ASSERT(false, ("Unknown state"));
}
return "Unknown";
}