Skip to content

Commit ccdfe4a

Browse files
committed
add io_uring_context::open_listening_socket
* opens a socket on a specified port * exposes a _Stream_ of `async_read_write_file` to handle accept(s)
1 parent 6d87373 commit ccdfe4a

File tree

6 files changed

+233
-4
lines changed

6 files changed

+233
-4
lines changed

include/unifex/file_concepts.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
#include <unifex/tag_invoke.hpp>
1919

20-
#include <unifex/io_concepts.hpp>
21-
2220
#include <unifex/filesystem.hpp>
2321

2422
#include <unifex/detail/prologue.hpp>

include/unifex/linux/io_epoll_context.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <unifex/detail/atomic_intrusive_queue.hpp>
2222
#include <unifex/detail/intrusive_heap.hpp>
2323
#include <unifex/detail/intrusive_queue.hpp>
24+
#include <unifex/io_concepts.hpp>
2425
#include <unifex/pipe_concepts.hpp>
2526
#include <unifex/get_stop_token.hpp>
2627
#include <unifex/manual_lifetime.hpp>

include/unifex/linux/io_uring_context.hpp

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,13 @@
2121
#include <unifex/detail/atomic_intrusive_queue.hpp>
2222
#include <unifex/detail/intrusive_heap.hpp>
2323
#include <unifex/detail/intrusive_queue.hpp>
24+
#include <unifex/defer.hpp>
2425
#include <unifex/file_concepts.hpp>
2526
#include <unifex/filesystem.hpp>
27+
#include <unifex/io_concepts.hpp>
28+
#include <unifex/just_done.hpp>
29+
#include <unifex/let_value_with.hpp>
30+
#include <unifex/socket_concepts.hpp>
2631
#include <unifex/get_stop_token.hpp>
2732
#include <unifex/manual_lifetime.hpp>
2833
#include <unifex/receiver_concepts.hpp>
@@ -43,7 +48,10 @@
4348

4449
#include UNIFEX_LIBURING_HEADER
4550

51+
#include <netinet/in.h>
52+
#include <sys/socket.h>
4653
#include <sys/uio.h>
54+
#include <unistd.h>
4755

4856
#include <unifex/detail/prologue.hpp>
4957

