К основному контенту

Кроссплатформенный многопоточный TCP/IP сервер на C++

Схема организации многопоточности

Введение

В данной статье мы рассмотрим простую реализацию TCP сервера и клиента на языке программирования C++ с использованием WinSocket2 для Windows и Posix Socket для Unix-подобных операционных систем. Весь исходный код представлен в этом репозитории GitHub


Реализация

Для начала рассмотрим общие определения как для серверной части, так и для клиентской части:

general.h

#ifndef GENERAL_H
#define GENERAL_H

#ifdef _WIN32
#else
#define SD_BOTH 0
#endif


#include <cstdint>
#include <cstring>
#include <cinttypes>
#include <malloc.h> 

// IP 127.0.0.1
uint32_t LOCALHOST_IP = 0x0100007f;

// Код состояния сокета
enum class SocketStatus : uint8_t {
  connected = 0,
  err_socket_init = 1,
  err_socket_bind = 2,
  err_socket_connect = 3,
  disconnected = 4
};

// Буффер данных
struct DataBuffer {
  int size = 0;
  void* data_ptr = nullptr;

  DataBuffer() = default;
  DataBuffer(int size, void* data_ptr) : size(size), data_ptr(data_ptr) {}
  DataBuffer(const DataBuffer& other) : size(other.size), data_ptr(malloc(size)) {memcpy(data_ptr, other.data_ptr, size);}
  DataBuffer(DataBuffer&& other) : size(other.size), data_ptr(other.data_ptr) {other.data_ptr = nullptr;}
  ~DataBuffer() {if(data_ptr) free(data_ptr); data_ptr = nullptr;}

  bool isEmpty() {return !data_ptr || !size;}
  operator bool() {return data_ptr && size;}
};

// Тип сокета
enum class SocketType : uint8_t {
  client_socket = 0,
  server_socket = 1
};

// Базовый класс TCP клиента
class TcpClientBase {
public:
  typedef SocketStatus status;
  virtual ~TcpClientBase() {};
  virtual status disconnect() = 0;
  virtual status getStatus() const = 0;
  virtual bool sendData(const void* buffer, const size_t size) const = 0;
  virtual DataBuffer loadData() = 0;
  virtual uint32_t getHost() const = 0;
  virtual uint16_t getPort() const = 0;
  virtual SocketType getType() const = 0;
};

#endif // GENERAL_H 

Теперь можно перейти непосредственно к реализации TCP сервера. Для начала рассмотрим заголовочный файл сервера:

TcpServer.h


#ifndef TCPSERVER_H
#define TCPSERVER_H

#include <functional>
#include <list>

#include <thread>
#include <mutex>
#include <shared_mutex>

#ifdef _WIN32 // Windows NT

#include <WinSock2.h>
#include <mstcpip.h>

#else // *nix

#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

#endif

#include "general.h"

#ifdef _WIN32 // Windows NT
typedef int SockLen_t;
typedef SOCKADDR_IN SocketAddr_in;
typedef SOCKET Socket;
typedef u_long ka_prop_t;
#else // POSIX
typedef socklen_t SockLen_t;
typedef struct sockaddr_in SocketAddr_in;
typedef int Socket;
typedef int ka_prop_t;
#endif

// Конфигурация Keep-Alive соединения
struct KeepAliveConfig{
  ka_prop_t ka_idle = 120;
  ka_prop_t ka_intvl = 3;
  ka_prop_t ka_cnt = 5;
};

// Класс Tcp сервера
struct TcpServer {
  // Класс клиента сервера (реализация определена ниже)
  struct Client;
  // Тип обработчик данных клиента
  typedef std::function<void(DataBuffer, Client&)> handler_function_t;
  // Тип обработчика подключения/отсоединения клиента
  typedef std::function<void(Client&)> con_handler_function_t;

  // Коды статуса сервера
  enum class status : uint8_t {
    up = 0,
    err_socket_init = 1,
    err_socket_bind = 2,
    err_scoket_keep_alive = 3,
    err_socket_listening = 4,
    close = 5
  };

private:
  // Сокет сервера
  Socket serv_socket;
  // Порт сервера
  uint16_t port;
  // Код статуса
  status _status = status::close;
  // Обработчик данных от клиента
  handler_function_t handler;
  // Обработчик подключения клиента
  con_handler_function_t connect_hndl = [](Client&){};
  // Обработчик отсоединения клиента
  con_handler_function_t disconnect_hndl = [](Client&){};
  // Поток-обработчик подключений
  std::thread accept_handler_thread;
  // Поток ожидания данных
  std::thread data_waiter_thread;
  // Тип итератора клиента
  typedef std::list<std::unique_ptr<Client>>::iterator ClientIterator;

