diff --git a/map/framework.cpp b/map/framework.cpp index f5f5d2b9b..6315099d6 100644 --- a/map/framework.cpp +++ b/map/framework.cpp @@ -23,6 +23,8 @@ #include "storage/routing_helpers.hpp" #include "storage/storage_helpers.hpp" +#include "traffxml/traff_source.hpp" + #include "drape_frontend/color_constants.hpp" #include "drape_frontend/gps_track_point.hpp" #include "drape_frontend/visual_params.hpp" @@ -392,6 +394,12 @@ Framework::Framework(FrameworkParams const & params, bool loadMaps) LoadMapsSync(); m_trafficManager.SetEnabled(LoadTrafficEnabled()); + + /* + * MockTraffSource for debugging purposes. + * TODO Replace with a real source, parametrized and conditionally loaded, once we have one. + */ + traffxml::MockTraffSource::Create(m_trafficManager); } Framework::~Framework() diff --git a/map/traffic_manager.cpp b/map/traffic_manager.cpp index 8b6051d50..121fa0b17 100644 --- a/map/traffic_manager.cpp +++ b/map/traffic_manager.cpp @@ -439,78 +439,31 @@ void TrafficManager::UpdateViewport(ScreenBase const & screen) UpdateActiveMwms(screen.ClipRect(), m_lastDrapeMwmsByRect, m_activeDrapeMwms); } -std::string TrafficManager::GetMwmFilters(std::set & mwms) -{ - std::vector rects; - for (auto mwmId : mwms) - rects.push_back(mwmId.GetInfo()->m_bordersRect); - return traffxml::FiltersToXml(rects); -} - -// TODO make this work with multiple sources (e.g. Android) -bool TrafficManager::Subscribe(std::set & mwms) -{ - // TODO what if we’re subscribed already? - std::string filterList = GetMwmFilters(mwms); - // TODO - LOG(LINFO, ("Would subscribe to:\n", filterList)); - m_subscriptionId = "placeholder_subscription_id"; - m_isPollNeeded = true; // would be false if we got a feed here - return true; -} - -// TODO make this work with multiple sources (e.g. Android) -bool TrafficManager::ChangeSubscription(std::set & mwms) -{ - if (!IsSubscribed()) - return false; - std::string filterList = GetMwmFilters(mwms); - // TODO - LOG(LINFO, ("Would change subscription", m_subscriptionId, "to:\n", filterList)); - m_isPollNeeded = true; // would be false if we got a feed here - return true; -} - -bool TrafficManager::SetSubscriptionArea() +void TrafficManager::SubscribeOrChangeSubscription() { std::set activeMwms; - if (!IsSubscribed()) + if (m_activeMwmsChanged) { { std::lock_guard lock(m_mutex); m_activeMwmsChanged = false; UniteActiveMwms(activeMwms); } - if (!Subscribe(activeMwms)) - return false; - } - else if (m_activeMwmsChanged) - { + { - std::lock_guard lock(m_mutex); - m_activeMwmsChanged = false; - UniteActiveMwms(activeMwms); + std::lock_guard lock(m_trafficSourceMutex); + for (auto & source : m_trafficSources) + source->SubscribeOrChangeSubscription(activeMwms); } - if (!ChangeSubscription(activeMwms)) - return false; } - return true; } -// TODO make this work with multiple sources (e.g. Android) void TrafficManager::Unsubscribe() { - if (!IsSubscribed()) - return; - // TODO - LOG(LINFO, ("Would unsubscribe from", m_subscriptionId)); - m_subscriptionId.clear(); -} - -bool TrafficManager::IsSubscribed() -{ - return !m_subscriptionId.empty(); + std::lock_guard lock(m_trafficSourceMutex); + for (auto & source : m_trafficSources) + source->Unsubscribe(); } bool TrafficManager::RestoreCache() @@ -563,51 +516,42 @@ bool TrafficManager::RestoreCache() return false; } -// TODO make this work with multiple sources (e.g. Android) -// TODO deal with subscriptions rejected by the server (delete, resubscribe) -bool TrafficManager::Poll() +void TrafficManager::Poll() { - // TODO - //std::string fileName("test_data/traff/PL-A18-Krzyzowa-Lipiany.xml"); - std::string fileName("test_data/traff/PL-A18-Krzyzowa-Lipiany-bidir.xml"); - //std::string fileName("test_data/traff/LT-A1-Vezaiciai-Endriejavas.xml"); - traffxml::LocalStorage storage(fileName); - pugi::xml_document document; - auto const load_result = storage.Load(document); - if (!load_result) - return false; - - std::setlocale(LC_ALL, "en_US.UTF-8"); - traffxml::TraffFeed feed; - if (traffxml::ParseTraff(document, std::nullopt /* dataSource */, feed)) - { - { - std::lock_guard lock(m_mutex); - m_feedQueue.push_back(feed); - } - m_lastResponseTime = steady_clock::now(); - m_isPollNeeded = false; - return true; - } - else - { - LOG(LWARNING, ("An error occurred parsing the TraFF feed")); - // TODO should we really reset m_isPollNeeded here? - m_isPollNeeded = false; - return false; - } + std::lock_guard lock(m_trafficSourceMutex); + for (auto & source : m_trafficSources) + if (source->IsPollNeeded()) + source->Poll(); } -void TrafficManager::Push(traffxml::TraffFeed feed) +void TrafficManager::ReceiveFeed(traffxml::TraffFeed feed) { { std::lock_guard lock(m_mutex); m_feedQueue.push_back(feed); - // TODO should we update m_lastResponseTime? } m_condition.notify_one(); } +void TrafficManager::RegisterSource(std::unique_ptr source) +{ + std::set activeMwms; + + { + std::lock_guard lock(m_mutex); + UniteActiveMwms(activeMwms); + } + + source->SubscribeOrChangeSubscription(activeMwms); + + { + std::lock_guard lock(m_trafficSourceMutex); + m_trafficSources.push_back(std::move(source)); + } + + m_isPollNeeded = true; +} + void TrafficManager::PurgeExpiredMessages() { PurgeExpiredMessagesImpl(); @@ -759,25 +703,17 @@ void TrafficManager::ThreadRoutine() LOG(LINFO, ("active MWMs changed:", m_activeMwmsChanged, ", poll needed:", m_isPollNeeded)); // this is a no-op if active MWMs have not changed - if (!SetSubscriptionArea()) - { - LOG(LWARNING, ("SetSubscriptionArea failed.")); - if (!IsSubscribed()) - // do not skip out of the loop, we may need to process pushed feeds - LOG(LWARNING, ("No subscription, no traffic data will be retrieved.")); - } + SubscribeOrChangeSubscription(); /* - * Fetch traffic data if needed and we have a subscription. - * m_isPollNeeded may be set by WaitForRequest() and set/unset by SetSubscriptionArea(). - */ - if (m_isPollNeeded && IsSubscribed()) + * Poll sources if needed. + * m_isPollNeeded may be set by WaitForRequest() and set/unset by SubscribeOrChangeSubscription(). + */ + if (m_isPollNeeded) { - if (!Poll()) - { - LOG(LWARNING, ("Poll failed.")); - // TODO set failed status somewhere and retry - } + m_lastResponseTime = steady_clock::now(); + m_isPollNeeded = false; + Poll(); } } LOG(LINFO, (m_feedQueue.size(), "feed(s) in queue")); @@ -1152,6 +1088,12 @@ void TrafficManager::OnTrafficDataResponse(traffic::TrafficInfo && info) } #endif +void TrafficManager::GetActiveMwms(std::set & activeMwms) +{ + std::lock_guard lock(m_mutex); + UniteActiveMwms(activeMwms); +} + void TrafficManager::UniteActiveMwms(std::set & activeMwms) const { activeMwms.insert(m_activeDrapeMwms.cbegin(), m_activeDrapeMwms.cend()); diff --git a/map/traffic_manager.hpp b/map/traffic_manager.hpp index 44ab7fa62..50256815b 100644 --- a/map/traffic_manager.hpp +++ b/map/traffic_manager.hpp @@ -15,6 +15,7 @@ #include "traffxml/traff_decoder.hpp" #include "traffxml/traff_model.hpp" +#include "traffxml/traff_source.hpp" #include "traffxml/traff_storage.hpp" #include "geometry/point2d.hpp" @@ -36,7 +37,7 @@ #include #include -class TrafficManager final +class TrafficManager final : public traffxml::TraffSourceManager { public: using CountryInfoGetterFn = std::function; @@ -234,15 +235,36 @@ public: void SetTestMode(); /** - * @brief Processes a traffic feed received through a push operation. + * @brief Processes a traffic feed. * - * Push is safe to call from any thread. + * The feed may be a result of a pull operation, or received through a push operation. + * (Push operations are not supported by all sources.) * - * Push operations are not supported on all platforms. + * This method is safe to call from any thread. * * @param feed The traffic feed. */ - void Push(traffxml::TraffFeed feed); + virtual void ReceiveFeed(traffxml::TraffFeed feed) override; + + /** + * @brief Registers a `TraffSource`. + * @param source The source. + */ + virtual void RegisterSource(std::unique_ptr source) override; + + /** + * @brief Retrieves all currently active MWMs. + * + * This method retrieves all MWMs in the viewport, within a certain distance of the current + * position (if there is a valid position) or part of the route (if any), and stores them in + * `activeMwms`. + * + * This method locks `m_mutex` and is therefore safe to call from any thread. Callers which + * already hold `m_mutex` can use the private `UniteActiveMwms()` method instead. + * + * @param activeMwms Retrieves the list of active MWMs. + */ + virtual void GetActiveMwms(std::set & activeMwms) override; /** * @brief Purges expired messages from the cache. @@ -332,50 +354,21 @@ private: }; /** - * @brief Returns a TraFF filter list for a set of MWMs. + * @brief Ensures every TraFF source has a subscription covering all currently active MWMs. * - * @param mwms The MWMs for which a filter list is to be created. - * @return A `filter_list` in XML format. + * This method cycles through all TraFF sources in `m_trafficSources` and calls + * `SubscribeOrChangeSubscription()` on each of them. */ - std::string GetMwmFilters(std::set & mwms); + void SubscribeOrChangeSubscription(); /** - * @brief Subscribes to a traffic service. + * @brief Unsubscribes from all traffic services we are subscribed to. * - * @param mwms The MWMs for which data is needed. - * @return true on success, false on failure. - */ - bool Subscribe(std::set & mwms); - - /** - * @brief Changes an existing traffic subscription. - * - * @param mwms The new set of MWMs for which data is needed. - * @return true on success, false on failure. - */ - bool ChangeSubscription(std::set & mwms); - - /** - * @brief Ensures we have a subscription covering all currently active MWMs. - * - * This method subscribes to a traffic service if not already subscribed, or changes the existing - * subscription otherwise. - * - * @return true on success, false on failure. - */ - bool SetSubscriptionArea(); - - /** - * @brief Unsubscribes from a traffic service we are subscribed to. + * This method cycles through all TraFF sources in `m_trafficSources` and calls `Unsubscribe()` + * on each of them. */ void Unsubscribe(); - /** - * @brief Whether we are currently subscribed to a traffic service. - * @return - */ - bool IsSubscribed(); - /** * @brief Restores the message cache from file storage. * @@ -393,11 +386,12 @@ private: bool RestoreCache(); /** - * @brief Polls the traffic service for updates. + * @brief Polls all traffic services for updates. * - * @return true on success, false on failure. + * This method cycles through all TraFF sources in `m_trafficSources` and calls `IsPollNeeded()` + * on each of them. If this method returns true, it then calls `Poll()` on the source. */ - bool Poll(); + void Poll(); /** * @brief Purges expired messages from the cache. @@ -568,6 +562,18 @@ private: void OnChangeRoutingSessionState(routing::SessionState previous, routing::SessionState current); + /** + * @brief Retrieves all currently active MWMs. + * + * This method retrieves all MWMs in the viewport, within a certain distance of the current + * position (if there is a valid position) or part of the route (if any), and stores them in + * `activeMwms`. + * + * The caller must hold `m_mutex` prior to calling this method. `GetActiveMwms()` is available + * as a convenience wrapper which locks `m_mutex`, calls this method and releases it. + * + * @param activeMwms Retrieves the list of active MWMs. + */ void UniteActiveMwms(std::set & activeMwms) const; void Pause(); @@ -634,6 +640,13 @@ private: std::map m_mwmCache; + /** + * @brief The TraFF sources from which we get traffic information. + * + * Threads must lock `m_trafficSourceMutex` prior to accessing this member. + */ + std::vector> m_trafficSources; + bool m_isRunning; std::condition_variable m_condition; @@ -691,9 +704,18 @@ private: * @brief Mutex for access to shared members. * * Threads which access shared members (see documentation) must lock this mutex while doing so. + * + * @note To access `m_trafficSource`, lock `m_trafficSourceMutex`, not this mutex. */ std::mutex m_mutex; + /** + * @brief Mutex for access to `m_trafficSources`. + * + * Threads which access `m_trafficSources` must lock this mutex while doing so. + */ + std::mutex m_trafficSourceMutex; + /** * @brief Worker thread which fetches traffic updates. */ @@ -724,18 +746,11 @@ private: */ bool m_activeMwmsChanged = false; - /** - * @brief The subscription ID received from the traffic server. - * - * An empty subscription ID means no subscription. - */ - std::string m_subscriptionId; - /** * @brief Whether a poll operation is needed. * - * Used in the worker thread. A poll operation is needed unless a subscription (or subscription - * change) operation was performed before and a feed was received as part of it. + * Used in the worker thread to indicate we need to poll all sources. The poll operation may still + * be inhibited for individual sources. */ bool m_isPollNeeded; diff --git a/traffxml/CMakeLists.txt b/traffxml/CMakeLists.txt index 93b4937d3..834f73918 100644 --- a/traffxml/CMakeLists.txt +++ b/traffxml/CMakeLists.txt @@ -7,6 +7,8 @@ set(SRC traff_model.hpp traff_model_xml.cpp traff_model_xml.hpp + traff_source.cpp + traff_source.hpp traff_storage.cpp traff_storage.hpp ) diff --git a/traffxml/traff_assessment_tool/mainwindow.cpp b/traffxml/traff_assessment_tool/mainwindow.cpp index ef2440139..3d551a6b5 100644 --- a/traffxml/traff_assessment_tool/mainwindow.cpp +++ b/traffxml/traff_assessment_tool/mainwindow.cpp @@ -376,7 +376,7 @@ void MainWindow::OnOpenTrafficSample() shiftedFeed.push_back(message); } LOG(LINFO, ("TraFF data parsed successfully, pushing")); - m_framework.GetTrafficManager().Push(shiftedFeed); + m_framework.GetTrafficManager().ReceiveFeed(shiftedFeed); LOG(LINFO, ("Push completed")); } else diff --git a/traffxml/traff_source.cpp b/traffxml/traff_source.cpp new file mode 100644 index 000000000..502d3cc95 --- /dev/null +++ b/traffxml/traff_source.cpp @@ -0,0 +1,105 @@ +#include "traffxml/traff_source.hpp" + +#include "traffxml/traff_model_xml.hpp" +#include "traffxml/traff_storage.hpp" + +#include "geometry/rect2d.hpp" + +#include + +namespace traffxml { +TraffSource::TraffSource(TraffSourceManager & manager) + : m_manager(manager) +{} + +void TraffSource::SubscribeOrChangeSubscription(std::set & mwms) +{ + std::lock_guard lock(m_mutex); + if (!IsSubscribed()) + Subscribe(mwms); + else + ChangeSubscription(mwms); +} + +std::string TraffSource::GetMwmFilters(std::set & mwms) +{ + std::vector rects; + for (auto mwmId : mwms) + rects.push_back(mwmId.GetInfo()->m_bordersRect); + return traffxml::FiltersToXml(rects); +} + +void MockTraffSource::Create(TraffSourceManager & manager) +{ + std::unique_ptr source = std::unique_ptr(new MockTraffSource(manager)); + manager.RegisterSource(std::move(source)); +} + +MockTraffSource::MockTraffSource(TraffSourceManager & manager) + : TraffSource(manager) +{} + +void MockTraffSource::Subscribe(std::set & mwms) +{ + std::string filterList = GetMwmFilters(mwms); + LOG(LINFO, ("Would subscribe to:\n", filterList)); + m_subscriptionId = "placeholder_subscription_id"; + m_nextRequestTime = std::chrono::steady_clock::now(); // would be in the future if we got a feed here +} + +void MockTraffSource::ChangeSubscription(std::set & mwms) +{ + if (!IsSubscribed()) + return; + std::string filterList = GetMwmFilters(mwms); + LOG(LINFO, ("Would change subscription", m_subscriptionId, "to:\n", filterList)); + m_nextRequestTime = std::chrono::steady_clock::now(); // would be in the future if we got a feed here +} + +void MockTraffSource::Unsubscribe() +{ + std::lock_guard lock(m_mutex); + if (!IsSubscribed()) + return; + LOG(LINFO, ("Would unsubscribe from", m_subscriptionId)); + m_subscriptionId.clear(); +} + +bool MockTraffSource::IsPollNeeded() +{ + return m_nextRequestTime.load() <= std::chrono::steady_clock::now(); +} + +void MockTraffSource::Poll() +{ + //std::string fileName("test_data/traff/PL-A18-Krzyzowa-Lipiany.xml"); + std::string fileName("test_data/traff/PL-A18-Krzyzowa-Lipiany-bidir.xml"); + //std::string fileName("test_data/traff/LT-A1-Vezaiciai-Endriejavas.xml"); + traffxml::LocalStorage storage(fileName); + pugi::xml_document document; + auto const load_result = storage.Load(document); + if (!load_result) + return; + + m_lastRequestTime = std::chrono::steady_clock::now(); + std::setlocale(LC_ALL, "en_US.UTF-8"); + traffxml::TraffFeed feed; + if (traffxml::ParseTraff(document, std::nullopt /* dataSource */, feed)) + { + m_lastResponseTime = std::chrono::steady_clock::now(); + m_nextRequestTime = std::chrono::steady_clock::now() + m_updateInterval; + m_lastAvailability = Availability::IsAvailable; + m_manager.ReceiveFeed(feed); + } + else + { + LOG(LWARNING, ("An error occurred parsing the TraFF feed")); + m_lastAvailability = Availability::Error; + /* + * TODO how should we deal with future requests? + * Static files usually don’t change. + */ + m_nextRequestTime = std::chrono::steady_clock::now() + m_updateInterval; + } +} +} // namespace traffxml diff --git a/traffxml/traff_source.hpp b/traffxml/traff_source.hpp new file mode 100644 index 000000000..8497c6a5f --- /dev/null +++ b/traffxml/traff_source.hpp @@ -0,0 +1,426 @@ +#pragma once + +#include "traffxml/traff_model.hpp" + +#include "indexer/mwm_set.hpp" + +#include +#include +#include + +namespace traffxml +{ +class TraffSource; + +/** + * @brief Abstract class which manages TraFF sources. + * + * `TraffSource` and its subclasses register with `TraffSourceManager` upon creation. The + * `TraffSourceManager` calls `TraffSource` methods to manage its subscription and poll for + * messages, and exposes a method to deliver message feeds. + */ +class TraffSourceManager +{ +public: + /** + * @brief Retrieves all currently active MWMs. + * + * This method retrieves all MWMs for which traffic data is needed (viewport, current position + * and route) and stores them in `activeMwms`. + * + * Implementations must ensure thread safety, so that this method can be called from any thread. + * + * @param activeMwms Retrieves the list of active MWMs. + */ + virtual void GetActiveMwms(std::set & activeMwms) = 0; + + /** + * @brief Processes a traffic feed. + * + * The feed may be a result of a pull operation, or received through a push operation. + * (Push operations are not supported by all sources.) + * + * This method is safe to call from any thread. + * + * @param feed The traffic feed. + */ + virtual void ReceiveFeed(traffxml::TraffFeed feed) = 0; + + /** + * @brief Registers a `TraffSource`. + * @param source The source. + */ + virtual void RegisterSource(std::unique_ptr source) = 0; +}; + +/** + * @brief Abstract base class for TraFF sources. + * + * Subclasses encapsulate various forms of TraFF sources. The base class provides methods for + * subscription management, message retrieval and service status. + * + * Any `TraffSource` method may call `TrafficManager` methods exposed through the + * `TraffSourceManager` interface. The traffic manager must therefore ensure there is no conflict + * between thread-synchronization mechanisms held when calling a `TraffSource` method and those + * which may get requested when that method calls a `TraffSourceManager` method. + * + * Each subclass should implement a non-public constructor (private if the subclass is final, + * protected otherwise) and a public factory method. The factory method takes the same arguments + * as the constructor, creates an instance wrapped in a `std::unique_ptr` and registers it with + * the `TraffSourceManager`. It can be implemented as follows: + * ``` + * void SomeTraffSource::Create(TraffSourceManager & manager, SomeOtherArg & otherArg) + * { + * std::unique_ptr source = std::unique_ptr(new SomeTraffSource(manager, otherArg)); + * manager.RegisterSource(std::move(source)); + * } + * ``` + * + * Each subclass must provide implementations for `Subscribe()`, `ChangeSubscription()`, + * `Unsubscribe()`, `IsPollNeeded()` and `Poll()`. + * + * Most of these methods can be called from any thread, including the UI thread (see documentation + * of individual methods for details). This has two implications: + * + * Subclasses must ensure thread safety for methods they implement, in particular regarding access + * to shared members. This can be done by locking `m_mutex`. + * + * Also, methods should not block or perform lengthy operations. Network operations must be + * delegated to a separate thread (attempting a network operation on the UI thread will cause the + * application to be killed on Android). + * + * This class provides various protected members which subclasses can build upon. These include a + * reference to the `TraffSourceManager`, a mutex for thread-safe access, a subscription ID, + * timestamps for the last request and response, as well as for the next request, a retry count + * for failed operations, and an indication of a pending request. + */ +class TraffSource +{ +public: + /** + * @brief Whether traffic data is available. + * + * The default value upon creating a new instance should be `Unknown`. After that, the value + * should be changed based on the result of the last TraFF operation, as detailed below: + * + * `OK` changes the status from `Unknown`, or any error which would be resolved by the last + * operation, to `IsAvailable`. + * + * `INVALID` indicates a condition which should be treated as a bug, either in the source or its + * backend. It should generate a log entry, and changes the status to `Error`. + * + * `SUBSCRIPTION_REJECTED` changes the status to `SubscriptionRejected`. + * + * `NOT_COVERED` changes the status to `NotCovered`. + * + * `PARTIALLY_COVERED` has the same effect as `OK`. + * + * `SUBSCRIPTION_UNKNOWN` should be handled by clearing the subscription ID and resubscribing, + * then setting the status based on the result of the new subscription. + * + * `INTERNAL_ERROR` changes the status to `Error`. + * + * If the source does not seem to be connected to a valid backend (e.g. if a HTTP source responds + * with an HTTP error), the status should be changed to `Error`. + * + * If a TraFF `GET_CAPABILITIES` request returns a minimum version higher than supported by this + * application, the status should be changed to `ExpiredApp` and no further requests to the source + * should be attempted. + * + * @todo Should `PARTIALLY_COVERED`, or `GET_CAPABILITIES` reporting a target version higher than + * supported, be stored in the class instance? + */ + enum class Availability + { + /** + * The source is working normally. + * This status is reached after the first request was made, if it is successful. + */ + IsAvailable, + /** + * The source, or its backend, rejected the subscription. + * This may happen for various reasons, possibly because the requested area was too large. + * An existing subscription ID (if any) remains valid, but no poll operations should be + * attempted until the subscription is changed successfully. + */ + SubscriptionRejected, + /** + * The requested area is not covered by the source. + * An existing subscription ID (if any) remains valid, but poll operations will not return any + * messages until the subscription is changed successfully. + */ + NotCovered, + /** + * The source has reported an internal error, has reported an invalid request or returned + * invalid data. + * The failed operation should be retried at a resonably chosen interval. After the source + * resumes normal operation, previously issued subscription IDs may no longer be valid (in which + * case the caller should attempt to resubscribe) and/or messages may be repeated. + */ + Error, + /** The app does not support the minimum TraFF version required by the source. */ + ExpiredApp, + /** No request was made yet. */ + Unknown + }; + + /** + * @brief Ensures we have a subscription covering the MWMs indicated. + * + * This method subscribes to a traffic service if not already subscribed, or changes the existing + * subscription otherwise. + * + * The default implementation acquires the mutex before running the following code: + * + * ``` + * if (!IsSubscribed()) + * Subscribe(mwms); + * else + * ChangeSubscription(mwms); + * ``` + * + * Therefore, `IsSubscribed()`, `Subscribe()` and `ChangeSubscription()` need not (and should not) + * acquire the mutex on their own. + * + * @param mwms The new set of MWMs for which data is needed. + */ + virtual void SubscribeOrChangeSubscription(std::set & mwms); + + /** + * @brief Whether this source should be polled. + * + * Prior to calling `Poll()` on a source, the caller should always first call `IsPollNeeded()` and + * poll the source only if the result is true. + * + * It is up to the source to decide when to return true or false. Typically a source would return + * false if another request is still pending, a predefined poll interval has not yet elapsed since + * the previous successful response, during the retry interval following an error, or if an error + * is not recoverable (such as `ExpiredApp`). In all other case it would return true. + * + * This method is only called from the `TrafficManager` worker thread. + * + * @return true if the source should be polled, false if not. + */ + virtual bool IsPollNeeded() = 0; + + /** + * @brief Polls the traffic service for updates. + * + * For sources which reliably push data, this implementation may do nothing. + * + * It is up to the caller to call `IsPollNeeded()` prior to calling this function, and use its + * result to decide whether or not to poll, or to force a poll operation. + * + * Sources should handle cases in which the backend responds with `SUBSCRIPTION_UNKNOWN`, usually + * by deleting the subscription ID and resubscribing to the set of active MWMs. The set of active + * MWMs can be retrieved by calling `m_manager.GetActiveMwms()`. + * + * This method is only called from the `TrafficManager` worker thread. + */ + virtual void Poll() = 0; + + /** + * @brief Unsubscribes from a traffic service we are subscribed to. + * + * Unsubscribing without being subscribed is a no-op. + */ + virtual void Unsubscribe() = 0; + +protected: + /** + * @brief Constructs a new `TraffSource`. + * @param manager The `TrafficSourceManager` instance to register the source with. + */ + TraffSource(TraffSourceManager & manager); + + /** + * @brief Returns a TraFF filter list for a set of MWMs. + * + * @param mwms The MWMs for which a filter list is to be created. + * @return A `filter_list` in XML format. + */ + static std::string GetMwmFilters(std::set & mwms); + + /** + * @brief Subscribes to a traffic service. + * + * If the default implementation of `SubscribeOrChangeSubscription()` is used, `m_mutex` is + * acquired before this method is called, and implementations do not need to (and should not) + * acquire it again. Any other calls to this method must be protected by acquiring `m_mutex`. + * + * @param mwms The MWMs for which data is needed. + */ + virtual void Subscribe(std::set & mwms) = 0; + + /** + * @brief Changes an existing traffic subscription. + * + * If the default implementation of `SubscribeOrChangeSubscription()` is used, `m_mutex` is + * acquired before this method is called, and implementations do not need to (and should not) + * acquire it again. Any other calls to this method must be protected by acquiring `m_mutex`. + * + * Sources should handle cases in which the backend responds with `SUBSCRIPTION_UNKNOWN`, usually + * by deleting the subscription ID and resubscribing to `mwms`. Asynchronous implementations, in + * which `mwms` may no longer be available when the operation completes, can retrieve the set of + * active MWMs can be retrieved by calling `m_manager.GetActiveMwms()`. + * + * @param mwms The new set of MWMs for which data is needed. + */ + virtual void ChangeSubscription(std::set & mwms) = 0; + + /** + * @brief Whether we are currently subscribed to a traffic service. + * + * If the default implementation of `SubscribeOrChangeSubscription()` is used, `m_mutex` is + * acquired before this method is called, and implementations do not need to (and should not) + * acquire it again. Any other calls to this method must be protected by acquiring `m_mutex`. + * + * @return true if subscribed, false if not. + */ + virtual bool IsSubscribed() { return !m_subscriptionId.empty(); } + + TraffSourceManager & m_manager; + + /** + * @brief Mutex for access to shared members. + * + * Any access to members shared between threads must be protected by obtaining this mutex first. + */ + std::mutex m_mutex; + + /** + * @brief The subscription ID received from the backend. + * + * An empty subscription ID means no subscription. + */ + std::string m_subscriptionId; + + /** + * @brief When the last update request occurred. + * + * This timestamp is the basis for determining whether an update is needed. + * + * It is initially in the past. Subclasses that use it should update it whenever a request is made. + */ + std::atomic> m_lastRequestTime; + + /** + * @brief When the last response was received. + * + * This timestamp is the basis for determining whether a network request timed out, or if data is + * outdated. + * + * It is initially in the past. Subclasses that use it should update it whenever a response to a + * request is received. + */ + std::atomic> m_lastResponseTime; + + /** + * @brief When the next request should be made. + * + * This timestamp is initiated to current time and updated when a request is made, or a response + * is received. + * + * It is initially in the present. Subclasses that use it should update it on every request or + * response, setting it a defined timespan into the future. + */ + std::atomic> m_nextRequestTime = std::chrono::steady_clock::now(); + + /** + * @brief The number of failed traffic requests for this source. + * + * Reset when a request is successful. + */ + std::atomic m_retriesCount = 0; + + /** + * @brief Whether a request is currently pending for this source. + * + * Set to `true` when a request is scheduled, reverted to `false` when a response is received or + * the request fails. + */ + std::atomic m_isWaitingForResponse = false; + + /** + * @brief The last reported availability of the traffic source. + * + * See the documentation of `Availability` for possible values and their meanings. + * + * Availability is `Unknown` until a result for the first request (positive or negative) has been + * received. Subclasses must update this value, ensuring it always correctly reflects the status + * of the source. + */ + std::atomic m_lastAvailability = Availability::Unknown; + +private: + DISALLOW_COPY(TraffSource); +}; + +/** + * @brief A mock TraFF source. + * + * This source will accept any and all subscription requests and return a static subscription ID. + * Polling will return a static set of messages. + */ +class MockTraffSource : public TraffSource +{ +public: + /** + * @brief Creates a new `MockTraffSource` instance and registers it with the traffic manager. + * + * @param manager The traffic manager to register the new instance with + */ + static void Create(TraffSourceManager & manager); + + /** + * @brief Subscribes to a traffic service. + * + * @param mwms The MWMs for which data is needed. + */ + virtual void Subscribe(std::set & mwms) override; + + /** + * @brief Changes an existing traffic subscription. + * + * @param mwms The new set of MWMs for which data is needed. + */ + virtual void ChangeSubscription(std::set & mwms) override; + + /** + * @brief Unsubscribes from a traffic service we are subscribed to. + */ + virtual void Unsubscribe() override; + + /** + * @brief Whether this source should be polled. + * + * Prior to calling `Poll()` on a source, the caller should always first call `IsPollNeeded()` and + * poll the source only if the result is true. + * + * This implementation uses `m_nextRequestTime` to determine when the next poll is due. When a + * feed is received, `m_nextRequestTime` is set to a point in time 5 minutes in the future. As + * long as `m_nextRequestTime` is in the future, this method returns false. + * + * @return true if the source should be polled, false if not. + */ + virtual bool IsPollNeeded() override; + + /** + * @brief Polls the traffic service for updates. + */ + virtual void Poll() override; + +protected: + /** + * @brief Constructs a new `MockTraffSource`. + * @param manager The `TrafficSourceManager` instance to register the source with. + */ + MockTraffSource(TraffSourceManager & manager); + +private: + /** + * @brief The update interval, 5 minutes. + */ + static auto constexpr m_updateInterval = std::chrono::minutes(5); +}; +} // namespace traffxml