@@ -62,6 +70,8 @@ class io_uring_context {
6270
class async_read_write_file;
6371
class async_write_only_file;
6472
class scheduler;
73+
class accept_sender;
74+
class accept_stream;
6575

6676
io_uring_context();
6777

@@ -925,6 +935,10 @@ class io_uring_context::scheduler {
925935
tag_t<open_file_write_only>,
926936
scheduler s,
927937
const filesystem::path& path);
938+
friend accept_stream tag_invoke(
939+
tag_t<open_listening_socket>,
940+
scheduler s,
941+
port_t port);
928942

929943
friend bool operator==(scheduler a, scheduler b) noexcept {
930944
return a.context_ == b.context_;
@@ -942,6 +956,178 @@ inline io_uring_context::scheduler io_uring_context::get_scheduler() noexcept {
942956
return scheduler{*this};
943957
}
944958

959+
class io_uring_context::accept_sender {
960+
using offset_t = std::int64_t;
961+
962+
template <typename Receiver>
963+
class operation : private completion_base {
964+
friend io_uring_context;
965+
966+
public:
967+
template <typename Receiver2>
968+
explicit operation(const accept_sender& sender, Receiver2&& r) noexcept(
969+
std::is_nothrow_constructible_v<Receiver, Receiver2>)
970+
: context_(sender.context_)
971+
, fd_(sender.fd_)
972+
, receiver_((Receiver2 &&) r) {}
973+
974+
operation(operation&&) = delete;
975+
976+
void start() noexcept {
977+
if (!context_.is_running_on_io_thread()) {
978+
this->execute_ = &operation::on_schedule_complete;
979+
context_.schedule_remote(this);
980+
} else {
981+
start_io();
982+
}
983+
}
984+
985+
private:
986+
static void on_schedule_complete(operation_base* op) noexcept {
987+
static_cast<operation*>(op)->start_io();
988+
}
989+
990+
void start_io() noexcept {
991+
UNIFEX_ASSERT(context_.is_running_on_io_thread());
992+
993+
auto populateSqe = [this](io_uring_sqe& sqe) noexcept {
994+
sqe.opcode = IORING_OP_ACCEPT;
995+
sqe.accept_flags = SOCK_NONBLOCK;
996+
sqe.fd = fd_;
997+
998+
sqe.user_data = reinterpret_cast<std::uintptr_t>(
999+
static_cast<completion_base*>(this));
1000+
1001+
this->execute_ = &operation::on_accept;
1002+
};
1003+
1004+
if (!context_.try_submit_io(populateSqe)) {
1005+
this->execute_ = &operation::on_schedule_complete;
1006+
context_.schedule_pending_io(this);
1007+
}
1008+
}
1009+
1010+
static void on_accept(operation_base* op) noexcept {
1011+
auto& self = *static_cast<operation*>(op);
1012+
if (self.result_ >= 0) {
1013+
if constexpr (noexcept(unifex::set_value(
1014+
std::move(self.receiver_), async_read_write_file{self.context_, self.result_}))) {
1015+
unifex::set_value(std::move(self.receiver_), async_read_write_file{self.context_, self.result_});
1016+
} else {
1017+
UNIFEX_TRY {
1018+
unifex::set_value(std::move(self.receiver_), async_read_write_file{self.context_, self.result_});
1019+
}
1020+
UNIFEX_CATCH(...) {
1021+
unifex::set_error(
1022+
std::move(self.receiver_), std::current_exception());
1023+
}
1024+
}
1025+
} else if (self.result_ == -ECANCELED) {
1026+
unifex::set_done(std::move(self.receiver_));
1027+
} else {
1028+
unifex::set_error(
1029+
std::move(self.receiver_),
1030+
std::error_code{-self.result_, std::system_category()});
1031+
}
1032+
}
1033+
1034+
io_uring_context& context_;
1035+
int fd_;
1036+
Receiver receiver_;
1037+
};
1038+
1039+
public:
1040+
// Produces an open read-write file corresponding to the accepted connection.
1041+
template <
1042+
template <typename...>
1043+
class Variant,
1044+
template <typename...>
1045+
class Tuple>
1046+
using value_types = Variant<Tuple<async_read_write_file>>;
1047+
1048+
// Note: Only case it might complete with exception_ptr is if the
1049+
// receiver's set_value() exits with an exception.
1050+
template <template <typename...> class Variant>
1051+
using error_types = Variant<std::error_code, std::exception_ptr>;
1052+
1053+
static constexpr bool sends_done = true;
1054+
static constexpr blocking_kind blocking = blocking_kind::never;
1055+
// always completes on the io_uring context
1056+
static constexpr bool is_always_scheduler_affine = false;
1057+
1058+
explicit accept_sender(io_uring_context& context, int fd) noexcept
1059+
: context_(context)
1060+
, fd_(fd) {}
1061+
1062+
template <typename Receiver>
1063+
operation<remove_cvref_t<Receiver>> connect(Receiver&& r) && {
1064+
return operation<remove_cvref_t<Receiver>>{*this, (Receiver &&) r};
1065+
}
1066+
1067+
private:
1068+
io_uring_context& context_;
1069+
int fd_;
1070+
};
1071+
1072+
class io_uring_context::accept_stream {
1073+
public:
1074+
using offset_t = std::int64_t;
1075+
1076+
explicit accept_stream(io_uring_context& context, port_t port) noexcept
1077+
: context_(context)
1078+
, port_(port) {}
1079+
1080+
auto next() noexcept {
1081+
return let_value_with(
1082+
[this]() noexcept {
1083+
// TODO move to operation.start_io()
1084+
if (!fd_.valid()) {
1085+
open_socket();
1086+
}
1087+
return fd_.get();
1088+
},
1089+
[this](auto fd) noexcept { return accept_sender{context_, fd}; });
1090+
}
1091+
1092+
auto cleanup() noexcept {
1093+
return defer([this]() noexcept {
1094+
if (fd_.valid()) {
1095+
fd_.close();
1096+
}
1097+
return just_done();
1098+
});
1099+
}
1100+
1101+
private:
1102+
friend scheduler;
1103+
1104+
io_uring_context& context_;
1105+
port_t port_;
1106+
safe_file_descriptor fd_;
1107+
1108+
// TODO should this run on the io_context? If so, why?
1109+
void open_socket() noexcept {
1110+
// both IPv4 and IPv6
1111+
sockaddr_in6 addr;
1112+
fd_ = safe_file_descriptor{socket(
1113+
AF_INET6, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, IPPROTO_TCP)};
1114+
std::int32_t val = 1;
1115+
[[maybe_unused]] int ret;
1116+
ret = setsockopt(fd_.get(), SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
1117+
UNIFEX_ASSERT(ret != -1);
1118+
ret = setsockopt(fd_.get(), SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
1119+
UNIFEX_ASSERT(ret != -1);
1120+
1121+
addr.sin6_family = AF_INET6;
1122+
addr.sin6_port = htons(port_);
1123+
addr.sin6_addr = in6addr_any;
1124+
ret = bind(fd_.get(), (const struct sockaddr*)&addr, sizeof(addr));
1125+
UNIFEX_ASSERT(ret != -1);
1126+
ret = listen(fd_.get(), 128);
1127+
UNIFEX_ASSERT(ret != -1);
1128+
}
1129+
};
1130+
9451131
} // namespace linuxos
9461132
} // namespace unifex
9471133

include/unifex/pipe_concepts.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
#include <unifex/tag_invoke.hpp>
1919

20-
#include <unifex/io_concepts.hpp>
21-
2220
#include <unifex/detail/prologue.hpp>
2321

2422
namespace unifex {

include/unifex/socket_concepts.hpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
5+
* (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://llvm.org/LICENSE.txt
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include <unifex/tag_invoke.hpp>
19+
20+
#include <unifex/detail/prologue.hpp>
21+
22+
namespace unifex {
23+
namespace _socket {
24+
using port_t = std::uint16_t;
25+
26+
inline constexpr struct open_listening_socket_cpo final {
27+
template <typename Scheduler>
28+
constexpr auto operator()(Scheduler&& sched, port_t port) const noexcept(
29+
is_nothrow_tag_invocable_v<open_listening_socket_cpo, Scheduler, port_t>)
30+
-> tag_invoke_result_t<open_listening_socket_cpo, Scheduler, port_t> {
31+
return tag_invoke(*this, static_cast<Scheduler&&>(sched), port);
32+
}
33+
} open_listening_socket{};
34+
} // namespace _socket
35+
36+
using _socket::open_listening_socket;
37+
using _socket::port_t;
38+
} // namespace unifex
39+
40+
#include <unifex/detail/epilogue.hpp>

source/linux/io_uring_context.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -835,6 +835,12 @@ io_uring_context::async_read_write_file tag_invoke(
835835
return io_uring_context::async_read_write_file{*scheduler.context_, result};
836836
}
837837

838+
io_uring_context::accept_stream tag_invoke(
839+
tag_t<open_listening_socket>,
840+
io_uring_context::scheduler scheduler,
841+
port_t port) {
842+
return io_uring_context::accept_stream{*scheduler.context_, port};
843+
}
838844
} // namespace unifex::linuxos
839845

840846
#endif // UNIFEX_NO_LIBURING

0 commit comments

Comments
 (0)