From a4e4c9e2f0b1147b47e3ba0cb11a341ff64e099c Mon Sep 17 00:00:00 2001 From: Adam Wegrzynek Date: Thu, 23 Apr 2020 16:36:30 +0200 Subject: [PATCH 1/3] Add InfluxDB 2.x optional backend (#193) * Add CURL to deps list * Add URL parsing * First working version * Clean up and docs --- CMakeLists.txt | 14 ++++++-- README.md | 3 +- src/MonitoringFactory.cxx | 36 +++++++++++++++++++++ src/Transports/HTTP.cxx | 68 +++++++++++++++++++++++++++++++++++++++ src/Transports/HTTP.h | 54 +++++++++++++++++++++++++++++++ src/UriParser/UriParser.h | 4 +-- test/testInfluxDb.cxx | 6 +++- 7 files changed, 178 insertions(+), 7 deletions(-) create mode 100644 src/Transports/HTTP.cxx create mode 100644 src/Transports/HTTP.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f73d9aea9..9e47f6818 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,6 +57,7 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake") find_package(Boost REQUIRED COMPONENTS unit_test_framework program_options system filesystem) find_package(Git QUIET) find_package(ApMon MODULE) +find_package(CURL MODULE) find_package(RdKafka CONFIG) #################################### @@ -105,11 +106,12 @@ add_library(Monitoring SHARED src/Exceptions/MonitoringException.cxx $<$:src/Backends/ApMonBackend.cxx> $<$:src/Transports/Kafka.cxx> + $<$:src/Transports/HTTP.cxx> ) target_include_directories(Monitoring - PUBLIC - $ + PUBLIC + $ $ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src @@ -127,6 +129,7 @@ target_link_libraries(Monitoring pthread $<$:ApMon::ApMon> $<$:RdKafka::rdkafka++> + $<$:CURL::libcurl> ) # Handle ApMon optional dependency @@ -138,6 +141,10 @@ if(RdKafka_FOUND) message(STATUS " Compiling Kafka transport") endif() +if(CURL_FOUND) + message(STATUS " Compiling HTTP transport/InfluxDB 2.x backend") +endif() + # Detect operating system if (UNIX AND NOT APPLE) message(STATUS "Detected Linux: Process monitor enabled") @@ -155,6 +162,7 @@ target_compile_definitions(Monitoring $<$:O2_MONITORING_OS_LINUX> $<$:O2_MONITORING_WITH_APPMON> $<$:O2_MONITORING_WITH_KAFKA> + $<$:O2_MONITORING_WITH_CURL> ) # Use C++17 @@ -217,7 +225,7 @@ foreach (test ${TEST_SRCS}) add_executable(${test_name} ${test}) target_link_libraries(${test_name} - PRIVATE + PRIVATE Monitoring Boost::unit_test_framework Boost::filesystem ) add_test(NAME ${test_name} COMMAND ${test_name}) diff --git a/README.md b/README.md index 0267821e1..646f41752 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ See the table below to find `URI`s for supported backends: | InfluxDB | Unix socket | `influxdb-unix` | - | `info` | | InfluxDB | StdOut | `influxdb-stdout` | - | `info` | | InfluxDB | Kafka | `influxdb-kafka` | Kafka topic | `info` | +| InfluxDB 2.x | HTTP | `influxdbv2` | `org=ORG&bucket=BUCKET&token=TOKEN` | `info` | | ApMon | UDP | `apmon` | - | `info` | | StdOut | - | `stdout`, `infologger` | [Prefix] | `debug` | @@ -62,7 +63,7 @@ A metric consist of 5 parameters: | Parameter name | Type | Required | Default | | -------------- |:--------------------------------:|:--------:| -----------------------:| | name | string | yes | - | -| values | map<string, int/double/string/uint64_t> | no/1 | - | +| values | map<string, int/double/string/uint64_t> | no/1 | - | | timestamp | time_point<system_clock> | no | current time | | verbosity | Enum (Debug/Info/Prod) | no | Verbosity::Info | | tags | map | no | host and process names | diff --git a/src/MonitoringFactory.cxx b/src/MonitoringFactory.cxx index c22b327b8..83b1b2a00 100644 --- a/src/MonitoringFactory.cxx +++ b/src/MonitoringFactory.cxx @@ -36,6 +36,10 @@ #include "Transports/Kafka.h" #endif +#ifdef O2_MONITORING_WITH_CURL +#include "Transports/HTTP.h" +#endif + namespace o2 { /// ALICE O2 Monitoring system @@ -56,6 +60,37 @@ std::unique_ptr getStdOut(http::url uri) } } +/// Extracts token from header add sets it as addition HTTP header +/// http://localhost:9999/?org=YOUR_ORG&bucket=YOUR_BUCKET&token=AUTH_TOKEN +/// -> +/// http://localhost:9999/api/v2/write?org=YOUR_ORG&bucket=YOUR_BUCKET +/// --header "Authorization: Token YOURAUTHTOKEN" +std::unique_ptr getInfluxDbv2(http::url uri) +{ +#ifdef O2_MONITORING_WITH_CURL + std::string tokenLabel = "token="; + std::string path = "/api/v2/write"; + std::string query = uri.search; + + auto tokenStart = query.find(tokenLabel); + auto tokenEnd = query.find('&', tokenStart); + if (tokenEnd == std::string::npos) { + tokenEnd = query.length(); + } + std::string token = query.substr(tokenStart + tokenLabel.length(), tokenEnd-(tokenStart + tokenLabel.length())); + // make sure ampersand is removed + if (tokenEnd < query.length() && query.at(tokenEnd) == '&') tokenEnd++; + if (tokenStart > 0 && query.at(tokenStart-1) == '&') tokenStart--; + query.erase(tokenStart, tokenEnd - tokenStart); + + auto transport = std::make_unique("http://" + uri.host + ':' + std::to_string(uri.port) + path + '?' + query); + transport->addHeader("Authorization: Token " + token); + return std::make_unique(std::move(transport)); +#else + throw std::runtime_error("HTTP transport is not enabled"); +#endif +} + std::unique_ptr getInfluxDb(http::url uri) { auto const position = uri.protocol.find_last_of('-'); @@ -129,6 +164,7 @@ std::unique_ptr MonitoringFactory::GetBackend(std::string& url) {"influxdb-unix", getInfluxDb}, {"influxdb-stdout", getInfluxDb}, {"influxdb-kafka", getInfluxDb}, + {"influxdbv2", getInfluxDbv2}, {"apmon", getApMon}, {"no-op", getNoop} }; diff --git a/src/Transports/HTTP.cxx b/src/Transports/HTTP.cxx new file mode 100644 index 000000000..6d86a2b7c --- /dev/null +++ b/src/Transports/HTTP.cxx @@ -0,0 +1,68 @@ +/// +/// \file HTTP.cxx +/// \author Adam Wegrzynek +/// + +#include "HTTP.h" +#include "../MonLogger.h" +#include "../Exceptions/MonitoringException.h" +#include + +namespace o2 +{ +/// ALICE O2 Monitoring system +namespace monitoring +{ +/// Monitoring transports +namespace transports +{ + +HTTP::HTTP(const std::string& url) +{ + mHeaders = NULL; + mCurl = curl_easy_init(); + curl_easy_setopt(mCurl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(mCurl, CURLOPT_SSL_VERIFYPEER, 0); + curl_easy_setopt(mCurl, CURLOPT_CONNECTTIMEOUT, 10); + curl_easy_setopt(mCurl, CURLOPT_TIMEOUT, 10); + curl_easy_setopt(mCurl, CURLOPT_POST, 1); + curl_easy_setopt(mCurl, CURLOPT_TCP_KEEPIDLE, 120L); + curl_easy_setopt(mCurl, CURLOPT_TCP_KEEPINTVL, 60L); + FILE *devnull = fopen("/dev/null", "w+"); + curl_easy_setopt(mCurl, CURLOPT_WRITEDATA, devnull); + + MonLogger::Get() << "HTTP transport initialized (" << url << ")" << MonLogger::End(); +} + +HTTP::~HTTP() +{ + curl_slist_free_all(mHeaders); + curl_easy_cleanup(mCurl); + curl_global_cleanup(); +} + +void HTTP::addHeader(const std::string& header) +{ + mHeaders = curl_slist_append(mHeaders, header.c_str()); + curl_easy_setopt(mCurl, CURLOPT_HTTPHEADER, mHeaders); +} + +void HTTP::send(std::string&& post) +{ + CURLcode response; + long responseCode; + curl_easy_setopt(mCurl, CURLOPT_POSTFIELDS, post.c_str()); + curl_easy_setopt(mCurl, CURLOPT_POSTFIELDSIZE, (long) post.length()); + response = curl_easy_perform(mCurl); + curl_easy_getinfo(mCurl, CURLINFO_RESPONSE_CODE, &responseCode); + if (response != CURLE_OK) { + MonLogger::Get() << "HTTP Tranport " << curl_easy_strerror(response) << MonLogger::End(); + } + if (responseCode < 200 || responseCode > 206) { + MonLogger::Get() << "HTTP Transport: Response code : " << std::to_string(responseCode) << MonLogger::End(); + } +} + +} // namespace transports +} // namespace monitoring +} // namespace o2 diff --git a/src/Transports/HTTP.h b/src/Transports/HTTP.h new file mode 100644 index 000000000..bd9b9b4e9 --- /dev/null +++ b/src/Transports/HTTP.h @@ -0,0 +1,54 @@ +/// +/// \file HTTP.h +/// \author Adam Wegrzynek +/// + +#ifndef ALICEO2_MONITORING_TRANSPORTS_HTTP_H +#define ALICEO2_MONITORING_TRANSPORTS_HTTP_H + +#include "TransportInterface.h" + +#include +#include + +namespace o2 +{ +/// ALICE O2 Monitoring system +namespace monitoring +{ +/// Monitoring transports +namespace transports +{ + +/// \brief HTTP POST transport +/// +/// Allows to push string formatted metrics as HTTP POST requests via cURL +class HTTP : public TransportInterface +{ + public: + /// Constructor + /// \param url URL of HTTP server endpoint + HTTP(const std::string& url); + + /// Destructor + ~HTTP(); + + /// Sends metric via HTTP POST + /// \param post r-value reference string formatted metric + void send(std::string&& post); + + /// Adds custom HTTP header + void addHeader(const std::string& header); + private: + /// CURL pointers + CURL *mCurl; + + /// HTTP headers struct + struct curl_slist *mHeaders; +}; + +} // namespace transports +} // namespace monitoring +} // namespace o2 + +#endif // ALICEO2_MONITORING_TRANSPORTS_HTTP_H diff --git a/src/UriParser/UriParser.h b/src/UriParser/UriParser.h index 00b199d1f..75fea158b 100644 --- a/src/UriParser/UriParser.h +++ b/src/UriParser/UriParser.h @@ -29,7 +29,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. namespace http { struct url { - std::string protocol, user, password, host, path, search; + std::string protocol, user, password, host, path, search, url; int port; }; @@ -89,7 +89,7 @@ static inline url ParseHttpUrl(std::string& in) { url ret; ret.port = -1; - + ret.url = in; ret.protocol = ExtractProtocol(in); ret.search = ExtractSearch(in); ret.path = ExtractPath(in); diff --git a/test/testInfluxDb.cxx b/test/testInfluxDb.cxx index 6ae8c3e61..d34a43c5d 100644 --- a/test/testInfluxDb.cxx +++ b/test/testInfluxDb.cxx @@ -19,7 +19,6 @@ namespace monitoring { namespace Test { - BOOST_AUTO_TEST_CASE(simplySendMetric) { auto monitoring = MonitoringFactory::Get("influxdb-udp://localhost:1000"); @@ -32,6 +31,11 @@ BOOST_AUTO_TEST_CASE(simplySendMetric2) monitoring->send(Metric{10, "myCrazyMetric"}); } +BOOST_AUTO_TEST_CASE(InfluxDbv2) +{ + auto monitoring = MonitoringFactory::Get("influxdbv2://localhost:9999?org=cern&bucket=test&token=TOKEN"); +} + } // namespace Test } // namespace monitoring } // namespace o2 From fbeb3ca8df75801d23230cd3cd2073b5cac93e83 Mon Sep 17 00:00:00 2001 From: Adam Wegrzynek Date: Thu, 28 May 2020 15:40:31 +0200 Subject: [PATCH 2/3] [OMON-350] Add DbFiller utility (#201) --- CMakeLists.txt | 4 ++- examples/8-DbFiller.cxx | 57 +++++++++++++++++++++++++++++++++++++ include/Monitoring/Metric.h | 2 +- src/Metric.cxx | 2 +- 4 files changed, 62 insertions(+), 3 deletions(-) create mode 100644 examples/8-DbFiller.cxx diff --git a/CMakeLists.txt b/CMakeLists.txt index 9e47f6818..666b7ddb1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -181,6 +181,7 @@ set(EXAMPLES examples/5-Benchmark.cxx examples/6-Increment.cxx examples/7-InternalBenchamrk.cxx + examples/8-DbFiller.cxx examples/10-Buffering.cxx ) @@ -195,6 +196,7 @@ foreach (example ${EXAMPLES}) endforeach() set_target_properties(5-Benchmark PROPERTIES OUTPUT_NAME "o2-monitoring-benchmark") +set_target_properties(8-DbFiller PROPERTIES OUTPUT_NAME "o2-monitoring-dbfiller") #################################### # Tests @@ -238,7 +240,7 @@ endforeach() #################################### # Install library -install(TARGETS Monitoring 5-Benchmark +install(TARGETS Monitoring 5-Benchmark 8-DbFiller EXPORT MonitoringTargets LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} diff --git a/examples/8-DbFiller.cxx b/examples/8-DbFiller.cxx new file mode 100644 index 000000000..4370333ae --- /dev/null +++ b/examples/8-DbFiller.cxx @@ -0,0 +1,57 @@ +/// +/// \file 8-DbFiller.cxx +/// \author Adam Wegrzynek +/// + +#include "Monitoring/MonitoringFactory.h" +#include +#include + +using o2::monitoring::Metric; +using namespace o2::monitoring; + +int main(int argc, char* argv[]) +{ + std::srand(std::time(nullptr)); + + std::random_device rd; + std::mt19937 mt(rd()); + + std::uniform_real_distribution doubleDist(1.0, 100.0); + std::uniform_int_distribution<> intDist(1, 100); + + boost::program_options::options_description desc("Allowed options"); + desc.add_options() + ("url", boost::program_options::value()->required(), "URL to monitoring backend (or list of comma seperated URLs)") + ("measurements", boost::program_options::value()->default_value(1), "Number of different measurements") + ("flps", boost::program_options::value()->default_value(1), "Number of FLPs") + ("since", boost::program_options::value()->default_value(60), "Start filling since (s)") + ("interval", boost::program_options::value()->default_value(1), "Interval between metrics (s)"); + + boost::program_options::variables_map vm; + boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm); + boost::program_options::notify(vm); + + auto monitoring = MonitoringFactory::Get(vm["url"].as()); + monitoring->addGlobalTag(tags::Key::Subsystem, tags::Value::QC); + + auto now = Metric::getCurrentTimestamp(); + auto interval = std::chrono::seconds(vm["interval"].as()); + auto since = now - std::chrono::seconds(vm["since"].as()); + + for (;;) { + since = since + interval; + for (int i = 1; i <= vm["measurements"].as(); i++) { + for (int k = 1; k <= vm["flps"].as(); k++) { + auto metric = Metric{"metric" + std::to_string(i), Metric::DefaultVerbosity, since} + .addValue(doubleDist(mt), "double") + .addValue(intDist(mt), "int") + .addValue(std::rand() % 2, "onOff") + .addTag(tags::Key::FLP, k); + monitoring->send(std::move(metric)); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + } + if (since > now) break; + } +} diff --git a/include/Monitoring/Metric.h b/include/Monitoring/Metric.h index 5979edd90..c3eda139b 100644 --- a/include/Monitoring/Metric.h +++ b/include/Monitoring/Metric.h @@ -59,7 +59,7 @@ class Metric /// Constructor that does not require any value to be specified, .addValue needs to be used /// \param name metric name - Metric(const std::string& name, Verbosity verbosity = Metric::DefaultVerbosity); + Metric(const std::string& name, Verbosity verbosity = Metric::DefaultVerbosity, const std::chrono::time_point& timestamp = Metric::getCurrentTimestamp()); /// Adds additional int value to metric /// \param value diff --git a/src/Metric.cxx b/src/Metric.cxx index 3775881ac..710cfa459 100644 --- a/src/Metric.cxx +++ b/src/Metric.cxx @@ -87,7 +87,7 @@ Metric&& Metric::addValue(const std::variant return std::move(*this); } -Metric::Metric(const std::string& name, Verbosity verbosity) : mName(name), mTimestamp(Metric::getCurrentTimestamp()), mVerbosity(verbosity) +Metric::Metric(const std::string& name, Verbosity verbosity, const std::chrono::time_point& timestamp) : mName(name), mTimestamp(timestamp), mVerbosity(verbosity) { overwriteVerbosity(); } From c82cc77963bda6c9be75695001fdf5e8fbe4b596 Mon Sep 17 00:00:00 2001 From: Adam Wegrzynek Date: Thu, 28 May 2020 15:55:36 +0200 Subject: [PATCH 3/3] Improve benchmark --- examples/5-Benchmark.cxx | 69 ++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/examples/5-Benchmark.cxx b/examples/5-Benchmark.cxx index 274504803..e717c5423 100644 --- a/examples/5-Benchmark.cxx +++ b/examples/5-Benchmark.cxx @@ -12,41 +12,34 @@ using namespace o2::monitoring; int main(int argc, char* argv[]) { - int sleep = 1000000; - int count = 1; - int measurements = 1; - int flps = 1; - std::srand(std::time(nullptr)); - std::random_device rd; std::mt19937 mt(rd()); - std::uniform_real_distribution doubleDist(1.0, 100.0); std::uniform_int_distribution<> intDist(1, 100); boost::program_options::options_description desc("Allowed options"); - desc.add_options()("sleep", boost::program_options::value(), "Thread sleep in microseconds")("url", boost::program_options::value()->required(), "URL to monitoring backend (or list of comma seperated URLs)")("id", boost::program_options::value(), "Instance ID")("count", boost::program_options::value(), "Number of loop cycles")("multiple", boost::program_options::bool_switch()->default_value(false), "Sends multiple metrics per measurement")("latency", boost::program_options::bool_switch()->default_value(false), "Sends timestamp as a value")("monitor", boost::program_options::bool_switch()->default_value(false), "Enabled process monitor")("buffer", boost::program_options::value(), "Creates buffr of given size")("measurements", boost::program_options::value(), "Number of different measurements")("flps", boost::program_options::value(), "Number of FLPs (tags)"); + desc.add_options() + ("sleep", boost::program_options::value()->default_value(1000000), "Thread sleep in microseconds") + ("url", boost::program_options::value()->required(), "URL to monitoring backend (or list of comma seperated URLs)") + ("count", boost::program_options::value()->default_value(1), "Number of loop cycles") + ("multiple", boost::program_options::bool_switch()->default_value(false), "Sends multiple metrics per measurement") + ("latency", boost::program_options::bool_switch()->default_value(false), "Sends timestamp as a value") + ("monitor", boost::program_options::bool_switch()->default_value(false), "Enabled process monitor") + ("buffer", boost::program_options::value(), "Creates buffr of given size") + ("measurements", boost::program_options::value()->default_value(1), "Number of different measurements") + ("flps", boost::program_options::value()->default_value(1), "Number of FLPs") + ("crus", boost::program_options::value()->default_value(1), "Number of CRUss (optional)"); boost::program_options::variables_map vm; boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm); boost::program_options::notify(vm); - if (vm.count("flps")) { - flps = vm["flps"].as(); - } - - if (vm.count("sleep")) { - sleep = vm["sleep"].as(); - } - - if (vm.count("count")) { - count = vm["count"].as(); - } - - if (vm.count("measurements")) { - measurements = vm["measurements"].as(); - } + int flps = vm["flps"].as(); + int crus = vm["crus"].as(); + int sleep = vm["sleep"].as(); + int count = vm["count"].as(); + int measurements = vm["measurements"].as(); auto monitoring = MonitoringFactory::Get(vm["url"].as()); if (vm["monitor"].as()) { @@ -55,12 +48,18 @@ int main(int argc, char* argv[]) if (vm["multiple"].as()) { for (int j = 1; j <= count; j++) { for (int i = 1; i <= measurements; i++) { - monitoring->send(Metric{"measurement" + std::to_string(i)} - .addValue(doubleDist(mt), "doubleMetric") - .addValue(intDist(mt), "intMetric") - .addValue(std::rand() % 2, "onOffMetric") - ); - std::this_thread::sleep_for(std::chrono::microseconds(sleep)); + for (int k = 1; k <= flps; k++) { + for (int l = 1; l <= crus; l++) { + monitoring->send(Metric{"measurement" + std::to_string(i)} + .addValue(doubleDist(mt), "double_metric") + .addValue(intDist(mt), "int_metric") + .addValue(std::rand() % 2, "on_off_metric") + .addTag(tags::Key::FLP, k) + .addTag(tags::Key::CRU, l) + ); + } + std::this_thread::sleep_for(std::chrono::microseconds(sleep)); + } } if (!vm.count("count")) j--; @@ -84,12 +83,14 @@ int main(int argc, char* argv[]) for (int j = 1; j <= count; j++) { for (int i = 1; i <= measurements; i++) { for (int k = 1; k <= flps; k++) { - monitoring->send(Metric{doubleDist(mt), "doubleMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k)); - monitoring->send(Metric{intDist(mt), "intMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k)); - monitoring->send(Metric{std::rand() % 2, "onOffMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k)); - std::this_thread::sleep_for(std::chrono::microseconds(10)); + for (int l = 1; l <= crus; l++) { + monitoring->send(Metric{doubleDist(mt), "doubleMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k).addTag(tags::Key::CRU, l)); + monitoring->send(Metric{intDist(mt), "intMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k).addTag(tags::Key::CRU, l)); + monitoring->send(Metric{std::rand() % 2, "onOffMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k).addTag(tags::Key::CRU, l)); + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + std::this_thread::sleep_for(std::chrono::microseconds(sleep)); } - std::this_thread::sleep_for(std::chrono::microseconds(sleep)); } if (!vm.count("count")) j--;