Как использовать сопрограммы С++ с Boost.Asio?

521
10 августа 2017, 23:51

Есть прокси-сервер, написанный на асинхронном API Boost.Asio - async_* функции и коллбеки.

Полный код есть в этом ответе.
Схематично его можно описать так:

Цикл приема входящих соединений, установка соединения с сервером назначения:

void accept_loop() {
   acceptor.async_accept(src_socket, [](auto err) {
     accept_loop();  // повторяем accept
     dst_socket.async_connect(dst_endpoint, [](auto err) {
       proxy_loop(src_socket, dst_socket);
       proxy_loop(dst_socket, src_socket);
     });
  });
}

Цикл передачи данных, по 2 шт. на каждое соединение:

void proxy_loop(socket src, socket dst) {
  async_read(src, buf, [](auto error, auto n) {
    async_write(dst, buf, [](auto error, auto n) {
      proxy_loop(src, dst);  // повторяем read
    });
  });
}

Как переписать этот сервер с использованием сопрограмм С++?

Answer 1

В настоящий момент нет стандартного класса Future, который был бы совместим с сопрограммами и co_await.
Также Boost.Asio еще не поддерживает co_await из коробки.

Поэтому мы напишем и то и другое, всего за сто строк кода.

Начнем с универсального Future, который можно вернуть из сопрограммы, и который можно ждать в co_await.

template<typename T>
struct Future {
  // На памяти экономить не будем,
  // поэтому данные будем хранить в некотором "общем состоянии",
  // результат будем копировать (или перемещать).
  struct SharedState {
    T value;
    std::experimental::coroutine_handle<> h;
    std::atomic<bool> is_ready;
  };
  // Поддержка использования Future как результата сопрограммы.
  struct promise_type {
    std::shared_ptr<SharedState> s = std::make_shared<SharedState>();
    Future<T> get_return_object() { return {s}; }
    std::experimental::suspend_never initial_suspend() { return {}; }
    // SharedState переживет удаление promise_type в конце работы сопрограммы
    std::experimental::suspend_never final_suspend() { return {}; }
    void return_value(T value) const {
      s->value = std::move(value);
      if (s->is_ready.exchange(true)) s->h.resume();
    }
  };
  std::shared_ptr<SharedState> s;
  // Поддержка co_await.
  bool await_ready() noexcept { return false; }
  bool await_suspend(std::experimental::coroutine_handle<> h) noexcept {
    s->h = h;
    return !s->is_ready.exchange(true);
  }
  T await_resume() { return std::move(s->value); }
};

Теперь пишем всё то же самое, но для случая когда сопрограмма не возвращает значений.

template<>
struct Future<void> {
  struct SharedState {
    std::experimental::coroutine_handle<> h;
    std::atomic<bool> is_ready;
  };
  struct promise_type {
    std::shared_ptr<SharedState> s = std::make_shared<SharedState>();
    Future<void> get_return_object() { return {s}; }
    std::experimental::suspend_never initial_suspend() { return {}; }
    std::experimental::suspend_never final_suspend() { return {}; }
    void return_void() const {
      if (s->is_ready.exchange(true)) s->h.resume();
    }
  };
  std::shared_ptr<SharedState> s;
  bool await_ready() noexcept { return false; }
  bool await_suspend(std::experimental::coroutine_handle<> h) noexcept {
    s->h = h;
    return !s->is_ready.exchange(true);
  }
  void await_resume() {}
};

Это был наш Future.

Теперь пишем обертки над async_* функциями Boost.Asio.

Мы можем использовать тот же Future::promise_type, как будто это сопрограмма.

Future<boost::system::error_code> coro_accept(boost::asio::ip::tcp::acceptor& acceptor,
                                              boost::asio::ip::tcp::socket& socket) {
  Future<boost::system::error_code>::promise_type p;
  acceptor.async_accept(socket, [p](auto error) { p.return_value(error); });
  return p.get_return_object();
}
Future<boost::system::error_code> coro_connect(boost::asio::ip::tcp::socket& socket,
                                               boost::asio::ip::tcp::endpoint endpoint) {
  Future<boost::system::error_code>::promise_type p;
  socket.async_connect(endpoint, [p](auto error) { p.return_value(error); });
  return p.get_return_object();
}

Если callback принимает больше одного параметра, то их можно сделать out-параметрами.
В С++17 можно будет использовать tuple и structured bindings для распаковки.

