[traffxml] Make sources pluggable

Signed-off-by: mvglasow <michael -at- vonglasow.com>
This commit is contained in:
mvglasow
2025-07-03 23:22:17 +03:00
parent f32493faaa
commit 6656c7e441
7 changed files with 658 additions and 160 deletions

View File

@@ -439,78 +439,31 @@ void TrafficManager::UpdateViewport(ScreenBase const & screen)
UpdateActiveMwms(screen.ClipRect(), m_lastDrapeMwmsByRect, m_activeDrapeMwms);
}
std::string TrafficManager::GetMwmFilters(std::set<MwmSet::MwmId> & mwms)
{
std::vector<m2::RectD> rects;
for (auto mwmId : mwms)
rects.push_back(mwmId.GetInfo()->m_bordersRect);
return traffxml::FiltersToXml(rects);
}
// TODO make this work with multiple sources (e.g. Android)
bool TrafficManager::Subscribe(std::set<MwmSet::MwmId> & mwms)
{
// TODO what if were subscribed already?
std::string filterList = GetMwmFilters(mwms);
// TODO
LOG(LINFO, ("Would subscribe to:\n", filterList));
m_subscriptionId = "placeholder_subscription_id";
m_isPollNeeded = true; // would be false if we got a feed here
return true;
}
// TODO make this work with multiple sources (e.g. Android)
bool TrafficManager::ChangeSubscription(std::set<MwmSet::MwmId> & mwms)
{
if (!IsSubscribed())
return false;
std::string filterList = GetMwmFilters(mwms);
// TODO
LOG(LINFO, ("Would change subscription", m_subscriptionId, "to:\n", filterList));
m_isPollNeeded = true; // would be false if we got a feed here
return true;
}
bool TrafficManager::SetSubscriptionArea()
void TrafficManager::SubscribeOrChangeSubscription()
{
std::set<MwmSet::MwmId> activeMwms;
if (!IsSubscribed())
if (m_activeMwmsChanged)
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_activeMwmsChanged = false;
UniteActiveMwms(activeMwms);
}
if (!Subscribe(activeMwms))
return false;
}
else if (m_activeMwmsChanged)
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_activeMwmsChanged = false;
UniteActiveMwms(activeMwms);
std::lock_guard<std::mutex> lock(m_trafficSourceMutex);
for (auto & source : m_trafficSources)
source->SubscribeOrChangeSubscription(activeMwms);
}
if (!ChangeSubscription(activeMwms))
return false;
}
return true;
}
// TODO make this work with multiple sources (e.g. Android)
void TrafficManager::Unsubscribe()
{
if (!IsSubscribed())
return;
// TODO
LOG(LINFO, ("Would unsubscribe from", m_subscriptionId));
m_subscriptionId.clear();
}
bool TrafficManager::IsSubscribed()
{
return !m_subscriptionId.empty();
std::lock_guard<std::mutex> lock(m_trafficSourceMutex);
for (auto & source : m_trafficSources)
source->Unsubscribe();
}
bool TrafficManager::RestoreCache()
@@ -563,51 +516,42 @@ bool TrafficManager::RestoreCache()
return false;
}
// TODO make this work with multiple sources (e.g. Android)
// TODO deal with subscriptions rejected by the server (delete, resubscribe)
bool TrafficManager::Poll()
void TrafficManager::Poll()
{
// TODO
//std::string fileName("test_data/traff/PL-A18-Krzyzowa-Lipiany.xml");
std::string fileName("test_data/traff/PL-A18-Krzyzowa-Lipiany-bidir.xml");
//std::string fileName("test_data/traff/LT-A1-Vezaiciai-Endriejavas.xml");
traffxml::LocalStorage storage(fileName);
pugi::xml_document document;
auto const load_result = storage.Load(document);
if (!load_result)
return false;
std::setlocale(LC_ALL, "en_US.UTF-8");
traffxml::TraffFeed feed;
if (traffxml::ParseTraff(document, std::nullopt /* dataSource */, feed))
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_feedQueue.push_back(feed);
}
m_lastResponseTime = steady_clock::now();
m_isPollNeeded = false;
return true;
}
else
{
LOG(LWARNING, ("An error occurred parsing the TraFF feed"));
// TODO should we really reset m_isPollNeeded here?
m_isPollNeeded = false;
return false;
}
std::lock_guard<std::mutex> lock(m_trafficSourceMutex);
for (auto & source : m_trafficSources)
if (source->IsPollNeeded())
source->Poll();
}
void TrafficManager::Push(traffxml::TraffFeed feed)
void TrafficManager::ReceiveFeed(traffxml::TraffFeed feed)
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_feedQueue.push_back(feed);
// TODO should we update m_lastResponseTime?
}
m_condition.notify_one();
}
void TrafficManager::RegisterSource(std::unique_ptr<traffxml::TraffSource> source)
{
std::set<MwmSet::MwmId> activeMwms;
{
std::lock_guard<std::mutex> lock(m_mutex);
UniteActiveMwms(activeMwms);
}
source->SubscribeOrChangeSubscription(activeMwms);
{
std::lock_guard<std::mutex> lock(m_trafficSourceMutex);
m_trafficSources.push_back(std::move(source));
}
m_isPollNeeded = true;
}
void TrafficManager::PurgeExpiredMessages()
{
PurgeExpiredMessagesImpl();
@@ -759,25 +703,17 @@ void TrafficManager::ThreadRoutine()
LOG(LINFO, ("active MWMs changed:", m_activeMwmsChanged, ", poll needed:", m_isPollNeeded));
// this is a no-op if active MWMs have not changed
if (!SetSubscriptionArea())
{
LOG(LWARNING, ("SetSubscriptionArea failed."));
if (!IsSubscribed())
// do not skip out of the loop, we may need to process pushed feeds
LOG(LWARNING, ("No subscription, no traffic data will be retrieved."));
}
SubscribeOrChangeSubscription();
/*
* Fetch traffic data if needed and we have a subscription.
* m_isPollNeeded may be set by WaitForRequest() and set/unset by SetSubscriptionArea().
*/
if (m_isPollNeeded && IsSubscribed())
* Poll sources if needed.
* m_isPollNeeded may be set by WaitForRequest() and set/unset by SubscribeOrChangeSubscription().
*/
if (m_isPollNeeded)
{
if (!Poll())
{
LOG(LWARNING, ("Poll failed."));
// TODO set failed status somewhere and retry
}
m_lastResponseTime = steady_clock::now();
m_isPollNeeded = false;
Poll();
}
}
LOG(LINFO, (m_feedQueue.size(), "feed(s) in queue"));
@@ -1152,6 +1088,12 @@ void TrafficManager::OnTrafficDataResponse(traffic::TrafficInfo && info)
}
#endif
void TrafficManager::GetActiveMwms(std::set<MwmSet::MwmId> & activeMwms)
{
std::lock_guard<std::mutex> lock(m_mutex);
UniteActiveMwms(activeMwms);
}
void TrafficManager::UniteActiveMwms(std::set<MwmSet::MwmId> & activeMwms) const
{
activeMwms.insert(m_activeDrapeMwms.cbegin(), m_activeDrapeMwms.cend());