Skip to content

Commit

Permalink
#410: epoch: rework deps, objgroup dep epochs, scheduler buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
lifflander committed Oct 16, 2023
1 parent 7ff70d6 commit 358d7b7
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 141 deletions.
22 changes: 11 additions & 11 deletions src/vt/epoch/epoch_manip.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,26 +140,26 @@ EpochWindow* EpochManip::getTerminatedWindow(EpochType epoch) {
}

/*static*/ bool EpochManip::isDS(EpochType epoch) {
using T = typename std::underlying_type<epoch::eEpochCategory>::type;
if (isRooted(epoch)) {
auto const ds_bit = epoch::eEpochCategory::DijkstraScholtenEpoch;
auto const cat = category(epoch);
bool const is_ds = static_cast<T>(cat) & static_cast<T>(ds_bit);
return is_ds;
using T = typename std::underlying_type<eEpochCategory>::type;
if (epoch != term::any_epoch_sentinel and isRooted(epoch)) {
BitPackerType::FieldType const ds_bit =
static_cast<T>(eEpochCategory::DijkstraScholtenEpoch) - 1;
auto cat = static_cast<T>(EpochManip::category(epoch));
return BitPackerType::boolGetField<ds_bit,1,decltype(cat)>(cat);
} else {
return false;
}
}

/*static*/ bool EpochManip::isDep(EpochType epoch) {
using T = typename std::underlying_type<epoch::eEpochCategory>::type;
using T = typename std::underlying_type<eEpochCategory>::type;
if (epoch == no_epoch or epoch == term::any_epoch_sentinel) {
return false;
}
auto const dep_bit = epoch::eEpochCategory::DependentEpoch;
auto const cat = epoch::EpochManip::category(epoch);
bool const is_dep = static_cast<T>(cat) & static_cast<T>(dep_bit);
return is_dep;
BitPackerType::FieldType const dep_bit =
static_cast<T>(eEpochCategory::DependentEpoch) - 1;
auto cat = static_cast<T>(EpochManip::category(epoch));
return BitPackerType::boolGetField<dep_bit,1,decltype(cat)>(cat);
}

/*static*/ eEpochCategory EpochManip::category(EpochType const& epoch) {
Expand Down
4 changes: 4 additions & 0 deletions src/vt/objgroup/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,8 @@ std::unordered_map<ObjGroupProxyType, std::vector<ActionType>>& getPending() {
return theObjGroup()->pending_;
}

ObjGroupProxyType getProxyFromPtr(void* obj) {
return theObjGroup()->getProxyFromPtr(obj);
}

}} /* end namespace vt::objgroup */
2 changes: 2 additions & 0 deletions src/vt/objgroup/manager.fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ decltype(auto) invoke(messaging::MsgSharedPtr<MsgT> msg, HandlerType han, NodeTy
template <typename MsgT>
messaging::PendingSend broadcast(MsgSharedPtr<MsgT> msg, HandlerType han);

ObjGroupProxyType getProxyFromPtr(void* obj);

}} /* end namespace vt::objgroup */

