Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ class connection
fu2::unique_function<void(std::string_view message,
error internal_error,
uint32_t db_error)> _error_cb = nullptr;
void handle_error(std::string_view message = {},
error internal_error = error::system,
uint32_t db_error = 0) noexcept;

// need to capture watcher, so movable functions preferred
fu2::unique_function<void(int mode) noexcept> _socket_watcher_request_cb;
fu2::unique_function<void()> _on_notify_request;
Expand All @@ -106,13 +102,17 @@ class connection
fu2::unique_function<void()> _connected_cb;
fu2::unique_function<void()> _disconnected_cb;

protected:
void handle_error(std::string_view message = {},
error internal_error = error::system,
uint32_t db_error = 0) noexcept;
static std::function<void(connection*)> _on_construct_global_cb;
static std::function<void(connection*)> _on_destruct_global_cb;
fu2::unique_function<void()> _on_destruct_cb;

public:
connection(std::string_view connection_string = {});
~connection();
virtual ~connection();

void open(int delay = 0);
void close(bool call_disconnect_handler = true, bool reconnect_soon = false) noexcept;
Expand Down
114 changes: 114 additions & 0 deletions connector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#include "connector.h"

#include "proto.h"

namespace tnt
{

Connector::Connector()
: iproto_writer(*(connection*)this)
{
on_response(bind(&Connector::OnResponse, this, placeholders::_1));
on_closed(bind(&Connector::OnClosed, this));
on_opened(bind(&Connector::OnOpened, this));
}

void Connector::OnResponse(wtf_buffer &buf)
{
isProcessingReply_ = true;
mp_reader bunch(buf);
while (mp_reader r = bunch.iproto_message())
{
try
{
auto encoded_header = r.read<mp_map_reader>();
Header header;
encoded_header[tnt::header_field::SYNC] >> header.sync;
encoded_header[tnt::header_field::CODE] >> header.errCode;
header.errCode &= 0x7fff;
auto handler = handlers_.find(header.sync);
if (handler == handlers_.end())
handle_error("unexpected response");
else
{
auto encoded_body = r.read<mp_map_reader>();
handler->second.handler_(header, encoded_body, (void*)&handler->second.userData_);
//handlers_.erase(header.sync);
handlers_.erase(handler);
}
}
catch(const mp_reader_error &e)
{
handle_error(e.what());
}
catch(const exception &e)
{
handle_error(e.what());
}
}
input_processed();
isProcessingReply_ = false;
if (isNeedsClose_)
{
connection::close(true, isNeedsReconnect_);
isNeedsClose_ = false;
}
}

void Connector::AddOnOpened(SimpleEventCallbak cb_)
{
onOpenedHandlers_.push_back(cb_);
}

void Connector::AddOnClosed(SimpleEventCallbak cb_)
{
onClosedHandlers_.push_back(cb_);
}

void Connector::OnOpened()
{
isConnected_ = true;
for (auto& it : onOpenedHandlers_)
it();
}

void Connector::OnClosed()
{
isConnected_ = false;
wtf_buffer buff;
mp_writer writer(buff);
writer.begin_map(1);
writer<<int(tnt::response_field::ERROR)<<"disconected";
writer.finalize();
Header header;
header.errCode = 77;
mp_reader reader(buff);
auto body = reader.read<mp_map_reader>();
for (auto& it : handlers_)
{
mp_map_reader body2 = body;
header.sync = it.first;
it.second.handler_(header, body2, &it.second.userData_);
}
handlers_.clear();
for (auto& it : onClosedHandlers_)
it();
}

void Connector::close(bool reconnect_soon) noexcept
{
if (isProcessingReply_)
{
isNeedsClose_ = true;
isNeedsReconnect_ = reconnect_soon;
}
else
connection::close(true, reconnect_soon);
}

bool Connector::IsConnected()
{
return isConnected_;
}

}
126 changes: 126 additions & 0 deletions connector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#pragma once

#include <string_view>
#include <unordered_map>

#include "mp_writer.h"
#include "mp_reader.h"
#include "proto.h"
#include "connection.h"

using namespace std;