  // Keep-Alive конфигурация
  KeepAliveConfig ka_conf;

  // Список клиентов
  std::list<std::unique_ptr<Client>> client_list;
  // Мьютекс для синзронизации потоков подключения и ожидания данных
  std::mutex client_mutex; 

  // Для систем Windows так же требуется
  // структура определяющая версию WinSocket
#ifdef _WIN32 // Windows NT
  WSAData w_data;
#endif

  // Включить Keep-Alive для сокета
  bool enableKeepAlive(Socket socket);
  // Метод обработчика подключений
  void handlingAcceptLoop();
  // Метод ожидания данных
  void waitingDataLoop();


public:
  // Упрощённый конструктор с указанием:
  // * порта
  // * обработчика данных
  // * конфигурации Keep-Alive
  TcpServer(const uint16_t port,
            handler_function_t handler,
            KeepAliveConfig ka_conf = {});
  // Конструктор с указанием:
  // * порта
  // * обработчика данных
  // * обработчика подключений
  // * обработчика отключений
  // * конфигурации Keep-Alive
  TcpServer(const uint16_t port,
            handler_function_t handler,
            con_handler_function_t connect_hndl,
            con_handler_function_t disconnect_hndl,
            KeepAliveConfig ka_conf = {});

  // Деструктор
  ~TcpServer();

  // Заменить обработчик данных
  void setHandler(handler_function_t handler);
  // Getter порта
  uint16_t getPort() const;
  // Setter порта
  uint16_t setPort(const uint16_t port);
  // Getter кода статуса сервера
  status getStatus() const {return _status;}
  // Метод запуска сервера
  status start();
  // Метод остановки сервера
  void stop();
  // Метод для входа присоединения циклических потоков сервера
  void joinLoop();

  // Исходящее подключение от сервера к другому серверу
  bool connectTo(uint32_t host, uint16_t port, con_handler_function_t connect_hndl);

  // Отправить данные всем клиентам сервера
  void sendData(const void* buffer, const size_t size);
  // Отправить данные клиенту по порту и хосту
  bool sendDataBy(uint32_t host, uint16_t port, const void* buffer, const size_t size);
  // Отключить клиента по порту и хосту
  bool disconnectBy(uint32_t host, uint16_t port);
  // Отключить всех клиентов
  void disconnectAll();
};


// Класс клиента (со стороны сервера)
struct TcpServer::Client : public TcpClientBase {
  friend struct TcpServer;

  // Мьютекс для синхронизации обработки данныз
  std::mutex access_mtx;
  // Адрес клиента
  SocketAddr_in address;
  // Сокет слиента
  Socket socket;
  // Код статуса клиента
  status _status = status::connected;


public:
  // Конструктор с указанием:
  // * сокета клиента
  // * адреса клиента
  Client(Socket socket, SocketAddr_in address);
  // Деструктор
  virtual ~Client() override;
  // Getter хоста
  virtual uint32_t getHost() const override;
  // Getter порта
  virtual uint16_t getPort() const override;
  // Getter кода статуса подключения
  virtual status getStatus() const override {return _status;}
  // Отключить клиента
  virtual status disconnect() override;
  // Получить данные от клиента
  virtual DataBuffer loadData() override;
  // Отправить данные клиенту
  virtual bool sendData(const void* buffer, const size_t size) const override;
  // Определить "сторону" клиента
  virtual SocketType getType() const override {return SocketType::server_socket;}
};

#endif // TCPSERVER_H


Теперь разберём непосредственно саму реализацию сервера:

TcpServer.cpp


#include "../include/TcpServer.h"
#include <chrono>
#include <cstring>
#include <mutex>

#ifdef _WIN32
// Макросы для выражений зависимых от OS
#define WIN(exp) exp
#define NIX(exp)

