[traffic] Implement HttpTraffSource

Signed-off-by: mvglasow <michael -at- vonglasow.com>
This commit is contained in:
mvglasow
2025-07-20 13:14:23 +03:00
parent a20d1453e0
commit 3f58c6ee20
8 changed files with 549 additions and 8 deletions

View File

@@ -399,7 +399,10 @@ Framework::Framework(FrameworkParams const & params, bool loadMaps)
* MockTraffSource for debugging purposes.
* TODO Replace with a real source, parametrized and conditionally loaded, once we have one.
*/
traffxml::MockTraffSource::Create(m_trafficManager);
//traffxml::MockTraffSource::Create(m_trafficManager);
// For testing purposes, HttpTraffSource pointing to a hardcoded local instance
traffxml::HttpTraffSource::Create(m_trafficManager, "http://traff:8080/subscription-manager");
}
Framework::~Framework()

View File

@@ -497,7 +497,8 @@ void TrafficManager::RegisterSource(std::unique_ptr<traffxml::TraffSource> sourc
UniteActiveMwms(activeMwms);
}
source->SubscribeOrChangeSubscription(activeMwms);
if (!activeMwms.empty())
source->SubscribeOrChangeSubscription(activeMwms);
{
std::lock_guard<std::mutex> lock(m_trafficSourceMutex);
@@ -699,7 +700,6 @@ void TrafficManager::ThreadRoutine()
if (hasUpdates)
OnTrafficDataUpdate();
}
// Calling Unsubscribe() from the worker thread on exit makes thread synchronization easier
Unsubscribe();
}

View File

