diff --git a/flecsi/data/hpx/dense.h b/flecsi/data/hpx/dense.h index 12501aeb2..dbb46a240 100644 --- a/flecsi/data/hpx/dense.h +++ b/flecsi/data/hpx/dense.h @@ -136,6 +136,8 @@ struct storage_class_u { auto & context = execution::context_t::instance(); + using client_type = typename DATA_CLIENT_TYPE::type_identifier_t; + // get field_info for this data handle auto & field_info = context.get_field_info_from_name( typeid(typename DATA_CLIENT_TYPE::type_identifier_t).hash_code(), diff --git a/flecsi/execution/execution.h b/flecsi/execution/execution.h index 484c69eb7..f5096ed36 100644 --- a/flecsi/execution/execution.h +++ b/flecsi/execution/execution.h @@ -433,7 +433,7 @@ clog_register_tag(execution); #define flecsi_execute_mpi_task(task, nspace, ...) \ /* MACRO IMPLEMENTATION */ \ \ - flecsi_execute_task(task, nspace, index, ##__VA_ARGS__) + flecsi_execute_task(task, nspace, index, ##__VA_ARGS__).get() //----------------------------------------------------------------------------// // Reduction Interface diff --git a/flecsi/execution/hpx/context_policy.cc b/flecsi/execution/hpx/context_policy.cc index 882b09fb4..08b124dbd 100644 --- a/flecsi/execution/hpx/context_policy.cc +++ b/flecsi/execution/hpx/context_policy.cc @@ -58,10 +58,6 @@ hpx_context_policy_t::hpx_main(void (*driver)(int, char *[]), int argc, char * argv[]) { - // initialize executors (possible only after runtime is active) - exec_ = std::make_unique("default"); - mpi_exec_ = std::make_unique("mpi"); - // execute user code (driver) (*driver)(argc, argv); @@ -84,29 +80,15 @@ hpx_context_policy_t::start_hpx(void (*driver)(int, char *[]), // disable HPX' short options "hpx.commandline.aliasing!=0"}; - auto init_rp = [](hpx::resource::partitioner & rp) { - // Create a thread pool encapsulating the default scheduler - rp.create_thread_pool("default", hpx::resource::local_priority_fifo); - - // Create a thread pool for executing MPI tasks - rp.create_thread_pool("mpi", hpx::resource::static_); - - // Add first core to mpi pool - rp.add_resource(rp.numa_domains()[0].cores()[0].pus()[0], "mpi"); - }; - // Now, initialize and run the HPX runtime, will return when done. #if HPX_VERSION_FULL < 0x010500 - hpx::resource::partitioner rp{ + return hpx::init( hpx::util::bind_front(&hpx_context_policy_t::hpx_main, this, driver), argc, - argv, cfg}; - init_rp(rp); - return hpx::init(); + argv, cfg); #else // Newer versions of HPX do not allow to explicitly initialize the // resource partitioner anymore hpx::init_params params; - params.rp_callback = init_rp; params.cfg = std::move(cfg); return hpx::init( diff --git a/flecsi/execution/hpx/context_policy.h b/flecsi/execution/hpx/context_policy.h index f1d6e2dd1..ecf0889dd 100644 --- a/flecsi/execution/hpx/context_policy.h +++ b/flecsi/execution/hpx/context_policy.h @@ -7,18 +7,19 @@ /@@ /@@/@@//// //@@ @@ /@@/@@ /@@ @@@//@@@@@@ //@@@@@@ @@@@@@@@ /@@ // /// ////// ////// //////// // -*/ + Copyright (c) 2016, Los Alamos National Security, LLC + All rights reserved. + */ #pragma once +#include #include -#include #include #include #include #include -#include #include #include #include @@ -50,49 +51,6 @@ #include -#if HPX_VERSION_FULL < 0x010500 -namespace flecsi { -namespace execution { - -using pool_executor = hpx::threads::executors::pool_executor; -} // namespace execution -} // namespace flecsi -#else -namespace flecsi { -namespace execution { - -// newer versions of HPX do not support pool_executor anymore -class pool_executor : public hpx::parallel::execution::thread_pool_executor -{ -public: - explicit pool_executor(std::string const & pool_name = "default") - : hpx::parallel::execution::thread_pool_executor( - &hpx::threads::get_thread_manager().get_pool(pool_name), - hpx::threads::thread_priority_default, - hpx::threads::thread_stacksize_default) {} -}; -} // namespace execution -} // namespace flecsi - -namespace hpx { -namespace parallel { -namespace execution { -template<> -struct is_one_way_executor : std::true_type { -}; - -template<> -struct is_two_way_executor : std::true_type { -}; - -template<> -struct is_bulk_two_way_executor - : std::true_type {}; -} // namespace execution -} // namespace parallel -} // namespace hpx -#endif - namespace flecsi { namespace execution { @@ -855,14 +813,6 @@ struct hpx_context_policy_t { start_hpx(void (*driver)(int, char *[]), int argc, char * argv[]); public: - flecsi::execution::pool_executor & get_default_executor() { - return *exec_; - } - - flecsi::execution::pool_executor & get_mpi_executor() { - return *mpi_exec_; - } - int rank; // private: @@ -886,11 +836,6 @@ struct hpx_context_policy_t { // Map to store task registration callback methods. std::map task_registry_; - -private: - std::unique_ptr exec_; - std::unique_ptr mpi_exec_; - }; // struct hpx_context_policy_t } // namespace execution diff --git a/flecsi/execution/hpx/execution_policy.h b/flecsi/execution/hpx/execution_policy.h index 46793b409..c969464e2 100644 --- a/flecsi/execution/hpx/execution_policy.h +++ b/flecsi/execution/hpx/execution_policy.h @@ -17,9 +17,6 @@ #include #include -#include -#include -#include #include #include @@ -52,11 +49,10 @@ struct executor_u { /*! FIXME documentation */ - template - static decltype(auto) execute(Exec && exec, T function, A && targs) { - auto user_fun = reinterpret_cast)>(function); - return hpx::async( - std::forward(exec), user_fun, std::forward(targs)); + template + static decltype(auto) execute(T function, A && targs) { + auto user_fun = reinterpret_cast(function); + return hpx::async(user_fun, utils::forward_tuple(std::forward(targs))); } // execute_task }; // struct executor_u @@ -164,27 +160,8 @@ struct FLECSI_EXPORT hpx_execution_policy_t { annotation::begin(tname); - // hpx_future_u future; - // auto processor_type = context_.processor_type(); - // if(processor_type == processor_type_t::mpi) { - // - // { - // clog_tag_guard(execution); - // clog(info) << "Executing MPI task: " << TASK << std::endl; - // } - // - // future = executor_u::execute( - // context_t::instance().get_mpi_executor(), std::move(function), - // task_args); - // } - // else { - // future = executor_u::execute( - // context_t::instance().get_default_executor(), - // std::move(function), task_args); - // } - hpx_future_u future = executor_u::execute( - context_t::instance().get_default_executor(), std::move(function), - task_args); + hpx_future_u future = + executor_u::execute(function, task_args); annotation::end(); @@ -204,7 +181,8 @@ struct FLECSI_EXPORT hpx_execution_policy_t { if constexpr(REDUCTION != ZERO) { return future - .then([&](hpx_future_u && future) { + .then([](hpx_future_u && future) { + context_t & context_ = context_t::instance(); MPI_Datatype datatype = flecsi::utils::mpi_typetraits_u::type(); diff --git a/flecsi/execution/hpx/runtime_main.cc b/flecsi/execution/hpx/runtime_main.cc index e9fc40c58..06cf6e800 100644 --- a/flecsi/execution/hpx/runtime_main.cc +++ b/flecsi/execution/hpx/runtime_main.cc @@ -19,6 +19,8 @@ #error FLECSI_ENABLE_MPI not defined! This file depends on MPI! #endif +#include + #include #include @@ -36,7 +38,14 @@ int main(int argc, char ** argv) { // Initialize the MPI runtime - MPI_Init(&argc, &argv); + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + if(provided < MPI_THREAD_MULTIPLE) { + printf("ERROR: Your implementation of MPI does not support " + "MPI_THREAD_MULTIPLE which is required for use of " + "MPI conduit with the HPX-MPI Interop!\n"); + } + assert(provided == MPI_THREAD_MULTIPLE); // get the rank int rank{0}; diff --git a/flecsi/topology/hpx/set_storage_policy.h b/flecsi/topology/hpx/set_storage_policy.h index 50d773f58..bfb3c91f6 100644 --- a/flecsi/topology/hpx/set_storage_policy.h +++ b/flecsi/topology/hpx/set_storage_policy.h @@ -61,16 +61,31 @@ struct hpx_set_topology_storage_policy_u { } void init_entities(size_t index_space, + size_t active_migrate_index_space, set_entity_t * entities, - size_t size, size_t num_entities, + set_entity_t * active_entities, + size_t num_active_entities, + set_entity_t * migrate_entities, + size_t num_migrate_entities, + size_t size, bool read) { + auto itr = index_space_map.find(index_space); clog_assert(itr != index_space_map.end(), "invalid index space"); auto & is = index_spaces[itr->second]; auto s = is.storage(); - s->set_buffer(entities, num_entities, read); + *s = {{entities, num_entities}, read ? num_entities : 0}; + + itr = index_space_map.find(active_migrate_index_space); + clog_assert( + itr != index_space_map.end(), "invalid active migrate index space"); + auto & amis = index_spaces[itr->second]; + auto s2 = amis.storage(); + + // how to handle migration buffer? + s2->set_buffer(active_entities, num_entities, read); if(!read) { return; @@ -101,15 +116,9 @@ struct hpx_set_topology_storage_policy_u { constexpr std::size_t index_space = find_set_index_space_u::find(); - auto & is = index_spaces[index_space].template cast(); - std::size_t entity = is.size(); - - auto placement_ptr = static_cast(is.storage()->buffer()) + entity; + auto & is = index_spaces[index_space]; + auto placement_ptr = static_cast(is.data.buffer()) + is.ids.size(); auto ent = new(placement_ptr) T(std::forward(args)...); - auto storage = is.storage(); - storage->pushed(); - is.pushed(); - return ent; } };