namespace tnt
{

class Stream
{
public:

};

class Header
{
public:
uint64_t sync;
uint64_t errCode;
};


//template<typename... Args>
//void write(){};

class Connector : public connection, public iproto_writer
{
public:
class FuncParamTuple
{
public:
template<typename... Args>
FuncParamTuple(Args... args)
{
//std::make_tuple(args...);
}

};

typedef function<void(const Header& header, const mp_map_reader& body, void* userData)> OnFuncResult;
typedef function<void()> SimpleEventCallbak;

Connector();

template<typename UserData = void*, typename... Args>
void Call(string_view name, OnFuncResult&& resultHundler, UserData userData = (void*)nullptr, Args... args)
{
if (isConnected_)
{
static_assert (sizeof (HandlerData::userData_) >= sizeof (userData), "User data too big.");
constexpr size_t countArgs = sizeof...(args);
begin_call(name);
begin_array(countArgs);
write(args...);
//if (countArgs)
finalize();
finalize();
HandlerData handler;
handler.handler_ = move(resultHundler);
*((decltype (userData)*)&handler.userData_) = move(userData);
handlers_[last_request_id()] = move(handler);
flush();
}
else
{
wtf_buffer buff;
mp_writer writer(buff);
writer.begin_map(1);
writer<<int(tnt::response_field::ERROR)<<"disconected";
writer.finalize();
Header header;
header.errCode = 77;
header.sync = 0;
mp_reader reader(buff);
auto body = reader.read<mp_map_reader>();
resultHundler(header, body, &userData);
}
}

void write()
{}

template<typename T, typename... Args>
void write(T& arg, Args... args)
{
*this<<arg;
write(args...);
}

void AddOnOpened(SimpleEventCallbak cb_);
void AddOnClosed(SimpleEventCallbak cb_);
void close(bool reconnect_soon = false) noexcept;
bool IsConnected();

protected:
class HandlerData
{
public:
OnFuncResult handler_;
uint64_t userData_[2];
};

unordered_map<uint64_t, HandlerData> handlers_;
vector<SimpleEventCallbak> onOpenedHandlers_;
vector<SimpleEventCallbak> onClosedHandlers_;
bool isConnected_ = false;

void OnResponse(wtf_buffer &buf);
void OnOpened();
void OnClosed();

bool isNeedsClose_ = false;
bool isNeedsReconnect_ = false;
bool isProcessingReply_ = false;

};


}
6 changes: 4 additions & 2 deletions cs_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,22 @@ cs_parts parse_cs(string_view connection_string) noexcept
return false;

// ensure the chunk contains valid port number
#if __has_include(<charconv>)
/*
* #if __has_include(<charconv>)
uint16_t tmp_port;
if (auto [ptr, err] = from_chars(chunk.data(), chunk.data() + chunk.size(), &tmp_port);
ptr == chunk.back() && tmp_port && tmp_port <= 0xffff)
res.port = chunk;
#else
*/
// to guarantee terminating null character
char tmp_chunk[6] = {0};
copy(chunk.begin(), chunk.end(), begin(tmp_chunk));
char *parse_end;
auto tmp_port = strtoul(tmp_chunk, &parse_end, 10);
if (!*parse_end && tmp_port && tmp_port <= 0xffff)
res.port = chunk;
#endif
//#endif

return !res.port.empty();
};
Expand Down
31 changes: 31 additions & 0 deletions mp_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,34 @@ size_t mp_array_reader::cardinality() const noexcept
{
return _cardinality;
}

const char* to_string(mp_type type)
{
switch (type)
{
case MP_NIL:
return "nil";
case MP_UINT:
return "uint";
case MP_INT:
return "int";
case MP_STR:
return "string";
case MP_BIN:
return "binary";
case MP_ARRAY:
return "array";
case MP_MAP:
return "map";
case MP_BOOL:
return "bool";
case MP_FLOAT:
return "float";
case MP_DOUBLE:
return "double";
case MP_EXT:
return "ext";
default:
return "unkown";
}
}
36 changes: 36 additions & 0 deletions mp_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
#include <optional>
#include <vector>
#include <map>
#include <string>
#include <memory>
#include <functional>
#include <limits>
#include "msgpuck/msgpuck.h"

class wtf_buffer;
Expand All @@ -17,6 +19,8 @@ class mp_reader;

std::string hex_dump(const char *begin, const char *end, const char *pos = nullptr);

const char* to_string(mp_type type);

/// messagepack parsing error
class mp_reader_error : public std::runtime_error
{
Expand Down Expand Up @@ -123,6 +127,12 @@ class mp_reader
}
}

/*
template <typename K,typename V>
mp_reader& operator>> (std::map<K,V>& val);
template <typename T>
mp_reader& operator>> (std::vector<T>& val);
*/
template <typename T>
mp_reader& operator>> (std::vector<T> &val);

Expand Down Expand Up @@ -326,6 +336,32 @@ class mp_array_reader : public mp_reader
size_t _cardinality = 0;
};

/*
template <typename T>
mp_reader& mp_reader::operator>> (std::vector<T>& val)
{
auto vector = array();
val.resize(vector.cardinality());
for (size_t i = 0; i < vector.cardinality(); ++i)
vector >> val[i];
return *this;
}

template <typename K,typename V>
mp_reader& mp_reader::operator>> (std::map<K,V>& val)
{
auto map = this->map();
for (size_t i = 0; i < map.cardinality(); ++i)
{
K k;
V v;
map >> k >> v;
val[k] = std::move(v);
}
return *this;
}
*/

template <typename T>
mp_reader& mp_reader::operator>> (std::vector<T> &val)
{
Expand Down
Loading