@@ -456,6 +456,23 @@ std::string DebugPrint(EventType eventType)
UNREACHABLE();
}
std::string DebugPrint(ResponseStatus status)
{
switch (status)
{
case ResponseStatus::Ok: return "Ok";
case ResponseStatus::InvalidOperation: return "InvalidOperation";
case ResponseStatus::SubscriptionRejected: return "SubscriptionRejected";
case ResponseStatus::NotCovered: return "NotCovered";
case ResponseStatus::PartiallyCovered: return "PartiallyCovered";
case ResponseStatus::SubscriptionUnknown: return "SubscriptionUnknown";
case ResponseStatus::PushRejected: return "PushRejected";
case ResponseStatus::InternalError: return "InternalError";
case ResponseStatus::Invalid: return "Invalid";
}
UNREACHABLE();
}
std::string DebugPrint(TrafficImpact impact)
{
std::ostringstream os;

View File

@@ -223,6 +223,76 @@ enum class EventType
// TODO Security*, Transport*, Weather*
};
enum class ResponseStatus
{
/**
* The operation was successful.
*/
Ok,
/**
* The source rejected the operation as invalid
*
* This may happen when a nonexistent operation is attempted, or an operation is attempted with
* incomplete or otherwise invalid data.
*
* @note This corresponds to TraFF status `INVALID` but was renamed here.
* `ResponseStatus::Invalid` refers to a different kind of error.
*/
InvalidOperation,
/**
* The source rejected the subscription, e.g. because the filtered region is too large.
*/
SubscriptionRejected,
/**
* The source does not supply data for the requested area; the request has failed.
*/
NotCovered,
/**
* The source supplies data only for a subset of the requested area; the request was successful
* (i.e. the subscription was created or changed as requested) but the consumer should be prepared
* to receive incomplete data.
*/
PartiallyCovered,
/**
* An operation (change, push, pull) was attempted on a subscription which the recipient did not
* recognize. On transport channels which support stable identifiers for both communication
* parties, this is also used if a consumer attempts an operation on a subscription created by
* another consumer.
*/
SubscriptionUnknown,
/**
* The aggregator does not accept unsolicited push requests from the sensor. Reserved for future
* versions and not used as of TraFF 0.8.
*/
PushRejected,
/**
* An internal error prevented the recipient of the request from fulfilling it.
*
* This is either translated directly from `INTERNAL_ERROR` returned from the source, or may be
* inferred from errors on the transport channel (e.g. HTTP errors).
*/
InternalError,
/**
* An unrecognized status code.
*
* This is used for all situations where we got a response from the source, with no indication of
* an error, but could not obtain a known status code from it (e.g. XML failed to parse, did not
* contain a status code, or contained an unknown status code).
*
* @note Not to be confused with TraFF status `INVALID`, which maps to
* `ResponseStatus::InvalidOperation`.
*/
Invalid
};
/**
* @brief Represents the impact of one or more traffic events.
*
@@ -436,6 +506,42 @@ using TraffFeed = std::vector<TraffMessage>;
* the full filter list.
*/
/**
* @brief Encapsulates the response to a TraFF request.
*/
struct TraffResponse
{
/**
* @brief The response status for the request which triggered the response.
*/
ResponseStatus m_status = ResponseStatus::Invalid;
/**
* @brief The subscription ID which the source has assigned to the subscriber.
*
* This attribute is how the source communicates the subscription ID to a subscriber. Required for
* responses to a subscription request; some transport channels may require it for every
* subscription-related operation; forbidden otherwise.
*/
std::string m_subscriptionId;
/**
* @brief The time in seconds after which the source will consider the subscription invalid if no
* activity occurs.
*
* Required for responses to a subscription request on some transport channels, optional on other
* channels, forbidden for other requests.
*
* If not used, the value is zero.
*/
uint32_t m_timeout = 0;
/**
* @brief A feed of traffic messages sent as part of the response.
*/
std::optional<TraffFeed> m_feed;
};
/**
* @brief Merges the contents of one `MultiMwmColoring` into another.
*
@@ -457,6 +563,7 @@ std::string DebugPrint(Ramps ramps);
std::string DebugPrint(RoadClass roadClass);
std::string DebugPrint(EventClass eventClass);
std::string DebugPrint(EventType eventType);
std::string DebugPrint(ResponseStatus status);
std::string DebugPrint(TrafficImpact impact);
std::string DebugPrint(Point point);
std::string DebugPrint(TraffLocation location);

View File

@@ -303,6 +303,41 @@ std::optional<IsoTime> OptionalTimeFromXml(pugi::xml_attribute const & attribute
return result;
}
/**
* @brief Retrieves a response status from an attribute.
*
* @param attribute The XML attribute to retrieve.
* @param status Receives the status retrieved.
* @return `true` on success, `false` if the attribute is not set or set to an empty string.
*/
bool ResponseStatusFromXml(pugi::xml_attribute const & attribute, ResponseStatus & status)
{
std::string statusString;
if (!StringFromXml(attribute, statusString))
return false;
if (statusString == "OK")
status = ResponseStatus::Ok;
else if (statusString == "INVALID")
status = ResponseStatus::InvalidOperation;
else if (statusString == "SUBSCRIPTION_REJECTED")
status = ResponseStatus::SubscriptionRejected;
else if (statusString == "NOT_COVERED")
status = ResponseStatus::NotCovered;
else if (statusString == "PARTIALLY_COVERED")
status = ResponseStatus::PartiallyCovered;
else if (statusString == "SUBSCRIPTION_UNKNOWN")
status = ResponseStatus::SubscriptionUnknown;
else if (statusString == "PUSH_REJECTED")
status = ResponseStatus::PushRejected;
else if (statusString == "INTERNAL_ERROR")
status = ResponseStatus::InternalError;
else
status = ResponseStatus::Invalid;
return true;
}
/**
* @brief Retrieves a boolean value from an attribute.
* @param attribute The XML attribute to retrieve.
@@ -1057,14 +1092,21 @@ void MessageToXml(TraffMessage const & message, pugi::xml_node node)
}
}
bool ParseTraff(pugi::xml_document const & document,
std::optional<std::reference_wrapper<const DataSource>> dataSource,
TraffFeed & feed)
/**
* @brief Retrieves a TraFF feed from an XML element.
* @param node The XML element to retrieve (`feed`).
* @param dataSource The data source for coloring, see `ParseTraff()`.
* @param feed Receives the feed.
* @return `true` on success, `false` if the node does not exist or does not contain valid message data.
*/
bool FeedFromXml(pugi::xml_node const & node,
std::optional<std::reference_wrapper<const DataSource>> dataSource,
TraffFeed & feed)
{
bool result = false;
// Select all messages elements that are direct children of the root.
auto const messages = document.document_element().select_nodes("./message");
// Select all messages elements that are direct children of the node.
auto const messages = node.select_nodes("./message");
if (messages.empty())
return true;
@@ -1085,6 +1127,13 @@ bool ParseTraff(pugi::xml_document const & document,
return result;
}
bool ParseTraff(pugi::xml_document const & document,
std::optional<std::reference_wrapper<const DataSource>> dataSource,
TraffFeed & feed)
{
return FeedFromXml(document.document_element(), dataSource, feed);
}
void GenerateTraff(TraffFeed const & feed, pugi::xml_document & document)
{
auto root = document.append_child("feed");
@@ -1117,4 +1166,39 @@ std::string FiltersToXml(std::vector<m2::RectD> & bboxRects)
mercator::XToLon(rect.maxX()));
return os.str();
}
TraffResponse ParseResponse(std::string const & responseXml)
{
TraffResponse result;
pugi::xml_document responseDocument;
if (!responseDocument.load_string(responseXml.c_str()))
return result;
auto const responseElement = responseDocument.document_element();
std::string responseElementName(responseElement.name());
if (responseElementName != "response")
return result;
if (!ResponseStatusFromXml(responseElement.attribute("status"), result.m_status))
return result;
StringFromXml(responseElement.attribute("subscription_id"), result.m_subscriptionId);
IntegerFromXml(responseElement.attribute("timeout"), result.m_timeout);
LOG(LDEBUG, ("Response, status:", result.m_status, "subscription ID:", result.m_subscriptionId, "timeout:", result.m_timeout));
if (responseElement.child("feed"))
{
TraffFeed feed;
FeedFromXml(responseElement.child("feed"), std::nullopt /* dataSource */, feed);
LOG(LDEBUG, ("Feed received, number of messages:", feed.size()));
result.m_feed = std::move(feed);
}
else
LOG(LDEBUG, ("No feed in response"));
return result;
}
} // namespace openlr

