Есть прокси-сервер, написанный на асинхронном API Boost.Asio - async_*
функции и коллбеки.
Полный код есть в этом ответе.
Схематично его можно описать так:
Цикл приема входящих соединений, установка соединения с сервером назначения:
void accept_loop() {
acceptor.async_accept(src_socket, [](auto err) {
accept_loop(); // повторяем accept
dst_socket.async_connect(dst_endpoint, [](auto err) {
proxy_loop(src_socket, dst_socket);
proxy_loop(dst_socket, src_socket);
});
});
}
Цикл передачи данных, по 2 шт. на каждое соединение:
void proxy_loop(socket src, socket dst) {
async_read(src, buf, [](auto error, auto n) {
async_write(dst, buf, [](auto error, auto n) {
proxy_loop(src, dst); // повторяем read
});
});
}
Как переписать этот сервер с использованием сопрограмм С++?
В настоящий момент нет стандартного класса Future
, который был бы совместим с сопрограммами и co_await
.
Также Boost.Asio еще не поддерживает co_await
из коробки.
Поэтому мы напишем и то и другое, всего за сто строк кода.
Начнем с универсального Future
, который можно вернуть из сопрограммы, и который можно ждать в co_await
.
template<typename T>
struct Future {
// На памяти экономить не будем,
// поэтому данные будем хранить в некотором "общем состоянии",
// результат будем копировать (или перемещать).
struct SharedState {
T value;
std::experimental::coroutine_handle<> h;
std::atomic<bool> is_ready;
};
// Поддержка использования Future как результата сопрограммы.
struct promise_type {
std::shared_ptr<SharedState> s = std::make_shared<SharedState>();
Future<T> get_return_object() { return {s}; }
std::experimental::suspend_never initial_suspend() { return {}; }
// SharedState переживет удаление promise_type в конце работы сопрограммы
std::experimental::suspend_never final_suspend() { return {}; }
void return_value(T value) const {
s->value = std::move(value);
if (s->is_ready.exchange(true)) s->h.resume();
}
};
std::shared_ptr<SharedState> s;
// Поддержка co_await.
bool await_ready() noexcept { return false; }
bool await_suspend(std::experimental::coroutine_handle<> h) noexcept {
s->h = h;
return !s->is_ready.exchange(true);
}
T await_resume() { return std::move(s->value); }
};
Теперь пишем всё то же самое, но для случая когда сопрограмма не возвращает значений.
template<>
struct Future<void> {
struct SharedState {
std::experimental::coroutine_handle<> h;
std::atomic<bool> is_ready;
};
struct promise_type {
std::shared_ptr<SharedState> s = std::make_shared<SharedState>();
Future<void> get_return_object() { return {s}; }
std::experimental::suspend_never initial_suspend() { return {}; }
std::experimental::suspend_never final_suspend() { return {}; }
void return_void() const {
if (s->is_ready.exchange(true)) s->h.resume();
}
};
std::shared_ptr<SharedState> s;
bool await_ready() noexcept { return false; }
bool await_suspend(std::experimental::coroutine_handle<> h) noexcept {
s->h = h;
return !s->is_ready.exchange(true);
}
void await_resume() {}
};
Это был наш Future.
Теперь пишем обертки над async_*
функциями Boost.Asio.
Мы можем использовать тот же Future::promise_type, как будто это сопрограмма.
Future<boost::system::error_code> coro_accept(boost::asio::ip::tcp::acceptor& acceptor,
boost::asio::ip::tcp::socket& socket) {
Future<boost::system::error_code>::promise_type p;
acceptor.async_accept(socket, [p](auto error) { p.return_value(error); });
return p.get_return_object();
}
Future<boost::system::error_code> coro_connect(boost::asio::ip::tcp::socket& socket,
boost::asio::ip::tcp::endpoint endpoint) {
Future<boost::system::error_code>::promise_type p;
socket.async_connect(endpoint, [p](auto error) { p.return_value(error); });
return p.get_return_object();
}
Если callback принимает больше одного параметра, то их можно сделать out-параметрами.
В С++17 можно будет использовать tuple
и structured bindings для распаковки.
template<typename Buffers>
Future<boost::system::error_code> coro_read(boost::asio::ip::tcp::socket& socket,
Buffers bufs, std::size_t& bytes_read) {
Future<boost::system::error_code>::promise_type p;
socket.async_read_some(bufs,
[p, &bytes_read](auto error, auto n) {
bytes_read = n; p.return_value(error);
});
return p.get_return_object();
}
template<typename Buffers>
Future<boost::system::error_code> coro_write_all(boost::asio::ip::tcp::socket& socket,
Buffers bufs, std::size_t& bytes_written) {
Future<boost::system::error_code>::promise_type p;
async_write(socket, bufs, boost::asio::transfer_all(),
[p, &bytes_written](auto error, auto n) {
bytes_written = n; p.return_value(error);
});
return p.get_return_object();
}
И наконец сам код сервера, те же ~50 строк что и в оригинале
boost::asio::io_service io_service;
boost::asio::ip::tcp::resolver resolver(io_service);
boost::asio::ip::tcp::resolver::query dst_query("arrowd.name", "80");
boost::asio::ip::tcp::resolver::iterator dst_iterator = resolver.resolve(dst_query);
boost::asio::ip::tcp::endpoint dst_endpoint = *dst_iterator;
Future<void> proxy(boost::asio::ip::tcp::socket& src,
boost::asio::ip::tcp::socket& dst) {
char buf[4096];
for (;;) {
std::size_t bytes_read;
auto error = co_await coro_read(src, boost::asio::buffer(buf), bytes_read);
std::cout << "read " << bytes_read << ' ' << error << '\n';
if (error) break;
std::size_t bytes_written;
error = co_await coro_write_all(dst, boost::asio::buffer(buf, bytes_read),
bytes_written);
std::cout << "write " << bytes_written << ' ' << error << '\n';
if (error) break;
}
// Закрытие сокетов вызовет ошибку в сопрограмме которая качает в другую сторону
src.close();
dst.close();
}
Future<void> connect(boost::asio::ip::tcp::socket src) {
boost::asio::ip::tcp::socket dst(io_service);
auto error = co_await coro_connect(dst, dst_endpoint);
std::cout << "connect " << error << '\n';
if (error) co_return;
auto _ = proxy(src, dst); // Запускаем первую сопрограмму без ожидания
co_await proxy(dst, src); // Запускаем вторую и ждем
// Мы вышли из второй с какой-то ошибкой
co_await _; // Ждем завершения первой
}
Future<void> accept_loop() {
boost::asio::ip::tcp::endpoint src_endpoint(
boost::asio::ip::address_v4::loopback(), 8080); // localhost:8080
boost::asio::ip::tcp::acceptor acceptor{io_service, src_endpoint};
for (;;) {
boost::asio::ip::tcp::socket src(io_service);
auto error = co_await coro_accept(acceptor, src);
std::cout << "accept " << error << '\n';
if (error) co_return;
connect(std::move(src));
}
}
int main() {
accept_loop();
io_service.run(); // Можно запустить параллельно в нескольких потоках
}
Код стал гораздо чище - пропали коллбеки.
connect
пришлось вынести из accept_loop
в отдельную функцию, т.к. это отдельная сопрограмма.
Всё что явно выделялось в динамической памяти теперь выглядит как локальные переменные. При этом оно всеравно живет в динамической памяти, в coroutine-state. Это позволяет передавать сокеты в proxy
по ссылке - их время жизни привязно к connect
.
В данном коде отсутствует обработка исключений.
Для поддержки исключений, Future
должно уметь перебрасывать исключения при помощи std::exception_ptr
.
Виртуальный выделенный сервер (VDS) становится отличным выбором
Решил реализовать на своём сайте поисковикСделал через ajax в js
Изменяю размер окна браузера, шапка двигаетсяКак сделать, чтобы заголовок не перемещался?
Я дописываю стили к другому сайтуУ меня свой normalize, у сайта свой (особенно проблема в свойстве box-sizing)
перестал минифицировать CSS файлыВместо этого выдает код страницы https://cssminifier