// Конвертировать WinSocket код ошибки в Posix код ошибки
inline int convertError() {
    switch (WSAGetLastError()) {
    case 0:
        return 0;
    case WSAEINTR:
        return EINTR;
    case WSAEINVAL:
        return EINVAL;
    case WSA_INVALID_HANDLE:
        return EBADF;
    case WSA_NOT_ENOUGH_MEMORY:
        return ENOMEM;
    case WSA_INVALID_PARAMETER:
        return EINVAL;
    case WSAENAMETOOLONG:
        return ENAMETOOLONG;
    case WSAENOTEMPTY:
        return ENOTEMPTY;
    case WSAEWOULDBLOCK:
        return EAGAIN;
    case WSAEINPROGRESS:
        return EINPROGRESS;
    case WSAEALREADY:
        return EALREADY;
    case WSAENOTSOCK:
        return ENOTSOCK;
    case WSAEDESTADDRREQ:
        return EDESTADDRREQ;
    case WSAEMSGSIZE:
        return EMSGSIZE;
    case WSAEPROTOTYPE:
        return EPROTOTYPE;
    case WSAENOPROTOOPT:
        return ENOPROTOOPT;
    case WSAEPROTONOSUPPORT:
        return EPROTONOSUPPORT;
    case WSAEOPNOTSUPP:
        return EOPNOTSUPP;
    case WSAEAFNOSUPPORT:
        return EAFNOSUPPORT;
    case WSAEADDRINUSE:
        return EADDRINUSE;
    case WSAEADDRNOTAVAIL:
        return EADDRNOTAVAIL;
    case WSAENETDOWN:
        return ENETDOWN;
    case WSAENETUNREACH:
        return ENETUNREACH;
    case WSAENETRESET:
        return ENETRESET;
    case WSAECONNABORTED:
        return ECONNABORTED;
    case WSAECONNRESET:
        return ECONNRESET;
    case WSAENOBUFS:
        return ENOBUFS;
    case WSAEISCONN:
        return EISCONN;
    case WSAENOTCONN:
        return ENOTCONN;
    case WSAETIMEDOUT:
        return ETIMEDOUT;
    case WSAECONNREFUSED:
        return ECONNREFUSED;
    case WSAELOOP:
        return ELOOP;
    case WSAEHOSTUNREACH:
        return EHOSTUNREACH;
    default:
        return EIO;
    }
}


#else
// Макросы для выражений зависимых от OS
#define WIN(exp)
#define NIX(exp) exp
#endif

// Реализация конструктора сервера с указанием
// * порта
// * обработчика данных
// * Keep-Alive конфигурации
TcpServer::TcpServer(const uint16_t port,
                     handler_function_t handler,
                     KeepAliveConfig ka_conf)
  : TcpServer(port, handler, [](Client&){}, [](Client&){}, ka_conf) {}

// Реализация конструктора сервера с указанием
// * порта
// * обработчика данных
// * обработчика подключения
// * обработчика отключения
// * Keep-Alive конфигурации
TcpServer::TcpServer(const uint16_t port,
                     handler_function_t handler,
                     con_handler_function_t connect_hndl,
                     con_handler_function_t disconnect_hndl,
                     KeepAliveConfig ka_conf)
  : port(port), handler(handler), connect_hndl(connect_hndl), disconnect_hndl(disconnect_hndl), ka_conf(ka_conf) {}

// Деструктор сервера
// автоматически закрывает сокет сервера 
TcpServer::~TcpServer() {
  if(_status == status::up)
    stop();
	WIN(WSACleanup());
}

// Setter обработчика данных
void TcpServer::setHandler(TcpServer::handler_function_t handler) {this->handler = handler;}

// Getter порта
uint16_t TcpServer::getPort() const {return port;}
// Setter порта
uint16_t TcpServer::setPort( const uint16_t port) {
	this->port = port;
	start();
	return port;
}

