diff --git a/.flake/pkgs/legion.nix b/.flake/pkgs/legion.nix index 814ef85e00..ebef22f4c5 100644 --- a/.flake/pkgs/legion.nix +++ b/.flake/pkgs/legion.nix @@ -18,13 +18,13 @@ in stdenv.mkDerivation rec { pname = "legion_flexflow"; - version = "2024-03-13"; + version = "2025-01-21"; src = fetchFromGitLab { owner = "StanfordLegion"; repo = "legion"; - rev = "24e8c452341dea41427e0ce61e154d61715e6835"; - sha256 = "sha256-NjCSjphOIew/V24i74I6DModSGcWKLeiSIjts3cFtx4="; + rev = "0c5a181e59c07e3af1091a2007378ff9355047fa"; + sha256 = "sha256-oapo7klN17gmRsmaSsrpup4YJ0dtHxiKFtwz8jyPqzU="; fetchSubmodules = true; }; @@ -33,7 +33,7 @@ stdenv.mkDerivation rec { ]; cmakeFlags = [ - "-DLegion_USE_Python=1" + "-DLegion_USE_Python=0" "-DLegion_BUILD_BINDINGS=1" "-DLegion_USE_CUDA=1" "-DLegion_CUDA_ARCH=${lib.concatStringsSep "," cudaCapabilities}" diff --git a/.proj.toml b/.proj.toml index b14d763339..20a10c98da 100644 --- a/.proj.toml +++ b/.proj.toml @@ -70,6 +70,13 @@ has-cpu-only-benchmarks = false has-cuda-tests = true has-cuda-benchmarks = false +[targets.realm-backend] +type = "lib" +has-cpu-only-tests = true +has-cpu-only-benchmarks = false +has-cuda-tests = true +has-cuda-benchmarks = false + [targets.models] type = "lib" has-cpu-only-tests = true diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index e2e561c384..4d5fe5f566 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -4,6 +4,7 @@ add_subdirectory(runtime) add_subdirectory(op-attrs) add_subdirectory(kernels) add_subdirectory(local-execution) +add_subdirectory(realm-backend) add_subdirectory(task-spec) add_subdirectory(utils) add_subdirectory(ffi) diff --git a/lib/local-execution/include/local-execution/local_args_backing.h b/lib/local-execution/include/local-execution/local_args_backing.h index 94748cf7ed..1e9a30d293 100644 --- a/lib/local-execution/include/local-execution/local_args_backing.h +++ b/lib/local-execution/include/local-execution/local_args_backing.h @@ -24,24 +24,14 @@ std::optional std::unordered_map construct_arg_slots_backing(TaskBinding const &, RuntimeArgConfig const &); -std::optional - create_per_device_op_state(LocalTaskRegistry const &, - LocalTensorBacking const &, - RuntimeArgConfig const &, - Allocator &, - TrainingLayerPlusContext const &); - TaskArgumentAccessor get_task_arg_accessor(LocalTensorBacking const &, RuntimeArgConfig const &, TaskInvocation const &, Allocator &); LocalArgsBacking make_local_args_backing_for_computation_graph( - LocalTaskRegistry const &, - TrainingComputationGraph const &, RuntimeArgConfig const &, - LocalTensorBacking const &, - Allocator &); + std::unordered_map> const &); } // namespace FlexFlow diff --git a/lib/local-execution/include/local-execution/local_training_backing.h b/lib/local-execution/include/local-execution/local_training_backing.h index f2177016fa..5484adef75 100644 --- a/lib/local-execution/include/local-execution/local_training_backing.h +++ b/lib/local-execution/include/local-execution/local_training_backing.h @@ -6,6 +6,7 @@ #include "pcg/optimizer_attrs.dtg.h" #include "task-spec/training_computation_graph.dtg.h" #include "task-spec/training_tensor_guid_t.dtg.h" +#include "utils/containers/generate_map.h" #include "utils/units/milliseconds_t.h" namespace FlexFlow { @@ -18,6 +19,13 @@ LocalTrainingBacking make_local_training_backing_for_computation_graph( RuntimeArgConfig const &runtime_arg_config, OptimizerAttrs const &optimizer_attrs); +std::optional + create_per_device_op_state(LocalTaskRegistry const &, + LocalTensorBacking const &, + RuntimeArgConfig const &, + Allocator &, + TrainingLayerPlusContext const &); + std::optional execute_forward(LocalTaskRegistry const &, LocalTensorBacking const &, LocalArgsBacking const &, diff --git a/lib/local-execution/src/local-execution/local_args_backing.cc b/lib/local-execution/src/local-execution/local_args_backing.cc index eb1c7b067e..a672b9d164 100644 --- a/lib/local-execution/src/local-execution/local_args_backing.cc +++ b/lib/local-execution/src/local-execution/local_args_backing.cc @@ -35,38 +35,6 @@ std::unordered_map ; } -std::optional - create_per_device_op_state(LocalTaskRegistry const &local_task_registry, - LocalTensorBacking const &tensor_backing, - RuntimeArgConfig const &runtime_arg_config, - Allocator &allocator, - TrainingLayerPlusContext const &training_layer) { - std::optional maybe_registered_task = try_get_registered_task( - local_task_registry, training_layer.layer_guid, OpTaskType::INIT); - - ASSERT(maybe_registered_task.has_value()); - - registered_task_t registered_task = maybe_registered_task.value(); - if (registered_task.is_noop_task()) { - return std::nullopt; - } - - TaskInvocation invocation = lower_to_task_invocation( - /*op_task_invocation=*/get_init_op_task_invocation( - training_layer.layer_attrs.op_attrs), - /*training_layer=*/training_layer, - /*device_specific_device_states=*/std::nullopt); - - TaskArgumentAccessor accessor = get_task_arg_accessor( - tensor_backing, runtime_arg_config, invocation, allocator); - TaskSignatureAndImpl task_sig_impl = - local_task_registry.task_mapping.at(invocation.task_id); - auto fn = - task_sig_impl.impl_function.get().function_ptr; - DeviceSpecificDeviceStates device_state = fn(accessor); - return device_state; -} - TaskArgumentAccessor get_task_arg_accessor(LocalTensorBacking const &local_tensor_backing, RuntimeArgConfig const &runtime_arg_config, @@ -82,24 +50,9 @@ TaskArgumentAccessor } LocalArgsBacking make_local_args_backing_for_computation_graph( - LocalTaskRegistry const &task_registry, - TrainingComputationGraph const &training_computation_graph, RuntimeArgConfig const &runtime_arg_config, - LocalTensorBacking const &local_tensor_backing, - Allocator &allocator) { - std::unordered_map> - per_device_op_states = generate_map( - topological_ordering(training_computation_graph.computation_graph), - [&](layer_guid_t const &layer_guid) { - return create_per_device_op_state( - task_registry, - local_tensor_backing, - runtime_arg_config, - allocator, - get_training_layer_plus_context(training_computation_graph, - layer_guid)); - }); - + std::unordered_map> const & + per_device_op_states) { return LocalArgsBacking{ runtime_arg_config, per_device_op_states, diff --git a/lib/local-execution/src/local-execution/local_training_backing.cc b/lib/local-execution/src/local-execution/local_training_backing.cc index 1aac8506f2..9c67d3acd3 100644 --- a/lib/local-execution/src/local-execution/local_training_backing.cc +++ b/lib/local-execution/src/local-execution/local_training_backing.cc @@ -39,12 +39,22 @@ LocalTrainingBacking make_local_training_backing_for_computation_graph( preallocated, allocator); + std::unordered_map> + per_device_op_states = generate_map( + topological_ordering(training_computation_graph.computation_graph), + [&](layer_guid_t const &layer_guid) { + return create_per_device_op_state( + local_task_registry, + local_tensor_backing, + runtime_arg_config, + allocator, + get_training_layer_plus_context(training_computation_graph, + layer_guid)); + }); + LocalArgsBacking local_args_backing = - make_local_args_backing_for_computation_graph(local_task_registry, - training_computation_graph, - runtime_arg_config, - local_tensor_backing, - allocator); + make_local_args_backing_for_computation_graph(runtime_arg_config, + per_device_op_states); return LocalTrainingBacking{ /*computation_graph=*/training_computation_graph, @@ -54,6 +64,38 @@ LocalTrainingBacking make_local_training_backing_for_computation_graph( }; } +std::optional + create_per_device_op_state(LocalTaskRegistry const &local_task_registry, + LocalTensorBacking const &tensor_backing, + RuntimeArgConfig const &runtime_arg_config, + Allocator &allocator, + TrainingLayerPlusContext const &training_layer) { + std::optional maybe_registered_task = try_get_registered_task( + local_task_registry, training_layer.layer_guid, OpTaskType::INIT); + + ASSERT(maybe_registered_task.has_value()); + + registered_task_t registered_task = maybe_registered_task.value(); + if (registered_task.is_noop_task()) { + return std::nullopt; + } + + TaskInvocation invocation = lower_to_task_invocation( + /*op_task_invocation=*/get_init_op_task_invocation( + training_layer.layer_attrs.op_attrs), + /*training_layer=*/training_layer, + /*device_specific_device_states=*/std::nullopt); + + TaskArgumentAccessor accessor = get_task_arg_accessor( + tensor_backing, runtime_arg_config, invocation, allocator); + TaskSignatureAndImpl task_sig_impl = + local_task_registry.task_mapping.at(invocation.task_id); + auto fn = + task_sig_impl.impl_function.get().function_ptr; + DeviceSpecificDeviceStates device_state = fn(accessor); + return device_state; +} + std::optional execute_forward(LocalTaskRegistry const &local_task_registry, LocalTensorBacking const &local_tensor_backing, diff --git a/lib/realm-backend/CMakeLists.txt b/lib/realm-backend/CMakeLists.txt new file mode 100644 index 0000000000..a325e14955 --- /dev/null +++ b/lib/realm-backend/CMakeLists.txt @@ -0,0 +1,21 @@ +ff_add_library( + NAME + realm-backend + SRC_PATTERNS + src/*.cc + PUBLIC_INCLUDE + include/ + PRIVATE_INCLUDE + src/ + DEPS + op-attrs + utils + kernels + compiler + local-execution + pcg + spdlog + legion +) + +add_subdirectory(test) diff --git a/lib/realm-backend/include/realm-backend/driver.h b/lib/realm-backend/include/realm-backend/driver.h new file mode 100644 index 0000000000..d4b373099b --- /dev/null +++ b/lib/realm-backend/include/realm-backend/driver.h @@ -0,0 +1,13 @@ +#ifndef _FLEXFLOW_REALM_BACKEND_DRIVER_H +#define _FLEXFLOW_REALM_BACKEND_DRIVER_H + +#include "realm.h" +#include "realm/cmdline.h" +#include "task-spec/op_task_invocation.h" + +Realm::Processor::TaskFuncID get_realm_task_id(FlexFlow::task_id_t task_id); + +void top_level_task(const void *args, size_t arglen, const void *userdata, + size_t userlen, Realm::Processor p); + +#endif diff --git a/lib/realm-backend/include/realm-backend/model_training_instance.h b/lib/realm-backend/include/realm-backend/model_training_instance.h new file mode 100644 index 0000000000..e95b4c81ea --- /dev/null +++ b/lib/realm-backend/include/realm-backend/model_training_instance.h @@ -0,0 +1,31 @@ +#ifndef _FLEXFLOW_LOCAL_EXECUTION_MODEL_TRAINING_INSTANCE_H +#define _FLEXFLOW_LOCAL_EXECUTION_MODEL_TRAINING_INSTANCE_H + +#include "realm-backend/realm_training_backing.h" +#include "op-attrs/ops/loss_functions/loss_attrs.dtg.h" +#include "pcg/tensor_guid_t.dtg.h" +#include "task-spec/loss_tensor_guid_t.dtg.h" + +namespace FlexFlow { + +struct ModelTrainingInstance { + ModelTrainingInstance(RealmRuntimeState &, + LocalTrainingBacking const &, + LossAttrs const &, + OptimizerAttrs const &); + + RealmRuntimeState &runtime_state; + LocalTrainingBacking training_backing; + LossAttrs loss_attrs; + OptimizerAttrs optimizer_attrs; + +public: + std::unordered_map> forward(); + std::unordered_map> backward(); + void update(); + GenericTensorAccessorR get_loss_tensor_accessor() const; +}; + +} // namespace FlexFlow + +#endif diff --git a/lib/realm-backend/include/realm-backend/realm_allocator.h b/lib/realm-backend/include/realm-backend/realm_allocator.h new file mode 100644 index 0000000000..2c6c854837 --- /dev/null +++ b/lib/realm-backend/include/realm-backend/realm_allocator.h @@ -0,0 +1,34 @@ +#ifndef _FLEXFLOW_REALM_BACKEND_REALM_ALLOCATOR_H +#define _FLEXFLOW_REALM_BACKEND_REALM_ALLOCATOR_H + +#include "realm-backend/driver.h" +#include "realm.h" +#include "kernels/allocation.h" +#include + +namespace FlexFlow { + +struct RealmAllocatorImpl : public IAllocator { + RealmAllocatorImpl() = delete; + RealmAllocatorImpl(RealmAllocatorImpl const &) = delete; + RealmAllocatorImpl(RealmAllocatorImpl &&) = delete; + RealmAllocatorImpl(Realm::Processor); + ~RealmAllocatorImpl() = default; + + void *allocate(size_t) override; + void deallocate(void *) override; + + DeviceType get_allocation_device_type() const override; + +private: + std::unordered_map ptrs; + Realm::Processor proc; + Realm::Memory mem; + std::vector field_sizes = {sizeof(char)}; +}; + +Allocator create_realm_memory_allocator(Realm::Processor); + +} // namespace FlexFlow + +#endif \ No newline at end of file diff --git a/lib/realm-backend/include/realm-backend/realm_training_backing.h b/lib/realm-backend/include/realm-backend/realm_training_backing.h new file mode 100644 index 0000000000..57fc7147ce --- /dev/null +++ b/lib/realm-backend/include/realm-backend/realm_training_backing.h @@ -0,0 +1,64 @@ +#ifndef _FLEXFLOW_REALM_BACKEND_REALM_TRAINING_BACKING_H +#define _FLEXFLOW_REALM_BACKEND_REALM_TRAINING_BACKING_H + +#include "local-execution/local_training_backing.dtg.h" +#include "op-attrs/ops/loss_functions/loss_attrs.dtg.h" +#include "pcg/optimizer_attrs.dtg.h" +#include "task-spec/training_computation_graph.dtg.h" +#include "task-spec/training_tensor_guid_t.dtg.h" +#include "utils/containers/generate_map.h" +#include "utils/units/milliseconds_t.h" +#include "realm-backend/driver.h" +#include "realm-backend/realm_allocator.h" +#include "realm-backend/task_wrapper.h" + +namespace FlexFlow { + +struct RealmRuntimeState { + Realm::Processor master_proc; + Realm::Event master_event; + Realm::Memory master_mem; + std::vector worker_procs; + std::vector worker_events; + std::vector allocators; +}; + +LocalTrainingBacking make_realm_training_backing_for_computation_graph( + RealmRuntimeState &runtime_state, + std::unordered_map const + &preallocated_tensors, + TrainingComputationGraph const &training_computation_graph, + RuntimeArgConfig const &runtime_arg_config, + OptimizerAttrs const &optimizer_attrs); + +void register_tasks_for_realm(LocalTaskRegistry const &, RealmRuntimeState &); + +std::optional + create_per_device_op_state(LocalTaskRegistry const &, + LocalTensorBacking const &, + RuntimeArgConfig const &, + RealmRuntimeState &, + TrainingLayerPlusContext const &); + +Future> execute_forward(LocalTaskRegistry const &, + LocalTensorBacking const &, + LocalArgsBacking const &, + TrainingLayerPlusContext const &, + RealmRuntimeState &); + +Future> execute_backward(LocalTaskRegistry const &, + LocalTensorBacking const &, + LocalArgsBacking const &, + TrainingLayerPlusContext const &, + RealmRuntimeState &); + +Future compute_loss(LocalTrainingBacking const &, LossAttrs const &, RealmRuntimeState &); + +Future execute_update(LocalTrainingBacking const &, + layer_guid_t const &, + OptimizerAttrs const &, + RealmRuntimeState &); + +} // namespace FlexFlow + +#endif diff --git a/lib/realm-backend/include/realm-backend/task_result.h b/lib/realm-backend/include/realm-backend/task_result.h new file mode 100644 index 0000000000..46e5f89274 --- /dev/null +++ b/lib/realm-backend/include/realm-backend/task_result.h @@ -0,0 +1,161 @@ +#ifndef _FLEXFLOW_LOCAL_EXECUTION_TASK_RESULT_H +#define _FLEXFLOW_LOCAL_EXECUTION_TASK_RESULT_H + +#include "realm-backend/driver.h" +#include +#include + +namespace FlexFlow { + +/** + * @brief SharedState class template that holds the state for both the Promise + * and Future objects. It is responsible for storing the result value and + * synchronization between the producer (Promise) and consumer (Future). + */ +template struct SharedState { + // synchronization primitives + Realm::Event event = Realm::Event::NO_EVENT; + // where the result is stored + Realm::RegionInstance inst = Realm::RegionInstance::NO_INST; + + SharedState() = default; + SharedState(Realm::Memory mem) { + Realm::Rect<1> bounds(Realm::Point<1>(0), Realm::Point<1>(0)); + Realm::RegionInstance::create_instance( + this->inst, mem, bounds, {sizeof(T)}, /*SOA*/ 1, + Realm::ProfilingRequestSet(), Realm::Event::NO_EVENT) + .wait(); + } + void set_event(Realm::Event e) { this->event = e; } + void set_value(T &&value) const { + assert(this->inst.exists()); + Realm::GenericAccessor acc(this->inst, 0); + acc[Realm::Point<1>(0)] = std::move(value); + } + void wait() { this->event.wait(); } + T get_value() { + wait(); + assert(this->inst.exists()); + Realm::GenericAccessor acc(this->inst, 0); + T value = acc[Realm::Point<1>(0)]; + this->inst.destroy(); + return value; + } +}; + +// Specialization of SharedState for the `void` type, as it does not carry a +// value. +template <> struct SharedState { + // synchronization primitives + Realm::Event event = Realm::Event::NO_EVENT; + + SharedState() = default; + void set_event(Realm::Event e) { this->event = e; } + void wait() { this->event.wait(); } +}; + +/** + * @brief Future class template that allows retrieving the result from a + * SharedState object. It is used to access the value once the Promise has been + * fulfilled, and provides mechanisms to block the current thread until the + * result is available. + */ +template class Future { +public: + explicit Future(SharedState state) : state_(state) {} + explicit Future() = default; + explicit Future(T value) : value_(std::move(value)) {} + void set_event(Realm::Event e) { state_.set_event(e); } + T get() { + if (!value_.has_value()) { + value_ = std::make_optional(state_.get_value()); + } + return value_.value(); + } + void wait() { state_.wait(); } + +private: + SharedState state_; + std::optional value_; +}; + +// Specialization of Future for the `void` type, as it does not carry a value. +template <> class Future { +public: + explicit Future(SharedState state) : state_(state) {} + explicit Future() = default; + void set_event(Realm::Event e) { state_.set_event(e); } + void get() { state_.wait(); } + void wait() { state_.wait(); } + +private: + SharedState state_; +}; + +template <> class Future { +public: + explicit Future( + std::shared_ptr> value) + : value_(value) {} + Future() = delete; + void set_event(Realm::Event e) { event_ = e; } + std::optional get() { + wait(); + return *value_; + } + void wait() { event_.wait(); } + +private: + Realm::Event event_; + std::shared_ptr> value_; +}; + +/** + * @brief Promise class template that allows setting a result in a SharedState + * object. It is used to fulfill a Future with a value, and provides methods to + * notify the waiting Future of completion. + */ +template class Promise { +public: + Promise() = delete; + Promise(Realm::Memory mem) : state_(SharedState(mem)) {} + Future get_future() { return Future(state_); } + void set_value(T &&value) const { state_.set_value(std::move(value)); } + +private: + SharedState state_; +}; + +// Specialization of Promise for the `void` type, as it does not carry a value. +template <> class Promise { +public: + Promise() : state_(SharedState()) {} + Future get_future() { return Future(state_); } + +private: + SharedState state_; +}; + +// Specialization of Promise for the `DeviceSpecificDeviceStates` type. +// It has an inner shared_ptr value, so we need to find a way to avoid the value +// to deconstruct early. `shared_ptr` can work because DeveiceState will stored +// in the same node with the device that launch init task. Wrap a std::optional +// because we don't know the specific DeviceSpecificDeviceStates size. +template <> class Promise { +public: + Promise() + : value_(std::make_shared>()) {} + void set_value(DeviceSpecificDeviceStates value) const { + *value_ = std::make_optional(value); + } + Future get_future() { + return Future(value_); + } + +private: + std::shared_ptr> value_; +}; + +} // namespace FlexFlow + +#endif \ No newline at end of file diff --git a/lib/realm-backend/include/realm-backend/task_wrapper.h b/lib/realm-backend/include/realm-backend/task_wrapper.h new file mode 100644 index 0000000000..fa6c9f0ed3 --- /dev/null +++ b/lib/realm-backend/include/realm-backend/task_wrapper.h @@ -0,0 +1,38 @@ +#ifndef _FLEXFLOW_REALM_BACKEND_TASK_WRAPPER_H +#define _FLEXFLOW_REALM_BACKEND_TASK_WRAPPER_H + +#include "local-execution/local_task_registry.h" +#include "realm-backend/task_result.h" + +namespace FlexFlow { + +/* The following are general task wrappers to be invoked by the Realm runtime */ + +template struct RealmTaskArgs { + task_id_t task_id; + TaskImplFunction impl_function; + TaskArgumentAccessor accessor; + Promise promise; +}; + +void init_wrapper_task(const void *args, size_t arglen, const void *userdata, + size_t userlen, Realm::Processor p); + +void fwdbwd_wrapper_task(const void *args, size_t arglen, const void *userdata, + size_t userlen, Realm::Processor p); + +void generic_wrapper_task(const void *args, size_t arglen, const void *userdata, + size_t userlen, Realm::Processor p); + +void register_wrapper_tasks_init(int p_id, Realm::Processor p, task_id_t task_id); + +void register_wrapper_tasks_fwdbwd(int p_id, Realm::Processor p, task_id_t task_id); + +void register_wrapper_tasks_generic(int p_id, Realm::Processor p, task_id_t task_id); + +void register_wrapper_tasks(int pid, Realm::Processor p, task_id_t task_id, + TaskSignatureAndImpl task_sig_impl); + +} // namespace FlexFlow + +#endif \ No newline at end of file diff --git a/lib/realm-backend/src/driver.cc b/lib/realm-backend/src/driver.cc new file mode 100644 index 0000000000..e656836c10 --- /dev/null +++ b/lib/realm-backend/src/driver.cc @@ -0,0 +1,31 @@ +#include "realm-backend/driver.h" + +using namespace Realm; +using namespace FlexFlow; + +Processor::TaskFuncID get_realm_task_id(task_id_t task_id) { + return static_cast(task_id) + + Processor::TASK_ID_FIRST_AVAILABLE; +} + +int main(int argc, char **argv) { + Runtime rt; + rt.init(&argc, &argv); + + Processor::register_task_by_kind( + Processor::LOC_PROC, false /*!global*/, + get_realm_task_id(task_id_t::TOP_LEVEL_TASK_ID), + CodeDescriptor(top_level_task), ProfilingRequestSet()) + .external_wait(); + + Processor p = Machine::ProcessorQuery(Machine::get_machine()) + .only_kind(Processor::LOC_PROC) + .first(); + assert(p.exists()); + + Event e = rt.collective_spawn( + p, get_realm_task_id(task_id_t::TOP_LEVEL_TASK_ID), 0, 0); + rt.shutdown(e); + + return rt.wait_for_shutdown(); +} diff --git a/lib/realm-backend/src/model_training_instance.cc b/lib/realm-backend/src/model_training_instance.cc new file mode 100644 index 0000000000..a7d359b638 --- /dev/null +++ b/lib/realm-backend/src/model_training_instance.cc @@ -0,0 +1,115 @@ +#include "pcg/computation_graph.h" +#include "pcg/optimizer_attrs.h" +#include "realm-backend/model_training_instance.h" +#include "task-spec/training_computation_graph.h" +#include "utils/containers/reversed.h" + +namespace FlexFlow { + + ModelTrainingInstance::ModelTrainingInstance( + RealmRuntimeState &runtime_state, + LocalTrainingBacking const &local_training_backing, + LossAttrs const &loss_attrs, + OptimizerAttrs const &optimizer_attrs) + : runtime_state(runtime_state), training_backing(local_training_backing), + loss_attrs(loss_attrs), optimizer_attrs(optimizer_attrs) {} + +std::unordered_map> + ModelTrainingInstance::forward() { + + std::unordered_map> + per_layer_elapsed_time; + std::unordered_map>> + per_layer_elapsed_time_future; + + for (layer_guid_t const &layer_guid : + topological_ordering(this->training_backing.training_computation_graph + .computation_graph)) { + per_layer_elapsed_time_future.insert( + {layer_guid, + execute_forward( + this->training_backing.local_task_registry, + this->training_backing.local_tensor_backing, + this->training_backing.local_args_backing, + get_training_layer_plus_context( + this->training_backing.training_computation_graph, layer_guid), + this->runtime_state) + }); + } + + for (layer_guid_t const &layer_guid : topological_ordering( + this->training_backing.training_computation_graph + .computation_graph)) { + std::optional elapsed_time = + per_layer_elapsed_time_future[layer_guid].get(); + per_layer_elapsed_time.insert({layer_guid, elapsed_time}); + } + return per_layer_elapsed_time; +} + +std::unordered_map> + ModelTrainingInstance::backward() { + compute_loss(this->training_backing, this->loss_attrs, this->runtime_state); + + std::unordered_map> + per_layer_elapsed_time; + std::unordered_map>> + per_layer_elapsed_time_future; + + for (layer_guid_t const &layer_guid : reversed(topological_ordering( + this->training_backing.training_computation_graph + .computation_graph))) { + per_layer_elapsed_time_future.insert( + {layer_guid, + execute_backward( + this->training_backing.local_task_registry, + this->training_backing.local_tensor_backing, + this->training_backing.local_args_backing, + get_training_layer_plus_context( + this->training_backing.training_computation_graph, layer_guid), + this->runtime_state) + }); + } + + for (layer_guid_t const &layer_guid : reversed(topological_ordering( + this->training_backing.training_computation_graph + .computation_graph))) { + std::optional elapsed_time = + per_layer_elapsed_time_future[layer_guid].get(); + per_layer_elapsed_time.insert({layer_guid, elapsed_time}); + } + return per_layer_elapsed_time; +} + +void ModelTrainingInstance::update() { + std::unordered_map> per_layer_update_future; + for (layer_guid_t const &layer_guid : topological_ordering( + this->training_backing.training_computation_graph + .computation_graph)) { + per_layer_update_future.insert( + {layer_guid, execute_update(this->training_backing, + layer_guid, + this->optimizer_attrs, + this->runtime_state)}); + } + for (layer_guid_t const &layer_guid : topological_ordering( + this->training_backing.training_computation_graph + .computation_graph)) { + per_layer_update_future[layer_guid].wait(); + } + this->optimizer_attrs = get_optimizer_attrs_for_next_iter( + this->optimizer_attrs); +} + +GenericTensorAccessorR ModelTrainingInstance::get_loss_tensor_accessor() const { + gradient_tensor_guid_t loss_tensor = get_gradient_tensor_guid_for_tensor_guid( + this->training_backing.training_computation_graph, + this->training_backing.training_computation_graph.logit_tensor); + GenericTensorAccessorW loss_tensor_backing = + this->training_backing.local_tensor_backing + .backing_for_training_tensor_map.at( + training_tensor_guid_t{loss_tensor}); + return read_only_accessor_from_write_accessor(loss_tensor_backing); +} + +} // namespace FlexFlow diff --git a/lib/realm-backend/src/realm_allocator.cc b/lib/realm-backend/src/realm_allocator.cc new file mode 100644 index 0000000000..287de0f2d5 --- /dev/null +++ b/lib/realm-backend/src/realm_allocator.cc @@ -0,0 +1,52 @@ +#include "realm-backend/realm_allocator.h" +#include "utils/containers/contains_key.h" + +namespace FlexFlow { + +using namespace Realm; + +/*********** RealmAllocatorImpl ***********/ + +RealmAllocatorImpl::RealmAllocatorImpl(Processor proc) : proc(proc) { + mem = Machine::MemoryQuery(Machine::get_machine()) + .only_kind(Memory::GPU_FB_MEM) + .best_affinity_to(proc) + .first(); +} + +// TODO: now the region instance only corresponds to one tensor +void *RealmAllocatorImpl::allocate(size_t requested_memory_size) { + Rect<1> bounds(Point<1>(0), Point<1>(requested_memory_size - 1)); + RegionInstance requested_instance = RegionInstance::NO_INST; + RegionInstance::create_instance(requested_instance, mem, bounds, field_sizes, + /*SOA*/ 1, ProfilingRequestSet()) + .wait(); + // TODO: looks like no need to do this because the memory is already zeroed out + // char *zero_data = new char[requested_memory_size]; + // memset(zero_data, 0, requested_memory_size); + // requested_instance.write_untyped(0, (const void *)zero_data, requested_memory_size); + // delete[] zero_data; + void *ptr = requested_instance.pointer_untyped(0, 0); + this->ptrs.insert({ptr, requested_instance}); + return ptr; +} + +void RealmAllocatorImpl::deallocate(void *ptr) { + if (this->ptrs.count(ptr)) { + RegionInstance region = this->ptrs.at(ptr); + region.destroy(); + } else { + throw std::runtime_error( + "Deallocating a pointer that was not allocated by this Allocator"); + } +} + +DeviceType RealmAllocatorImpl::get_allocation_device_type() const { + return DeviceType::GPU; +} + +Allocator create_realm_memory_allocator(Processor proc) { + return Allocator::create(proc); +} + +} // namespace FlexFlow diff --git a/lib/realm-backend/src/realm_training_backing.cc b/lib/realm-backend/src/realm_training_backing.cc new file mode 100644 index 0000000000..66bc098e07 --- /dev/null +++ b/lib/realm-backend/src/realm_training_backing.cc @@ -0,0 +1,315 @@ +#include "local-execution/local_args_backing.h" +#include "pcg/computation_graph.h" +#include "pcg/optimizer_attrs.h" +#include "task-spec/loss_functions.h" +#include "task-spec/op_task_to_task_invocation.h" +#include "task-spec/optimizer.h" +#include "task-spec/task_invocation.h" +#include "task-spec/task_signature_impl.h" +#include "task-spec/training_computation_graph.h" +#include "utils/containers/contains.h" +#include "utils/containers/contains_key.h" +#include "utils/containers/get_only.h" +#include "utils/containers/is_subseteq_of.h" +#include "utils/containers/keys.h" +#include "utils/containers/values.h" +#include "utils/exception.h" +#include "realm-backend/realm_training_backing.h" +#include "realm-backend/task_result.h" +#include "realm-backend/task_wrapper.h" + +namespace FlexFlow { + +using namespace Realm; + +LocalTrainingBacking make_realm_training_backing_for_computation_graph( + RealmRuntimeState &runtime_state, + std::unordered_map const + &preallocated, + TrainingComputationGraph const &training_computation_graph, + RuntimeArgConfig const &runtime_arg_config, + OptimizerAttrs const &optimizer_attrs) { + + ASSERT(is_subseteq_of( + keys(preallocated), + keys(get_all_training_tensor_shapes(training_computation_graph)))); + + LocalTaskRegistry local_task_registry = + construct_local_task_registry_for_layers(get_layer_attrs_mapping( + training_computation_graph.computation_graph)); + + register_tasks_for_realm(local_task_registry, runtime_state); + + LocalTensorBacking local_tensor_backing = construct_local_tensor_backing( + get_all_training_tensor_shapes(training_computation_graph), + preallocated, + runtime_state.allocators[0]); + + std::unordered_map> + per_device_op_states = generate_map( + topological_ordering(training_computation_graph.computation_graph), + [&](layer_guid_t const &layer_guid) { + return create_per_device_op_state( + local_task_registry, + local_tensor_backing, + runtime_arg_config, + runtime_state, + get_training_layer_plus_context(training_computation_graph, + layer_guid)); + }); + + LocalArgsBacking local_args_backing = + make_local_args_backing_for_computation_graph(runtime_arg_config, + per_device_op_states); + + return LocalTrainingBacking{ + /*computation_graph=*/training_computation_graph, + /*local_task_registry=*/local_task_registry, + /*local_tensor_backing=*/local_tensor_backing, + /*local_args_backing=*/local_args_backing, + }; +} + +// register tasks for realm runtime +void register_tasks_for_realm(LocalTaskRegistry const &local_task_registry, RealmRuntimeState &runtime_state) { + for (std::pair const &task : local_task_registry.task_mapping) { + task_id_t task_id = task.first; + TaskSignatureAndImpl task_signature_impl = task.second; + // TODO: multi gpu + register_wrapper_tasks(0, runtime_state.worker_procs[0], task_id, task_signature_impl); + } +} + +std::optional + create_per_device_op_state(LocalTaskRegistry const &local_task_registry, + LocalTensorBacking const &tensor_backing, + RuntimeArgConfig const &runtime_arg_config, + RealmRuntimeState &runtime_state, + TrainingLayerPlusContext const &training_layer) { + std::optional maybe_registered_task = try_get_registered_task( + local_task_registry, training_layer.layer_guid, OpTaskType::INIT); + + ASSERT(maybe_registered_task.has_value()); + + registered_task_t registered_task = maybe_registered_task.value(); + if (registered_task.is_noop_task()) { + return std::nullopt; + } + + TaskInvocation invocation = lower_to_task_invocation( + /*op_task_invocation=*/get_init_op_task_invocation( + training_layer.layer_attrs.op_attrs), + /*training_layer=*/training_layer, + /*device_specific_device_states=*/std::nullopt); + + TaskArgumentAccessor accessor = get_task_arg_accessor( + tensor_backing, runtime_arg_config, invocation, runtime_state.allocators[0]); + + task_id_t task_id = invocation.task_id; + TaskImplFunction impl_function = + local_task_registry.task_mapping.at(task_id).impl_function; + // TODO: multi gpu launching + Promise promise = Promise(); + Future future = promise.get_future(); + RealmTaskArgs* task_arg = + new RealmTaskArgs{ + task_id, impl_function, accessor, + std::move(promise)}; + uintptr_t args[1] = {reinterpret_cast(task_arg)}; + Event e = runtime_state.worker_procs[0].spawn( + get_realm_task_id(task_id), args, sizeof(uintptr_t), + runtime_state.worker_events[0]); + runtime_state.worker_events[0] = e; + future.set_event(e); + return future.get().value(); +} + +Future> + execute_forward(LocalTaskRegistry const &local_task_registry, + LocalTensorBacking const &local_tensor_backing, + LocalArgsBacking const &local_args_backing, + TrainingLayerPlusContext const &training_layer, + RealmRuntimeState &runtime_state) { + + std::optional maybe_registered_task = try_get_registered_task( + local_task_registry, training_layer.layer_guid, OpTaskType::BWD); + + ASSERT(maybe_registered_task.has_value()); + + registered_task_t registered_task = maybe_registered_task.value(); + if (registered_task.is_noop_task()) { + return Future>(std::nullopt); + } + + std::optional device_state = + get_per_device_op_state_if_exists(local_args_backing, + training_layer.layer_guid); + + TaskInvocation invocation = lower_to_task_invocation( + /*op_task_invocation=*/get_forward_op_task_invocation( + training_layer.layer_attrs.op_attrs), + /*training_layer=*/training_layer, + /*device_specific_device_states=*/device_state); + + TaskArgumentAccessor accessor = + get_task_arg_accessor(local_tensor_backing, + local_args_backing.runtime_arg_config, + invocation, + runtime_state.allocators[0]); + + task_id_t task_id = invocation.task_id; + TaskImplFunction impl_function = + local_task_registry.task_mapping.at(task_id).impl_function; + // TODO: multi gpu launching + Promise> promise(runtime_state.master_mem); + Future> future = promise.get_future(); + RealmTaskArgs>* task_arg = + new RealmTaskArgs>{ + task_id, impl_function, accessor, + std::move(promise)}; + uintptr_t args[1] = {reinterpret_cast(task_arg)}; + Event e = runtime_state.worker_procs[0].spawn( + get_realm_task_id(task_id), args, sizeof(uintptr_t), + runtime_state.worker_events[0]); + runtime_state.worker_events[0] = e; + future.set_event(e); + return future; +} + +Future> + execute_backward(LocalTaskRegistry const &local_task_registry, + LocalTensorBacking const &local_tensor_backing, + LocalArgsBacking const &local_args_backing, + TrainingLayerPlusContext const &training_layer, + RealmRuntimeState &runtime_state) { + + std::optional maybe_registered_task = try_get_registered_task( + local_task_registry, training_layer.layer_guid, OpTaskType::BWD); + + ASSERT(maybe_registered_task.has_value()); + + registered_task_t registered_task = maybe_registered_task.value(); + if (registered_task.is_noop_task()) { + return Future>(std::nullopt); + } + + std::optional device_state = + get_per_device_op_state_if_exists(local_args_backing, + training_layer.layer_guid); + TaskInvocation invocation = lower_to_task_invocation( + get_backward_op_task_invocation(training_layer.layer_attrs.op_attrs), + training_layer, + device_state); + TaskArgumentAccessor accessor = + get_task_arg_accessor(local_tensor_backing, + local_args_backing.runtime_arg_config, + invocation, + runtime_state.allocators[0]); + + task_id_t task_id = invocation.task_id; + TaskImplFunction impl_function = + local_task_registry.task_mapping.at(task_id).impl_function; + // TODO: multi gpu launching + Promise> promise(runtime_state.master_mem); + Future> future = promise.get_future(); + RealmTaskArgs>* task_arg = + new RealmTaskArgs>{ + task_id, impl_function, accessor, + std::move(promise)}; + uintptr_t args[1] = {reinterpret_cast(task_arg)}; + Event e = runtime_state.worker_procs[0].spawn( + get_realm_task_id(task_id), args, sizeof(uintptr_t), + runtime_state.worker_events[0]); + runtime_state.worker_events[0] = e; + future.set_event(e); + return future; +} + +Future execute_update(LocalTrainingBacking const &local_training_backing, + layer_guid_t const &layer_guid, + OptimizerAttrs const &optimizer_attrs, + RealmRuntimeState &runtime_state) { + TrainingLayerPlusContext training_layer = get_training_layer_plus_context( + local_training_backing.training_computation_graph, layer_guid); + + if (training_layer.layer_attrs.op_attrs.has()) { + TrainingTensorGroupWithAttrs weight_tensor_group = + get_only(training_layer.output_tensor_groups); + + TaskInvocation invocation = + get_update_invocation(optimizer_attrs, + weight_tensor_group.forward_tensor, + weight_tensor_group.gradient_tensor, + weight_tensor_group.optimizer_tensors); + + // TODO: https://github.com/flexflow/flexflow-train/issues/1442 + // assert(is_invocation_valid(get_update_signature(attrs), invocation)); + + TaskArgumentAccessor accessor = get_task_arg_accessor( + local_training_backing.local_tensor_backing, + local_training_backing.local_args_backing.runtime_arg_config, + invocation, + runtime_state.allocators[0]); + TaskImplFunction update_impl_fn = get_update_task_impl(optimizer_attrs); + + task_id_t task_id = invocation.task_id; + register_wrapper_tasks_generic(0, runtime_state.worker_procs[0], + task_id); + // TODO: multi gpu launching + Promise promise; + Future future = promise.get_future(); + RealmTaskArgs* task_arg = new RealmTaskArgs{task_id, update_impl_fn, accessor, + std::move(promise)}; + uintptr_t args[1] = {reinterpret_cast(task_arg)}; + Event e = runtime_state.worker_procs[0].spawn( + get_realm_task_id(task_id), args, sizeof(uintptr_t), + runtime_state.worker_events[0]); + runtime_state.worker_events[0] = e; + future.set_event(e); + return future; + } else { + return Future(); + } +} + +Future compute_loss(LocalTrainingBacking const &local_training_backing, + LossAttrs const &loss_attrs, + RealmRuntimeState &runtime_state) { + + TrainingComputationGraph training_cg = + local_training_backing.training_computation_graph; + tensor_guid_t logit_tensor = training_cg.logit_tensor; + loss_tensor_guid_t label_tensor = training_cg.label_tensor; + + TaskInvocation loss_invocation = backward( + loss_attrs, + get_forward_tensor_guid_for_tensor_guid(training_cg, logit_tensor), + get_gradient_tensor_guid_for_tensor_guid(training_cg, logit_tensor), + label_tensor); + // TODO: https://github.com/flexflow/flexflow-train/issues/1442 + // assert(is_invocation_valid(get_loss_bwd_signature(), loss_invocation)); + TaskArgumentAccessor loss_accessor = get_task_arg_accessor( + local_training_backing.local_tensor_backing, + local_training_backing.local_args_backing.runtime_arg_config, + loss_invocation, + runtime_state.allocators[0]); + TaskImplFunction loss_impl_fn = get_loss_bwd_task_impl(); + + task_id_t task_id = loss_invocation.task_id; + register_wrapper_tasks_generic(0, runtime_state.worker_procs[0], + task_id); + // TODO: multi gpu launching + Promise promise; + Future future = promise.get_future(); + RealmTaskArgs* task_arg = new RealmTaskArgs{task_id, loss_impl_fn, loss_accessor, + std::move(promise)}; + uintptr_t args[1] = {reinterpret_cast(task_arg)}; + Event e = runtime_state.worker_procs[0].spawn( + get_realm_task_id(task_id), args, sizeof(uintptr_t), + runtime_state.worker_events[0]); + runtime_state.worker_events[0] = e; + future.set_event(e); + return future; +} + +} // namespace FlexFlow diff --git a/lib/realm-backend/src/task_wrapper.cc b/lib/realm-backend/src/task_wrapper.cc new file mode 100644 index 0000000000..b81494dce4 --- /dev/null +++ b/lib/realm-backend/src/task_wrapper.cc @@ -0,0 +1,103 @@ +#include "realm-backend/task_wrapper.h" +#include +#include + +namespace FlexFlow { + +using namespace Realm; + + +std::unordered_set> registered_tasks; + +void init_wrapper_task(const void *args, size_t arglen, const void *userdata, + size_t userlen, Processor p) { + assert(arglen == sizeof(uintptr_t)); + uintptr_t task_arg_ptr = *reinterpret_cast(args); + RealmTaskArgs *task_args = + reinterpret_cast *>(task_arg_ptr); + auto fn = + task_args->impl_function.get().function_ptr; + DeviceSpecificDeviceStates result = fn(task_args->accessor); + task_args->promise.set_value(result); + delete task_args; +} + +void fwdbwd_wrapper_task(const void *args, size_t arglen, const void *userdata, + size_t userlen, Processor p) { + assert(arglen == sizeof(uintptr_t)); + uintptr_t task_arg_ptr = *reinterpret_cast(args); + RealmTaskArgs> *task_args = + reinterpret_cast> *>(task_arg_ptr); + auto fn = + task_args->impl_function.get().function_ptr; + std::optional result = transform( + fn(task_args->accessor), [](float running_time) { return milliseconds_t{running_time}; }); + task_args->promise.set_value(std::move(result)); + delete task_args; +} + +void generic_wrapper_task(const void *args, size_t arglen, const void *userdata, + size_t userlen, Processor p) { + assert(arglen == sizeof(uintptr_t)); + uintptr_t task_arg_ptr = *reinterpret_cast(args); + RealmTaskArgs *task_args = + reinterpret_cast *>(task_arg_ptr); + auto fn = + task_args->impl_function.get().function_ptr; + fn(task_args->accessor); + delete task_args; +} + +void register_wrapper_tasks_init(int p_id, Processor p, task_id_t task_id) { + std::pair key = {p_id, task_id}; + if (registered_tasks.find(key) != registered_tasks.end()) { + return; + } + registered_tasks.insert(key); + Processor::register_task_by_kind( + p.kind(), false /*!global*/, get_realm_task_id(task_id), + CodeDescriptor(init_wrapper_task), ProfilingRequestSet()) + .external_wait(); +} + +void register_wrapper_tasks_fwdbwd(int p_id, Realm::Processor p, task_id_t task_id) { + std::pair key = {p_id, task_id}; + if (registered_tasks.find(key) != registered_tasks.end()) { + return; + } + registered_tasks.insert(key); + Processor::register_task_by_kind( + p.kind(), false /*!global*/, get_realm_task_id(task_id), + CodeDescriptor(fwdbwd_wrapper_task), ProfilingRequestSet()) + .external_wait(); +} + +void register_wrapper_tasks_generic(int p_id, Realm::Processor p, task_id_t task_id) { + std::pair key = {p_id, task_id}; + if (registered_tasks.find(key) != registered_tasks.end()) { + return; + } + registered_tasks.insert(key); + Processor::register_task_by_kind( + p.kind(), false /*!global*/, get_realm_task_id(task_id), + CodeDescriptor(generic_wrapper_task), ProfilingRequestSet()) + .external_wait(); +} + +void register_wrapper_tasks(int p_id, Processor p, task_id_t task_id, + TaskSignatureAndImpl task_sig_impl) { + switch (task_sig_impl.task_signature.type) { + case OpTaskType::INIT: + register_wrapper_tasks_init(p_id, p, task_id); + break; + case OpTaskType::FWD: + case OpTaskType::BWD: + register_wrapper_tasks_fwdbwd(p_id, p, task_id); + break; + default: + register_wrapper_tasks_generic(p_id, p, task_id); + break; + } +} + +} // namespace FlexFlow \ No newline at end of file diff --git a/lib/realm-backend/test/CMakeLists.txt b/lib/realm-backend/test/CMakeLists.txt new file mode 100644 index 0000000000..6658784d9e --- /dev/null +++ b/lib/realm-backend/test/CMakeLists.txt @@ -0,0 +1,17 @@ +ff_add_executable( + NAME + realm-backend-tests + SRC_PATTERNS + src/*.cc + PRIVATE_INCLUDE + src/ + DEPS + realm-backend +) + +set(FF_TEST_EXEC_NAME "realm-backend-tests") +add_custom_command( + TARGET ${FF_TEST_EXEC_NAME} POST_BUILD + COMMAND ${CMAKE_COMMAND} -DFF_TEST_EXEC_NAME=${FF_TEST_EXEC_NAME} -P ${CMAKE_CURRENT_LIST_DIR}/modify_test_commands.cmake + DEPENDS ${FF_TEST_EXEC_NAME} +) diff --git a/lib/realm-backend/test/modify_test_commands.cmake b/lib/realm-backend/test/modify_test_commands.cmake new file mode 100644 index 0000000000..6494ae2d78 --- /dev/null +++ b/lib/realm-backend/test/modify_test_commands.cmake @@ -0,0 +1,21 @@ +# modify_test_commands.cmake + +file(GLOB ctest_tests_files "${CMAKE_CURRENT_BINARY_DIR}/${FF_TEST_EXEC_NAME}_tests-*.cmake") + +foreach(ctest_tests_file IN LISTS ctest_tests_files) + file(READ "${ctest_tests_file}" content) + + # add nix run prefix + string(REGEX REPLACE + "add_test\\([ \t\r\n]*\\[==\\[([^]]+)\\]==\\][ \t\r\n]+([^ ]+)[ \t\r\n]+\\[==\\[([^]]+)\\]==\\]\\)" + "add_test( [==[\\1]==] nixGL -- \\2 [==[\\3]==])" + content "${content}") + + # add environment + # string(REGEX REPLACE + # "set_tests_properties\\([ \t\r\n]*\\[==\\[([^]]+)\\]==\\][ \t\r\n]+PROPERTIES[ \t\r\n]+([^)]+)\\)" + # "set_tests_properties( [==[\\1]==] PROPERTIES \\2 ENVIRONMENT \"NIXPKGS_ALLOW_UNFREE=1\")" + # content "${content}") + + file(WRITE "${ctest_tests_file}" "${content}") +endforeach() diff --git a/lib/realm-backend/test/src/test_e2e.cc b/lib/realm-backend/test/src/test_e2e.cc new file mode 100644 index 0000000000..f7a338d32b --- /dev/null +++ b/lib/realm-backend/test/src/test_e2e.cc @@ -0,0 +1,199 @@ +#include "test_utils.h" +#include "kernels/compare_tensor_accessors.h" +#include "kernels/copy_tensor_accessor.h" +#include "kernels/format_accessor_contents.h" +#include "kernels/local_cpu_allocator.h" +#include "kernels/local_cuda_allocator.h" +#include "kernels/managed_ff_stream.h" +#include "kernels/managed_per_device_ff_handle.h" +#include "kernels/tensor_accessor_reductions.h" +#include "realm-backend/realm_training_backing.h" +#include "realm-backend/model_training_instance.h" +#include "op-attrs/ops/loss_functions/loss_attrs.dtg.h" +#include "pcg/computation_graph.h" +#include "pcg/computation_graph_builder.h" +#include "pcg/optimizer_attrs.dtg.h" +#include "task-spec/forward_tensor_source.h" +#include "task-spec/gradient_tensor_source.h" +#include "task-spec/loss_tensor_source.h" +#include "task-spec/optimizer_tensor_source.h" +#include "task-spec/runtime_arg_config.h" +#include "task-spec/training_computation_graph.h" +#include "utils/containers/get_only.h" + +using namespace ::FlexFlow; +using namespace Realm; + +bool did_loss_decrease(GenericTensorAccessorR const &first_epoch, + GenericTensorAccessorR const &last_epoch) { + Allocator cpu_allocator = create_local_cpu_memory_allocator(); + + return tensor_accessor_all( + compare_tensor_accessors_le(last_epoch, first_epoch, cpu_allocator)); +} + +void top_level_task(const void *args, size_t arglen, const void *userdata, + size_t userlen, Realm::Processor p) { + // initialize runtime + ManagedFFStream managed_stream{}; + ManagedPerDeviceFFHandle managed_handle = initialize_single_gpu_handle( + /*workSpaceSize=*/1024 * 1024, + /*allowTensorOpMathConversion=*/true); + + Memory master_mem = Machine::MemoryQuery(Machine::get_machine()) + .only_kind(Memory::SYSTEM_MEM) + .best_affinity_to(p) + .first(); + std::vector worker_procs; + std::vector worker_events; + std::vector allocators; + Machine::ProcessorQuery pq = Machine::ProcessorQuery(Machine::get_machine()) + .only_kind(Processor::TOC_PROC); + assert(pq.count() > 0); + for (Processor p : pq) { + worker_procs.push_back(p); + worker_events.push_back(Event::NO_EVENT); + allocators.push_back(create_realm_memory_allocator(p)); + } + RealmRuntimeState runtime_state = RealmRuntimeState{ + p, Event::NO_EVENT, master_mem, worker_procs, worker_events, allocators}; + + positive_int batch_size = 10_p; + positive_int data_dim = 16_p; + positive_int hidden_dim = 32_p; + positive_int output_dim = 1_p; + + TensorShape output_tensor_shape = TensorShape{ + TensorDims{FFOrdered{batch_size, output_dim}}, DataType::FLOAT}; + + // TODO: multi gpu + GenericTensorAccessorW label_tensor_backing = + runtime_state.allocators[0].allocate_tensor(output_tensor_shape); + + // construct computation graph + ComputationGraph computation_graph = make_empty_computation_graph(); + + TensorShape input_tensor_shape = TensorShape{ + TensorDims{FFOrdered{batch_size, data_dim}}, DataType::FLOAT}; + + TensorShape weight_shape_1 = TensorShape{ + TensorDims{FFOrdered{data_dim, hidden_dim}}, DataType::FLOAT}; + TensorShape weight_shape_2 = TensorShape{ + TensorDims{FFOrdered{hidden_dim, output_dim}}, DataType::FLOAT}; + + LayerAddedResult inputs_layer = + add_input_layer_with_grad(computation_graph, input_tensor_shape); + + LayerAddedResult weights_layer_1 = add_layer( + computation_graph, + LayerAttrs{ComputationGraphOpAttrs{WeightAttrs{ + weight_shape_1, InitializerAttrs{GlorotNormalAttrs{0}}}}, + std::nullopt}, + {}, + {}); + + LayerAddedResult weights_layer_2 = add_layer( + computation_graph, + LayerAttrs{ComputationGraphOpAttrs{WeightAttrs{ + weight_shape_2, InitializerAttrs{GlorotNormalAttrs{0}}}}, + std::nullopt}, + {}, + {}); + + LayerAddedResult linear_operator_1 = add_layer( + computation_graph, + LayerAttrs{ComputationGraphOpAttrs{LinearAttrs{hidden_dim, + /*use_bias=*/false, + DataType::FLOAT, + Activation::RELU, + std::nullopt}}, + std::nullopt}, + inputs_layer.outputs, + weights_layer_1.outputs); + + LayerAddedResult linear_operator_2 = add_layer( + computation_graph, + LayerAttrs{ComputationGraphOpAttrs{LinearAttrs{output_dim, + /*use_bias=*/false, + DataType::FLOAT, + Activation::RELU, + std::nullopt}}, + std::nullopt}, + linear_operator_1.outputs, + weights_layer_2.outputs); + + tensor_guid_t logit_tensor = get_only(linear_operator_2.outputs); + + RuntimeArgConfig runtime_arg_config = gpu_make_runtime_arg_config( + managed_handle.raw_handle(), + EnableProfiling::YES, + ProfilingSettings{/*warmup_iters=*/0, /*measure_iters=*/1}); + + // initialize training backing + LossAttrs loss_attrs = LossAttrs{ + NonconfigurableLossAttrs{LossFunction::CATEGORICAL_CROSSENTROPY}}; + OptimizerAttrs optimizer_attrs = OptimizerAttrs{ + SGDOptimizerAttrs{ + /*lr=*/0.001, + /*momentum=*/0.9, + /*nesterov=*/false, + /*weight_decay=*/0.001, + }, + }; + + ForwardTensorSource forward_tensor_source; + GradientTensorSource gradient_tensor_source; + OptimizerTensorSource optimizer_tensor_source; + LossTensorSource loss_tensor_source; + + TrainingComputationGraph training_computation_graph = + generate_training_computation_graph(computation_graph, + optimizer_attrs, + logit_tensor, + forward_tensor_source, + gradient_tensor_source, + optimizer_tensor_source, + loss_tensor_source); + + LocalTrainingBacking local_training_backing = + make_realm_training_backing_for_computation_graph( + /*runtime_state=*/runtime_state, + /*preallocated_tensors=*/ + { + { + training_tensor_guid_t{ + training_computation_graph.label_tensor}, + label_tensor_backing, + }, + }, + /*training_computation_graph=*/training_computation_graph, + /*runtime_arg_config=*/runtime_arg_config, + /*optimizer_attrs=*/optimizer_attrs); + + // begin training loop + ModelTrainingInstance model_training_instance = ModelTrainingInstance{ + runtime_state, local_training_backing, loss_attrs, optimizer_attrs}; + + { + printf("\nRunning test %d: E2ETest...\n", 1); + Allocator cpu_allocator = create_local_cpu_memory_allocator(); + + int num_epochs = 5; + std::vector loss_values; + + for (int i = 0; i < num_epochs; i++) { + model_training_instance.forward(); + model_training_instance.backward(); + model_training_instance.update(); + loss_values.push_back(copy_tensor_accessor_r( + model_training_instance.get_loss_tensor_accessor(), cpu_allocator)); + } + + // Assert that each sample in the batch has a lower loss in last epoch than + // the first epoch + GenericTensorAccessorR first_epoch_loss = loss_values.at(0); + GenericTensorAccessorR last_epoch = loss_values.back(); + assert(did_loss_decrease(first_epoch_loss, last_epoch)); + printf("passed!\n"); + } +} diff --git a/lib/realm-backend/test/src/test_utils.cc b/lib/realm-backend/test/src/test_utils.cc new file mode 100644 index 0000000000..b7a4e16b97 --- /dev/null +++ b/lib/realm-backend/test/src/test_utils.cc @@ -0,0 +1,19 @@ +#include "test_utils.h" +#include "pcg/tensor_guid_t.dtg.h" + +namespace FlexFlow { + +PerDeviceFFHandle get_mock_per_device_ff_handle() { + return {nullptr, nullptr, nullptr, 0, false}; +} + +size_t MockTensorGuidSource::next_available_mock_tensor_guid = 0; + +MockTensorGuidSource::MockTensorGuidSource() {} + +tensor_guid_t MockTensorGuidSource::new_mock_tensor_guid() { + size_t next_guid = MockTensorGuidSource::next_available_mock_tensor_guid++; + return tensor_guid_t{DataflowOutput{Node{0}, nonnegative_int{next_guid}}}; +} + +} // namespace FlexFlow diff --git a/lib/realm-backend/test/src/test_utils.h b/lib/realm-backend/test/src/test_utils.h new file mode 100644 index 0000000000..056e92687c --- /dev/null +++ b/lib/realm-backend/test/src/test_utils.h @@ -0,0 +1,23 @@ +#ifndef _FLEXFLOW_LOCAL_EXECUTION_TEST_UTILS +#define _FLEXFLOW_LOCAL_EXECUTION_TEST_UTILS + +#include "kernels/ff_handle.h" +#include "pcg/tensor_guid_t.dtg.h" + +namespace FlexFlow { + +struct MockTensorGuidSource { +public: + MockTensorGuidSource(); + + tensor_guid_t new_mock_tensor_guid(); + +private: + static size_t next_available_mock_tensor_guid; +}; + +PerDeviceFFHandle get_mock_per_device_ff_handle(); + +} // namespace FlexFlow + +#endif