Skip to content

Commit d2465e9

Browse files
committed
add io_uring_context::open_listening_socket
* opens a socket on a specified port * exposes a _Stream_ of `async_read_write_file` to handle accept(s)
1 parent c0922e4 commit d2465e9

File tree

6 files changed

+226
-4
lines changed

6 files changed

+226
-4
lines changed

include/unifex/file_concepts.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
#include <unifex/tag_invoke.hpp>
1919

20-
#include <unifex/io_concepts.hpp>
21-
2220
#include <unifex/filesystem.hpp>
2321

2422
#include <unifex/detail/prologue.hpp>

include/unifex/linux/io_epoll_context.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <unifex/detail/atomic_intrusive_queue.hpp>
2222
#include <unifex/detail/intrusive_heap.hpp>
2323
#include <unifex/detail/intrusive_queue.hpp>
24+
#include <unifex/io_concepts.hpp>
2425
#include <unifex/pipe_concepts.hpp>
2526
#include <unifex/get_stop_token.hpp>
2627
#include <unifex/manual_lifetime.hpp>

include/unifex/linux/io_uring_context.hpp

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,13 @@
2121
#include <unifex/detail/atomic_intrusive_queue.hpp>
2222
#include <unifex/detail/intrusive_heap.hpp>
2323
#include <unifex/detail/intrusive_queue.hpp>
24+
#include <unifex/defer.hpp>
2425
#include <unifex/file_concepts.hpp>
2526
#include <unifex/filesystem.hpp>
27+
#include <unifex/io_concepts.hpp>
28+
#include <unifex/just_done.hpp>
29+
#include <unifex/let_value_with.hpp>
30+
#include <unifex/socket_concepts.hpp>
2631
#include <unifex/get_stop_token.hpp>
2732
#include <unifex/manual_lifetime.hpp>
2833
#include <unifex/receiver_concepts.hpp>
@@ -43,7 +48,10 @@
4348

4449
#include UNIFEX_LIBURING_HEADER
4550

51+
#include <netinet/in.h>
52+
#include <sys/socket.h>
4653
#include <sys/uio.h>
54+
#include <unistd.h>
4755

4856
#include <unifex/detail/prologue.hpp>
4957