// Реализация запуска сервера
TcpServer::status TcpServer::start() {
  int flag;
  // Если сервер запущен, то отключаем его
  if(_status == status::up) stop();

  // Для Windows указываем версию WinSocket
  WIN(if(WSAStartup(MAKEWORD(2, 2), &w_data) == 0) {})
  
  // Задаём адрес сервера
  SocketAddr_in address;
  // INADDR_ANY - любой IP адрес
  address.sin_addr
      WIN(.S_un.S_addr)NIX(.s_addr) = INADDR_ANY;
  // Задаём порт сервера
  address.sin_port = htons(port);
  // Семейство сети AF_INET - IPv4 (AF_INET6 - IPv6)
  address.sin_family = AF_INET;

  // Создаём TCP сокет
  if((serv_socket = socket(AF_INET, SOCK_STREAM, 0)) WIN(== INVALID_SOCKET)NIX(== -1))
     return _status = status::err_socket_init;

  
  flag = true;
  // Устанавливаем параметр сокета SO_REUSEADDR в true (подробнее https://it.wikireading.ru/7093)
  if((setsockopt(serv_socket, SOL_SOCKET, SO_REUSEADDR, WIN((char*))&flag, sizeof(flag)) == -1) ||
     // Привязываем к сокету адрес и порт
     (bind(serv_socket, (struct sockaddr*)&address, sizeof(address)) WIN(== SOCKET_ERROR)NIX(< 0)))
     return _status = status::err_socket_bind;

  // Активируем ожидание фходящих соединений
  if(listen(serv_socket, SOMAXCONN) WIN(== SOCKET_ERROR)NIX(< 0))
    return _status = status::err_socket_listening;
  
  _status = status::up;
  // Запускаем поток ожидания соединений
  accept_handler_thread = std::thread([this]{handlingAcceptLoop();});
  // Запускаем поток ожидания данных
  data_waiter_thread = std::thread([this]{waitingDataLoop();});
  return _status;
}

// Реализация остановки сервера
void TcpServer::stop() {
  _status = status::close;
  // Закрываем сокет
  WIN(closesocket)NIX(close)(serv_socket);
  // Ожидаем завершения потоков
  joinLoop();
  // Вычищаем список клиентов
  client_list.clear();
}

// "Вхождение" в потоки ожидания
void TcpServer::joinLoop() {accept_handler_thread.join(); data_waiter_thread.join();}

// Создание подключение со стороны сервера
// (подключение аналогично клиентоскому, но обрабатывается
// тем же обработчиком, что и входящие соединения)
bool TcpServer::connectTo(uint32_t host, uint16_t port, con_handler_function_t connect_hndl) {
  Socket client_socket;
  SocketAddr_in address;
  // Создание TCP сокета
  if((client_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) WIN(== INVALID_SOCKET) NIX(< 0)) return false;

  new(&address) SocketAddr_in;
  address.sin_family = AF_INET;
  address.sin_addr.s_addr = host;
  WIN(address.sin_addr.S_un.S_addr = host;)
  NIX(address.sin_addr.s_addr = host;)

  address.sin_port = htons(port);

  // Установка соединения
  if(connect(client_socket, (sockaddr *)&address, sizeof(address))
     WIN(== SOCKET_ERROR)NIX(!= 0)
     ) {
    WIN(closesocket(client_socket);)NIX(close(client_socket);)
    return false;
  }

  // Активация Keep-Alive
  if(!enableKeepAlive(client_socket)) {
    shutdown(client_socket, 0);
    WIN(closesocket)NIX(close)(client_socket)
  }

  std::unique_ptr<Client> client(new Client(client_socket, address));
  // Запуск обработчика подключения
  connect_hndl(*client);
  // Добавление клиента в список клиентов
  client_mutex.lock();
  client_list.emplace_back(std::move(client));
  client_mutex.unlock();
  return true;
}

// Отправка данных всем клиентам
void TcpServer::sendData(const void* buffer, const size_t size) {
  for(std::unique_ptr<Client>& client : client_list)
    client->sendData(buffer, size);
}

// Отправка данных по конкретному хосту и порту
bool TcpServer::sendDataBy(uint32_t host, uint16_t port, const void* buffer, const size_t size) {
  bool data_is_sended = false;
  for(std::unique_ptr<Client>& client : client_list)
    if(client->getHost() == host &&
       client->getPort() == port) {
      client->sendData(buffer, size);
      data_is_sended = true;
    }
  return data_is_sended;
}

// Отключение клиента по конкретному хосту и порту
bool TcpServer::disconnectBy(uint32_t host, uint16_t port) {
  bool client_is_disconnected = false;
  for(std::unique_ptr<Client>& client : client_list)
    if(client->getHost() == host &&
       client->getPort() == port) {
      client->disconnect();
      client_is_disconnected = true;
    }
  return client_is_disconnected;
}

// Отключение всех клиентов
void TcpServer::disconnectAll() {
  for(std::unique_ptr<Client>& client : client_list)
    client->disconnect();
}

