diff --git a/connection.h b/connection.h index ecdd203..76c8708 100644 --- a/connection.h +++ b/connection.h @@ -94,10 +94,6 @@ class connection fu2::unique_function _error_cb = nullptr; - void handle_error(std::string_view message = {}, - error internal_error = error::system, - uint32_t db_error = 0) noexcept; - // need to capture watcher, so movable functions preferred fu2::unique_function _socket_watcher_request_cb; fu2::unique_function _on_notify_request; @@ -106,13 +102,17 @@ class connection fu2::unique_function _connected_cb; fu2::unique_function _disconnected_cb; +protected: + void handle_error(std::string_view message = {}, + error internal_error = error::system, + uint32_t db_error = 0) noexcept; static std::function _on_construct_global_cb; static std::function _on_destruct_global_cb; fu2::unique_function _on_destruct_cb; public: connection(std::string_view connection_string = {}); - ~connection(); + virtual ~connection(); void open(int delay = 0); void close(bool call_disconnect_handler = true, bool reconnect_soon = false) noexcept; diff --git a/connector.cpp b/connector.cpp new file mode 100644 index 0000000..5215d10 --- /dev/null +++ b/connector.cpp @@ -0,0 +1,114 @@ +#include "connector.h" + +#include "proto.h" + +namespace tnt +{ + + Connector::Connector() + : iproto_writer(*(connection*)this) + { + on_response(bind(&Connector::OnResponse, this, placeholders::_1)); + on_closed(bind(&Connector::OnClosed, this)); + on_opened(bind(&Connector::OnOpened, this)); + } + + void Connector::OnResponse(wtf_buffer &buf) + { + isProcessingReply_ = true; + mp_reader bunch(buf); + while (mp_reader r = bunch.iproto_message()) + { + try + { + auto encoded_header = r.read(); + Header header; + encoded_header[tnt::header_field::SYNC] >> header.sync; + encoded_header[tnt::header_field::CODE] >> header.errCode; + header.errCode &= 0x7fff; + auto handler = handlers_.find(header.sync); + if (handler == handlers_.end()) + handle_error("unexpected response"); + else + { + auto encoded_body = r.read(); + handler->second.handler_(header, encoded_body, (void*)&handler->second.userData_); + //handlers_.erase(header.sync); + handlers_.erase(handler); + } + } + catch(const mp_reader_error &e) + { + handle_error(e.what()); + } + catch(const exception &e) + { + handle_error(e.what()); + } + } + input_processed(); + isProcessingReply_ = false; + if (isNeedsClose_) + { + connection::close(true, isNeedsReconnect_); + isNeedsClose_ = false; + } + } + + void Connector::AddOnOpened(SimpleEventCallbak cb_) + { + onOpenedHandlers_.push_back(cb_); + } + + void Connector::AddOnClosed(SimpleEventCallbak cb_) + { + onClosedHandlers_.push_back(cb_); + } + + void Connector::OnOpened() + { + isConnected_ = true; + for (auto& it : onOpenedHandlers_) + it(); + } + + void Connector::OnClosed() + { + isConnected_ = false; + wtf_buffer buff; + mp_writer writer(buff); + writer.begin_map(1); + writer<(); + for (auto& it : handlers_) + { + mp_map_reader body2 = body; + header.sync = it.first; + it.second.handler_(header, body2, &it.second.userData_); + } + handlers_.clear(); + for (auto& it : onClosedHandlers_) + it(); + } + + void Connector::close(bool reconnect_soon) noexcept + { + if (isProcessingReply_) + { + isNeedsClose_ = true; + isNeedsReconnect_ = reconnect_soon; + } + else + connection::close(true, reconnect_soon); + } + + bool Connector::IsConnected() + { + return isConnected_; + } + +} diff --git a/connector.h b/connector.h new file mode 100644 index 0000000..4c75fb7 --- /dev/null +++ b/connector.h @@ -0,0 +1,126 @@ +#pragma once + +#include +#include + +#include "mp_writer.h" +#include "mp_reader.h" +#include "proto.h" +#include "connection.h" + +using namespace std; + +namespace tnt +{ + + class Stream + { + public: + + }; + + class Header + { + public: + uint64_t sync; + uint64_t errCode; + }; + + + //template + //void write(){}; + + class Connector : public connection, public iproto_writer + { + public: + class FuncParamTuple + { + public: + template + FuncParamTuple(Args... args) + { + //std::make_tuple(args...); + } + + }; + + typedef function OnFuncResult; + typedef function SimpleEventCallbak; + + Connector(); + + template + void Call(string_view name, OnFuncResult&& resultHundler, UserData userData = (void*)nullptr, Args... args) + { + if (isConnected_) + { + static_assert (sizeof (HandlerData::userData_) >= sizeof (userData), "User data too big."); + constexpr size_t countArgs = sizeof...(args); + begin_call(name); + begin_array(countArgs); + write(args...); + //if (countArgs) + finalize(); + finalize(); + HandlerData handler; + handler.handler_ = move(resultHundler); + *((decltype (userData)*)&handler.userData_) = move(userData); + handlers_[last_request_id()] = move(handler); + flush(); + } + else + { + wtf_buffer buff; + mp_writer writer(buff); + writer.begin_map(1); + writer<(); + resultHundler(header, body, &userData); + } + } + + void write() + {} + + template + void write(T& arg, Args... args) + { + *this< handlers_; + vector onOpenedHandlers_; + vector onClosedHandlers_; + bool isConnected_ = false; + + void OnResponse(wtf_buffer &buf); + void OnOpened(); + void OnClosed(); + + bool isNeedsClose_ = false; + bool isNeedsReconnect_ = false; + bool isProcessingReply_ = false; + + }; + + +} diff --git a/cs_parser.cpp b/cs_parser.cpp index 8f98d8e..7fe7c82 100644 --- a/cs_parser.cpp +++ b/cs_parser.cpp @@ -34,12 +34,14 @@ cs_parts parse_cs(string_view connection_string) noexcept return false; // ensure the chunk contains valid port number -#if __has_include() +/* + * #if __has_include() uint16_t tmp_port; if (auto [ptr, err] = from_chars(chunk.data(), chunk.data() + chunk.size(), &tmp_port); ptr == chunk.back() && tmp_port && tmp_port <= 0xffff) res.port = chunk; #else +*/ // to guarantee terminating null character char tmp_chunk[6] = {0}; copy(chunk.begin(), chunk.end(), begin(tmp_chunk)); @@ -47,7 +49,7 @@ cs_parts parse_cs(string_view connection_string) noexcept auto tmp_port = strtoul(tmp_chunk, &parse_end, 10); if (!*parse_end && tmp_port && tmp_port <= 0xffff) res.port = chunk; -#endif +//#endif return !res.port.empty(); }; diff --git a/mp_reader.cpp b/mp_reader.cpp index 65e58de..e7610cf 100644 --- a/mp_reader.cpp +++ b/mp_reader.cpp @@ -319,3 +319,34 @@ size_t mp_array_reader::cardinality() const noexcept { return _cardinality; } + +const char* to_string(mp_type type) +{ + switch (type) + { + case MP_NIL: + return "nil"; + case MP_UINT: + return "uint"; + case MP_INT: + return "int"; + case MP_STR: + return "string"; + case MP_BIN: + return "binary"; + case MP_ARRAY: + return "array"; + case MP_MAP: + return "map"; + case MP_BOOL: + return "bool"; + case MP_FLOAT: + return "float"; + case MP_DOUBLE: + return "double"; + case MP_EXT: + return "ext"; + default: + return "unkown"; + } +} diff --git a/mp_reader.h b/mp_reader.h index 099c017..803a06f 100644 --- a/mp_reader.h +++ b/mp_reader.h @@ -6,8 +6,10 @@ #include #include #include +#include #include #include +#include #include "msgpuck/msgpuck.h" class wtf_buffer; @@ -17,6 +19,8 @@ class mp_reader; std::string hex_dump(const char *begin, const char *end, const char *pos = nullptr); +const char* to_string(mp_type type); + /// messagepack parsing error class mp_reader_error : public std::runtime_error { @@ -123,6 +127,12 @@ class mp_reader } } +/* + template + mp_reader& operator>> (std::map& val); + template + mp_reader& operator>> (std::vector& val); +*/ template mp_reader& operator>> (std::vector &val); @@ -326,6 +336,32 @@ class mp_array_reader : public mp_reader size_t _cardinality = 0; }; +/* +template +mp_reader& mp_reader::operator>> (std::vector& val) +{ + auto vector = array(); + val.resize(vector.cardinality()); + for (size_t i = 0; i < vector.cardinality(); ++i) + vector >> val[i]; + return *this; +} + +template +mp_reader& mp_reader::operator>> (std::map& val) +{ + auto map = this->map(); + for (size_t i = 0; i < map.cardinality(); ++i) + { + K k; + V v; + map >> k >> v; + val[k] = std::move(v); + } + return *this; +} +*/ + template mp_reader& mp_reader::operator>> (std::vector &val) { diff --git a/mp_writer.cpp b/mp_writer.cpp index bd51de7..918c5a5 100644 --- a/mp_writer.cpp +++ b/mp_writer.cpp @@ -95,7 +95,7 @@ void mp_writer::finalize() throw runtime_error("odd number of map items"); actual_cardinality = c.items_count / 2; // map cardinality - if (actual_cardinality == c.max_cardinality) + if (actual_cardinality * 2 == c.max_cardinality) return; // get current header size @@ -127,6 +127,7 @@ void mp_writer::finalize() default: throw runtime_error("previously not implemented container cardinality"); } + } mp_writer& mp_writer::operator<<(const string_view &val) diff --git a/mp_writer.h b/mp_writer.h index 09cec66..3ab6ec5 100644 --- a/mp_writer.h +++ b/mp_writer.h @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include "msgpuck/msgpuck.h" #include "wtf_buffer.h" @@ -48,7 +50,27 @@ class mp_writer *this << val.value(); return *this; } +/* + template + mp_writer& operator<< (const std::vector &val) noexcept + { + begin_array(val.size()); + for( auto& it : val ) + *this< + mp_writer& operator<< (const std::map& val) noexcept + { + begin_map(val.size()); + for( auto& it : val ) + *this << it.first << it.second; + finalize(); + return *this; + } +*/ template && sizeof(T) < 16>> mp_writer& operator<< (const T &val) noexcept { @@ -68,6 +90,46 @@ class mp_writer return *this; } + /* + template + class TupleWriter + { + public: + static void out_tuple(mp_writer *stream, const Tuple &tuple) + { + TupleWriter::out_tuple(stream, tuple); + *stream << std::get(tuple); + } + }; + + template + class TupleWriter + { + public: + static void out_tuple(mp_writer *stream, const Tuple &tuple) + { + *stream << std::get<0>(tuple); + } + }; + + template + class TupleWriter + { + public: + static void out_tuple(mp_writer *stream, const Tuple &tuple) + {} + }; + + template + mp_writer& operator<<(const std::tuple &value) + { + begin_array(std::tuple_size>::value); + TupleWriter, sizeof...(Args)>::out_tuple(this, value); + finalize(); + return *this; + } + */ + template mp_writer& operator<< (const std::tuple &val) { diff --git a/third_party/msgpuck b/third_party/msgpuck index 084f256..a1594a7 160000 --- a/third_party/msgpuck +++ b/third_party/msgpuck @@ -1 +1 @@ -Subproject commit 084f25677af32ff980421deae928fec6e91f7aa3 +Subproject commit a1594a7a5326c34cc5d9fecb5d710a2b2ca7f437 diff --git a/tntevloop.cpp b/tntevloop.cpp new file mode 100644 index 0000000..c84918d --- /dev/null +++ b/tntevloop.cpp @@ -0,0 +1,108 @@ +#include "tntevloop.h" + +#include +#include + +using namespace std; + +namespace tnt +{ + + TntEvLoop::TntEvLoop() + { + AddOnOpened(bind(&TntEvLoop::OnConnected, this)); + } + + TntEvLoop::~TntEvLoop() + { + isStoped_ = true; + connection::close(false, false); + } + + void TntEvLoop::Attach(struct ev_loop* loop) + { + loop_ = loop; + + socketWatcher_.data = this; + ev_init(&socketWatcher_, OnSocketEvent_); + socketWatcher_.events = 0; + + on_socket_watcher_request([this](int mode)noexcept{this->OnSocketWatcherRequest(mode);}); + + ev_async_init(&asyncNotifier_, OnAsyncNotifier_); + asyncNotifier_.data = this; + ev_async_start(loop, &asyncNotifier_); + + ev_timer_init(&timer_, OnTimer_, 0, 1); + timer_.data = this; + ev_timer_start(loop, &timer_); + + on_notify_request(bind(ev_async_send, loop, &asyncNotifier_)); + + } + + void TntEvLoop::OnSocketWatcherRequest(int mode) noexcept + { + if (isStoped_) + return; + int events = (mode & tnt::socket_state::read ? EV_READ : EV_NONE); + events |= (mode & tnt::socket_state::write ? EV_WRITE : EV_NONE); + + if ((socketWatcher_.events & (EV_READ | EV_WRITE)) != events) + { + if (ev_is_active(&socketWatcher_)) + ev_io_stop(loop_, &socketWatcher_); + ev_io_set(&socketWatcher_, socket_handle(), events); + if (events) + ev_io_start(loop_, &socketWatcher_); + } + + } + + void TntEvLoop::OnSocketEvent_(struct ev_loop* loop, ev_io* w, int revents) + { + ((TntEvLoop*)w->data)->OnSocketEvent(loop, w, revents); + } + + void TntEvLoop::OnSocketEvent(struct ev_loop* loop, ev_io* w, int revents) + { +// if (isStoped_) +// return; + (void)loop;(void)w; + if (revents & EV_ERROR) + connection::handle_error("EV_ERROR soket state received"); + + if (revents & EV_WRITE) + connection::write(); + if (revents & EV_READ) + connection::read(); + } + + void TntEvLoop::OnAsyncNotifier_(struct ev_loop* loop, ev_async* w, int revents) + { + ((TntEvLoop*)w->data)->OnAsyncNotifier(loop, w, revents); + } + + void TntEvLoop::OnAsyncNotifier(struct ev_loop* loop, ev_async* w, int revents) + { + (void)loop;(void)w;(void)revents; + acquire_notifications(); + } + + void TntEvLoop::OnConnected() noexcept + { + } + + void TntEvLoop::OnTimer_(struct ev_loop* loop, ev_timer* w, int revents) + { + ((TntEvLoop*)w->data)->OnTimer(loop, w, revents); + } + + void TntEvLoop::OnTimer(struct ev_loop*, ev_timer*, int) + { + tick_1sec(); + } + + + +} diff --git a/tntevloop.h b/tntevloop.h new file mode 100644 index 0000000..fd41e25 --- /dev/null +++ b/tntevloop.h @@ -0,0 +1,38 @@ +#pragma once + + +#include +#include "connector.h" + +namespace tnt +{ + + class TntEvLoop : public Connector + { + public: + TntEvLoop(); + ~TntEvLoop(); + void Attach(struct ev_loop* loop); + + protected: + struct ev_loop* loop_; + bool isStoped_ = false; + + ev_io socketWatcher_; + static void OnSocketEvent_(struct ev_loop* loop, ev_io* w, int revents); + void OnSocketEvent(struct ev_loop* loop, ev_io* w, int revents); + + ev_async asyncNotifier_; + static void OnAsyncNotifier_(struct ev_loop* loop, ev_async* w, int revents); + void OnAsyncNotifier(struct ev_loop* loop, ev_async* w, int revents); + + ev_timer timer_; + static void OnTimer_(struct ev_loop* loop, ev_timer* w, int revents); + void OnTimer(struct ev_loop* loop, ev_timer* w, int revents); + + void OnSocketWatcherRequest(int mode) noexcept; + void OnConnected() noexcept; + + }; + +}