@@ -62,6 +70,8 @@ class io_uring_context {
6270
class async_read_write_file;
6371
class async_write_only_file;
6472
class scheduler;
73+
class accept_sender;
74+
class accept_stream;
6575

6676
io_uring_context();
6777

@@ -925,6 +935,10 @@ class io_uring_context::scheduler {
925935
tag_t<open_file_write_only>,
926936
scheduler s,
927937
const filesystem::path& path);
938+
friend accept_stream tag_invoke(
939+
tag_t<open_listening_socket>,
940+
scheduler s,
941+
port_t port);
928942

929943
friend bool operator==(scheduler a, scheduler b) noexcept {
930944
return a.context_ == b.context_;
@@ -942,6 +956,171 @@ inline io_uring_context::scheduler io_uring_context::get_scheduler() noexcept {
942956
return scheduler{*this};
943957
}
944958

959+
class io_uring_context::accept_sender {
960+
using offset_t = std::int64_t;
961+
962+
template <typename Receiver>
963+
class operation : private completion_base {
964+
friend io_uring_context;
965+
966+
public:
967+
template <typename Receiver2>
968+
explicit operation(const accept_sender& sender, Receiver2&& r) noexcept(
969+
std::is_nothrow_constructible_v<Receiver, Receiver2>)
970+
: context_(sender.context_)
971+
, fd_(sender.fd_)
972+
, receiver_((Receiver2 &&) r) {}
973+
974+
void start() noexcept {
975+
if (!context_.is_running_on_io_thread()) {
976+
this->execute_ = &operation::on_schedule_complete;
977+
context_.schedule_remote(this);
978+
} else {
979+
start_io();
980+
}
981+
}
982+
983+
private:
984+
static void on_schedule_complete(operation_base* op) noexcept {
985+
static_cast<operation*>(op)->start_io();
986+
}
987+
988+
void start_io() noexcept {
989+
UNIFEX_ASSERT(context_.is_running_on_io_thread());
990+
991+
auto populateSqe = [this](io_uring_sqe& sqe) noexcept {
992+
sqe.opcode = IORING_OP_ACCEPT;
993+
sqe.accept_flags = SOCK_NONBLOCK;
994+
sqe.fd = fd_;
995+
996+
sqe.user_data = reinterpret_cast<std::uintptr_t>(
997+
static_cast<completion_base*>(this));
998+
999+
this->execute_ = &operation::on_accept;
1000+
};
1001+
1002+
if (!context_.try_submit_io(populateSqe)) {
1003+
this->execute_ = &operation::on_schedule_complete;
1004+
context_.schedule_pending_io(this);
1005+
}
1006+
}
1007+
1008+
static void on_accept(operation_base* op) noexcept {
1009+
auto& self = *static_cast<operation*>(op);
1010+
if (self.result_ >= 0) {
1011+
if constexpr (noexcept(unifex::set_value(
1012+
std::move(self.receiver_), async_read_write_file{self.context_, self.result_}))) {
1013+
unifex::set_value(std::move(self.receiver_), async_read_write_file{self.context_, self.result_});
1014+
} else {
1015+
UNIFEX_TRY {
1016+
unifex::set_value(std::move(self.receiver_), async_read_write_file{self.context_, self.result_});
1017+
}
1018+
UNIFEX_CATCH(...) {
1019+
unifex::set_error(
1020+
std::move(self.receiver_), std::current_exception());
1021+
}
1022+
}
1023+
} else if (self.result_ == -ECANCELED) {
1024+
unifex::set_done(std::move(self.receiver_));
1025+
} else {
1026+
unifex::set_error(
1027+
std::move(self.receiver_),
1028+
std::error_code{-self.result_, std::system_category()});
1029+
}
1030+
}
1031+
1032+
io_uring_context& context_;
1033+
int fd_;
1034+
Receiver receiver_;
1035+
};
1036+
1037+
public:
1038+
// Produces number of bytes read.
1039+
template <
1040+
template <typename...>
1041+
class Variant,
1042+
template <typename...>
1043+
class Tuple>
1044+
using value_types = Variant<Tuple<ssize_t>>;
1045+
1046+
// Note: Only case it might complete with exception_ptr is if the
1047+
// receiver's set_value() exits with an exception.
1048+
template <template <typename...> class Variant>
1049+
using error_types = Variant<std::error_code, std::exception_ptr>;
1050+
1051+
static constexpr bool sends_done = true;
1052+
1053+
explicit accept_sender(io_uring_context& context, int fd) noexcept
1054+
: context_(context)
1055+
, fd_(fd) {}
1056+
1057+
template <typename Receiver>
1058+
operation<remove_cvref_t<Receiver>> connect(Receiver&& r) && {
1059+
return operation<remove_cvref_t<Receiver>>{*this, (Receiver &&) r};
1060+
}
1061+
1062+
private:
1063+
io_uring_context& context_;
1064+
int fd_;
1065+
};
1066+
1067+
class io_uring_context::accept_stream {
1068+
public:
1069+
using offset_t = std::int64_t;
1070+
1071+
explicit accept_stream(io_uring_context& context, std::int32_t port) noexcept
1072+
: context_(context)
1073+
, port_(port) {}
1074+
1075+
auto next() noexcept {
1076+
return let_value_with(
1077+
[this]() noexcept {
1078+
if (!fd_.valid()) {
1079+
open_socket();
1080+
}
1081+
return fd_.get();
1082+
},
1083+
[this](auto fd) noexcept { return accept_sender{context_, fd}; });
1084+
}
1085+
1086+
auto cleanup() noexcept {
1087+
return defer([this]() noexcept {
1088+
if (fd_.valid()) {
1089+
fd_.close();
1090+
}
1091+
return just_done();
1092+
});
1093+
}
1094+
1095+
private:
1096+
friend scheduler;
1097+
1098+
io_uring_context& context_;
1099+
std::int32_t port_;
1100+
safe_file_descriptor fd_;
1101+
1102+
void open_socket() noexcept {
1103+
// both IPv4 and IPv6
1104+
sockaddr_in6 addr;
1105+
fd_ = safe_file_descriptor{socket(
1106+
AF_INET6, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, IPPROTO_TCP)};
1107+
std::int32_t val = 1;
1108+
[[maybe_unused]] int ret;
1109+
ret = setsockopt(fd_.get(), SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
1110+
UNIFEX_ASSERT(ret != -1);
1111+
ret = setsockopt(fd_.get(), SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
1112+
UNIFEX_ASSERT(ret != -1);
1113+
1114+
addr.sin6_family = AF_INET6;
1115+
addr.sin6_port = htons(port_);
1116+
addr.sin6_addr = in6addr_any;
1117+
ret = bind(fd_.get(), (const struct sockaddr*)&addr, sizeof(addr));
1118+
UNIFEX_ASSERT(ret != -1);
1119+
ret = listen(fd_.get(), 128);
1120+
UNIFEX_ASSERT(ret != -1);
1121+
}
1122+
};
1123+
9451124
} // namespace linuxos
9461125
} // namespace unifex
9471126

include/unifex/pipe_concepts.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
#include <unifex/tag_invoke.hpp>
1919

20-
#include <unifex/io_concepts.hpp>
21-
2220
#include <unifex/detail/prologue.hpp>
2321

2422
namespace unifex {

include/unifex/socket_concepts.hpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
5+
* (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://llvm.org/LICENSE.txt
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include <unifex/tag_invoke.hpp>
19+
20+
#include <unifex/detail/prologue.hpp>
21+
22+
namespace unifex {
23+
namespace _socket {
24+
using port_t = std::uint16_t;
25+
26+
inline constexpr struct open_listening_socket_cpo final {
27+
template <typename Scheduler>
28+
constexpr auto operator()(Scheduler&& sched, port_t port) const noexcept(
29+
is_nothrow_tag_invocable_v<open_listening_socket_cpo, Scheduler, port_t>)
30+
-> tag_invoke_result_t<open_listening_socket_cpo, Scheduler, port_t> {
31+
return tag_invoke(*this, static_cast<Scheduler&&>(sched), port);
32+
}
33+
} open_listening_socket{};
34+
} // namespace _socket
35+
36+
using _socket::open_listening_socket;
37+
using _socket::port_t;
38+
} // namespace unifex
39+
40+
#include <unifex/detail/epilogue.hpp>

source/linux/io_uring_context.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -835,6 +835,12 @@ io_uring_context::async_read_write_file tag_invoke(
835835
return io_uring_context::async_read_write_file{*scheduler.context_, result};
836836
}
837837

838+
io_uring_context::accept_stream tag_invoke(
839+
tag_t<open_listening_socket>,
840+
io_uring_context::scheduler scheduler,
841+
port_t port) {
842+
return io_uring_context::accept_stream{*scheduler.context_, port};
843+
}
838844
} // namespace unifex::linuxos
839845

840846
#endif // UNIFEX_NO_LIBURING

0 commit comments

Comments
 (0)