// Цикл обработки входящих подключений
// (исполняется в отдельном потоке)
void TcpServer::handlingAcceptLoop() {
  SockLen_t addrlen = sizeof(SocketAddr_in);
  // Пока сервер запущен
  while (_status == status::up) {
    SocketAddr_in client_addr;
    // Принятеи новго подключения (блокирующи вызов)
    if (Socket client_socket = accept(serv_socket, (struct sockaddr*)&client_addr, &addrlen);
        client_socket WIN(!= 0)NIX(>= 0) && _status == status::up) {
      // Если получен сокет с ошибкой продолжить ожидание
      if(client_socket == WIN(INVALID_SOCKET)NIX(-1)) continue;

      // Активировать Keep-Alive для клиента
      if(!enableKeepAlive(client_socket)) {
        shutdown(client_socket, 0);
        WIN(closesocket)NIX(close)(client_socket);
      }
      
      std::unique_ptr<Client> client(new Client(client_socket, client_addr));
      // Запустить обработчик подключений
      connect_hndl(*client);
      // Добавить клиента в список клиентов
      client_mutex.lock();
      client_list.emplace_back(std::move(client));
      client_mutex.unlock();
    }
  }

}

// Цикл ожидания данных
void TcpServer::waitingDataLoop() {
  using namespace std::chrono_literals;
  while (true) {
    client_mutex.lock();
    // Перебрать всех клиентов
    for(auto it = client_list.begin(), end = client_list.end(); it != end; ++it) {
      auto& client = *it;
      // Если unique_ptr содержит объект клиента
      if(client){
        if(DataBuffer data = client->loadData(); data.size) {
          // При наличии данных запустить обработку входящих данных в отдельном потоке
          std::thread([this, _data = std::move(data), &client]{
            client->access_mtx.lock();
            handler(_data, *client);
            client->access_mtx.unlock();
          }).detach();
        } else if(client->_status == SocketStatus::disconnected) {
          // При отключении клиента запустить обработку в отдельном потоке
          std::thread([this, &client, it]{
          	// Извлечь объект клиента из unique_ptr в списке
            client->access_mtx.lock();
            Client* pointer = client.release();
            client = nullptr;
            pointer->access_mtx.unlock();
            // Запуск обработчика отключения
            disconnect_hndl(*pointer);
            // Удалить элемент клиента из списка
            client_list.erase(it);
            // Удалить объект клиента
            delete pointer;
          }).detach();
        }
      }
    }
    client_mutex.unlock();
    // Ожидание 50 млисекунд так как в данном потоке
    // не содержится блокирующих вызовов и данный
    // цикл сильно повышает загруженность CPU
    std::this_thread::sleep_for(50ms);
  }
}

// Функция запуска и конфигурации Keep-Alive для сокета
bool TcpServer::enableKeepAlive(Socket socket) {
  int flag = 1;
#ifdef _WIN32
  tcp_keepalive ka {1, ka_conf.ka_idle * 1000, ka_conf.ka_intvl * 1000};
  if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, (const char *) &flag, sizeof(flag)) != 0) return false;
  unsigned long numBytesReturned = 0;
  if(WSAIoctl(socket, SIO_KEEPALIVE_VALS, &ka, sizeof (ka), nullptr, 0, &numBytesReturned, 0, nullptr) != 0) return false;
#else //POSIX
  if(setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) == -1) return false;
  if(setsockopt(socket, IPPROTO_TCP, TCP_KEEPIDLE, &ka_conf.ka_idle, sizeof(ka_conf.ka_idle)) == -1) return false;
  if(setsockopt(socket, IPPROTO_TCP, TCP_KEEPINTVL, &ka_conf.ka_intvl, sizeof(ka_conf.ka_intvl)) == -1) return false;
  if(setsockopt(socket, IPPROTO_TCP, TCP_KEEPCNT, &ka_conf.ka_cnt, sizeof(ka_conf.ka_cnt)) == -1) return false;
#endif
  return true;
}

С реализацией сервера определились, но теперь остаётся реализация объекта клиента со стороны сервера:

TcpServerClient.cpp


#include "../include/TcpServer.h"

#ifdef _WIN32
// Макросы для выражений зависимых от OS
#define WIN(exp) exp
#define NIX(exp)