View File

@@ -105,4 +105,17 @@ void GenerateTraff(std::map<std::string, traffxml::TraffMessage> const & message
* @return A string of XML `filter` elements.
*/
std::string FiltersToXml(std::vector<m2::RectD> & bboxRects);
/**
* @brief Parses the response to a TraFF request.
*
* The response must comply with TraFF 0.8. The root element must be `response`.
*
* If a parsing error occurs, the response returned will have its `m_status` member set to
* `ResponseStatus::Invalid`.
*
* @param responseXml The response, as a string in XML format.
* @return The parsed response.
*/
TraffResponse ParseResponse(std::string const & responseXml);
} // namespace traffxml

View File

@@ -5,6 +5,10 @@
#include "geometry/rect2d.hpp"
#include "platform/platform.hpp"
#include <functional>
#include <thread>
#include <vector>
namespace traffxml {
@@ -102,4 +106,202 @@ void MockTraffSource::Poll()
m_nextRequestTime = std::chrono::steady_clock::now() + m_updateInterval;
}
}
TraffResponse HttpPost(std::string const & url, std::string data)
{
platform::HttpClient request(url);
request.SetBodyData(data, "application/xml");
if (!request.RunHttpRequest() || request.ErrorCode() != 200)
{
TraffResponse result;
result.m_status = ResponseStatus::InternalError;
return result;
}
LOG(LDEBUG, ("Got response, status", request.ErrorCode()));
TraffResponse result = ParseResponse(request.ServerResponse());
return result;
}
void HttpTraffSource::Create(TraffSourceManager & manager, std::string const & url)
{
std::unique_ptr<HttpTraffSource> source = std::unique_ptr<HttpTraffSource>(new HttpTraffSource(manager, url));
manager.RegisterSource(std::move(source));
}
HttpTraffSource::HttpTraffSource(TraffSourceManager & manager, std::string const & url)
: TraffSource(manager)
, m_url(url)
{}
void HttpTraffSource::Subscribe(std::set<MwmSet::MwmId> & mwms)
{
std::string data = "<request operation=\"SUBSCRIBE\">\n<filter_list>\n"
+ GetMwmFilters(mwms)
+ "</filter_list>\n"
+ "</request>";
LOG(LDEBUG, ("Sending request:\n", data));
threads::SimpleThread thread([this, data]() {
// TODO sometimes the request gets sent (and processed) twice
TraffResponse response = HttpPost(m_url, data);
OnSubscribeResponse(response);
return;
});
thread.detach();
}
void HttpTraffSource::OnFeedReceived(TraffFeed & feed)
{
m_lastResponseTime = std::chrono::steady_clock::now();
m_nextRequestTime = std::chrono::steady_clock::now() + m_updateInterval;
m_lastAvailability = Availability::IsAvailable;
m_manager.ReceiveFeed(feed);
}
void HttpTraffSource::OnSubscribeResponse(TraffResponse & response)
{
if (response.m_status == ResponseStatus::Ok
|| response.m_status == ResponseStatus::PartiallyCovered)
{
if (response.m_subscriptionId.empty())
LOG(LWARNING, ("Server replied with", response.m_status, "but subscription ID is empty; ignoring"));
else
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_subscriptionId = response.m_subscriptionId;
// TODO timeout
}
if (response.m_feed && !response.m_feed.value().empty())
OnFeedReceived(response.m_feed.value());
else
Poll();
}
}
else
LOG(LWARNING, ("Subscribe request failed:", response.m_status));
}
void HttpTraffSource::ChangeSubscription(std::set<MwmSet::MwmId> & mwms)
{
std::string data = "<request operation=\"SUBSCRIPTION_CHANGE\" subscription_id=\"" + m_subscriptionId + "\">\n"
+ "<filter_list>\n"
+ GetMwmFilters(mwms)
+ "</filter_list>\n"
+ "</request>";
LOG(LDEBUG, ("Sending request:\n", data));
threads::SimpleThread thread([this, data]() {
TraffResponse response = HttpPost(m_url, data);
OnChangeSubscriptionResponse(response);
return;
});
thread.detach();
}
void HttpTraffSource::OnChangeSubscriptionResponse(TraffResponse & response)
{
if (response.m_status == ResponseStatus::Ok
|| response.m_status == ResponseStatus::PartiallyCovered)
{
if (response.m_feed && !response.m_feed.value().empty())
OnFeedReceived(response.m_feed.value());
else
Poll();
}
else if (response.m_status == ResponseStatus::SubscriptionUnknown)
{
LOG(LWARNING, ("Change Subscription returned", response.m_status, " removing subscription", m_subscriptionId));
{
std::lock_guard<std::mutex> lock(m_mutex);
m_subscriptionId.clear();
}
}
else
LOG(LWARNING, ("Change Subscription request failed:", response.m_status));
}
void HttpTraffSource::Unsubscribe()
{
std::string data;
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_subscriptionId.empty())
return;
data = "<request operation=\"UNSUBSCRIBE\" subscription_id=\"" + m_subscriptionId + "\"/>";
}
LOG(LDEBUG, ("Sending request:\n", data));
threads::SimpleThread thread([this, data]() {
TraffResponse response = HttpPost(m_url, data);
OnUnsubscribeResponse(response);
return;
});
thread.detach();
}
void HttpTraffSource::OnUnsubscribeResponse(TraffResponse & response)
{
if (response.m_status != ResponseStatus::Ok
&& response.m_status != ResponseStatus::SubscriptionUnknown)
{
LOG(LWARNING, ("Unsubscribe returned", response.m_status, " removing subscription"));
}
{
std::lock_guard<std::mutex> lock(m_mutex);
m_subscriptionId.clear();
}
}
bool HttpTraffSource::IsPollNeeded()
{
// TODO revisit logic
return m_nextRequestTime.load() <= std::chrono::steady_clock::now();
}
void HttpTraffSource::Poll()
{
std::string data;
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_subscriptionId.empty())
return;
data = "<request operation=\"POLL\" subscription_id=\"" + m_subscriptionId + "\"/>";
}
LOG(LDEBUG, ("Sending request:\n", data));
threads::SimpleThread thread([this, data]() {
// TODO sometimes the request gets sent (and processed) twice
TraffResponse response = HttpPost(m_url, data);
OnPollResponse(response);
return;
});
thread.detach();
}
void HttpTraffSource::OnPollResponse(TraffResponse & response)
{
if (response.m_status == ResponseStatus::Ok)
{
if (response.m_feed && !response.m_feed.value().empty())
OnFeedReceived(response.m_feed.value());
}
else if (response.m_status == ResponseStatus::SubscriptionUnknown)
{
LOG(LWARNING, ("Poll returned", response.m_status, " removing subscription", m_subscriptionId));
{
std::lock_guard<std::mutex> lock(m_mutex);
m_subscriptionId.clear();
}
}
else
LOG(LWARNING, ("Poll returned", response.m_status));
}
} // namespace traffxml

