talking_async_why_cpp20_ep1
Talking Async Ep1: Why C++20 is the Awesomest Language for Network Programming - Chris & Klemens
A "man-in-the-middle" proxy scenario
-
A "man-in-the-middle" proxy is a networking component that intercepts communications between a client and a server.
-
It sits between the client and the server and relays messages between them.
-
The term "man-in-the-middle" refers to the fact that the proxy sits in the middle of the communication path, allowing it to monitor, intercept, and potentially modify the data being exchanged.
-
In the context of the discussion, the "man-in-the-middle proxy" is specifically designed to forward bytes of data between a client and a remote server.
-
When a client initiates a connection to the proxy, the proxy establishes a connection to the remote server on behalf of the client. Once both connections are established, the proxy acts as an intermediary, passing data between the client and the server bidirectionally.
-
the sequence of operations involved in the proxy, including asynchronous accept, connect, read, and write operations.

Implementation details: start from main
// The listen function implements an asynchronous accept loop,
// waiting for new connections and handling them as they arrive.
void listen(tcp::acceptor& acceptor, tcp::endpoint target) {
// Utilizes a lambda function as the callback for async_accept,
// which is triggered when a new connection is successfully
// established.
acceptor.async_accept(
[&acceptor, target](std::error_code error, tcp::socket client) {
if (!error) {
// Upon a successful connection, the callback initializes a proxy
// object (handle the forwarding of data between the client and
// the target server) for each new client connection.
std::make_shared<proxy>(std::move(client))->connect_to_server(target);
}
// recursive chain of accept, ensuring the server continuously
// listens for new connections.
listen(acceptor, target);
});
}
int main(int argc, char* argv[]) {
try {
if (argc != 5) {
std::cerr << "Usage: proxy";
std::cerr << " <listen_address> <listen_port>";
std::cerr << " <target_address> <target_port>\n";
return 1;
}
// Central to Asio's operation, handling the asynchronous event loop.
asio::io_context ctx;
auto listen_endpoint =
*tcp::resolver(ctx).resolve(
argv[1], argv[2], tcp::resolver::passive);
auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);
tcp::acceptor acceptor(ctx, listen_endpoint);
listen(acceptor, target_endpoint);
ctx.run();
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
}
Implementation details: The classic recursive handling
// the object that represents an individual connection from a client
class proxy : public std::enable_shared_from_this<proxy> {
public:
proxy(tcp::socket client)
: client_(std::move(client)), server_(client_.get_executor()) {}
void connect_to_server(tcp::endpoint target) {
auto self = shared_from_this();
server_.async_connect(target, [self](std::error_code error) {
// initiates an asynchronous connection to the target server endpoint.
// Upon successful connection, it starts reading from both the client and
// the server simultaneously.
if (!error) {
self->read_from_client();
self->read_from_server();
}
});
}
private:
void stop() {
client_.close();
server_.close();
}
void read_from_client() { // <---- 1
auto self = shared_from_this();
client_.async_read_some(buffer(data_from_client_),
[self](std::error_code error, std::size_t n) {
if (!error) {
self->write_to_server(n); // <---- 2
} else {
self->stop();
}
});
}
void write_to_server(std::size_t n) { // <---- 3
auto self = shared_from_this();
async_write(server_, buffer(data_from_client_, n),
[self](std::error_code ec, std::size_t /*n*/) {
if (!ec) {
// After writing, it loops back to reading from the client,
// maintaining a continuous data transfer loop from the
// client to the server.
self->read_from_client(); // <---- 4 (back to 1)
} else {
self->stop();
}
});
}
// ---------------------------------------------------------------------------
void read_from_server() { // <---- a
auto self = shared_from_this();
server_.async_read_some(asio::buffer(data_from_server_),
[self](std::error_code error, std::size_t n) {
if (!error) {
self->write_to_client(n); // <---- b
} else {
self->stop();
}
});
}
void write_to_client(std::size_t n) { // <---- c
auto self = shared_from_this();
async_write(client_, buffer(data_from_server_, n),
[self](std::error_code ec, std::size_t /*n*/) {
if (!ec) {
// After sending, it loops back to reading from the server,
// thus maintaining a continuous data transfer loop from the
// server to the client.
self->read_from_server(); // <---- d (back to a)
} else {
self->stop();
}
});
}
tcp::socket client_;
tcp::socket server_;
std::array<char, 1024> data_from_client_;
std::array<char, 1024> data_from_server_;
};
The evolvement of asio
Old Asynchronous API Specification
// Asynchronous API specification (old)
template <typename CompletionHandler>
void foo(/ *...* /, CompletionHandler&& handler);
// Completion Signature:
void(A1 /*, ... */);
- Completion Handler Model:
- Originally, Asio's asynchronous operations were designed to accept a completion handler, typically a lambda or function pointer, which defined the actions to take upon completion.
- The signature of this model explicitly indicates the type of the completion handler and the parameters it expects upon the operation's completion.
New Asynchronous API Specification
// Asynchronous API specification (new)
template <typename CompletionToken>
DEDUCED foo(/ *...* /, CompletionToken&& token);
// DEDUCED based on the CompletionToken passed in
- Completion Token Model:
- The modern approach introduces a more generic interface that accepts a
CompletionToken, offering greater flexibility. - The return type (
DEDUCED) of asynchronous operations is deduced based on the type of completion token provided, allowing for different types of interaction patterns and return values.
- The modern approach introduces a more generic interface that accepts a

