From 9776065158b70f43f497979caef62acc6ebb1760 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Fri, 15 Jul 2022 14:18:07 +0000 Subject: [PATCH 1/6] merge --- .../runtime/task/local_mode_task_submitter.cc | 1 + .../java/io/ray/api/call/ActorTaskCaller.java | 5 ++++ .../java/io/ray/api/options/CallOptions.java | 14 +++++++-- java/build-jar-multiplatform.sh | 18 ++++++----- .../io/ray/runtime/AbstractRayRuntime.java | 9 ++++-- .../java/io/ray/runtime/RayDevRuntime.java | 2 +- .../java/io/ray/runtime/RayNativeRuntime.java | 6 ++-- src/ray/common/task/task_spec.cc | 7 ++++- src/ray/common/task/task_util.h | 2 ++ src/ray/core_worker/common.h | 8 +++-- src/ray/core_worker/core_worker.cc | 26 ++++++++++++++-- src/ray/core_worker/core_worker.h | 4 ++- .../java/io_ray_runtime_RayNativeRuntime.cc | 5 ++-- .../java/io_ray_runtime_RayNativeRuntime.h | 2 +- ...io_ray_runtime_task_NativeTaskSubmitter.cc | 5 +++- src/ray/core_worker/lib/java/jni_init.cc | 4 +++ src/ray/core_worker/lib/java/jni_utils.h | 3 ++ src/ray/core_worker/task_manager.cc | 5 ++-- src/ray/core_worker/test/core_worker_test.cc | 1 + .../test/direct_task_transport_test.cc | 1 + src/ray/gcs/test/gcs_test_util.h | 1 + src/ray/protobuf/common.proto | 30 ++++++++++--------- .../scheduling/cluster_task_manager_test.cc | 1 + 23 files changed, 116 insertions(+), 44 deletions(-) diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index c62765d53b10..fe12238018f2 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -58,6 +58,7 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation, 0, local_mode_ray_tuntime_.GetCurrentTaskId(), address, + -1, 1, required_resources, required_placement_resources, diff --git a/java/api/src/main/java/io/ray/api/call/ActorTaskCaller.java b/java/api/src/main/java/io/ray/api/call/ActorTaskCaller.java index 2ee68e70168d..04d8ff555fe1 100644 --- a/java/api/src/main/java/io/ray/api/call/ActorTaskCaller.java +++ b/java/api/src/main/java/io/ray/api/call/ActorTaskCaller.java @@ -28,6 +28,11 @@ public ActorTaskCaller setConcurrencyGroup(String name) { return self(); } + public ActorTaskCaller setForwardObjectToParentTask(boolean ifForward) { + builder.setForwardObjectToParentTask(ifForward); + return self(); + } + private ActorTaskCaller self() { return this; } diff --git a/java/api/src/main/java/io/ray/api/options/CallOptions.java b/java/api/src/main/java/io/ray/api/options/CallOptions.java index e646887ec09d..f1f5cf1b8922 100644 --- a/java/api/src/main/java/io/ray/api/options/CallOptions.java +++ b/java/api/src/main/java/io/ray/api/options/CallOptions.java @@ -13,6 +13,7 @@ public class CallOptions extends BaseTaskOptions { public final int bundleIndex; public final String concurrencyGroupName; private final String serializedRuntimeEnvInfo; + public final boolean forwardObjectToParentTask; private CallOptions( String name, @@ -20,13 +21,15 @@ private CallOptions( PlacementGroup group, int bundleIndex, String concurrencyGroupName, - RuntimeEnv runtimeEnv) { + RuntimeEnv runtimeEnv, + boolean forwardObjectToParentTask) { super(resources); this.name = name; this.group = group; this.bundleIndex = bundleIndex; this.concurrencyGroupName = concurrencyGroupName; this.serializedRuntimeEnvInfo = runtimeEnv == null ? "" : runtimeEnv.toJsonBytes(); + this.forwardObjectToParentTask = forwardObjectToParentTask; } /** This inner class for building CallOptions. */ @@ -38,6 +41,7 @@ public static class Builder { private int bundleIndex; private String concurrencyGroupName = ""; private RuntimeEnv runtimeEnv = null; + private boolean forwardObjectToParentTask = false; /** * Set a name for this task. @@ -98,8 +102,14 @@ public Builder setRuntimeEnv(RuntimeEnv runtimeEnv) { return this; } + public Builder setForwardObjectToParentTask(boolean ifForward) { + this.forwardObjectToParentTask = ifForward; + return this; + } + public CallOptions build() { - return new CallOptions(name, resources, group, bundleIndex, concurrencyGroupName, runtimeEnv); + return new CallOptions(name, resources, group, bundleIndex, concurrencyGroupName, + runtimeEnv, forwardObjectToParentTask); } } } diff --git a/java/build-jar-multiplatform.sh b/java/build-jar-multiplatform.sh index 417b5b556035..6f5ec70eef51 100755 --- a/java/build-jar-multiplatform.sh +++ b/java/build-jar-multiplatform.sh @@ -79,12 +79,12 @@ build_jars_multiplatform() { return fi fi - if download_jars "ray-runtime-$version.jar"; then - prepare_native - build_jars multiplatform false - else - echo "download_jars failed, skip building multiplatform jars" - fi + # if download_jars "ray-runtime-$version.jar"; then + prepare_native + build_jars linux + # else + # echo "download_jars failed, skip building multiplatform jars" + # fi } # Download darwin/windows ray-related jar from s3 @@ -124,7 +124,8 @@ download_jars() { # prepare native binaries and libraries. prepare_native() { - for os in 'darwin' 'linux'; do + # for os in 'darwin' 'linux'; do + for os in 'linux'; do cd "$JAR_BASE_DIR/$os" jar xf "ray-runtime-$version.jar" "native/$os" local native_dir="$WORKSPACE_DIR/java/runtime/native_dependencies/native/$os" @@ -137,7 +138,8 @@ prepare_native() { # Return 0 if native bianries and libraries exist and 1 if not. native_files_exist() { local os - for os in 'darwin' 'linux'; do + # for os in 'darwin' 'linux'; do + for os in 'linux'; do native_dirs=() native_dirs+=("$WORKSPACE_DIR/java/runtime/native_dependencies/native/$os") for native_dir in "${native_dirs[@]}"; do diff --git a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java index 3ff7fbff8bff..b5c278179666 100644 --- a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java @@ -323,7 +323,8 @@ private ObjectRef callNormalFunction( ObjectRefImpl impl = new ObjectRefImpl<>(); /// Mapping the object id to the object ref. - List preparedReturnIds = getCurrentReturnIds(numReturns, ActorId.NIL); + List preparedReturnIds = getCurrentReturnIds(numReturns, ActorId.NIL, + options.forwardObjectToParentTask); if (rayConfig.runMode == RunMode.CLUSTER && numReturns > 0) { ObjectRefImpl.registerObjectRefImpl(preparedReturnIds.get(0), impl); } @@ -354,7 +355,9 @@ private ObjectRef callActorFunction( ObjectRefImpl impl = new ObjectRefImpl<>(); /// Mapping the object id to the object ref. - List preparedReturnIds = getCurrentReturnIds(numReturns, rayActor.getId()); + System.err.println(options.forwardObjectToParentTask); + List preparedReturnIds = getCurrentReturnIds(numReturns, rayActor.getId(), + options.forwardObjectToParentTask); if (rayConfig.runMode == RunMode.CLUSTER && numReturns > 0) { ObjectRefImpl.registerObjectRefImpl(preparedReturnIds.get(0), impl); } @@ -394,7 +397,7 @@ private BaseActorHandle createActorImpl( return actor; } - abstract List getCurrentReturnIds(int numReturns, ActorId actorId); + abstract List getCurrentReturnIds(int numReturns, ActorId actorId, boolean ifForward); public WorkerContext getWorkerContext() { return workerContext; diff --git a/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java index 8280642bae2d..6734e8b05b67 100644 --- a/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java @@ -93,7 +93,7 @@ public Map> getAvailableResourceIds() { } @Override - List getCurrentReturnIds(int numReturns, ActorId actorId) { + List getCurrentReturnIds(int numReturns, ActorId actorId, boolean ifForward) { return null; } diff --git a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java index 4f471b890f8e..7bd523f5dea8 100644 --- a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java @@ -217,8 +217,8 @@ public void killActor(BaseActorHandle actor, boolean noRestart) { } @Override - List getCurrentReturnIds(int numReturns, ActorId actorId) { - List ret = nativeGetCurrentReturnIds(numReturns, actorId.getBytes()); + List getCurrentReturnIds(int numReturns, ActorId actorId, boolean ifForward) { + List ret = nativeGetCurrentReturnIds(numReturns, actorId.getBytes(), ifForward); return ret.stream().map(ObjectId::new).collect(Collectors.toList()); } @@ -291,7 +291,7 @@ private static native void nativeInitialize( private static native String nativeGetNamespace(); - private static native List nativeGetCurrentReturnIds(int numReturns, byte[] actorId); + private static native List nativeGetCurrentReturnIds(int numReturns, byte[] actorId, boolean ifForward); private static native byte[] nativeGetCurrentNodeId(); } diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 4b608d9bf5c1..163ae83e0840 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -198,7 +198,12 @@ size_t TaskSpecification::NumArgs() const { return message_->args_size(); } size_t TaskSpecification::NumReturns() const { return message_->num_returns(); } ObjectID TaskSpecification::ReturnId(size_t return_index) const { - return ObjectID::FromIndex(TaskId(), return_index + 1); + auto parent_num_returns = this->message_->parent_num_returns(); + if (parent_num_returns < 0) { + return ObjectID::FromIndex(TaskId(), return_index + 1); + } else { + return ObjectID::FromIndex(ParentTaskId(), parent_num_returns + return_index + 1); + } } bool TaskSpecification::ArgByRef(size_t arg_index) const { diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 4f5305f9b5aa..32a17b984356 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -107,6 +107,7 @@ class TaskSpecBuilder { uint64_t parent_counter, const TaskID &caller_id, const rpc::Address &caller_address, + int parent_num_returns, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, @@ -124,6 +125,7 @@ class TaskSpecBuilder { message_->set_parent_counter(parent_counter); message_->set_caller_id(caller_id.Binary()); message_->mutable_caller_address()->CopyFrom(caller_address); + message_->set_parent_num_returns(parent_num_returns); message_->set_num_returns(num_returns); message_->mutable_required_resources()->insert(required_resources.begin(), required_resources.end()); diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 0460494f3201..5854db0f3e11 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -60,12 +60,14 @@ struct TaskOptions { int num_returns, std::unordered_map &resources, const std::string &concurrency_group_name = "", - const std::string &serialized_runtime_env_info = "{}") + const std::string &serialized_runtime_env_info = "{}", + bool use_parent_task_id = false) : name(name), num_returns(num_returns), resources(resources), concurrency_group_name(concurrency_group_name), - serialized_runtime_env_info(serialized_runtime_env_info) {} + serialized_runtime_env_info(serialized_runtime_env_info), + use_parent_task_id(use_parent_task_id) {} /// The name of this task. std::string name; @@ -79,6 +81,8 @@ struct TaskOptions { /// fields which not contained in Runtime Env, such as eager_install. /// Propagated to child actors and tasks. std::string serialized_runtime_env_info; + + bool use_parent_task_id = false; }; /// Options for actor creation tasks. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9c3cfcff9e76..ff7b0d85f729 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1530,6 +1530,7 @@ void CoreWorker::BuildCommonTaskSpec( const rpc::Address &address, const RayFunction &function, const std::vector> &args, + int parent_num_returns, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, @@ -1549,6 +1550,7 @@ void CoreWorker::BuildCommonTaskSpec( task_index, caller_id, address, + parent_num_returns, num_returns, required_resources, required_placement_resources, @@ -1597,6 +1599,7 @@ std::vector CoreWorker::SubmitTask( rpc_address_, function, args, + -1, task_options.num_returns, constrained_resources, required_resources, @@ -1676,6 +1679,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, rpc_address_, function, args, + -1, 1, new_resource, new_placement_resources, @@ -1872,7 +1876,13 @@ std::optional> CoreWorker::SubmitActorTask( // Add one for actor cursor object id for tasks. const int num_returns = task_options.num_returns + 1; - + int parent_num_returns = -1; + if (task_options.use_parent_task_id) { + auto current_task = worker_context_.GetCurrentTask(); + parent_num_returns = current_task->NumReturns(); + } + RAY_LOG(ERROR) << parent_num_returns; + RAY_LOG(ERROR) << task_options.use_parent_task_id; // Build common task spec. TaskSpecBuilder builder; const auto next_task_index = worker_context_.GetNextTaskIndex(); @@ -1899,6 +1909,7 @@ std::optional> CoreWorker::SubmitActorTask( rpc_address_, function, args, + parent_num_returns, num_returns, task_options.resources, required_resources, @@ -3392,11 +3403,20 @@ Status CoreWorker::WaitForActorRegistered(const std::vector &ids) { } std::vector CoreWorker::GetCurrentReturnIds(int num_returns, - const ActorID &callee_actor_id) { + const ActorID &callee_actor_id, + bool use_parent_task_id) { std::vector return_ids(num_returns); const auto next_task_index = worker_context_.GetTaskIndex() + 1; TaskID task_id; - if (callee_actor_id.IsNil()) { + if (use_parent_task_id) { + task_id = worker_context_.GetCurrentTaskID(); + auto current_task = worker_context_.GetCurrentTask(); + size_t parent_task_num_returns = current_task->NumReturns(); + for (int i = 0; i < num_returns; i++) { + return_ids[i] = ObjectID::FromIndex(task_id, parent_task_num_returns + i + 1); + } + return return_ids; + } else if (callee_actor_id.IsNil()) { /// Return ids for normal task call. task_id = TaskID::ForNormalTask(worker_context_.GetCurrentJobID(), worker_context_.GetCurrentInternalTaskId(), diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index bfd43b6dddb7..f04db9a0da6b 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -673,7 +673,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Get the expected return ids of the next task. std::vector GetCurrentReturnIds(int num_returns, - const ActorID &callee_actor_id); + const ActorID &callee_actor_id, + bool use_parent_task_id); /// The following methods are handlers for the core worker's gRPC server, which follow /// a macro-generated call convention. These are executed on the io_service_ and @@ -844,6 +845,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const rpc::Address &address, const RayFunction &function, const std::vector> &args, + int parent_num_returns, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index a544847edf16..c64a21f5bb49 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -428,11 +428,12 @@ Java_io_ray_runtime_RayNativeRuntime_nativeGetCurrentNodeId(JNIEnv *env, jclass) } JNIEXPORT jobject JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeGetCurrentReturnIds( - JNIEnv *env, jclass, jint numReturns, jbyteArray actorIdByteArray) { + JNIEnv *env, jclass, jint numReturns, jbyteArray actorIdByteArray, jboolean ifForward) { auto &core_worker = CoreWorkerProcess::GetCoreWorker(); auto return_ids = core_worker.GetCurrentReturnIds( static_cast(numReturns), - JavaByteArrayToId(env, actorIdByteArray)); + JavaByteArrayToId(env, actorIdByteArray), + static_cast(ifForward)); return NativeIdVectorToJavaByteArrayList(env, return_ids); } diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h index a7e8a885469b..36730b2f9fe5 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h @@ -101,7 +101,7 @@ Java_io_ray_runtime_RayNativeRuntime_nativeGetNamespace(JNIEnv *, jclass); * Signature: (I[B)Ljava/util/List; */ JNIEXPORT jobject JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeGetCurrentReturnIds( - JNIEnv *, jclass, jint, jbyteArray); + JNIEnv *, jclass, jint, jbyteArray, jboolean); /* * Class: io_ray_runtime_RayNativeRuntime diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index 68536247030c..c4d203887eb4 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -124,6 +124,7 @@ inline TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject callOptio std::string concurrency_group_name = ""; std::string serialzied_runtime_env_info = ""; + bool if_forward = false; if (callOptions) { jobject java_resources = env->GetObjectField(callOptions, java_base_task_options_resources); @@ -150,8 +151,10 @@ inline TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject callOptio } } + if_forward = (bool) env->GetBooleanField(callOptions, java_call_options_if_forward); TaskOptions task_options{ - name, numReturns, resources, concurrency_group_name, serialzied_runtime_env_info}; + name, numReturns, resources, concurrency_group_name, + serialzied_runtime_env_info, if_forward}; return task_options; } diff --git a/src/ray/core_worker/lib/java/jni_init.cc b/src/ray/core_worker/lib/java/jni_init.cc index f51286f1548e..9e484b667af8 100644 --- a/src/ray/core_worker/lib/java/jni_init.cc +++ b/src/ray/core_worker/lib/java/jni_init.cc @@ -101,6 +101,7 @@ jfieldID java_task_creation_options_group; jfieldID java_task_creation_options_bundle_index; jfieldID java_call_options_concurrency_group_name; jfieldID java_call_options_serialized_runtime_env_info; +jfieldID java_call_options_if_forward; jclass java_actor_creation_options_class; jfieldID java_actor_creation_options_name; @@ -319,6 +320,9 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) { java_call_options_class, "concurrencyGroupName", "Ljava/lang/String;"); java_call_options_serialized_runtime_env_info = env->GetFieldID( java_call_options_class, "serializedRuntimeEnvInfo", "Ljava/lang/String;"); + java_call_options_if_forward = env->GetFieldID( + java_call_options_class, "forwardObjectToParentTask", "Z"); + java_placement_group_class = LoadClass(env, "io/ray/runtime/placementgroup/PlacementGroupImpl"); diff --git a/src/ray/core_worker/lib/java/jni_utils.h b/src/ray/core_worker/lib/java/jni_utils.h index 506b72aade40..18bddbb5e5cb 100644 --- a/src/ray/core_worker/lib/java/jni_utils.h +++ b/src/ray/core_worker/lib/java/jni_utils.h @@ -178,9 +178,12 @@ extern jfieldID java_task_creation_options_group; extern jfieldID java_task_creation_options_bundle_index; /// concurrencyGroupName field of CallOptions class extern jfieldID java_call_options_concurrency_group_name; + /// serializedRuntimeEnvInfo field of CallOptions class extern jfieldID java_call_options_serialized_runtime_env_info; +extern jfieldID java_call_options_if_forward; + /// ActorCreationOptions class extern jclass java_actor_creation_options_class; /// name field of ActorCreationOptions class diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 614e6834a547..862a946e8fbf 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -66,7 +66,8 @@ std::vector TaskManager::AddPendingTask( for (size_t i = 0; i < num_returns; i++) { auto return_id = spec.ReturnId(i); if (!spec.IsActorCreationTask()) { - bool is_reconstructable = max_retries != 0; + bool is_reconstructable = true; + // bool is_reconstructable = max_retries != 0; // We pass an empty vector for inner IDs because we do not know the return // value of the task yet. If the task returns an ID(s), the worker will // publish the WaitForRefRemoved message that we are now a borrower for @@ -98,7 +99,7 @@ std::vector TaskManager::AddPendingTask( { absl::MutexLock lock(&mu_); auto inserted = submissible_tasks_.emplace(spec.TaskId(), - TaskEntry(spec, max_retries, num_returns)); + TaskEntry(spec, 1, num_returns)); RAY_CHECK(inserted.second); num_pending_tasks_++; } diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index fb48f1ceeb1d..9542e1999c18 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -559,6 +559,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { 0, RandomTaskId(), address, + -1, num_returns, resources, resources, diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 551dfe2cb164..c8fd749f58c3 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -45,6 +45,7 @@ TaskSpecification BuildTaskSpec(const std::unordered_map &r 0, TaskID::Nil(), empty_address, + -1, 1, resources, resources, diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 7b7298ab29cd..f2096be2a0eb 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -54,6 +54,7 @@ struct Mocker { 0, TaskID::Nil(), owner_address, + -1, 1, required_resources, required_placement_resources, diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 3dc35c941ea4..36c398f3ae66 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -284,42 +284,44 @@ message TaskSpec { Address caller_address = 10; // Task arguments. repeated TaskArg args = 11; + // Number of parent task's return objects. + int32 parent_num_returns = 12; // Number of return objects. - uint64 num_returns = 12; + uint64 num_returns = 13; // Quantities of the different resources required by this task. - map required_resources = 13; + map required_resources = 14; // The resources required for placing this task on a node. If this is empty, // then the placement resources are equal to the required_resources. - map required_placement_resources = 14; + map required_placement_resources = 15; // Task specification for an actor creation task. // This field is only valid when `type == ACTOR_CREATION_TASK`. - ActorCreationTaskSpec actor_creation_task_spec = 15; + ActorCreationTaskSpec actor_creation_task_spec = 16; // Task specification for an actor task. // This field is only valid when `type == ACTOR_TASK`. - ActorTaskSpec actor_task_spec = 16; + ActorTaskSpec actor_task_spec = 17; // Number of times this task may be retried on worker failure. - int32 max_retries = 17; + int32 max_retries = 18; // Whether or not to skip the execution of this task. When it's true, // the receiver will not execute the task. This field is used by async actors // to guarantee task submission order after restart. - bool skip_execution = 21; + bool skip_execution = 22; // Breakpoint if this task should drop into the debugger when it starts executing // and "" if the task should not drop into the debugger. - bytes debugger_breakpoint = 22; + bytes debugger_breakpoint = 23; // Runtime environment for this task. - RuntimeEnvInfo runtime_env_info = 23; + RuntimeEnvInfo runtime_env_info = 24; // The concurrency group name in which this task will be performed. - string concurrency_group_name = 24; + string concurrency_group_name = 25; // Whether application-level errors (exceptions) should be retried. - bool retry_exceptions = 25; + bool retry_exceptions = 26; // The depth of the task. The driver has depth 0, anything it calls has depth // 1, etc. - int64 depth = 26; + int64 depth = 27; // Strategy about how to schedule this task. - SchedulingStrategy scheduling_strategy = 27; + SchedulingStrategy scheduling_strategy = 28; // A count of the number of times this task has been attempted so far. 0 // means this is the first execution. - uint64 attempt_number = 28; + uint64 attempt_number = 29; } message TaskInfoEntry { diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index cc0ad1efab1b..aa6b684b2044 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -161,6 +161,7 @@ RayTask CreateTask( 0, TaskID::Nil(), address, + -1, 0, required_resources, {}, From d31676c1d3b2357e20d2fc91312244c437fa0048 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Tue, 19 Jul 2022 11:08:39 +0000 Subject: [PATCH 2/6] upd --- src/ray/common/task/task_spec.cc | 4 + src/ray/common/task/task_spec.h | 2 + src/ray/core_worker/core_worker.cc | 97 ++++++++++++------- src/ray/core_worker/core_worker.h | 6 ++ src/ray/core_worker/task_manager.cc | 43 ++++---- src/ray/core_worker/task_manager.h | 10 ++ .../test/dependency_resolver_test.cc | 1 + src/ray/core_worker/test/task_manager_test.cc | 5 + 8 files changed, 118 insertions(+), 50 deletions(-) diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 163ae83e0840..e9c050fc31b7 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -206,6 +206,10 @@ ObjectID TaskSpecification::ReturnId(size_t return_index) const { } } +bool TaskSpecification::ForwardToParent() const { + return this->message_->parent_num_returns() >= 0; +} + bool TaskSpecification::ArgByRef(size_t arg_index) const { return message_->args(arg_index).has_object_ref(); } diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index debe14267864..8d56dfaa1736 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -221,6 +221,8 @@ class TaskSpecification : public MessageWrapper { ObjectID ReturnId(size_t return_index) const; + bool ForwardToParent() const; + const uint8_t *ArgData(size_t arg_index) const; size_t ArgDataSize(size_t arg_index) const; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ff7b0d85f729..662312f4b94c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -277,6 +277,17 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ double timestamp) { return PushError(job_id, type, error_message, timestamp); }; + auto forward_object_callback = [this](const ObjectID &object_id, + const std::vector &contained_object_ids, + const rpc::Address &borrower_address, + const rpc::Address &owner_address, + const size_t object_size) { + return ForwardToOtherWorker(object_id, + contained_object_ids, + borrower_address, + owner_address, + object_size); + }; task_manager_.reset(new TaskManager( memory_store_, reference_counter_, @@ -308,6 +319,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ } }, push_error_callback, + forward_object_callback, RayConfig::instance().max_lineage_bytes())); // Create an entry for the driver task in the task table. This task is @@ -950,6 +962,44 @@ Status CoreWorker::Put(const RayObject &object, return PutInLocalPlasmaStore(object, object_id, pin_object); } +Status CoreWorker::ForwardToOtherWorker(const ObjectID &object_id, + const std::vector &contained_object_ids, + const rpc::Address &borrower_address, + const rpc::Address &owner_address, + const size_t object_size) { + // Because in the remote worker's `HandleAssignObjectOwner`, + // a `WaitForRefRemoved` RPC request will be sent back to + // the current worker. So we need to make sure ref count is > 0 + // by invoking `AddLocalReference` first. Note that in worker.py we set + // skip_adding_local_ref=True to avoid double referencing the object. + AddLocalReference(object_id); + RAY_UNUSED( + reference_counter_->AddBorrowedObject(object_id, + ObjectID::Nil(), + owner_address, + /*foreign_owner_already_monitoring=*/true)); + + // Remote call `AssignObjectOwner()`. + rpc::AssignObjectOwnerRequest request; + request.set_object_id(object_id.Binary()); + request.mutable_borrower_address()->CopyFrom(borrower_address); + request.set_call_site(CurrentCallSite()); + + for (auto &contained_object_id : contained_object_ids) { + request.add_contained_object_ids(contained_object_id.Binary()); + } + request.set_object_size(object_size); + auto conn = core_worker_client_pool_->GetOrConnect(owner_address); + std::promise status_promise; + conn->AssignObjectOwner(request, + [&status_promise](const Status &returned_status, + const rpc::AssignObjectOwnerReply &reply) { + status_promise.set_value(returned_status); + }); + // Block until the remote call `AssignObjectOwner` returns. + return status_promise.get_future().get(); +} + Status CoreWorker::CreateOwnedAndIncrementLocalRef( const std::shared_ptr &metadata, const size_t data_size, @@ -978,37 +1028,11 @@ Status CoreWorker::CreateOwnedAndIncrementLocalRef( /*add_local_ref=*/true, NodeID::FromBinary(rpc_address_.raylet_id())); } else { - // Because in the remote worker's `HandleAssignObjectOwner`, - // a `WaitForRefRemoved` RPC request will be sent back to - // the current worker. So we need to make sure ref count is > 0 - // by invoking `AddLocalReference` first. Note that in worker.py we set - // skip_adding_local_ref=True to avoid double referencing the object. - AddLocalReference(*object_id); - RAY_UNUSED( - reference_counter_->AddBorrowedObject(*object_id, - ObjectID::Nil(), - real_owner_address, - /*foreign_owner_already_monitoring=*/true)); - - // Remote call `AssignObjectOwner()`. - rpc::AssignObjectOwnerRequest request; - request.set_object_id(object_id->Binary()); - request.mutable_borrower_address()->CopyFrom(rpc_address_); - request.set_call_site(CurrentCallSite()); - - for (auto &contained_object_id : contained_object_ids) { - request.add_contained_object_ids(contained_object_id.Binary()); - } - request.set_object_size(data_size + metadata->Size()); - auto conn = core_worker_client_pool_->GetOrConnect(real_owner_address); - std::promise status_promise; - conn->AssignObjectOwner(request, - [&status_promise](const Status &returned_status, - const rpc::AssignObjectOwnerReply &reply) { - status_promise.set_value(returned_status); - }); - // Block until the remote call `AssignObjectOwner` returns. - status = status_promise.get_future().get(); + status = ForwardToOtherWorker(*object_id, + contained_object_ids, + rpc_address_, + real_owner_address, + data_size + metadata->Size()); } if (options_.is_local_mode && owned_by_us && inline_small_object) { @@ -1936,8 +1960,15 @@ std::optional> CoreWorker::SubmitActorTask( lock.Release(); returned_refs = ExecuteTaskLocalMode(task_spec, actor_id); } else { - returned_refs = task_manager_->AddPendingTask( - rpc_address_, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries()); + // auto caller_address = rpc_address_; + if (task_spec.ForwardToParent()) { + auto parent_address = worker_context_.GetCurrentTask()->CallerAddress(); + returned_refs = task_manager_->AddPendingTask( + parent_address, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries()); + } else { + returned_refs = task_manager_->AddPendingTask( + rpc_address_, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries()); + } RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(task_spec)); } return {std::move(returned_refs)}; diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index f04db9a0da6b..481452f94724 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -266,6 +266,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const ObjectID &object_id, bool pin_object = false); + Status ForwardToOtherWorker(const ObjectID &object_id, + const std::vector &contained_object_ids, + const rpc::Address &borrower_address, + const rpc::Address &owner_address, + const size_t object_size); + /// Create and return a buffer in the object store that can be directly written /// into. After writing to the buffer, the caller must call `SealOwned()` to /// finalize the object. The `CreateOwnedAndIncrementLocalRef()` and diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 862a946e8fbf..61d787bd71ff 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -67,23 +67,32 @@ std::vector TaskManager::AddPendingTask( auto return_id = spec.ReturnId(i); if (!spec.IsActorCreationTask()) { bool is_reconstructable = true; - // bool is_reconstructable = max_retries != 0; - // We pass an empty vector for inner IDs because we do not know the return - // value of the task yet. If the task returns an ID(s), the worker will - // publish the WaitForRefRemoved message that we are now a borrower for - // the inner IDs. Note that this message can be received *before* the - // PushTaskReply. - // NOTE(swang): We increment the local ref count to ensure that the - // object is considered in scope before we return the ObjectRef to the - // language frontend. Note that the language bindings should set - // skip_adding_local_ref=True to avoid double referencing the object. - reference_counter_->AddOwnedObject(return_id, - /*inner_ids=*/{}, - caller_address, - call_site, - -1, - /*is_reconstructable=*/is_reconstructable, - /*add_local_ref=*/true); + if (spec.ForwardToParent()){ + auto contained_object_ids = std::vector(); + forward_object_callback_(return_id, + contained_object_ids, + spec.CallerAddress(), + caller_address, + -1); + } else { + // bool is_reconstructable = max_retries != 0; + // We pass an empty vector for inner IDs because we do not know the return + // value of the task yet. If the task returns an ID(s), the worker will + // publish the WaitForRefRemoved message that we are now a borrower for + // the inner IDs. Note that this message can be received *before* the + // PushTaskReply. + // NOTE(swang): We increment the local ref count to ensure that the + // object is considered in scope before we return the ObjectRef to the + // language frontend. Note that the language bindings should set + // skip_adding_local_ref=True to avoid double referencing the object. + reference_counter_->AddOwnedObject(return_id, + /*inner_ids=*/{}, + caller_address, + call_site, + -1, + /*is_reconstructable=*/is_reconstructable, + /*add_local_ref=*/true); + } } return_ids.push_back(return_id); diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 044ee6b141bb..8cfb56f0fbf6 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -82,6 +82,11 @@ using PushErrorCallback = std::function; +using ForwardObjectCallback = std::function &contained_object_ids, + const rpc::Address &borrower_address, + const rpc::Address &owner_address, + const size_t object_size)>; class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface { public: @@ -90,12 +95,14 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa PutInLocalPlasmaCallback put_in_local_plasma_callback, RetryTaskCallback retry_task_callback, PushErrorCallback push_error_callback, + ForwardObjectCallback forward_object_callback, int64_t max_lineage_bytes) : in_memory_store_(in_memory_store), reference_counter_(reference_counter), put_in_local_plasma_callback_(put_in_local_plasma_callback), retry_task_callback_(retry_task_callback), push_error_callback_(push_error_callback), + forward_object_callback_(forward_object_callback), max_lineage_bytes_(max_lineage_bytes) { reference_counter_->SetReleaseLineageCallback( [this](const ObjectID &object_id, std::vector *ids_to_release) { @@ -371,6 +378,9 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa // Called to push an error to the relevant driver. const PushErrorCallback push_error_callback_; + // Called to forward a returned object to another worker + const ForwardObjectCallback forward_object_callback_; + const int64_t max_lineage_bytes_; // The number of task failures we have logged total. diff --git a/src/ray/core_worker/test/dependency_resolver_test.cc b/src/ray/core_worker/test/dependency_resolver_test.cc index 4968dcc73ace..7fd68d1870fc 100644 --- a/src/ray/core_worker/test/dependency_resolver_test.cc +++ b/src/ray/core_worker/test/dependency_resolver_test.cc @@ -40,6 +40,7 @@ TaskSpecification BuildTaskSpec(const std::unordered_map &r 0, TaskID::Nil(), empty_address, + -1, 1, resources, resources, diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index ef005771b6dc..b6fa8e3a8897 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -64,6 +64,11 @@ class TaskManagerTest : public ::testing::Test { const std::string &type, const std::string &error_message, double timestamp) { return Status::OK(); }, + [](const ObjectID &object_id, + const std::vector &contained_object_ids, + const rpc::Address &borrower_address, + const rpc::Address &owner_address, + const size_t object_size) { return Status::OK(); }, max_lineage_bytes) {} virtual void TearDown() { AssertNoLeaks(); } From 24940118e2dd4e0c1226ab40a74ff7c85335799a Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Fri, 29 Jul 2022 07:48:53 +0000 Subject: [PATCH 3/6] add update_forwarded_object --- src/mock/ray/core_worker/core_worker.h | 6 +++ src/mock/ray/rpc/worker/core_worker_client.h | 5 +++ src/ray/core_worker/core_worker.cc | 42 +++++++++++++++++++- src/ray/core_worker/core_worker.h | 7 ++++ src/ray/core_worker/task_manager.cc | 12 +++++- src/ray/core_worker/task_manager.h | 8 ++++ src/ray/protobuf/core_worker.proto | 13 ++++++ src/ray/rpc/worker/core_worker_client.h | 10 +++++ src/ray/rpc/worker/core_worker_server.h | 6 ++- 9 files changed, 105 insertions(+), 4 deletions(-) diff --git a/src/mock/ray/core_worker/core_worker.h b/src/mock/ray/core_worker/core_worker.h index 01afa150cc46..15f7c8180d39 100644 --- a/src/mock/ray/core_worker/core_worker.h +++ b/src/mock/ray/core_worker/core_worker.h @@ -165,6 +165,12 @@ class MockCoreWorker : public CoreWorker { rpc::AssignObjectOwnerReply *reply, rpc::SendReplyCallback send_reply_callback), (override)); + MOCK_METHOD(void, + HandleUpdateForwardedObject, + (const rpc::UpdateForwardedObjectRequest &request, + rpc::UpdateForwardedObjectReply *reply, + rpc::SendReplyCallback send_reply_callback), + (override)); }; } // namespace core diff --git a/src/mock/ray/rpc/worker/core_worker_client.h b/src/mock/ray/rpc/worker/core_worker_client.h index a101da1c19e5..e946214ce6f1 100644 --- a/src/mock/ray/rpc/worker/core_worker_client.h +++ b/src/mock/ray/rpc/worker/core_worker_client.h @@ -129,6 +129,11 @@ class MockCoreWorkerClientInterface : public ray::pubsub::MockSubscriberClientIn (const AssignObjectOwnerRequest &request, const ClientCallback &callback), (override)); + MOCK_METHOD(void, + UpdateForwardedObject, + (const UpdateForwardedObjectRequest &request, + const ClientCallback &callback), + (override)); MOCK_METHOD(int64_t, ClientProcessedUpToSeqno, (), (override)); }; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 662312f4b94c..000768bf913d 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -288,6 +288,13 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ owner_address, object_size); }; + auto update_forwarded_object_callback = [this](const rpc::PushTaskReply &reply, + const std::string &pinned_at_raylet_id, + const rpc::Address &owner_address) { + return UpdateForwardedObject(reply, + pinned_at_raylet_id, + owner_address); + }; task_manager_.reset(new TaskManager( memory_store_, reference_counter_, @@ -320,6 +327,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ }, push_error_callback, forward_object_callback, + update_forwarded_object_callback, RayConfig::instance().max_lineage_bytes())); // Create an entry for the driver task in the task table. This task is @@ -962,6 +970,25 @@ Status CoreWorker::Put(const RayObject &object, return PutInLocalPlasmaStore(object, object_id, pin_object); } +Status CoreWorker::UpdateForwardedObject(const rpc::PushTaskReply &reply, + const std::string &pinned_at_raylet_id, + const rpc::Address &owner_address) { + rpc::UpdateForwardedObjectRequest request; + for (int i = 0; i < reply.return_objects_size(); i++) { + request.add_return_objects()->CopyFrom(reply.return_objects(i)); + } + request.set_pinned_at_raylet_id(pinned_at_raylet_id); + auto conn = core_worker_client_pool_->GetOrConnect(owner_address); + std::promise status_promise; + conn->UpdateForwardedObject(request, + [&status_promise](const Status &returned_status, + const rpc::UpdateForwardedObjectReply &reply) { + status_promise.set_value(returned_status); + }); + // Block until the remote call `UpdateForwardedObject` returns. + return status_promise.get_future().get(); +} + Status CoreWorker::ForwardToOtherWorker(const ObjectID &object_id, const std::vector &contained_object_ids, const rpc::Address &borrower_address, @@ -3256,7 +3283,7 @@ void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &re rpc_address_, call_site, request.object_size(), - /*is_reconstructable=*/false, + request.is_reconstructable(), /*add_local_ref=*/false, /*pinned_at_raylet_id=*/NodeID::FromBinary(borrower_address.raylet_id())); reference_counter_->AddBorrowerAddress(object_id, borrower_address); @@ -3264,6 +3291,19 @@ void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &re send_reply_callback(Status::OK(), nullptr, nullptr); } +void CoreWorker::HandleUpdateForwardedObject(const rpc::UpdateForwardedObjectRequest &request, + rpc::UpdateForwardedObjectReply *reply, + rpc::SendReplyCallback send_reply_callback) { + auto pinned_at_raylet_id = NodeID::FromBinary(request.pinned_at_raylet_id()); + for (int i = 0; i < request.return_objects_size(); i++) { + const auto &return_object = request.return_objects(i); + ObjectID object_id = ObjectID::FromBinary(return_object.object_id()); + reference_counter_->UpdateObjectSize(object_id, return_object.size()); + reference_counter_->UpdateObjectPinnedAtRaylet(object_id, pinned_at_raylet_id); + } + send_reply_callback(Status::OK(), nullptr, nullptr); +} + void CoreWorker::YieldCurrentFiber(FiberEvent &event) { RAY_CHECK(worker_context_.CurrentActorIsAsync()); boost::this_fiber::yield(); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 481452f94724..6a4e7b429dc1 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -272,6 +272,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const rpc::Address &owner_address, const size_t object_size); + Status UpdateForwardedObject(const rpc::PushTaskReply &reply, + const std::string &pinned_at_raylet_id, + const rpc::Address &owner_address); + /// Create and return a buffer in the object store that can be directly written /// into. After writing to the buffer, the caller must call `SealOwned()` to /// finalize the object. The `CreateOwnedAndIncrementLocalRef()` and @@ -791,6 +795,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { rpc::AssignObjectOwnerReply *reply, rpc::SendReplyCallback send_reply_callback) override; + void HandleUpdateForwardedObject(const rpc::UpdateForwardedObjectRequest &request, + rpc::UpdateForwardedObjectReply *reply, + rpc::SendReplyCallback send_reply_callback) override; /// /// Public methods related to async actor call. This should only be used when /// the actor is (1) direct actor and (2) using asyncio mode. diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 61d787bd71ff..5f132cd61521 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -241,6 +241,8 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, // reference holders that are already scheduled at the raylet can retrieve // these objects through plasma. absl::flat_hash_set store_in_plasma_ids = {}; + bool forward_to_parent = false; + rpc::Address caller_address; { absl::MutexLock lock(&mu_); auto it = submissible_tasks_.find(task_id); @@ -249,12 +251,18 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, if (it->second.num_successful_executions > 0) { store_in_plasma_ids = it->second.reconstructable_return_ids; } + forward_to_parent = it->second.spec.ForwardToParent(); + caller_address = it->second.spec.CallerAddress(); } - std::vector direct_return_ids; + if (forward_to_parent) { + update_forwarded_object_callback_(reply, worker_addr.raylet_id(), caller_address); + } else { + for (int i = 0; i < reply.return_objects_size(); i++) { const auto &return_object = reply.return_objects(i); ObjectID object_id = ObjectID::FromBinary(return_object.object_id()); + // in rpc reference_counter_->UpdateObjectSize(object_id, return_object.size()); RAY_LOG(DEBUG) << "Task return object " << object_id << " has size " << return_object.size(); @@ -266,6 +274,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, // it as local in the in-memory store so that the data locality policy // will choose the right raylet for any queued dependent tasks. const auto pinned_at_raylet_id = NodeID::FromBinary(worker_addr.raylet_id()); + // in rpc reference_counter_->UpdateObjectPinnedAtRaylet(object_id, pinned_at_raylet_id); // Mark it as in plasma with a dummy object. RAY_CHECK( @@ -311,6 +320,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, reference_counter_->AddNestedObjectIds(object_id, nested_ids, owner_address); } } + } TaskSpecification spec; bool release_lineage = true; diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 8cfb56f0fbf6..5d5080578c75 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -87,6 +87,9 @@ using ForwardObjectCallback = std::function; +using UpdateForwardedObjectCallback = std::function; class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface { public: @@ -96,6 +99,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa RetryTaskCallback retry_task_callback, PushErrorCallback push_error_callback, ForwardObjectCallback forward_object_callback, + UpdateForwardedObjectCallback update_forwarded_object_callback, int64_t max_lineage_bytes) : in_memory_store_(in_memory_store), reference_counter_(reference_counter), @@ -103,6 +107,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa retry_task_callback_(retry_task_callback), push_error_callback_(push_error_callback), forward_object_callback_(forward_object_callback), + update_forwarded_object_callback_(update_forwarded_object_callback), max_lineage_bytes_(max_lineage_bytes) { reference_counter_->SetReleaseLineageCallback( [this](const ObjectID &object_id, std::vector *ids_to_release) { @@ -381,6 +386,9 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa // Called to forward a returned object to another worker const ForwardObjectCallback forward_object_callback_; + // Called to forward a returned object to another worker + const UpdateForwardedObjectCallback update_forwarded_object_callback_; + const int64_t max_lineage_bytes_; // The number of task failures we have logged total. diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 78171ec8f07c..3c5b8cf23e9b 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -353,10 +353,21 @@ message AssignObjectOwnerRequest { Address borrower_address = 4; // Description of the call site where the reference was created. string call_site = 5; + // whether the object is reconstrutable + bool is_reconstructable = 6; } message AssignObjectOwnerReply {} +message UpdateForwardedObjectRequest { + // The returned objects. + repeated ReturnObject return_objects = 1; + // + string pinned_at_raylet_id = 2; +} + +message UpdateForwardedObjectReply {} + message RayletNotifyGCSRestartRequest {} message RayletNotifyGCSRestartReply {} @@ -414,4 +425,6 @@ service CoreWorkerService { rpc Exit(ExitRequest) returns (ExitReply); // Assign the owner of an object to the intended worker. rpc AssignObjectOwner(AssignObjectOwnerRequest) returns (AssignObjectOwnerReply); + // + rpc UpdateForwardedObject(UpdateForwardedObjectRequest) returns (UpdateForwardedObjectReply); } diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index dc80444cf846..5f76508cf76f 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -193,6 +193,10 @@ class CoreWorkerClientInterface : public pubsub::SubscriberClientInterface { const ClientCallback &callback) { } + virtual void UpdateForwardedObject(const UpdateForwardedObjectRequest &request, + const ClientCallback &callback) { + } + virtual void RayletNotifyGCSRestart( const RayletNotifyGCSRestartRequest &request, const ClientCallback &callback) {} @@ -331,6 +335,12 @@ class CoreWorkerClient : public std::enable_shared_from_this, /*method_timeout_ms*/ -1, override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, + UpdateForwardedObject, + grpc_client_, + /*method_timeout_ms*/ -1, + override) + void PushActorTask(std::unique_ptr request, bool skip_queue, const ClientCallback &callback) override { diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index a66ef4657745..dcf9912ec839 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -47,7 +47,8 @@ namespace rpc { RPC_SERVICE_HANDLER(CoreWorkerService, DeleteSpilledObjects, -1) \ RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady, -1) \ RPC_SERVICE_HANDLER(CoreWorkerService, Exit, -1) \ - RPC_SERVICE_HANDLER(CoreWorkerService, AssignObjectOwner, -1) + RPC_SERVICE_HANDLER(CoreWorkerService, AssignObjectOwner, -1) \ + RPC_SERVICE_HANDLER(CoreWorkerService, UpdateForwardedObject, -1) #define RAY_CORE_WORKER_DECLARE_RPC_HANDLERS \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PushTask) \ @@ -69,7 +70,8 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(DeleteSpilledObjects) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(Exit) \ - DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AssignObjectOwner) + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AssignObjectOwner) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(UpdateForwardedObject) /// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`. class CoreWorkerServiceHandler { From 87309b0d69bfe1eb9236daff9e5a721d63be864e Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Fri, 29 Jul 2022 08:25:15 +0000 Subject: [PATCH 4/6] upd --- src/ray/protobuf/core_worker.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 3c5b8cf23e9b..cb7161a74cf1 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -363,7 +363,7 @@ message UpdateForwardedObjectRequest { // The returned objects. repeated ReturnObject return_objects = 1; // - string pinned_at_raylet_id = 2; + bytes pinned_at_raylet_id = 2; } message UpdateForwardedObjectReply {} From d990a1f7ce86d7cf6d056e0d9eb16d527fbcccab Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Tue, 2 Aug 2022 11:28:11 +0000 Subject: [PATCH 5/6] fix --- src/ray/core_worker/core_worker.cc | 33 ++++++++++--------- src/ray/core_worker/core_worker.h | 2 +- src/ray/core_worker/reference_count.cc | 5 +-- src/ray/core_worker/task_manager.cc | 12 ++++--- src/ray/core_worker/task_manager.h | 2 +- .../transport/direct_actor_transport.cc | 2 +- 6 files changed, 31 insertions(+), 25 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 000768bf913d..47c349443435 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -289,10 +289,10 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ object_size); }; auto update_forwarded_object_callback = [this](const rpc::PushTaskReply &reply, - const std::string &pinned_at_raylet_id, + const std::string &raylet_id, const rpc::Address &owner_address) { return UpdateForwardedObject(reply, - pinned_at_raylet_id, + raylet_id, owner_address); }; task_manager_.reset(new TaskManager( @@ -971,22 +971,27 @@ Status CoreWorker::Put(const RayObject &object, } Status CoreWorker::UpdateForwardedObject(const rpc::PushTaskReply &reply, - const std::string &pinned_at_raylet_id, + const std::string &raylet_id, const rpc::Address &owner_address) { rpc::UpdateForwardedObjectRequest request; + RAY_LOG(ERROR) << "update forward object callback: " << ObjectID::FromBinary(reply.return_objects(0).object_id()); for (int i = 0; i < reply.return_objects_size(); i++) { request.add_return_objects()->CopyFrom(reply.return_objects(i)); } - request.set_pinned_at_raylet_id(pinned_at_raylet_id); + request.set_pinned_at_raylet_id(raylet_id); auto conn = core_worker_client_pool_->GetOrConnect(owner_address); std::promise status_promise; + RAY_LOG(ERROR) << "point 1"; conn->UpdateForwardedObject(request, [&status_promise](const Status &returned_status, const rpc::UpdateForwardedObjectReply &reply) { status_promise.set_value(returned_status); }); + RAY_LOG(ERROR) << "point 2"; // Block until the remote call `UpdateForwardedObject` returns. - return status_promise.get_future().get(); + auto status = status_promise.get_future().get(); + RAY_LOG(ERROR) << "update forwarded object has status " << status; + return status; } Status CoreWorker::ForwardToOtherWorker(const ObjectID &object_id, @@ -1928,9 +1933,11 @@ std::optional> CoreWorker::SubmitActorTask( // Add one for actor cursor object id for tasks. const int num_returns = task_options.num_returns + 1; int parent_num_returns = -1; + auto caller_address = rpc_address_; if (task_options.use_parent_task_id) { auto current_task = worker_context_.GetCurrentTask(); parent_num_returns = current_task->NumReturns(); + caller_address = current_task->CallerAddress(); } RAY_LOG(ERROR) << parent_num_returns; RAY_LOG(ERROR) << task_options.use_parent_task_id; @@ -1957,7 +1964,7 @@ std::optional> CoreWorker::SubmitActorTask( worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), - rpc_address_, + caller_address, function, args, parent_num_returns, @@ -1987,15 +1994,8 @@ std::optional> CoreWorker::SubmitActorTask( lock.Release(); returned_refs = ExecuteTaskLocalMode(task_spec, actor_id); } else { - // auto caller_address = rpc_address_; - if (task_spec.ForwardToParent()) { - auto parent_address = worker_context_.GetCurrentTask()->CallerAddress(); - returned_refs = task_manager_->AddPendingTask( - parent_address, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries()); - } else { - returned_refs = task_manager_->AddPendingTask( - rpc_address_, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries()); - } + returned_refs = task_manager_->AddPendingTask( + rpc_address_, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries()); RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(task_spec)); } return {std::move(returned_refs)}; @@ -3269,6 +3269,7 @@ void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &re rpc::AssignObjectOwnerReply *reply, rpc::SendReplyCallback send_reply_callback) { ObjectID object_id = ObjectID::FromBinary(request.object_id()); + RAY_LOG(ERROR) << "Handling assign ownership for " << object_id; const auto &borrower_address = request.borrower_address(); std::string call_site = request.call_site(); // Get a list of contained object ids. @@ -3294,10 +3295,12 @@ void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &re void CoreWorker::HandleUpdateForwardedObject(const rpc::UpdateForwardedObjectRequest &request, rpc::UpdateForwardedObjectReply *reply, rpc::SendReplyCallback send_reply_callback) { + RAY_LOG(ERROR) << "Start handling updateForwarded"; auto pinned_at_raylet_id = NodeID::FromBinary(request.pinned_at_raylet_id()); for (int i = 0; i < request.return_objects_size(); i++) { const auto &return_object = request.return_objects(i); ObjectID object_id = ObjectID::FromBinary(return_object.object_id()); + RAY_LOG(ERROR) << "Handling updating raylet for " << object_id; reference_counter_->UpdateObjectSize(object_id, return_object.size()); reference_counter_->UpdateObjectPinnedAtRaylet(object_id, pinned_at_raylet_id); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 6a4e7b429dc1..ebe98a8e0d52 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -273,7 +273,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const size_t object_size); Status UpdateForwardedObject(const rpc::PushTaskReply &reply, - const std::string &pinned_at_raylet_id, + const std::string &raylet_id, const rpc::Address &owner_address); /// Create and return a buffer in the object store that can be directly written diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index d8a5e5e6a00d..971ac95db574 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -197,7 +197,7 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id, bool is_reconstructable, bool add_local_ref, const absl::optional &pinned_at_raylet_id) { - RAY_LOG(DEBUG) << "Adding owned object " << object_id; + RAY_LOG(ERROR) << "Adding owned object " << object_id; absl::MutexLock lock(&mutex_); RAY_CHECK(object_id_refs_.count(object_id) == 0) << "Tried to create an owned object that already exists: " << object_id; @@ -692,6 +692,7 @@ std::vector ReferenceCounter::FlushObjectsToRecover() { void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id, const NodeID &raylet_id) { absl::MutexLock lock(&mutex_); + RAY_LOG(ERROR) << "updating raylet of " << object_id; auto it = object_id_refs_.find(object_id); if (it != object_id_refs_.end()) { if (freed_objects_.count(object_id) > 0) { @@ -702,7 +703,7 @@ void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id, // The object is still in scope. Track the raylet location until the object // has gone out of scope or the raylet fails, whichever happens first. if (it->second.pinned_at_raylet_id.has_value()) { - RAY_LOG(INFO) << "Updating primary location for object " << object_id << " to node " + RAY_LOG(ERROR) << "Updating primary location for object " << object_id << " to node " << raylet_id << ", but it already has a primary location " << *it->second.pinned_at_raylet_id << ". This should only happen during reconstruction"; diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 5f132cd61521..a2106921d405 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -35,7 +35,7 @@ std::vector TaskManager::AddPendingTask( int max_retries) { RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId() << " with " << max_retries << " retries"; - + auto owner_address= spec.CallerAddress(); // Add references for the dependencies to the task. std::vector task_deps; for (size_t i = 0; i < spec.NumArgs(); i++) { @@ -71,8 +71,8 @@ std::vector TaskManager::AddPendingTask( auto contained_object_ids = std::vector(); forward_object_callback_(return_id, contained_object_ids, - spec.CallerAddress(), caller_address, + owner_address, -1); } else { // bool is_reconstructable = max_retries != 0; @@ -87,7 +87,7 @@ std::vector TaskManager::AddPendingTask( // skip_adding_local_ref=True to avoid double referencing the object. reference_counter_->AddOwnedObject(return_id, /*inner_ids=*/{}, - caller_address, + owner_address, call_site, -1, /*is_reconstructable=*/is_reconstructable, @@ -98,7 +98,7 @@ std::vector TaskManager::AddPendingTask( return_ids.push_back(return_id); rpc::ObjectReference ref; ref.set_object_id(spec.ReturnId(i).Binary()); - ref.mutable_owner_address()->CopyFrom(caller_address); + ref.mutable_owner_address()->CopyFrom(owner_address); ref.set_call_site(call_site); returned_refs.push_back(std::move(ref)); } @@ -255,7 +255,9 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, caller_address = it->second.spec.CallerAddress(); } std::vector direct_return_ids; + RAY_LOG(ERROR) << "task manager completing task " << task_id; if (forward_to_parent) { + RAY_LOG(ERROR) << "updating forwarded object"; update_forwarded_object_callback_(reply, worker_addr.raylet_id(), caller_address); } else { @@ -321,7 +323,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, } } } - + RAY_LOG(ERROR) << "Complete pending task continued..."; TaskSpecification spec; bool release_lineage = true; int64_t min_lineage_bytes_to_evict = 0; diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 5d5080578c75..2f39db05bb81 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -88,7 +88,7 @@ using ForwardObjectCallback = std::function; using UpdateForwardedObjectCallback = std::function; class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface { diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 31a911c1a3b9..e69f970ed038 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -104,7 +104,7 @@ void CoreWorkerDirectTaskReceiver::HandleTask( if (objects_valid) { for (size_t i = 0; i < return_objects.size(); i++) { auto return_object = reply->add_return_objects(); - ObjectID id = ObjectID::FromIndex(task_spec.TaskId(), /*index=*/i + 1); + ObjectID id = task_spec.ReturnId(i); return_object->set_object_id(id.Binary()); if (!return_objects[i]) { From 6706c3e64c191a554d72583a031fa85240bf75ee Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Thu, 4 Aug 2022 08:47:26 +0000 Subject: [PATCH 6/6] add to submissible tasks --- src/ray/core_worker/core_worker.cc | 7 ++++--- src/ray/core_worker/task_manager.cc | 15 ++++++++++++++- src/ray/core_worker/task_manager.h | 3 +++ src/ray/core_worker/test/task_manager_test.cc | 3 +++ 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 47c349443435..22b9de3c8f79 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1016,7 +1016,7 @@ Status CoreWorker::ForwardToOtherWorker(const ObjectID &object_id, request.set_object_id(object_id.Binary()); request.mutable_borrower_address()->CopyFrom(borrower_address); request.set_call_site(CurrentCallSite()); - + request.set_is_reconstructable(true); for (auto &contained_object_id : contained_object_ids) { request.add_contained_object_ids(contained_object_id.Binary()); } @@ -1662,7 +1662,7 @@ std::vector CoreWorker::SubmitTask( debugger_breakpoint, depth, task_options.serialized_runtime_env_info); - builder.SetNormalTaskSpec(max_retries, retry_exceptions, scheduling_strategy); + builder.SetNormalTaskSpec(1, retry_exceptions, scheduling_strategy); TaskSpecification task_spec = builder.Build(); RAY_LOG(DEBUG) << "Submitting normal task " << task_spec.DebugString(); std::vector returned_refs; @@ -1670,7 +1670,7 @@ std::vector CoreWorker::SubmitTask( returned_refs = ExecuteTaskLocalMode(task_spec); } else { returned_refs = task_manager_->AddPendingTask( - task_spec.CallerAddress(), task_spec, CurrentCallSite(), max_retries); + task_spec.CallerAddress(), task_spec, CurrentCallSite(), 1); io_service_.post( [this, task_spec]() { RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec)); @@ -3303,6 +3303,7 @@ void CoreWorker::HandleUpdateForwardedObject(const rpc::UpdateForwardedObjectReq RAY_LOG(ERROR) << "Handling updating raylet for " << object_id; reference_counter_->UpdateObjectSize(object_id, return_object.size()); reference_counter_->UpdateObjectPinnedAtRaylet(object_id, pinned_at_raylet_id); + task_manager_->AddReconstructableObject(object_id); } send_reply_callback(Status::OK(), nullptr, nullptr); } diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index a2106921d405..19446cf75d94 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -123,12 +123,14 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector *tas { absl::MutexLock lock(&mu_); auto it = submissible_tasks_.find(task_id); + RAY_LOG(ERROR) << "finding in submissible tasks..."; if (it == submissible_tasks_.end()) { // This can happen when the task has already been // retried up to its max attempts. + RAY_LOG(ERROR) << "Cannot find the task in submissible tasks"; return false; } - + RAY_LOG(ERROR) << "found in submissible tasks"; if (!it->second.IsPending()) { resubmit = true; it->second.status = rpc::TaskStatus::WAITING_FOR_DEPENDENCIES; @@ -353,8 +355,10 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, // A finished task can only be re-executed if it has some number of // retries left and returned at least one object that is still in use and // stored in plasma. + RAY_LOG(ERROR) << "task " << task_id << " state: num_retries_left=" << it->second.num_retries_left; bool task_retryable = it->second.num_retries_left != 0 && !it->second.reconstructable_return_ids.empty(); + RAY_LOG(ERROR) << "task " << task_id << "retryable is " << task_retryable; if (task_retryable) { // Pin the task spec if it may be retried again. release_lineage = false; @@ -382,6 +386,15 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, ShutdownIfNeeded(); } +void TaskManager::AddReconstructableObject(const ObjectID &object_id) { + absl::MutexLock lock(&mu_); + auto task_id = object_id.TaskId(); + auto it = submissible_tasks_.find(task_id); + RAY_CHECK(it != submissible_tasks_.end()) + << "Tried to add reconstructable object to finished task" << task_id; + it->second.reconstructable_return_ids.insert(object_id); +} + bool TaskManager::RetryTaskIfPossible(const TaskID &task_id) { int num_retries_left = 0; TaskSpecification spec; diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 2f39db05bb81..d87aaa80a74d 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -284,6 +284,9 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// Fill every task information of the current worker to GetCoreWorkerStatsReply. void FillTaskInfo(rpc::GetCoreWorkerStatsReply *reply, const int64_t limit) const; + /// Add a reconstructable object to a task, used in forward + void AddReconstructableObject(const ObjectID &object_id); + private: struct TaskEntry { TaskEntry(const TaskSpecification &spec_arg, diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index b6fa8e3a8897..83f75135d26f 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -69,6 +69,9 @@ class TaskManagerTest : public ::testing::Test { const rpc::Address &borrower_address, const rpc::Address &owner_address, const size_t object_size) { return Status::OK(); }, + [](const rpc::PushTaskReply &reply, + const std::string &raylet_id, + const rpc::Address &owner_address) { return Status::OK(); }, max_lineage_bytes) {} virtual void TearDown() { AssertNoLeaks(); }