// Конвертировать WinSocket код ошибки в Posix код ошибки
inline int convertError() {
    switch (WSAGetLastError()) {
    case 0:
        return 0;
    case WSAEINTR:
        return EINTR;
    case WSAEINVAL:
        return EINVAL;
    case WSA_INVALID_HANDLE:
        return EBADF;
    case WSA_NOT_ENOUGH_MEMORY:
        return ENOMEM;
    case WSA_INVALID_PARAMETER:
        return EINVAL;
    case WSAENAMETOOLONG:
        return ENAMETOOLONG;
    case WSAENOTEMPTY:
        return ENOTEMPTY;
    case WSAEWOULDBLOCK:
        return EAGAIN;
    case WSAEINPROGRESS:
        return EINPROGRESS;
    case WSAEALREADY:
        return EALREADY;
    case WSAENOTSOCK:
        return ENOTSOCK;
    case WSAEDESTADDRREQ:
        return EDESTADDRREQ;
    case WSAEMSGSIZE:
        return EMSGSIZE;
    case WSAEPROTOTYPE:
        return EPROTOTYPE;
    case WSAENOPROTOOPT:
        return ENOPROTOOPT;
    case WSAEPROTONOSUPPORT:
        return EPROTONOSUPPORT;
    case WSAEOPNOTSUPP:
        return EOPNOTSUPP;
    case WSAEAFNOSUPPORT:
        return EAFNOSUPPORT;
    case WSAEADDRINUSE:
        return EADDRINUSE;
    case WSAEADDRNOTAVAIL:
        return EADDRNOTAVAIL;
    case WSAENETDOWN:
        return ENETDOWN;
    case WSAENETUNREACH:
        return ENETUNREACH;
    case WSAENETRESET:
        return ENETRESET;
    case WSAECONNABORTED:
        return ECONNABORTED;
    case WSAECONNRESET:
        return ECONNRESET;
    case WSAENOBUFS:
        return ENOBUFS;
    case WSAEISCONN:
        return EISCONN;
    case WSAENOTCONN:
        return ENOTCONN;
    case WSAETIMEDOUT:
        return ETIMEDOUT;
    case WSAECONNREFUSED:
        return ECONNREFUSED;
    case WSAELOOP:
        return ELOOP;
    case WSAEHOSTUNREACH:
        return EHOSTUNREACH;
    default:
        return EIO;
    }
}


#else
// Макросы для выражений зависимых от OS
#define WIN(exp)
#define NIX(exp) exp
#endif



#include <iostream>

// Реализация загрузки данных
DataBuffer TcpServer::Client::loadData() {
  // Если клиент не подключён вернуть пустой буффер
  if(_status != SocketStatus::connected) return DataBuffer();
  using namespace std::chrono_literals;
  DataBuffer buffer;
  int err;

  // Чтение длинный данных в неблокирующем режиме
  // MSG_DONTWAIT - Unix флаг неблокирующего режима для recv
  // FIONBIO - Windows-флаг неблокирующего режима для ioctlsocket
  WIN(if(u_long t = true; SOCKET_ERROR == ioctlsocket(socket, FIONBIO, &t)) return DataBuffer();)
  int answ = recv(socket, (char*)&buffer.size, sizeof (buffer.size), NIX(MSG_DONTWAIT)WIN(0));

  // Обработка отключения
  if(!answ) {
    disconnect();
    return DataBuffer();
  } else if(answ == -1) {
    // Чтение кода ошибки
    WIN(
      err = convertError();
      if(!err) {
        SockLen_t len = sizeof (err);
        getsockopt (socket, SOL_SOCKET, SO_ERROR, WIN((char*))&err, &len);
      }
    )NIX(
      SockLen_t len = sizeof (err);
      getsockopt (socket, SOL_SOCKET, SO_ERROR, WIN((char*))&err, &len);
      if(!err) err = errno;
    )
    
    // Отключение неблокирующего режима для Windows
    WIN(if(u_long t = false; SOCKET_ERROR == ioctlsocket(socket, FIONBIO, &t)) return DataBuffer();) 
    
    // Обработка ошибки при наличии
    switch (err) {
      case 0: break;
        // Keep alive timeout
      case ETIMEDOUT:
      case ECONNRESET:
      case EPIPE:
        disconnect();
        [[fallthrough]];
        // No data
      case EAGAIN: return DataBuffer();
      default:
        disconnect();
        std::cerr << "Unhandled error!\n"
                    << "Code: " << err << " Err: " << std::strerror(err) << '\n';
      return DataBuffer();
    }
  }

  // Если прочитанный размер нулевой, то вернуть пустой буффер
  if(!buffer.size) return DataBuffer();
  // Если размер не нулевой выделить буффер в куче для чтения данных
  buffer.data_ptr = (char*)malloc(buffer.size);
  // Чтение данных в блокирующем режиме
  recv(socket, (char*)buffer.data_ptr, buffer.size, 0);
  // Возврат буффера с прочитанными данными
  return buffer;
}

