1 #ifndef UVW_STREAM_INCLUDE_H
2 #define UVW_STREAM_INCLUDE_H
11 #include "request.hpp"
65 explicit DataEvent(std::unique_ptr<
char[]> buf, std::size_t len) noexcept;
67 std::unique_ptr<char[]>
data;
75 struct ConnectReq final:
public Request<ConnectReq, uv_connect_t> {
76 using Request::Request;
78 template<
typename F,
typename... Args>
79 void connect(F &&f, Args&&... args) {
80 invoke(std::forward<F>(f), get(), std::forward<Args>(args)..., &defaultCallback<ConnectEvent>);
85 struct ShutdownReq final:
public Request<ShutdownReq, uv_shutdown_t> {
86 using Request::Request;
88 void shutdown(uv_stream_t *handle);
92 template<
typename Deleter>
93 class WriteReq final:
public Request<WriteReq<Deleter>, uv_write_t> {
94 using ConstructorAccess =
typename Request<WriteReq<Deleter>, uv_write_t>::ConstructorAccess;
97 WriteReq(ConstructorAccess ca, std::shared_ptr<Loop> loop, std::unique_ptr<
char[], Deleter> dt,
unsigned int len)
98 : Request<WriteReq<Deleter>, uv_write_t>{ca, std::move(loop)},
100 buf{uv_buf_init(
data.get(), len)}
103 void write(uv_stream_t *handle) {
104 this->invoke(&uv_write, this->get(), handle, &buf, 1, &this->
template defaultCallback<WriteEvent>);
107 void write(uv_stream_t *handle, uv_stream_t *send) {
108 this->invoke(&uv_write2, this->get(), handle, &buf, 1, send, &this->
template defaultCallback<WriteEvent>);
112 std::unique_ptr<char[], Deleter>
data;
127 template<
typename T,
typename U>
129 static constexpr
unsigned int DEFAULT_BACKLOG = 128;
131 static void readCallback(uv_stream_t *handle, ssize_t nread,
const uv_buf_t *buf) {
132 T &ref = *(
static_cast<T*
>(handle->data));
134 std::unique_ptr<char[]>
data{buf->base};
140 if(nread == UV_EOF) {
143 }
else if(nread > 0) {
145 ref.publish(
DataEvent{std::move(
data),
static_cast<std::size_t
>(nread)});
146 }
else if(nread < 0) {
152 static void listenCallback(uv_stream_t *handle,
int status) {
153 T &ref = *(
static_cast<T*
>(handle->data));
154 if(status) { ref.publish(
ErrorEvent{status}); }
175 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
179 auto shutdown = this->
loop().template resource<details::ShutdownReq>();
180 shutdown->template once<ErrorEvent>(listener);
181 shutdown->template once<ShutdownEvent>(listener);
182 shutdown->shutdown(this->
template get<uv_stream_t>());
195 void listen(
int backlog = DEFAULT_BACKLOG) {
196 this->invoke(&uv_listen, this->
template get<uv_stream_t>(), backlog, &listenCallback);
220 this->invoke(&uv_accept, this->
template get<uv_stream_t>(), this->
template get<uv_stream_t>(ref));
231 this->invoke(&uv_read_start, this->
template get<uv_stream_t>(), &this->allocCallback, &readCallback);
240 this->invoke(&uv_read_stop, this->
template get<uv_stream_t>());
255 template<
typename Deleter>
256 void write(std::unique_ptr<
char[], Deleter>
data,
unsigned int len) {
257 auto req = this->
loop().template resource<details::WriteReq<Deleter>>(std::move(
data), len);
258 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
262 req->template once<ErrorEvent>(listener);
263 req->template once<WriteEvent>(listener);
264 req->write(this->
template get<uv_stream_t>());
280 auto req = this->
loop().template resource<details::WriteReq<void(*)(
char *)>>(std::unique_ptr<
char[],
void(*)(
char *)>{
data, [](
char *) {}}, len);
281 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
285 req->template once<ErrorEvent>(listener);
286 req->template once<WriteEvent>(listener);
287 req->write(this->
template get<uv_stream_t>());
309 template<
typename S,
typename Deleter>
310 void write(S &send, std::unique_ptr<
char[], Deleter>
data,
unsigned int len) {
311 auto req = this->
loop().template resource<details::WriteReq<Deleter>>(std::move(
data), len);
312 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
316 req->template once<ErrorEvent>(listener);
317 req->template once<WriteEvent>(listener);
318 req->write(this->
template get<uv_stream_t>(), this->
template get<uv_stream_t>(send));
342 auto req = this->
loop().template resource<details::WriteReq<void(*)(
char *)>>(std::unique_ptr<
char[],
void(*)(
char *)>{
data, [](
char *) {}}, len);
343 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
347 req->template once<ErrorEvent>(listener);
348 req->template once<WriteEvent>(listener);
349 req->write(this->
template get<uv_stream_t>(), this->
template get<uv_stream_t>(send));
364 uv_buf_t bufs[] = { uv_buf_init(
data.get(), len) };
365 auto bw = uv_try_write(this->
template get<uv_stream_t>(), bufs, 1);
386 template<
typename V,
typename W>
388 uv_buf_t bufs[] = { uv_buf_init(
data.get(), len) };
389 auto bw = uv_try_write2(this->
template get<uv_stream_t>(), bufs, 1, send.
raw());
411 uv_buf_t bufs[] = { uv_buf_init(
data, len) };
412 auto bw = uv_try_write(this->
template get<uv_stream_t>(), bufs, 1);
433 template<
typename V,
typename W>
435 uv_buf_t bufs[] = { uv_buf_init(
data, len) };
436 auto bw = uv_try_write2(this->
template get<uv_stream_t>(), bufs, 1, send.
raw());
451 return (uv_is_readable(this->
template get<uv_stream_t>()) == 1);
459 return (uv_is_writable(this->
template get<uv_stream_t>()) == 1);
478 return (0 == uv_stream_set_blocking(this->
template get<uv_stream_t>(), enable));
486 return uv_stream_get_write_queue_size(this->
template get<uv_stream_t>());
495 #include "stream.cpp"
std::shared_ptr< R > data() const
Gets user-defined data. uvw won't use this field in any case.
void write(S &send, char *data, unsigned int len)
Extended write function for sending handles over a pipe handle.
void write(S &send, std::unique_ptr< char[], Deleter > data, unsigned int len)
Extended write function for sending handles over a pipe handle.
void read()
Starts reading data from an incoming stream.
void shutdown()
Shutdowns the outgoing (write) side of a duplex stream.
int tryWrite(std::unique_ptr< char[]> data, unsigned int len)
Queues a write request if it can be completed immediately.
bool writable() const noexcept
Checks if the stream is writable.
bool blocking(bool enable=false)
Enables or disables blocking mode for a stream.
int tryWrite(char *data, unsigned int len, StreamHandle< V, W > &send)
Queues a write request if it can be completed immediately.
size_t writeQueueSize() const noexcept
Gets the amount of queued bytes waiting to be sent.
int tryWrite(char *data, unsigned int len)
Queues a write request if it can be completed immediately.
bool readable() const noexcept
Checks if the stream is readable.
void stop()
Stops reading data from the stream.
void write(char *data, unsigned int len)
Writes data to the stream.
void listen(int backlog=DEFAULT_BACKLOG)
Starts listening for incoming connections.
void accept(S &ref)
Accepts incoming connections.
int tryWrite(std::unique_ptr< char[]> data, unsigned int len, StreamHandle< V, W > &send)
Queues a write request if it can be completed immediately.
void write(std::unique_ptr< char[], Deleter > data, unsigned int len)
Writes data to the stream.
Loop & loop() const noexcept
Gets the loop from which the resource was originated.
const U * raw() const noexcept
Gets the underlying raw data structure.
std::unique_ptr< char[]> data