diff --git a/map/framework.cpp b/map/framework.cpp index 6315099d6..ccb72ac32 100644 --- a/map/framework.cpp +++ b/map/framework.cpp @@ -399,7 +399,10 @@ Framework::Framework(FrameworkParams const & params, bool loadMaps) * MockTraffSource for debugging purposes. * TODO Replace with a real source, parametrized and conditionally loaded, once we have one. */ - traffxml::MockTraffSource::Create(m_trafficManager); + //traffxml::MockTraffSource::Create(m_trafficManager); + + // For testing purposes, HttpTraffSource pointing to a hardcoded local instance + traffxml::HttpTraffSource::Create(m_trafficManager, "http://traff:8080/subscription-manager"); } Framework::~Framework() diff --git a/map/traffic_manager.cpp b/map/traffic_manager.cpp index faa669bbf..980350ffb 100644 --- a/map/traffic_manager.cpp +++ b/map/traffic_manager.cpp @@ -497,7 +497,8 @@ void TrafficManager::RegisterSource(std::unique_ptr sourc UniteActiveMwms(activeMwms); } - source->SubscribeOrChangeSubscription(activeMwms); + if (!activeMwms.empty()) + source->SubscribeOrChangeSubscription(activeMwms); { std::lock_guard lock(m_trafficSourceMutex); @@ -699,7 +700,6 @@ void TrafficManager::ThreadRoutine() if (hasUpdates) OnTrafficDataUpdate(); } - // Calling Unsubscribe() from the worker thread on exit makes thread synchronization easier Unsubscribe(); } diff --git a/traffxml/traff_model.cpp b/traffxml/traff_model.cpp index 028f720b5..0add21815 100644 --- a/traffxml/traff_model.cpp +++ b/traffxml/traff_model.cpp @@ -456,6 +456,23 @@ std::string DebugPrint(EventType eventType) UNREACHABLE(); } +std::string DebugPrint(ResponseStatus status) +{ + switch (status) + { + case ResponseStatus::Ok: return "Ok"; + case ResponseStatus::InvalidOperation: return "InvalidOperation"; + case ResponseStatus::SubscriptionRejected: return "SubscriptionRejected"; + case ResponseStatus::NotCovered: return "NotCovered"; + case ResponseStatus::PartiallyCovered: return "PartiallyCovered"; + case ResponseStatus::SubscriptionUnknown: return "SubscriptionUnknown"; + case ResponseStatus::PushRejected: return "PushRejected"; + case ResponseStatus::InternalError: return "InternalError"; + case ResponseStatus::Invalid: return "Invalid"; + } + UNREACHABLE(); +} + std::string DebugPrint(TrafficImpact impact) { std::ostringstream os; diff --git a/traffxml/traff_model.hpp b/traffxml/traff_model.hpp index f7d44531c..cf5d7724f 100644 --- a/traffxml/traff_model.hpp +++ b/traffxml/traff_model.hpp @@ -223,6 +223,76 @@ enum class EventType // TODO Security*, Transport*, Weather* }; +enum class ResponseStatus +{ + /** + * The operation was successful. + */ + Ok, + + /** + * The source rejected the operation as invalid + * + * This may happen when a nonexistent operation is attempted, or an operation is attempted with + * incomplete or otherwise invalid data. + * + * @note This corresponds to TraFF status `INVALID` but was renamed here. + * `ResponseStatus::Invalid` refers to a different kind of error. + */ + InvalidOperation, + + /** + * The source rejected the subscription, e.g. because the filtered region is too large. + */ + SubscriptionRejected, + + /** + * The source does not supply data for the requested area; the request has failed. + */ + NotCovered, + + /** + * The source supplies data only for a subset of the requested area; the request was successful + * (i.e. the subscription was created or changed as requested) but the consumer should be prepared + * to receive incomplete data. + */ + PartiallyCovered, + + /** + * An operation (change, push, pull) was attempted on a subscription which the recipient did not + * recognize. On transport channels which support stable identifiers for both communication + * parties, this is also used if a consumer attempts an operation on a subscription created by + * another consumer. + */ + SubscriptionUnknown, + + /** + * The aggregator does not accept unsolicited push requests from the sensor. Reserved for future + * versions and not used as of TraFF 0.8. + */ + PushRejected, + + /** + * An internal error prevented the recipient of the request from fulfilling it. + * + * This is either translated directly from `INTERNAL_ERROR` returned from the source, or may be + * inferred from errors on the transport channel (e.g. HTTP errors). + */ + InternalError, + + /** + * An unrecognized status code. + * + * This is used for all situations where we got a response from the source, with no indication of + * an error, but could not obtain a known status code from it (e.g. XML failed to parse, did not + * contain a status code, or contained an unknown status code). + * + * @note Not to be confused with TraFF status `INVALID`, which maps to + * `ResponseStatus::InvalidOperation`. + */ + Invalid +}; + /** * @brief Represents the impact of one or more traffic events. * @@ -436,6 +506,42 @@ using TraffFeed = std::vector; * the full filter list. */ +/** + * @brief Encapsulates the response to a TraFF request. + */ +struct TraffResponse +{ + /** + * @brief The response status for the request which triggered the response. + */ + ResponseStatus m_status = ResponseStatus::Invalid; + + /** + * @brief The subscription ID which the source has assigned to the subscriber. + * + * This attribute is how the source communicates the subscription ID to a subscriber. Required for + * responses to a subscription request; some transport channels may require it for every + * subscription-related operation; forbidden otherwise. + */ + std::string m_subscriptionId; + + /** + * @brief The time in seconds after which the source will consider the subscription invalid if no + * activity occurs. + * + * Required for responses to a subscription request on some transport channels, optional on other + * channels, forbidden for other requests. + * + * If not used, the value is zero. + */ + uint32_t m_timeout = 0; + + /** + * @brief A feed of traffic messages sent as part of the response. + */ + std::optional m_feed; +}; + /** * @brief Merges the contents of one `MultiMwmColoring` into another. * @@ -457,6 +563,7 @@ std::string DebugPrint(Ramps ramps); std::string DebugPrint(RoadClass roadClass); std::string DebugPrint(EventClass eventClass); std::string DebugPrint(EventType eventType); +std::string DebugPrint(ResponseStatus status); std::string DebugPrint(TrafficImpact impact); std::string DebugPrint(Point point); std::string DebugPrint(TraffLocation location); diff --git a/traffxml/traff_model_xml.cpp b/traffxml/traff_model_xml.cpp index 2466b5e65..93fcd0a0b 100644 --- a/traffxml/traff_model_xml.cpp +++ b/traffxml/traff_model_xml.cpp @@ -303,6 +303,41 @@ std::optional OptionalTimeFromXml(pugi::xml_attribute const & attribute return result; } +/** + * @brief Retrieves a response status from an attribute. + * + * @param attribute The XML attribute to retrieve. + * @param status Receives the status retrieved. + * @return `true` on success, `false` if the attribute is not set or set to an empty string. + */ +bool ResponseStatusFromXml(pugi::xml_attribute const & attribute, ResponseStatus & status) +{ + std::string statusString; + if (!StringFromXml(attribute, statusString)) + return false; + + if (statusString == "OK") + status = ResponseStatus::Ok; + else if (statusString == "INVALID") + status = ResponseStatus::InvalidOperation; + else if (statusString == "SUBSCRIPTION_REJECTED") + status = ResponseStatus::SubscriptionRejected; + else if (statusString == "NOT_COVERED") + status = ResponseStatus::NotCovered; + else if (statusString == "PARTIALLY_COVERED") + status = ResponseStatus::PartiallyCovered; + else if (statusString == "SUBSCRIPTION_UNKNOWN") + status = ResponseStatus::SubscriptionUnknown; + else if (statusString == "PUSH_REJECTED") + status = ResponseStatus::PushRejected; + else if (statusString == "INTERNAL_ERROR") + status = ResponseStatus::InternalError; + else + status = ResponseStatus::Invalid; + + return true; +} + /** * @brief Retrieves a boolean value from an attribute. * @param attribute The XML attribute to retrieve. @@ -1057,14 +1092,21 @@ void MessageToXml(TraffMessage const & message, pugi::xml_node node) } } -bool ParseTraff(pugi::xml_document const & document, - std::optional> dataSource, - TraffFeed & feed) +/** + * @brief Retrieves a TraFF feed from an XML element. + * @param node The XML element to retrieve (`feed`). + * @param dataSource The data source for coloring, see `ParseTraff()`. + * @param feed Receives the feed. + * @return `true` on success, `false` if the node does not exist or does not contain valid message data. + */ +bool FeedFromXml(pugi::xml_node const & node, + std::optional> dataSource, + TraffFeed & feed) { bool result = false; - // Select all messages elements that are direct children of the root. - auto const messages = document.document_element().select_nodes("./message"); + // Select all messages elements that are direct children of the node. + auto const messages = node.select_nodes("./message"); if (messages.empty()) return true; @@ -1085,6 +1127,13 @@ bool ParseTraff(pugi::xml_document const & document, return result; } +bool ParseTraff(pugi::xml_document const & document, + std::optional> dataSource, + TraffFeed & feed) +{ + return FeedFromXml(document.document_element(), dataSource, feed); +} + void GenerateTraff(TraffFeed const & feed, pugi::xml_document & document) { auto root = document.append_child("feed"); @@ -1117,4 +1166,39 @@ std::string FiltersToXml(std::vector & bboxRects) mercator::XToLon(rect.maxX())); return os.str(); } + +TraffResponse ParseResponse(std::string const & responseXml) +{ + TraffResponse result; + pugi::xml_document responseDocument; + if (!responseDocument.load_string(responseXml.c_str())) + return result; + + auto const responseElement = responseDocument.document_element(); + std::string responseElementName(responseElement.name()); + + if (responseElementName != "response") + return result; + + if (!ResponseStatusFromXml(responseElement.attribute("status"), result.m_status)) + return result; + + StringFromXml(responseElement.attribute("subscription_id"), result.m_subscriptionId); + + IntegerFromXml(responseElement.attribute("timeout"), result.m_timeout); + + LOG(LDEBUG, ("Response, status:", result.m_status, "subscription ID:", result.m_subscriptionId, "timeout:", result.m_timeout)); + + if (responseElement.child("feed")) + { + TraffFeed feed; + FeedFromXml(responseElement.child("feed"), std::nullopt /* dataSource */, feed); + LOG(LDEBUG, ("Feed received, number of messages:", feed.size())); + result.m_feed = std::move(feed); + } + else + LOG(LDEBUG, ("No feed in response")); + + return result; +} } // namespace openlr diff --git a/traffxml/traff_model_xml.hpp b/traffxml/traff_model_xml.hpp index ad2ef1371..68c5bb8e7 100644 --- a/traffxml/traff_model_xml.hpp +++ b/traffxml/traff_model_xml.hpp @@ -105,4 +105,17 @@ void GenerateTraff(std::map const & message * @return A string of XML `filter` elements. */ std::string FiltersToXml(std::vector & bboxRects); + +/** + * @brief Parses the response to a TraFF request. + * + * The response must comply with TraFF 0.8. The root element must be `response`. + * + * If a parsing error occurs, the response returned will have its `m_status` member set to + * `ResponseStatus::Invalid`. + * + * @param responseXml The response, as a string in XML format. + * @return The parsed response. + */ +TraffResponse ParseResponse(std::string const & responseXml); } // namespace traffxml diff --git a/traffxml/traff_source.cpp b/traffxml/traff_source.cpp index 502d3cc95..de829fca8 100644 --- a/traffxml/traff_source.cpp +++ b/traffxml/traff_source.cpp @@ -5,6 +5,10 @@ #include "geometry/rect2d.hpp" +#include "platform/platform.hpp" + +#include +#include #include namespace traffxml { @@ -102,4 +106,202 @@ void MockTraffSource::Poll() m_nextRequestTime = std::chrono::steady_clock::now() + m_updateInterval; } } + +TraffResponse HttpPost(std::string const & url, std::string data) +{ + platform::HttpClient request(url); + request.SetBodyData(data, "application/xml"); + + if (!request.RunHttpRequest() || request.ErrorCode() != 200) + { + TraffResponse result; + result.m_status = ResponseStatus::InternalError; + return result; + } + + LOG(LDEBUG, ("Got response, status", request.ErrorCode())); + + TraffResponse result = ParseResponse(request.ServerResponse()); + return result; +} + +void HttpTraffSource::Create(TraffSourceManager & manager, std::string const & url) +{ + std::unique_ptr source = std::unique_ptr(new HttpTraffSource(manager, url)); + manager.RegisterSource(std::move(source)); +} + +HttpTraffSource::HttpTraffSource(TraffSourceManager & manager, std::string const & url) + : TraffSource(manager) + , m_url(url) +{} + +void HttpTraffSource::Subscribe(std::set & mwms) +{ + std::string data = "\n\n" + + GetMwmFilters(mwms) + + "\n" + + ""; + LOG(LDEBUG, ("Sending request:\n", data)); + + threads::SimpleThread thread([this, data]() { + // TODO sometimes the request gets sent (and processed) twice + TraffResponse response = HttpPost(m_url, data); + OnSubscribeResponse(response); + return; + }); + thread.detach(); +} + +void HttpTraffSource::OnFeedReceived(TraffFeed & feed) +{ + m_lastResponseTime = std::chrono::steady_clock::now(); + m_nextRequestTime = std::chrono::steady_clock::now() + m_updateInterval; + m_lastAvailability = Availability::IsAvailable; + m_manager.ReceiveFeed(feed); +} + +void HttpTraffSource::OnSubscribeResponse(TraffResponse & response) +{ + if (response.m_status == ResponseStatus::Ok + || response.m_status == ResponseStatus::PartiallyCovered) + { + if (response.m_subscriptionId.empty()) + LOG(LWARNING, ("Server replied with", response.m_status, "but subscription ID is empty; ignoring")); + else + { + { + std::lock_guard lock(m_mutex); + m_subscriptionId = response.m_subscriptionId; + // TODO timeout + } + if (response.m_feed && !response.m_feed.value().empty()) + OnFeedReceived(response.m_feed.value()); + else + Poll(); + } + } + else + LOG(LWARNING, ("Subscribe request failed:", response.m_status)); +} + +void HttpTraffSource::ChangeSubscription(std::set & mwms) +{ + std::string data = "\n" + + "\n" + + GetMwmFilters(mwms) + + "\n" + + ""; + LOG(LDEBUG, ("Sending request:\n", data)); + + threads::SimpleThread thread([this, data]() { + TraffResponse response = HttpPost(m_url, data); + OnChangeSubscriptionResponse(response); + return; + }); + thread.detach(); +} + +void HttpTraffSource::OnChangeSubscriptionResponse(TraffResponse & response) +{ + if (response.m_status == ResponseStatus::Ok + || response.m_status == ResponseStatus::PartiallyCovered) + { + if (response.m_feed && !response.m_feed.value().empty()) + OnFeedReceived(response.m_feed.value()); + else + Poll(); + } + else if (response.m_status == ResponseStatus::SubscriptionUnknown) + { + LOG(LWARNING, ("Change Subscription returned", response.m_status, " – removing subscription", m_subscriptionId)); + { + std::lock_guard lock(m_mutex); + m_subscriptionId.clear(); + } + } + else + LOG(LWARNING, ("Change Subscription request failed:", response.m_status)); +} + +void HttpTraffSource::Unsubscribe() +{ + std::string data; + { + std::lock_guard lock(m_mutex); + + if (m_subscriptionId.empty()) + return; + data = ""; + } + + LOG(LDEBUG, ("Sending request:\n", data)); + + threads::SimpleThread thread([this, data]() { + TraffResponse response = HttpPost(m_url, data); + OnUnsubscribeResponse(response); + return; + }); + thread.detach(); +} + +void HttpTraffSource::OnUnsubscribeResponse(TraffResponse & response) +{ + if (response.m_status != ResponseStatus::Ok + && response.m_status != ResponseStatus::SubscriptionUnknown) + { + LOG(LWARNING, ("Unsubscribe returned", response.m_status, " – removing subscription")); + } + { + std::lock_guard lock(m_mutex); + m_subscriptionId.clear(); + } +} + +bool HttpTraffSource::IsPollNeeded() +{ + // TODO revisit logic + return m_nextRequestTime.load() <= std::chrono::steady_clock::now(); +} + +void HttpTraffSource::Poll() +{ + std::string data; + { + std::lock_guard lock(m_mutex); + + if (m_subscriptionId.empty()) + return; + data = ""; + } + + LOG(LDEBUG, ("Sending request:\n", data)); + + threads::SimpleThread thread([this, data]() { + // TODO sometimes the request gets sent (and processed) twice + TraffResponse response = HttpPost(m_url, data); + OnPollResponse(response); + return; + }); + thread.detach(); +} + +void HttpTraffSource::OnPollResponse(TraffResponse & response) +{ + if (response.m_status == ResponseStatus::Ok) + { + if (response.m_feed && !response.m_feed.value().empty()) + OnFeedReceived(response.m_feed.value()); + } + else if (response.m_status == ResponseStatus::SubscriptionUnknown) + { + LOG(LWARNING, ("Poll returned", response.m_status, " – removing subscription", m_subscriptionId)); + { + std::lock_guard lock(m_mutex); + m_subscriptionId.clear(); + } + } + else + LOG(LWARNING, ("Poll returned", response.m_status)); +} } // namespace traffxml diff --git a/traffxml/traff_source.hpp b/traffxml/traff_source.hpp index 8497c6a5f..840d1b5d9 100644 --- a/traffxml/traff_source.hpp +++ b/traffxml/traff_source.hpp @@ -2,8 +2,12 @@ #include "traffxml/traff_model.hpp" +#include "base/thread.hpp" + #include "indexer/mwm_set.hpp" +#include "platform/http_client.hpp" + #include #include #include @@ -423,4 +427,115 @@ private: */ static auto constexpr m_updateInterval = std::chrono::minutes(5); }; + +/** + * @brief A TraFF source backed by a HTTP[S] server. + */ +class HttpTraffSource : public TraffSource +{ +public: + /** + * @brief Creates a new `HttpTraffSource` instance and registers it with the traffic manager. + * + * @param manager The traffic manager to register the new instance with + */ + static void Create(TraffSourceManager & manager, std::string const & url); + + /** + * @brief Subscribes to a traffic service. + * + * @param mwms The MWMs for which data is needed. + */ + virtual void Subscribe(std::set & mwms) override; + + /** + * @brief Changes an existing traffic subscription. + * + * @param mwms The new set of MWMs for which data is needed. + */ + virtual void ChangeSubscription(std::set & mwms) override; + + /** + * @brief Unsubscribes from a traffic service we are subscribed to. + */ + virtual void Unsubscribe() override; + + /** + * @brief Whether this source should be polled. + * + * Prior to calling `Poll()` on a source, the caller should always first call `IsPollNeeded()` and + * poll the source only if the result is true. + * + * @todo Document how the result is calculated. For example: + * This implementation uses `m_nextRequestTime` to determine when the next poll is due. When a + * feed is received, `m_nextRequestTime` is set to a point in time 5 minutes in the future. As + * long as `m_nextRequestTime` is in the future, this method returns false. + * + * @return true if the source should be polled, false if not. + */ + virtual bool IsPollNeeded() override; + + /** + * @brief Polls the traffic service for updates. + */ + virtual void Poll() override; + +protected: + /** + * @brief Constructs a new `HttpTraffSource`. + * @param manager The `TrafficSourceManager` instance to register the source with. + * @param url The URL for the TraFF service API. + */ + HttpTraffSource(TraffSourceManager & manager, std::string const & url); + +private: + /** + * @brief Processes a TraFF feed. + * @param feed The feed. + */ + void OnFeedReceived(TraffFeed & feed); + + /** + * @brief Processes the response to a subscribe request. + * @param response The response to the subscribe operation. + */ + void OnSubscribeResponse(TraffResponse & response); + + /** + * @brief Processes the response to a change subscription request. + * @param response The response to the change subscription operation. + */ + void OnChangeSubscriptionResponse(TraffResponse & response); + + /** + * @brief Processes the response to an unsubscribe request. + * @param response The response to the unsubscribe operation. + */ + void OnUnsubscribeResponse(TraffResponse & response); + + /** + * @brief Processes the response to a poll request. + * @param response The response to the poll operation. + */ + void OnPollResponse(TraffResponse & response); + + /** + * @brief Event loop for the worker thread. + * + * This method runs an event loop, which blocks until woken up. When woken up, it processes the + * request (subscribe, change subscription, poll or unsubscribe) and its result, then blocks + * again until woken up for the next request. + */ + void ThreadRoutine(); + + /** + * @brief The update interval, 5 minutes. + */ + static auto constexpr m_updateInterval = std::chrono::minutes(5); + + /** + * @brief The URL for the TraFF service. + */ + const std::string m_url; +}; } // namespace traffxml