// Обработка отключения клиента
TcpClientBase::status TcpServer::Client::disconnect() {
  _status = status::disconnected;
  // Если сокет не валидный прекратить обработку
  if(socket == WIN(INVALID_SOCKET)NIX(-1)) return _status;
  // Отключение сокета
  shutdown(socket, SD_BOTH)
  // Закрытие сокета
  WIN(closesocket)NIX(close)(socket);
  // Установление в сокета не валидного значения
  socket = WIN(INVALID_SOCKET)NIX(-1);
  return _status;
}

// Отправка данных
bool TcpServer::Client::sendData(const void* buffer, const size_t size) const {
  // Если сокет закрыт вернуть false
  if(_status != SocketStatus::connected) return false;
  // Сформировать сообщение с длинной в начале
  void* send_buffer = malloc(size + sizeof (int));
  memcpy(reinterpret_cast<char*>(send_buffer) + sizeof(int), buffer, size);
  *reinterpret_cast<int*>(send_buffer) = size;
  
  // Отправить сообщение
  if(send(socket, reinterpret_cast<char*>(send_buffer), size + sizeof (int), 0) < 0) return false;
  // Вычистить буффер сообщения
  free(send_buffer);
  return true;
}

// Конструктор клиента
TcpServer::Client::Client(Socket socket, SocketAddr_in address)
  : address(address), socket(socket) {}

// Деструктор клиента с закрытием сокета
TcpServer::Client::~Client() {
  if(socket == WIN(INVALID_SOCKET)NIX(-1)) return;
  shutdown(socket, SD_BOTH);
  WIN(closesocket(socket);)
  NIX(close(socket);)
}

// Получить хост клиента
uint32_t TcpServer::Client::getHost() const {return NIX(address.sin_addr.s_addr) WIN(address.sin_addr.S_un.S_addr);}
// Получить порт клиента
uint16_t TcpServer::Client::getPort() const {return address.sin_port;}

Вывод

Как видно из реализации данный сервер ориентирован на Windows и Unix-подобные операционные системы. Данный сервер может постоянно принимать соединения и данные от клиентов, а так же ввиду активного Keep-Alive определять неактивные соединения и закрывать их. Стоит отметить что реализация хоть и проста в использоавнии:

main.cpp


#include "server/hdr/TcpServer.h"

#include <iostream>

//Parse ip to std::string
std::string getHostStr(const TcpServer::Client& client) {
uint32_t ip = client.getHost ();
return std::string() + std::to_string(int(reinterpret_cast<char*>(&ip)[0])) + '.' +
        std::to_string(int(reinterpret_cast<char*>(&ip)[1])) + '.' +
        std::to_string(int(reinterpret_cast<char*>(&ip)[2])) + '.' +
        std::to_string(int(reinterpret_cast<char*>(&ip)[3])) + ':' +
        std::to_string( client.getPort ());
}

int main() {
    //Create object of TcpServer
    TcpServer server( 8080, [](DataBuffer data, TcpServer::Client& client){
        std::cout << "("<<getHostStr(client)<<")[ " << data.size << " bytes ]: " << (char*)data.data_ptr << '\n';
        client.sendData("Hello, client!", sizeof("Hello, client!"));
        }, {1, 1, 1}); // Keep alive{idle:1s, interval: 1s, package count: 1};
    
    //Start server
    if(server.start() == TcpServer::status::up) {
        std::cout << "Server is up!" << std::endl;
        server.joinLoop(); //Joing to the client handling loop
    } else {
        std::cout << "Server start error! Error code:" << int(server.getStatus()) << std::endl;
        return -1;
    }

}

Но имеет недостатки в использовании многопоточности, так как два потока постоянно занимаются циклами обработки входящих соединений и входящих данных, а также при каждом входящем сообщении со стороны клиента, на сервере порождается отдельный поток для его обработки. Наиболее оптимальным решением данной задачи было бы использовать пулы потоков из библиотек Boost Asio или libuv.

24.01.22 UPD: На текущий момент TcpServer переписан с использование кастомной реализацией пула потоков. Работоспособность протестирована на Linux.

Комментарии