namespace vt {
Expand Down
17 changes: 17 additions & 0 deletions src/vt/objgroup/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,23 @@ struct ObjGroupManager : runtime::component::Component<ObjGroupManager> {
*/
elm::ElementIDStruct getNextElm(ObjGroupProxyType proxy);

public:
/**
* \brief Get an objgroup proxy from pointer
*
* \param[in] obj the pointer
*
* \return objgroup proxy
*/
ObjGroupProxyType getProxyFromPtr(void* obj) const {
auto iter = obj_to_proxy_.find(obj);
if (iter != obj_to_proxy_.end()) {
return iter->second;
} else {
return no_obj_group;
}
}

private:
/// The current obj ID, sequential on each node for collective construction
ObjGroupIDType cur_obj_id_ = fst_obj_group_id;
Expand Down
2 changes: 2 additions & 0 deletions src/vt/runnable/runnable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ void RunnableNew::setupHandler(HandlerType handler) {
void RunnableNew::setupHandlerObjGroup(void* obj, HandlerType handler) {
f_.func_ = auto_registry::getAutoHandlerObjGroup(handler).get();
obj_ = obj;
is_objgroup_ = true;
}

void RunnableNew::setupHandlerElement(
Expand All @@ -92,6 +93,7 @@ void RunnableNew::setupHandlerElement(
auto_registry::getAutoHandlerCollectionMem(handler).get() :
auto_registry::getAutoHandlerCollection(handler).get();
obj_ = elm;
is_objgroup_ = false;
}

void RunnableNew::setupHandlerElement(
Expand Down
15 changes: 15 additions & 0 deletions src/vt/runnable/runnable.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,20 @@ struct RunnableNew {
*/
EpochType getEpoch() const;

/**
* \brief Whether this runnable targets an object group
*
* \return targets object group
*/
bool isObjGroup() const { return is_objgroup_; }

/**
* \brief Get the object this runnable targets
*
* \return the obj
*/
void* getObj() const { return obj_; }

private:
detail::Contexts contexts_; /**< The contexts */
MsgSharedPtr<BaseMsgType> msg_ = nullptr; /**< The associated message */
Expand All @@ -369,6 +383,7 @@ struct RunnableNew {
DispatcherScatterType func_scat_;
} f_;
bool is_scatter_ = false;
bool is_objgroup_ = false;
#if vt_check_enabled(fcontext)
bool is_threaded_ = false; /**< Whether ULTs are supported */
bool done_ = false; /**< Whether task is complete */
Expand Down
7 changes: 7 additions & 0 deletions src/vt/scheduler/base_unit.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ struct BaseUnit {
*/
void execute();

/**
* \brief Get the runnable
*
* \return the runnable
*/
RunnablePtrType getRunnable() const { return r_; }

protected:
RunnablePtrType r_ = nullptr; /**< the runnable task */
ActionType work_ = nullptr; /**< the lambda task */
Expand Down
21 changes: 19 additions & 2 deletions src/vt/scheduler/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,7 @@ void Scheduler::resume(ThreadIDType tid) {
}

void Scheduler::releaseEpoch(EpochType ep) {
auto iter = pending_work_.find(ep);
if (iter != pending_work_.end()) {
if (auto iter = pending_work_.find(ep); iter != pending_work_.end()) {
auto& container = iter->second;
while (container.size() > 0) {
work_queue_.emplace(container.pop());
Expand All @@ -395,6 +394,24 @@ void Scheduler::releaseEpoch(EpochType ep) {
}
}

void Scheduler::releaseEpochObjgroup(EpochType ep, ObjGroupProxyType proxy) {
released_objgroups_[ep].insert(proxy);

if (auto iter = pending_objgroup_work_.find(ep);
iter != pending_objgroup_work_.end()) {
if (auto iter2 = iter->second.find(proxy); iter2 != iter->second.end()) {
auto& container = iter2->second;
while (container.size() > 0) {
work_queue_.emplace(container.pop());
}
iter->second.erase(iter2);
}
if (iter->second.size() == 0) {
pending_objgroup_work_.erase(iter);
}
}
}

#if vt_check_enabled(fcontext)
ThreadManager* Scheduler::getThreadManager() {
return thread_manager_.get();
Expand Down
41 changes: 29 additions & 12 deletions src/vt/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,14 @@ struct Scheduler : runtime::component::Component<Scheduler> {
bool hasSchedRun() const { return has_executed_; }

/**
* \brief Enqueue an action to execute later with the default priority
* \brief Enqueue an action to execute later with a priority
* \c default_priority
*
* \param[in] r action to execute
* \param[in] priority the priority of the action
*/
template <typename RunT>
void enqueue(RunT r);
void enqueue(RunT r, PriorityType priority = default_priority);

/**
* \brief Enqueue a callable to execute later with the default priority
Expand All @@ -252,15 +253,6 @@ struct Scheduler : runtime::component::Component<Scheduler> {
template <typename Callable>
void enqueueLambda(PriorityType priority, Callable&& c);

/**
* \brief Enqueue an runnable with a priority to execute later
*
* \param[in] priority the priority of the action
* \param[in] r the runnable to execute later
*/
template <typename RunT>
void enqueue(PriorityType priority, RunT r);

/**
* \brief Print current memory usage
*/
Expand Down Expand Up @@ -295,6 +287,14 @@ struct Scheduler : runtime::component::Component<Scheduler> {
template <typename MsgT, typename RunT>
void enqueue(messaging::MsgSharedPtr<MsgT> const& msg, RunT r);

/**
* \brief Enqueue a work unit or postpone
*
* \param[in] u the work unit
*/
template <typename UnitT>
void enqueueOrPostpone(UnitT u);

/**
* \brief Get the work queue size
*
Expand Down Expand Up @@ -377,6 +377,13 @@ struct Scheduler : runtime::component::Component<Scheduler> {
*/
void releaseEpoch(EpochType ep);

/**
* \brief Release an epoch to run
*
* \param[in] ep the epoch to release
*/
void releaseEpochObjgroup(EpochType ep, ObjGroupProxyType proxy);

template <typename SerializerT>
void serialize(SerializerT& s) {
s | work_queue_
Expand Down Expand Up @@ -407,7 +414,9 @@ struct Scheduler : runtime::component::Component<Scheduler> {
| schedLoopTime
| idleTime
| idleTimeMinusTerm
| pending_work_;
| pending_work_
| pending_objgroup_work_
| released_objgroups_;
}

private:
Expand Down Expand Up @@ -448,6 +457,14 @@ struct Scheduler : runtime::component::Component<Scheduler> {
/// Unreleased work pending an epoch release
std::unordered_map<EpochType, Queue<UnitType>> pending_work_;

/// Unreleased work pending on an objgroup epoch release
std::unordered_map<
EpochType, std::unordered_map<ObjGroupProxyType, Queue<UnitType>>
> pending_objgroup_work_;

/// Released epochs for an objgroup
std::unordered_map<EpochType, std::set<ObjGroupProxyType>> released_objgroups_;

#if vt_check_enabled(fcontext)
std::unique_ptr<ThreadManager> thread_manager_ = nullptr;
#endif
Expand Down
Loading

0 comments on commit 358d7b7

Please sign in to comment.