Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/unifex/allocate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <unifex/sender_concepts.hpp>
#include <unifex/tag_invoke.hpp>
#include <unifex/bind_back.hpp>
#include <unifex/blocking.hpp>

#include <memory>
#include <type_traits>
Expand Down Expand Up @@ -105,6 +106,10 @@ namespace _alloc {
static_cast<Self&&>(s).sender_, (Receiver &&) r};
}

friend constexpr auto tag_invoke(tag_t<unifex::blocking>, const type& self) noexcept {
return blocking(self.sender_);
}

Sender sender_;
};
} // namespace _alloc
Expand Down
51 changes: 48 additions & 3 deletions include/unifex/any_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ struct _with<CPOs...>::any_scheduler {
return _sender{this};
}

type_index type() const noexcept {
return _get_type_index(impl_);
}

friend _equal_to_fn;
friend bool operator==(const any_scheduler& left, const any_scheduler& right) noexcept {
return _equal_to(left.impl_, right);
Expand All @@ -210,15 +214,44 @@ struct _with<CPOs...>::any_scheduler {
any_scheduler_impl<CPOs...> impl_;
};

template <typename... ReceiverCPOs>
using any_scheduler_ref_impl = any_ref_t<_schedule_and_connect<ReceiverCPOs...>>;
template <typename... CPOs>
using any_scheduler_ref_impl =
any_ref_t<
_schedule_and_connect<CPOs...>,
_get_type_index,
overload<bool(const this_&, const any_scheduler_ref<CPOs...>&) noexcept>(_equal_to)>;

#if defined(__GLIBCXX__)
template <typename>
inline constexpr bool _is_tuple = false;

template <typename... Ts>
inline constexpr bool _is_tuple<std::tuple<Ts...>> = true;

template <typename... Ts>
inline constexpr bool _is_tuple<std::tuple<Ts...> const> = true;
#endif

template <typename... CPOs>
struct _with<CPOs...>::any_scheduler_ref {
#if !defined(__GLIBCXX__)
template (typename Scheduler)
(requires (!same_as<const Scheduler, const any_scheduler_ref>) AND scheduler<Scheduler>)
(requires (!same_as<const Scheduler, const any_scheduler_ref>) AND
scheduler<Scheduler>)
/* implicit */ any_scheduler_ref(Scheduler& sched) noexcept
: impl_(sched) {}
#else
// Under-constrained implicit tuple converting constructor from a
// single argument doesn't exclude instances of the tuple type
// itself, so it is considered for copy/move constructors, leading
// to constraint recursion with the any_scheduler_ref constructor
// below.
template (typename Scheduler)
(requires (!same_as<const Scheduler, const any_scheduler_ref>) AND
(!_is_tuple<Scheduler>) AND scheduler<Scheduler>)
/* implicit */ any_scheduler_ref(Scheduler& sched) noexcept
: impl_(sched) {}
#endif

struct _sender {
template <template <class...> class Variant, template <class...> class Tuple>
Expand Down Expand Up @@ -257,14 +290,26 @@ struct _with<CPOs...>::any_scheduler_ref {
return _sender{this};
}

type_index type() const noexcept {
return _get_type_index(impl_);
}

// Shallow equality comparison by default, for regularity:
friend bool operator==(const any_scheduler_ref& left, const any_scheduler_ref& right) noexcept {
return left.impl_ == right.impl_;
}
friend bool operator!=(const any_scheduler_ref& left, const any_scheduler_ref& right) noexcept {
return !(left == right);
}

// Deep equality comparison:
friend _equal_to_fn;
bool equal_to(const any_scheduler_ref& that) const noexcept {
return _equal_to(impl_, that);
}

private:

any_scheduler_ref_impl<CPOs...> impl_;
};

Expand Down
2 changes: 1 addition & 1 deletion include/unifex/async_trace.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ namespace _async_trace {
return operation<Receiver>{(Receiver &&) r};
}

friend blocking_kind tag_invoke(tag_t<blocking>, const sender&) noexcept {
friend auto tag_invoke(tag_t<blocking>, const sender&) noexcept {
return blocking_kind::always_inline;
}
};
Expand Down
45 changes: 38 additions & 7 deletions include/unifex/at_coroutine_exit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
#include <unifex/tag_invoke.hpp>
#include <unifex/await_transform.hpp>
#include <unifex/continuations.hpp>
#include <unifex/scheduler_concepts.hpp>
#include <unifex/stop_token_concepts.hpp>
#include <unifex/unstoppable_token.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/inline_scheduler.hpp>
#include <unifex/any_scheduler.hpp>
#include <unifex/blocking.hpp>

#if UNIFEX_NO_COROUTINES
# error "Coroutine support is required to use this header"
Expand Down Expand Up @@ -115,11 +119,19 @@ struct _cleanup_promise_base {
}
#endif

friend unstoppable_token tag_invoke(tag_t<get_stop_token>, const _cleanup_promise_base&) noexcept {
friend unstoppable_token
tag_invoke(tag_t<get_stop_token>, const _cleanup_promise_base&) noexcept {
return unstoppable_token{};
}

friend any_scheduler
tag_invoke(tag_t<get_scheduler>, const _cleanup_promise_base& p) noexcept {
return p.sched_;
}

inline static constexpr inline_scheduler _default_scheduler{};
continuation_handle<> continuation_{};
any_scheduler sched_{_default_scheduler};
bool isUnhandledDone_{false};
};

Expand All @@ -145,6 +157,15 @@ struct _die_on_done_rec {
UNIFEX_ASSERT(!"A cleanup action tried to cancel. Calling terminate...");
std::terminate();
}

template(typename CPO)
(requires is_receiver_query_cpo_v<CPO> AND
is_callable_v<CPO, const Receiver&>)
friend auto tag_invoke(CPO cpo, const type& p)
noexcept(is_nothrow_callable_v<CPO, const Receiver&>)
-> callable_result_t<CPO, const Receiver&> {
return cpo(p.rec_);
}
};
};