template<typename Buffers>
Future<boost::system::error_code> coro_read(boost::asio::ip::tcp::socket& socket,
                                            Buffers bufs, std::size_t& bytes_read) {
  Future<boost::system::error_code>::promise_type p;
  socket.async_read_some(bufs,
    [p, &bytes_read](auto error, auto n) {
      bytes_read = n; p.return_value(error);
    });
  return p.get_return_object();
}
template<typename Buffers>
Future<boost::system::error_code> coro_write_all(boost::asio::ip::tcp::socket& socket,
                                                 Buffers bufs, std::size_t& bytes_written) {
  Future<boost::system::error_code>::promise_type p;
  async_write(socket, bufs, boost::asio::transfer_all(), 
    [p, &bytes_written](auto error, auto n) {
      bytes_written = n; p.return_value(error);
    });
  return p.get_return_object();
}

И наконец сам код сервера, те же ~50 строк что и в оригинале

boost::asio::io_service io_service;
boost::asio::ip::tcp::resolver resolver(io_service);
boost::asio::ip::tcp::resolver::query dst_query("arrowd.name", "80");
boost::asio::ip::tcp::resolver::iterator dst_iterator = resolver.resolve(dst_query);
boost::asio::ip::tcp::endpoint dst_endpoint = *dst_iterator;
Future<void> proxy(boost::asio::ip::tcp::socket& src,
                   boost::asio::ip::tcp::socket& dst) {
  char buf[4096];
  for (;;) {
    std::size_t bytes_read;
    auto error = co_await coro_read(src, boost::asio::buffer(buf), bytes_read);
    std::cout << "read " << bytes_read << ' ' << error << '\n';
    if (error) break;
    std::size_t bytes_written;
    error = co_await coro_write_all(dst, boost::asio::buffer(buf, bytes_read),
                                    bytes_written);
    std::cout << "write " << bytes_written << ' ' << error << '\n';
    if (error) break;
  }
  // Закрытие сокетов вызовет ошибку в сопрограмме которая качает в другую сторону
  src.close();
  dst.close();
}
Future<void> connect(boost::asio::ip::tcp::socket src) {
  boost::asio::ip::tcp::socket dst(io_service);
  auto error = co_await coro_connect(dst, dst_endpoint);
  std::cout << "connect " << error << '\n';
  if (error) co_return;
  auto _ = proxy(src, dst);  // Запускаем первую сопрограмму без ожидания
  co_await proxy(dst, src);  // Запускаем вторую и ждем
                             // Мы вышли из второй с какой-то ошибкой
  co_await _;                // Ждем завершения первой
}
Future<void> accept_loop() {
  boost::asio::ip::tcp::endpoint src_endpoint(
      boost::asio::ip::address_v4::loopback(), 8080);  // localhost:8080
  boost::asio::ip::tcp::acceptor acceptor{io_service, src_endpoint};
  for (;;) {
    boost::asio::ip::tcp::socket src(io_service);
    auto error = co_await coro_accept(acceptor, src);
    std::cout << "accept " << error << '\n';
    if (error) co_return;
    connect(std::move(src));
  }
}
int main() {
  accept_loop();
  io_service.run();  // Можно запустить параллельно в нескольких потоках
}

Код стал гораздо чище - пропали коллбеки.
connect пришлось вынести из accept_loop в отдельную функцию, т.к. это отдельная сопрограмма.

Всё что явно выделялось в динамической памяти теперь выглядит как локальные переменные. При этом оно всеравно живет в динамической памяти, в coroutine-state. Это позволяет передавать сокеты в proxy по ссылке - их время жизни привязно к connect.

В данном коде отсутствует обработка исключений.
Для поддержки исключений, Future должно уметь перебрасывать исключения при помощи std::exception_ptr.

READ ALSO
Не передаются данные методом POST через ajax

Не передаются данные методом POST через ajax

Решил реализовать на своём сайте поисковикСделал через ajax в js

426
как сделать чтобы header не двигался

как сделать чтобы header не двигался

Изменяю размер окна браузера, шапка двигаетсяКак сделать, чтобы заголовок не перемещался?

386
Можно ли как-нибудь написать стили только к определенным CSS селекторам (.css файлу), если у них нет общего селектора?

Можно ли как-нибудь написать стили только к определенным CSS селекторам (.css файлу), если у них нет общего селектора?

Я дописываю стили к другому сайтуУ меня свой normalize, у сайта свой (особенно проблема в свойстве box-sizing)

390
sublimetext3: поломался плагин minifier для CSS

sublimetext3: поломался плагин minifier для CSS

перестал минифицировать CSS файлыВместо этого выдает код страницы https://cssminifier

440