View File

@@ -2,8 +2,12 @@
#include "traffxml/traff_model.hpp"
#include "base/thread.hpp"
#include "indexer/mwm_set.hpp"
#include "platform/http_client.hpp"
#include <chrono>
#include <set>
#include <string>
@@ -423,4 +427,115 @@ private:
*/
static auto constexpr m_updateInterval = std::chrono::minutes(5);
};
/**
* @brief A TraFF source backed by a HTTP[S] server.
*/
class HttpTraffSource : public TraffSource
{
public:
/**
* @brief Creates a new `HttpTraffSource` instance and registers it with the traffic manager.
*
* @param manager The traffic manager to register the new instance with
*/
static void Create(TraffSourceManager & manager, std::string const & url);
/**
* @brief Subscribes to a traffic service.
*
* @param mwms The MWMs for which data is needed.
*/
virtual void Subscribe(std::set<MwmSet::MwmId> & mwms) override;
/**
* @brief Changes an existing traffic subscription.
*
* @param mwms The new set of MWMs for which data is needed.
*/
virtual void ChangeSubscription(std::set<MwmSet::MwmId> & mwms) override;
/**
* @brief Unsubscribes from a traffic service we are subscribed to.
*/
virtual void Unsubscribe() override;
/**
* @brief Whether this source should be polled.
*
* Prior to calling `Poll()` on a source, the caller should always first call `IsPollNeeded()` and
* poll the source only if the result is true.
*
* @todo Document how the result is calculated. For example:
* This implementation uses `m_nextRequestTime` to determine when the next poll is due. When a
* feed is received, `m_nextRequestTime` is set to a point in time 5 minutes in the future. As
* long as `m_nextRequestTime` is in the future, this method returns false.
*
* @return true if the source should be polled, false if not.
*/
virtual bool IsPollNeeded() override;
/**
* @brief Polls the traffic service for updates.
*/
virtual void Poll() override;
protected:
/**
* @brief Constructs a new `HttpTraffSource`.
* @param manager The `TrafficSourceManager` instance to register the source with.
* @param url The URL for the TraFF service API.
*/
HttpTraffSource(TraffSourceManager & manager, std::string const & url);
private:
/**
* @brief Processes a TraFF feed.
* @param feed The feed.
*/
void OnFeedReceived(TraffFeed & feed);
/**
* @brief Processes the response to a subscribe request.
* @param response The response to the subscribe operation.
*/
void OnSubscribeResponse(TraffResponse & response);
/**
* @brief Processes the response to a change subscription request.
* @param response The response to the change subscription operation.
*/
void OnChangeSubscriptionResponse(TraffResponse & response);
/**
* @brief Processes the response to an unsubscribe request.
* @param response The response to the unsubscribe operation.
*/
void OnUnsubscribeResponse(TraffResponse & response);
/**
* @brief Processes the response to a poll request.
* @param response The response to the poll operation.
*/
void OnPollResponse(TraffResponse & response);
/**
* @brief Event loop for the worker thread.
*
* This method runs an event loop, which blocks until woken up. When woken up, it processes the
* request (subscribe, change subscription, poll or unsubscribe) and its result, then blocks
* again until woken up for the next request.
*/
void ThreadRoutine();
/**
* @brief The update interval, 5 minutes.
*/
static auto constexpr m_updateInterval = std::chrono::minutes(5);
/**
* @brief The URL for the TraFF service.
*/
const std::string m_url;
};
} // namespace traffxml