Expand Down Expand Up @@ -177,7 +198,7 @@ struct _die_on_done {
_die_on_done_rec_t<Receiver>{(Receiver&&) rec});
}

Sender sender_;
UNIFEX_NO_UNIQUE_ADDRESS Sender sender_;
};
};

Expand Down Expand Up @@ -229,7 +250,7 @@ struct _cleanup_promise : _cleanup_promise_base {
return unifex::await_transform(*this, _die_on_done_fn{}((Value&&) value));
}

std::tuple<Ts&...> args_;
UNIFEX_NO_UNIQUE_ADDRESS std::tuple<Ts&...> args_;
};

template <typename... Ts>
Expand All @@ -251,14 +272,24 @@ struct [[nodiscard]] _cleanup_task {
}

template <typename Promise>
bool await_suspend(coro::coroutine_handle<Promise> parent) noexcept {
bool await_suspend_impl_(Promise& parent) noexcept {
continuation_.promise().continuation_ =
exchange_continuation(parent.promise(), continuation_);
exchange_continuation(parent, continuation_);
continuation_.promise().sched_ = get_scheduler(parent);
return false;
}

template <typename Promise>
bool await_suspend(coro::coroutine_handle<Promise> parent) noexcept {
return await_suspend_impl_(parent.promise());
}

std::tuple<Ts&...> await_resume() noexcept {
return std::exchange(continuation_, {}).promise().args_;
return std::move(std::exchange(continuation_, {}).promise().args_);
}

friend constexpr auto tag_invoke(tag_t<blocking>, const _cleanup_task&) noexcept {
return blocking_kind::always_inline;
}

private:
Expand All @@ -275,7 +306,7 @@ namespace _at_coroutine_exit {
public:
template (typename Action, typename... Ts)
(requires callable<std::decay_t<Action>, std::decay_t<Ts>...>)
_cleanup_task<Ts...> operator()(Action&& action, Ts&&... ts) const {
_cleanup_task<std::decay_t<Ts>...> operator()(Action&& action, Ts&&... ts) const {
return _fn::at_coroutine_exit((Action&&) action, (Ts&&) ts...);
}
} at_coroutine_exit{};
Expand Down
17 changes: 10 additions & 7 deletions include/unifex/await_transform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ struct _awaitable_base<Promise, Value>::type {
struct _rec {
public:
explicit _rec(_expected<Value>* result, coro::coroutine_handle<Promise> continuation) noexcept
: result_(result)
, continuation_(continuation)
: result_(result)
, continuation_(continuation)
{}

_rec(_rec&& r) noexcept
: result_(std::exchange(r.result_, nullptr))
, continuation_(std::exchange(r.continuation_, nullptr))
: result_(std::exchange(r.result_, nullptr))
, continuation_(std::exchange(r.continuation_, nullptr))
{}

template(class... Us)
Expand Down Expand Up @@ -185,13 +185,16 @@ struct _awaitable<Promise, Sender>::type
template <typename Promise, typename Sender>
using _as_awaitable = typename _awaitable<Promise, Sender>::type;

inline const struct _fn {
struct _fn {
// Call custom implementation if present.
template(typename Promise, typename Value)
(requires tag_invocable<_fn, Promise&, Value>)
auto operator()(Promise& promise, Value&& value) const
noexcept(is_nothrow_tag_invocable_v<_fn, Promise&, Value>)
-> tag_invoke_result_t<_fn, Promise&, Value> {
static_assert(detail::_awaitable<tag_invoke_result_t<_fn, Promise&, Value>>,
"The return type of a customization of unifex::await_transform() "
"must satisfy the awaitable concept.");
return unifex::tag_invoke(_fn{}, promise, (Value&&)value);
}

Expand All @@ -218,7 +221,7 @@ inline const struct _fn {
return (Value&&) value;
}
}
} await_transform {};
};

} // namespace _await_tfx

Expand All @@ -231,7 +234,7 @@ inline const struct _fn {
//
// Coroutine promise_types can implement their .await_transform() methods to
// forward to this customisation point to enable use of type customisations.
using _await_tfx::await_transform;
inline constexpr _await_tfx::_fn await_transform {};

} // namespace unifex

Expand Down
75 changes: 67 additions & 8 deletions include/unifex/blocking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@

namespace unifex {

enum class blocking_kind {
namespace _block {
enum class _enum {
// No guarantees about the timing and context on which the receiver will
// be called.
maybe,
maybe = 0,

// Always completes asynchronously.
// Guarantees that the receiver will not be called on the current thread
Expand All @@ -44,8 +45,46 @@ enum class blocking_kind {
always_inline
};

namespace _blocking {
inline const struct _fn {
struct blocking_kind {
template <_enum Kind>
using constant = std::integral_constant<_enum, Kind>;

blocking_kind() = default;

constexpr blocking_kind(_enum kind) noexcept
: value(kind)
{}

template <_enum Kind>
constexpr blocking_kind(constant<Kind>) noexcept
: value(Kind)
{}

constexpr operator _enum() const noexcept {
return value;
}

constexpr _enum operator()() const noexcept {
return value;
}

friend constexpr bool operator==(blocking_kind a, blocking_kind b) noexcept {
return a.value == b.value;
}

friend constexpr bool operator!=(blocking_kind a, blocking_kind b) noexcept {
return a.value != b.value;
}

static constexpr constant<_enum::maybe> maybe {};
static constexpr constant<_enum::never> never {};
static constexpr constant<_enum::always> always {};
static constexpr constant<_enum::always_inline> always_inline {};

_enum value{};
};

struct _fn {
template(typename Sender)
(requires tag_invocable<_fn, const Sender&>)
constexpr auto operator()(const Sender& s) const
Expand All @@ -55,12 +94,32 @@ inline const struct _fn {
}
template(typename Sender)
(requires (!tag_invocable<_fn, const Sender&>))
constexpr blocking_kind operator()(const Sender&) const noexcept {
constexpr auto operator()(const Sender&) const noexcept {
return blocking_kind::maybe;
}
};

namespace _cfn {
template <_enum Kind>
static constexpr auto _kind(blocking_kind::constant<Kind> kind) noexcept {
return kind;
}
static constexpr auto _kind(blocking_kind) noexcept {
return blocking_kind::maybe;
}
} blocking{};
} // namespace _blocking
using _blocking::blocking;

template <typename T>
constexpr auto cblocking() noexcept {
using blocking_t = remove_cvref_t<decltype(_fn{}(UNIFEX_DECLVAL(T&)))>;
return _cfn::_kind(blocking_t{});
}
}

} // namespace _block

inline constexpr _block::_fn blocking {};
using _block::_cfn::cblocking;
using _block::blocking_kind;

} // namespace unifex

Expand Down
Loading