This repository has been archived by the owner on Jun 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTerminalSocket.h
328 lines (263 loc) · 12.7 KB
/
TerminalSocket.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
//
// Created by liuzikai on 3/11/21.
//
#ifndef META_VISION_SOLAIS_TERMINALSOCKET_H
#define META_VISION_SOLAIS_TERMINALSOCKET_H
#include <thread>
#include <boost/asio.hpp>
#include <utility>
#include <google/protobuf/message.h>
namespace meta {
/**
* TerminalSocket provides a bidirectional TCP socket to send and receive several types of data, each labelled with a
* NUL-terminated string.
*
* This class is a base class (not instantiable from outside) for TerminalSocketServer and TerminalSocketClient (below).
* Only one concurrent TCP connection is allowed (see TerminalSocketServer::startAccept() below).
*
* Boost asio is used for the low-level TCP socket. The main motivation is the requirement for asynchronous socket
* operation. Sockets are slow. If instant processing results are to be sent, we don't want the socket operations take
* too much time. In this module, four sending functions are async. They return immediately after accepting the data
* to be sent. A separate thread (ioThread) is launched as this class instantiates, which processes the actual I/O
* operations (io_context.run()). Received data is processed and sent back through four callback functions.
*
* The class accepted a running io_context from outside.
*
* Async operations need careful memory management. All the resource used should be alive when the operations are
* actually performed and there should not be any memory leak after the operations are performed or cancelled.
* shared_ptr's are frequently used in this class to help manage the memory.
*
* Cleaning up sockets also requires careful handling. Boost does well on cleaning up the socket. No manual shutdown or
* close is performed on the socket objects in this class. Instead, we just carefully manage the life cycles of socket
* instances and let their destructor do the clean-up. When a socket instance is destroyed, queued async work will
* be aborted, which is handled in all async callback handlers.
*
* The thing we need to pay careful attention is the disconnection callback. Once the thread pool is created (by
* creating a word_guard in ioThread), it seems that there is no easy way to call io_context.run() without making the
* current thread become a worker thread (i.e. io_context.run() doesn't return). Therefore, we can't ask the
* disconnection to be handled immediately. Considering starting a new connection and replacing the currently alive one,
* the original callback must not be replaced by the new one before it's called, which is not easy to realize
* if the callback is stored as a member variable. The current solution is: let async_recv carries the
* callback function pointer. It is triggered when the socket disconnected (in handleRecv), or is passed to the next
* async_recv (in handleRecv) when the next async receiving starts. If a socket is alive, there should be one
* async_recv work in the queue that carries the function pointer. When the socket is disconnected, the callback is
* triggered and there won't be any more async_recv. Therefore the callback will be triggered exactly once.
* handleSend() do nothing with the callback but simply ignore the disconnection.
*/
class TerminalSocketBase {
protected:
/**
* Types of packages that can be sent and received.
*/
enum PackageType {
SINGLE_STRING,
SINGLE_INT,
BYTES,
LIST_OF_STRINGS,
PACKAGE_TYPE_COUNT
};
/**
* Package structure:
* 1-byte PREAMBLE (defined below)
* uint8_t package type (enum PackageType)
* NUL-terminated string for name
* uint32_t content size
* bytes
*/
public:
bool connected() const { return (socket != nullptr && socket->is_open() && !socketDisconnected); }
/**
* Async send a single string.
* @param name Name of the package.
* @param s The string to send. Doesn't need to be alive after this call (copied for async operation).
* @return Whether the operation succeeded.
*/
bool sendSingleString(const std::string &name, const std::string &s);
/**
* Async send a single integer
* @param name Name of the package.
* @param n The integer to send.
* @return Whether the operation succeeded.
*/
bool sendSingleInt(const std::string &name, int32_t n);
/**
* Async send bytes. Can be use to send only name (nullptr data and size 0).
* @param name Name of the package.
* @param s The start of the data. Doesn't need to be alive after this call (copied for async operation).
* @param size The number of bytes to send.
* @return Whether the operation succeeded.
*/
bool sendBytes(const std::string &name, uint8_t *data = nullptr, size_t size = 0);
/**
* Async send bytes. Can be use to send only name (nullptr data and size 0).
* @param name Name of the package.
* @param message A protobuf message.
* @return Whether the operation succeeded.
*/
bool sendBytes(const std::string &name, const google::protobuf::Message &message);
/**
* Async send a list of strings
* @param name Name of the package.
* @param list The list of strings to send. Doesn't need to be alive after this call (copied for async operation).
* @return Whether the operation succeeded.
*/
bool sendListOfStrings(const std::string &name, const std::vector<std::string> &list);
/**
* Callback function type for arrival of a single string. The name and the string are read-only and not persistent
* (need to be copied if they are to be used later).
*/
using SingleStringCallback = std::function<void(std::string_view name, std::string_view s)>;
/**
* Callback function type for arrival of a single integer. The name is read-only and not persistent (need to be
* copied if it is to be used later).
*/
using SingleIntCallback = std::function<void(std::string_view name, int32_t n)>;
/**
* Callback function type for arrival of bytes. The name and the data are read-only and not persistent (need to be
* copied if they are to be used later).
*/
using BytesCallback = std::function<void(std::string_view name, const uint8_t *buf, size_t size)>;
/**
* Callback function type for arrival of a list of strings. The name and the strings are read-only and not
* persistent (need to be copied if they are to be used later).
*/
using ListOfStringsCallback = std::function<void(std::string_view name, const std::vector<const char *> &list)>;
/**
* Set callback functions for arrivals of data.
* @param singleString Callback for arrival of a single string. Can be nullptr.
* @param singleInt Callback for arrival of a single integer. Can be nullptr.
* @param bytes Callback for arrival of bytes. Can be nullptr.
* @param listOfStrings Callback for a list of strings. Can be nullptr.
*/
void setCallbacks(SingleStringCallback singleString = nullptr,
SingleIntCallback singleInt = nullptr,
BytesCallback bytes = nullptr,
ListOfStringsCallback listOfStrings = nullptr) {
singleStringCallBack = std::move(singleString);
singleIntCallBack = std::move(singleInt);
bytesCallBack = std::move(bytes);
listOfStringsCallBack = std::move(listOfStrings);
}
/**
* Get the number of bytes sent/received since last clear.
* @return {sent bytes, received bytes}
*/
std::pair<unsigned, unsigned> getAndClearStats();
protected:
/**
* Initialize a TerminalSocketBase. This constructor is set as protected to avoid instantiation from outside.
* @param ioContext A running io_context object.
*/
explicit TerminalSocketBase(boost::asio::io_context &ioContext);
static constexpr uint8_t PREAMBLE = 0xCE;
/**
* Set a working socket file descriptor and start receiving thread.
* @param newSocket A socket that is already setup on ioContext.
* @param disconnectCallback Callback function when disconnected.
*/
template<class T>
void setupSocket(std::shared_ptr<boost::asio::ip::tcp::socket> newSocket, std::function<void(T *)> disconnectCallback);
/**
* Close the socket. disconnectCallback maybe triggered by handleRecv if the recv cycle is still running.
*/
void closeSocket();
boost::asio::io_context &ioContext;
std::atomic<unsigned> uploadBytes = 0;
std::atomic<unsigned> downloadBytes = 0;
private:
// ================================ Socket and IO Context ================================
std::shared_ptr<boost::asio::ip::tcp::socket> socket = nullptr;
std::atomic<bool> socketDisconnected = false;
// ================================ Sending ================================
void handleSend(std::shared_ptr<std::vector<uint8_t>> buf, const boost::system::error_code &error, size_t numBytes);
static std::shared_ptr<std::vector<uint8_t>> allocateBuffer(PackageType type, const std::string &name, size_t contentSize);
static void emplaceInt32(std::vector<uint8_t> &buf, int32_t n);
// ================================ Receiving ================================
SingleStringCallback singleStringCallBack = nullptr;
SingleIntCallback singleIntCallBack = nullptr;
BytesCallback bytesCallBack = nullptr;
ListOfStringsCallback listOfStringsCallBack = nullptr;
enum ReceiverState {
RECV_PREAMBLE,
RECV_PACKAGE_TYPE,
RECV_NAME,
RECV_SIZE,
RECV_CONTENT
};
static constexpr size_t RECV_BUFFER_SIZE = 0x10000;
ReceiverState recvState = RECV_PREAMBLE;
std::vector<uint8_t> recvBuf; // use std::vector to ensure the support of large packages
ssize_t recvOffset = 0;
ssize_t recvRemainingBytes = 0;
PackageType recvCurrentPackageType = PACKAGE_TYPE_COUNT;
ssize_t recvNameStart = -1;
ssize_t recvContentSize = -1;
ssize_t recvContentStart = -1;
template<class T>
void
handleRecv(const boost::system::error_code &error, size_t numBytes, std::function<void(T *)> disconnectCallback);
static int32_t decodeInt32(const uint8_t *start);
void handlePackage() const;
};
/**
* Derived class to act as the TCP server (listener).
*/
class TerminalSocketServer : public TerminalSocketBase {
public:
using ServerDisconnectionCallback = std::function<void(TerminalSocketServer *)>;
/**
* Create a server listening on the given port. The server will not accept incoming connection until startAccept()
* is called. A server can connect to at most one client concurrently.
* @param port The port to listen on.
* @param disconnectionCallback Callback function when the socket is disconnected.
*/
explicit TerminalSocketServer(boost::asio::io_context &ioContext,
int port, ServerDisconnectionCallback disconnectionCallback = nullptr);
/**
* Start accepting incoming connection. A server can connect to at most one client concurrently. If this function
* is called when there is already a connection, and another request comes, the previous one will get disconnected.
*
* To restart acceptance after disconnection, call this function in the disconnect callback.
*/
void startAccept();
/**
* Disconnect the socket.
*/
void disconnect();
protected:
int port;
boost::asio::ip::tcp::acceptor acceptor;
ServerDisconnectionCallback disconnectionCallback;
void handleAccept(std::shared_ptr<boost::asio::ip::tcp::socket> socket, const boost::system::error_code &error);
};
/**
* Derived class to act as the TCP client (connector).
*/
class TerminalSocketClient : public TerminalSocketBase {
public:
using ClientDisconnectionCallback = std::function<void(TerminalSocketClient *)>;
/**
* Create a client.
*/
explicit TerminalSocketClient(boost::asio::io_context &ioContext,
ClientDisconnectionCallback disconnectionCallback = nullptr)
: TerminalSocketBase(ioContext),
resolver(ioContext),
disconnectionCallback(std::move(disconnectionCallback)) {}
/**
* Try to connect to a server. This function is sync.
* @param server The server IP or name (will be resolved).
* @param port The server port.
* @return Whether the connection success. This function is sync.
*/
bool connect(const std::string &server, const std::string &port);
/**
* Disconnect the socket.
*/
void disconnect();
protected:
boost::asio::ip::tcp::resolver resolver;
ClientDisconnectionCallback disconnectionCallback;
};
}
#endif //META_VISION_SOLAIS_TERMINALSOCKET_H