Skip to content

Commit

Permalink
数据转发逻辑使用双缓冲并发读写,避免串行读写可能导致效率的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackarain committed Nov 18, 2024
1 parent 2abe005 commit 06d4bda
Showing 1 changed file with 43 additions and 18 deletions.
61 changes: 43 additions & 18 deletions proxy/include/proxy/proxy_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2867,40 +2867,65 @@ R"x*x*x(<html>
template<typename S1, typename S2>
net::awaitable<void> transfer(S1& from, S2& to, size_t& bytes_transferred)
{
std::vector<char> data(1024 * 1024, 0);
boost::system::error_code ec;
bytes_transferred = 0;

stream_rate_limit(from, m_option.tcp_rate_limit_);
stream_rate_limit(to, m_option.tcp_rate_limit_);

stream_expires_after(from, std::chrono::seconds(m_option.tcp_timeout_));

constexpr auto buf_size = 512 * 1024;

std::unique_ptr<char, decltype(&std::free)> buf0((char*)std::malloc(buf_size), &std::free);
std::unique_ptr<char, decltype(&std::free)> buf1((char*)std::malloc(buf_size), &std::free);

// 分别使用主从缓冲指针用于并发读写.
auto primary_buf = buf0.get();
auto secondary_buf = buf1.get();

// 首先邓读取第一个数据作为预备, 以用于后面的交替读写逻辑.
boost::system::error_code ec;
auto bytes = co_await from.async_read_some(net::buffer(primary_buf, buf_size), net_awaitable[ec]);
if (ec || m_abort)
{
if (bytes > 0)
co_await net::async_write(to,
net::buffer(primary_buf, bytes), net_awaitable[ec]);

to.shutdown(net::socket_base::shutdown_send, ec);
co_return;
}

for (; !m_abort;)
{
stream_expires_after(to, std::chrono::seconds(m_option.tcp_timeout_));
stream_expires_after(from, std::chrono::seconds(m_option.tcp_timeout_));

auto bytes = co_await from.async_read_some(
net::buffer(data), net_awaitable[ec]);
if (ec || m_abort)
{
if (bytes > 0)
co_await net::async_write(to,
net::buffer(data, bytes), net_awaitable[ec]);
// 并发读写.
auto [write_bytes, read_bytes] =
co_await(
net::async_write(to,
net::buffer(primary_buf, bytes), net_awaitable[ec])
&&
from.async_read_some(
net::buffer(secondary_buf, buf_size), net_awaitable[ec])
);

to.shutdown(net::socket_base::shutdown_send, ec);
co_return;
}
// 交换主从缓冲区.
std::swap(primary_buf, secondary_buf);

stream_expires_after(to, std::chrono::seconds(m_option.tcp_timeout_));
bytes = read_bytes;
bytes_transferred += bytes;

co_await net::async_write(to,
net::buffer(data, bytes), net_awaitable[ec]);
if (ec || m_abort)
// 如果 async_write 失败, 则也无需要再读取数据, 如果
// async_read_some 失败, 则也无数据可用于写, 所以无论哪一种情况
// 都可以直接退出.
if (ec)
{
to.shutdown(net::socket_base::shutdown_send, ec);
from.shutdown(net::socket_base::shutdown_receive, ec);
co_return;
}

bytes_transferred += bytes;
}
}

Expand Down

0 comments on commit 06d4bda

Please sign in to comment.