Role of async_result Trait
- Customization and Flexibility:
- At the heart of this new model is the
async_resulttrait, serving as a customization point that adapts the asynchronous operation to the specific needs dictated by the completion token. - This trait takes into account the signature of the operation, its implementation, and the user-supplied completion token to tailor the operation's behavior and return type.
- At the heart of this new model is the
Implications of the Transition
-
Enhanced Usability and Customization:
- This evolution allows users to specify not just the actions to be taken upon completion but also how the completion is reported (e.g., through futures, coroutines, or traditional callbacks).
- It supports a wider array of asynchronous programming paradigms within the same framework, making Asio more adaptable to different programming needs and styles.
-
Backward Compatibility and Simplicity:
- Despite the introduction of completion tokens, the system retains backward compatibility with the traditional callback model. When a callback is used as a completion token, the behavior remains essentially unchanged from the user's perspective.
- This ensures that existing codebases can gradually adopt the new model without requiring immediate, extensive refactoring.
What Completion"Token" actually differs from CompletionHandler?
Completion Handler
- Definition: A Completion Handler is a callback function (or function object) that is called upon the completion of an asynchronous operation. It directly handles the result of the operation.
- Behavior: Traditionally, when an asynchronous operation completes, the Completion Handler is invoked with the operation's outcome, such as data read from a socket or an error code.
- Example Use: Passing a lambda function directly to an asynchronous operation function as a callback to handle the operation's result.
Completion Token
- Definition: A Completion Token is a more abstract concept than a Completion Handler. It represents a way of specifying how the completion of an asynchronous operation should be handled but is not itself a handler of the operation's result.
- Flexibility: The Completion Token mechanism allows for different models of handling the completion of asynchronous operations, such as futures, coroutines, or custom handling mechanisms provided by third-party libraries.
- Transformation: When an asynchronous operation is initiated with a Completion Token, the token is transformed (via the
async_resulttrait mechanism) into an appropriate handler and a return type for the operation, which might be a future, a suspension point in a coroutine, or another form of result object.
Practical Examples and Implications
- Futures:
- Using
use_futureas a Completion Token changes the behavior of an asynchronous operation to return astd::future, which can be used to wait for the operation's completion and obtain its result synchronously. - Example:
auto f = socket.async_read_some(buffer, use_future);
- Using
// completion handler with use_future
auto f = foo(/ *...* /, use_future) ;
// the power of completion "token"
future<A1> foo(/ *...* /, const use_future_t& &&token);
// the future and token enables us to do:
future<size_t> f =
socket.async_read_some(buffer, use_future);
// ...
size_t n = f.get();
- Coroutines:
- Completion Tokens can integrate with coroutine mechanisms, allowing asynchronous operations to be awaited with
co_await, thus blending asynchronous operations into coroutine control flow seamlessly. - Example: Integration with C++20 coroutines using
use_awaitable.
- Completion Tokens can integrate with coroutine mechanisms, allowing asynchronous operations to be awaited with
// the future and token enables us to even use coroutine
void foo(yield_context yield) {
size_t n = socket.async_read_some(buffer, yield);
//...
}
// or even C++20 coroutine
awaitable<void> foo() {
size_t n =
co_await socket.async_read_some(buffer, use_awaitable);
// ...
}
- Third-Party Libraries:
- Libraries like Boost.Coroutine and Boost.Fiber can define their Completion Tokens, like
yield_contextfor stackful coroutines or specific tokens for fibers, enabling asynchronous operations to suspend and resume coroutines or fiber tasks.
- Libraries like Boost.Coroutine and Boost.Fiber can define their Completion Tokens, like
// or even with third party libs
void foo() {
size_t n = socket.async_read_some(
buffer, boost::fibers::asio::yield);
// ...
}
Advantages of Completion Token over Completion Handler
- Versatility: Allows the same asynchronous operation to be used in different execution models (callbacks, futures, coroutines) without changing the operation's implementation.
- Simplification: Offers a cleaner, more readable syntax for asynchronous programming by abstracting away the boilerplate code associated with setting up completion handlers.
- Integration: Facilitates easier integration with new C++ language features (like coroutines) and third-party libraries by providing a customizable interface for handling operation completions.
1. From old style to new coroutine style
Overview of Changes
-
Incorporation of Coroutines:
- Introduction of
co_await,co_return, andco_spawnto manage asynchronous operations, replacing callback chains with a more linear and readable coroutine-based flow.
- Introduction of
-
Shared State Management:
- Use of a shared state (
proxy_state_ptr) to manage sockets for both client-to-server and server-to-client data transfer within the coroutine context.
- Use of a shared state (
-
Error Handling:
- Exception-based error handling within coroutines to manage socket closures and other errors, simplifying control flow compared to traditional callback error handling.
-
Asynchronous Loops:
- Implementation of continuous read-write loops for both directions of proxy data transfer using straightforward loop constructs (
for(;;)) within coroutines, enhancing readability and maintainability.
- Implementation of continuous read-write loops for both directions of proxy data transfer using straightforward loop constructs (
-
Executor Management:
- Utilization of executors from sockets to ensure that coroutine spawns (
co_spawn) and asynchronous operations are executed within the appropriate context, maintaining the single-threaded model of the example.
- Utilization of executors from sockets to ensure that coroutine spawns (
Key Concepts and Functions
-
co_awaitanduse_awaitable: Used to suspend the current coroutine until the awaited asynchronous operation completes, seamlessly integrating asynchronous operations into the coroutine's flow. -
co_spawn: Launches a new coroutine, effectively starting a new asynchronous operation chain. In this example, it's used to run the client-to-server and server-to-client data transfers concurrently. -
awaitable<void>: The return type of functions that are coroutines, indicating that they perform asynchronous operations but do not return a value. -
detached: A special completion token indicating that the result of the coroutine is to be ignored, suitable for fire-and-forget operations where the outcome does not need to be captured. It's effectively an empty lambda as completion token.
Advantages of Using Coroutines
-
Simplified Control Flow: Coroutines allow asynchronous code to be written in a synchronous style, reducing complexity and improving readability.
-
Error Handling: The use of exceptions for error handling in coroutines simplifies the control flow, especially in comparison to traditional callback-based error handling, where errors need to be passed through the callback chain.
-
Concurrency and Parallelism: With
co_spawnand executors, multiple coroutines can be run concurrently, handling multiple client connections efficiently without blocking the main thread.
// Conversion of Callback-Based Code to Awaitable Functions
awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target) {
for (;;) {
// how the newly accepted client socket is obtained.
auto client = co_await acceptor.async_accept(use_awaitable);
auto ex = client.get_executor();
co_spawn(ex, proxy(std::move(client), target), detached);
}
}
awaitable<void> proxy(tcp::socket client, tcp::endpoint target) {
// proxy function, which handles an individual client connection. There is
// need for shared state between asynchronous operations
auto state = std::make_shared<proxy_state>(std::move(client));
co_await state->server.async_connect(target, use_awaitable);
auto ex = state->client.get_executor();
// spawning new chains of asynchronous operations using the co_spawn function
co_spawn(ex, client_to_server(state), detached);
co_await server_to_client(state);
}
// The shared state use in prxy function
struct proxy_state {
proxy_state(tcp::socket client) : client(std::move(client)) {}
tcp::socket client;
tcp::socket server{client.get_executor()};
};
using proxy_state_ptr = std::shared_ptr<proxy_state>;
awaitable<void> client_to_server(proxy_state_ptr state) {
try {
std::array<char, 1024> data;
for (;;) {
auto n =
co_await state->client.async_read_some(buffer(data), use_awaitable);
co_await async_write(state->server, buffer(data, n), use_awaitable);
}
} catch (const std::exception& e) {
state->client.close();
state->server.close();
}
}
awaitable<void> server_to_client(proxy_state_ptr state) {
try {
std::array<char, 1024> data;
for (;;) {
auto n =
co_await state->server.async_read_some(buffer(data), use_awaitable);
co_await async_write(state->client, buffer(data, n), use_awaitable);
}
} catch (const std::exception& e) {
state->client.close();
state->server.close();
}
}
int main(int argc, char* argv[]) {
try {
if (argc != 5) {
std::cerr << "Usage: proxy";
std::cerr << " <listen_address> <listen_port>";
std::cerr << " <target_address> <target_port>\n";
return 1;
}
asio::io_context ctx;
auto listen_endpoint =
*tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);
auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);
tcp::acceptor acceptor(ctx, listen_endpoint);
// above in main are all the same as before, this is the place that changed!
co_spawn(ctx, listen(acceptor, target_endpoint), detached);
ctx.run();
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
}
2. How to do a no-throw way?
// Converts the results of asynchronous operations into a tuple containing an
// error code and the operation result, which can then be handled without
// throwing exceptions.
constexpr auto use_nothrow_awaitable =
asio::experimental::as_tuple(asio::use_awaitable);
// ... (skip, same as before)
awaitable<void> proxy(tcp::socket client, tcp::endpoint target) {
auto state = std::make_shared<proxy_state>(std::move(client));
auto [e] =
co_await state->server.async_connect(
target, use_nothrow_awaitable); // <-- change!
if (!e) {
auto ex = state->client.get_executor();
co_spawn(ex, client_to_server(state), detached);
co_await server_to_client(state);
}
}
awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target) {
for (;;) {
auto [e, client] = co_await acceptor.async_accept(
use_nothrow_awaitable); // <-- change!
if (e) break;
auto ex = client.get_executor();
co_spawn(ex, proxy(std::move(client), target), detached);
}
}
awaitable<void> client_to_server(proxy_state_ptr state) {
std::array<char, 1024> data;
for (;;) {
auto [e1, n1] = co_await state->client.async_read_some(
buffer(data), use_nothrow_awaitable); // <-- change!
if (e1) break;
auto [e2, n2] = co_await async_write(state->server, buffer(data, n1),
use_nothrow_awaitable); // <-- change!
if (e2) break;
}
state->client.close();
state->server.close();
}
awaitable<void> server_to_client(proxy_state_ptr state) {
std::array<char, 1024> data;
for (;;) {
auto [e1, n1] = co_await state->server.async_read_some(
buffer(data), use_nothrow_awaitable); // <-- change!
if (e1) break;
auto [e2, n2] = co_await async_write(state->client, buffer(data, n1),
use_nothrow_awaitable); // <-- change!
if (e2) break;
}
state->client.close();
state->server.close();
}
Key Changes
-
Adapting ASIO Completion Token:
- A new completion token (
use_nothrow_awaitable) is defined to apply an adapter, converting the outcome of asynchronous operations into a tuple format, thus bypassing the default exception-throwing behavior for errors.
- A new completion token (
-
Structured Bindings for Result Handling:
- Structured bindings are utilized to unpack the tuple and separately handle the error code and the operation's result. This approach provides clear and concise syntax for handling operation outcomes directly within the coroutine.
-
Error Handling without Exceptions:
- Error checks are performed directly on the error code component of the tuple, allowing for explicit control flow decisions based on the success or failure of an operation, without relying on exception handling.
Advantages of No-Throw Awaitables
-
Improved Error Handling Clarity: Directly handling error codes makes the control flow regarding error handling more explicit and easier to follow, reducing the overhead and potential complexity associated with exceptions.
-
Performance Considerations: Avoiding exceptions can lead to performance improvements, especially in contexts where exceptions might be expensive or in high-performance applications where every cycle counts.
-
Simplified Control Flow: Eliminating exceptions from the equation simplifies the control flow, especially in error handling, making the code more straightforward to reason about.
Implementing No-Throw Awaitables
-
Each coroutine that performs an asynchronous operation with
use_nothrow_awaitableneeds to adjust its await expressions to expect tuples. This change necessitates reviewing and updating each coroutine to correctly unpack and handle these tuples. -
Exiting loops or coroutines upon encountering errors requires explicit checks on the error codes, shifting the responsibility for error detection and handling to the developer, away from automatic exception-based mechanisms.
3. Remove active connection from server?
struct proxy_state {
proxy_state(tcp::socket client) : client(std::move(client)) {}
tcp::socket client;
tcp::socket server{client.get_executor()};
steady_clock::time_point deadline; // <---added timer!
};
using proxy_state_ptr = std::shared_ptr<proxy_state>;
awaitable<void> client_to_server(proxy_state_ptr state) {
std::array<char, 1024> data;
for (;;) {
// This line is added before read or write operations to extend the deadline
// by a fixed duration (5 seconds in this case) every time an operation is
// initiated.
state->deadline =
std::max(state->deadline, steady_clock::now() + 5s); // <---added deadline
auto [e1, n1] = co_await state->client.async_read_some(
buffer(data), use_nothrow_awaitable);
if (e1) break;
auto [e2, n2] = co_await async_write(state->server, buffer(data, n1),
use_nothrow_awaitable);
if (e2) break;
}
state->client.close();
state->server.close();
}
awaitable<void> server_to_client(proxy_state_ptr state) {
std::array<char, 1024> data;
for (;;) {
state->deadline =
std::max(state->deadline, steady_clock::now() + 5s); // <---added deadline
auto [e1, n1] = co_await state->server.async_read_some(
buffer(data), use_nothrow_awaitable);
if (e1) break;
auto [e2, n2] = co_await async_write(state->client, buffer(data, n1),
use_nothrow_awaitable);
if (e2) break;
}
state->client.close();
state->server.close();
}
// Added new coroutine
awaitable<void> watchdog(proxy_state_ptr state) {
asio::steady_timer timer(state->client.get_executor());
auto now = steady_clock::now();
while (state->deadline > now) {
timer.expires_at(state->deadline);
co_await timer.async_wait(use_nothrow_awaitable);
now = steady_clock::now();
}
// when deadline is not updated, which makes above loop breaks, we close the
// connection!
state->client.close();
state->server.close();
}
awaitable<void> proxy(tcp::socket client, tcp::endpoint target) {
auto state = std::make_shared<proxy_state>(std::move(client));
auto [e] =
co_await state->server.async_connect(target, use_nothrow_awaitable);
if (!e) {
auto ex = state->client.get_executor();
// This sequence initiates the client-to-server and server-to-client data
// transfer operations and the watchdog coroutine to monitor the
// connection's activity.
co_spawn(ex, client_to_server(state), detached);
co_spawn(ex, server_to_client(state), detached); // <-- change!
co_await watchdog(state); // <-- added!
}
}
The provided code and discussion illustrate the implementation of a timeout mechanism for a proxy server using ASIO and C++20 coroutines. This mechanism ensures that connections are automatically closed if inactive for a specified duration, enhancing resource management and server efficiency. Here's a succinct overview and the steps involved in integrating this functionality:
Adding a Timeout Mechanism
-
Deadline Tracking:
- A
deadlinemember is added to theproxy_statestruct to track the latest time by which activity is expected on a connection. - The deadline is updated with a new future time point every time an operation (read or write) is successfully completed, thereby resetting the timeout countdown.
- A
-
Implementing a Watchdog Coroutine:
- A new coroutine,
watchdog, is introduced to monitor the connection's activity against the deadline. - It utilizes an
asio::steady_timerto asynchronously wait until the next deadline. - If the deadline is reached without any activity (i.e., no reset of the deadline by ongoing operations), the connection is deemed inactive, and both client and server sockets are closed.
- A new coroutine,
-
Integration with Proxy Operations:
- The
watchdogcoroutine is integrated into the mainproxycoroutine alongside the existing client-to-server and server-to-client data transfer coroutines. - This integration ensures that the watchdog runs concurrently with data transfer operations, monitoring the connection for inactivity.
- The
4. Avoid prematurely closing connections that may still have data to transfer
// --------------------------------------------------------
// follow the number of comment to understand the changes.
// --------------------------------------------------------
awaitable<void> watchdog(proxy_state_ptr state) {
asio::steady_timer timer(state->client.get_executor());
auto now = steady_clock::now();
while (state->deadline > now) {
timer.expires_at(state->deadline);
co_await timer.async_wait(use_nothrow_awaitable);
now = steady_clock::now();
}
// <------ 1. watchdog no longer closing the socket when deadline reach
}
awaitable<void> proxy(tcp::socket client, tcp::endpoint target) {
auto state = std::make_shared<proxy_state>(std::move(client));
auto [e] =
co_await state->server.async_connect(target, use_nothrow_awaitable);
if (!e) {
// 3. The critical change is here!!
// for asio >= 1.91, each async operation has a cancellation slot.
// The 3 coroutines running in parallel in their own chain, each one has its
// own slot. And asio is actually able to deliver signal to each coroutine
// and instruct it to cancel. The slot reuses for each operation in the
// chain. Such cancel mechanism are built into coroutine api directly.
// so we don't explicit add anything here. When anything is done, with the
// or operation, the remaining task got short circuit, which triggers the
// cancel signal, and the cancel signal in each coroutine will be propagated
// down to cancel everything in the chain of that coroutine.
co_await (client_to_server(state) ||
server_to_client(state) ||
watchdog(state));
// 4. note that co_await(...) guarantees that when code runs to here, one of
// the 3 coroutines above must have finished.
// 2. watchdog no longer cancel stuff, we cancel stuff here.
state->client.close();
state->server.close();
}
}
- Introduced the concept of per-operation cancellation available in ASIO 1.19 and later.
- This feature enables a more granular control over the cancellation of asynchronous operations, preventing the loss of data due to the abrupt closing of connections.
Enhancements for Finer-Grained Cancellation
-
Cancellation Mechanism: Leveraged ASIO's per-operation cancellation feature to enable selective cancellation of operations. This mechanism relies on cancellation slots, which are associated with each asynchronous operation to facilitate cancellation signals.
-
Watchdog Adjustment: The watchdog's role is modified to merely check for inactivity based on a deadline without closing the sockets directly. Instead, if the deadline is exceeded, it initiates the cancellation process for ongoing operations.
-
Cancellation Propagation: Cancellation signals are propagated through cancellation slots, ensuring that any ongoing asynchronous operation like
async_read_someorasync_writeis canceled if the deadline is exceeded, enhancing the proxy server's responsiveness to inactivity.
Implementation Details
-
Logical Operators for Awaitables: Used the logical OR operator to combine awaitable operations (
client_to_server,server_to_client,watchdog). This setup ensures that if any of the operations complete, the others are canceled, thus short-circuiting the combined operation. -
Efficient Cancellation Signaling: Cancellation is implemented using a lightweight, efficient mechanism based on integer bit masks and function callbacks, minimizing overhead.
-
High-Level Interface for Cancellation: Instead of directly manipulating cancellation slots, a high-level interface integrated into awaitable coroutines is used, simplifying the implementation.
Code Modifications
-
Awaitables Combination: The combination of
client_to_server,server_to_client, andwatchdogoperations with logical OR (||) ensures that the completion of any operation triggers the cancellation of the others, preventing unnecessary continuation of inactive connections. -
Selective Operation Completion: By leveraging logical OR, the proxy ensures that operations are only canceled when necessary, preserving the integrity of active data transfers and maintaining the connection's reliability until all necessary data has been transmitted.
5. Removal of Shared State
// server_to_client and client_to_server becomes one function...
awaitable<void> transfer(tcp::socket& from, tcp::socket& to, steady_clock::time_point& deadline)
{
std::array<char, 1024> data;
for (;;)
{
deadline = std::max(deadline, steady_clock::now() + 5s);
//vvv
auto [e1, n1] = co_await from.async_read_some(buffer(data), use_nothrow_awaitable);
if (e1)
co_return;
//vvv
auto [e2, n2] = co_await async_write(to, buffer(data, n1), use_nothrow_awaitable);
if (e2)
co_return;
}
}
awaitable<void> watchdog(steady_clock::time_point& deadline)
{
// vvv
asio::steady_timer timer(co_await this_coro::executor);
auto now = steady_clock::now();
while (deadline > now)
{
timer.expires_at(deadline);
co_await timer.async_wait(use_nothrow_awaitable);
now = steady_clock::now();
}
}
awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
tcp::socket server(client.get_executor());
steady_clock::time_point deadline{};
auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
if (!e)
{
co_await (
// vvv
transfer(client, server, deadline) ||
transfer(server, client, deadline) ||
watchdog(deadline)
);
}
}
Unified Transfer Function
- The
client_to_serverandserver_to_clientfunctions are consolidated into a singletransferfunction. This function is generic enough to handle data transfer in both directions, taking references to the source and destination sockets and a reference to a deadline as parameters.
Deadline Management
- The deadline is updated within the
transferfunction before each read-write cycle, ensuring that the connection remains active only as long as data is being transferred within the specified timeout period.
Enhanced Watchdog Mechanism
- The
watchdogcoroutine is simplified to focus solely on deadline monitoring, utilizing anasio::steady_timerfor this purpose. It no longer directly closes sockets but instead relies on the structured flow of coroutines to manage socket lifetimes effectively.
Removal of Shared State
- By leveraging local variables and references, the need for a shared state (
proxy_state) is eliminated. This change enhances memory management and reduces the complexity of passing state between functions.
Simplified Connection and Resource Management
- Connections are now managed more cleanly, with sockets being automatically closed when they go out of scope, thus ensuring proper resource cleanup without explicitly calling close operations.
Use of this_coro::executor
- To obtain the executor within the
watchdogcoroutine for timer setup,this_coro::executoris used. This approach demonstrates how to access the current coroutine's executor, an essential aspect of managing coroutine lifetimes and execution contexts.
6. Separate cancellation
awaitable<void> watchdog(steady_clock::time_point& deadline) {
asio::steady_timer timer(co_await this_coro::executor);
auto now = steady_clock::now();
while (deadline > now) {
timer.expires_at(deadline);
co_await timer.async_wait(use_nothrow_awaitable);
now = steady_clock::now();
}
}
awaitable<void> proxy(tcp::socket client, tcp::endpoint target) {
tcp::socket server(client.get_executor());
steady_clock::time_point client_to_server_deadline{};
steady_clock::time_point server_to_client_deadline{};
auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
if (!e) {
// The proxy coroutine is adapted to run two pairs of transfer and watchdog
// coroutines, one pair for each direction. Logical AND (`&&`) and OR (`||`)
// operators are used to ensure that both transfer directions are managed
// concurrently but independently.
co_await ((transfer(client, server, client_to_server_deadline) ||
watchdog(client_to_server_deadline)) &&
(transfer(server, client, server_to_client_deadline) ||
watchdog(server_to_client_deadline)));
}
}
Implementing Separate Cancellations
-
Individual Deadlines: By defining separate deadline variables for client-to-server and server-to-client transfers, it's possible to monitor and manage the activity timeouts for each direction independently.
-
Separate Watchdogs: Assigning a watchdog coroutine to each transfer direction allows for the independent cancellation of each transfer based on its specific deadline. This ensures that one direction's timeout doesn't affect the other, improving the proxy's responsiveness and reliability.
Logical Operators and Coroutine Combinations
-
Combining Awaitables: Using logical AND and OR operators with awaitables allows for complex control flows where coroutines can be run concurrently, and their completions or cancellations are managed based on custom logic.
-
Result Aggregation: When combining awaitables, the return values can be aggregated using tuples (for logical AND) or variants (for logical OR), facilitating rich information return from complex asynchronous operations.
7. Per-operation timeout mechanism
// A new coroutine `timeout` is defined to simply wait for a specified duration
// and then complete. This coroutine is used to enforce a timeout on individual
// asynchronous operations.
awaitable<void> timeout(steady_clock::duration duration) {
asio::steady_timer timer(co_await this_coro::executor);
timer.expires_after(duration);
co_await timer.async_wait(use_nothrow_awaitable);
}
awaitable<void> transfer(tcp::socket& from, tcp::socket& to) {
std::array<char, 1024> data;
for (;;) {
// The `transfer` function integrates the `timeout` coroutine with each read
// and write operation using a logical OR (`||`) combined with the
// operation's awaitable. This setup allows either the operation to complete
// or the timeout to elapse, whichever comes first.
auto result1 =
co_await (from.async_read_some(buffer(data), use_nothrow_awaitable) ||
timeout(5s));
// The result of the combined operation and timeout is captured in a
// `std::variant`, which is then inspected to determine whether the
// operation completed successfully or if it timed out.
if (result1.index() == 1) co_return; // timed out
auto [e1, n1] = std::get<0>(result1);
if (e1) break;
auto result2 =
co_await (async_write(to, buffer(data, n1), use_nothrow_awaitable) ||
timeout(1s));
if (result2.index() == 1) co_return; // timed out
auto [e2, n2] = std::get<0>(result2);
if (e2) break;
}
}
awaitable<void> proxy(tcp::socket client, tcp::endpoint target) {
tcp::socket server(client.get_executor());
auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
if (!e) {
// no no longer needs deadline and watchdog
co_await (transfer(client, server) || transfer(server, client));
}
}
Per-Operation Timeout Implementation
- Handling Timeout Results:
Performance Considerations
-
Overhead of Timers: While this per-operation timeout approach provides fine-grained control over operation durations, it introduces additional overhead due to the management of timers for each operation. This may not be ideal for high-performance scenarios where minimizing overhead is crucial.
-
High-Performance Alternatives: For applications requiring high performance, a less granular timeout mechanism (such as the previously discussed watchdog approach) may be more appropriate, as it reduces the number of timers in use and their associated overhead.