// Copyright Sebastian Jeckel 2014. // Distributed under the Boost Software License, Version 1.0. // (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) #include "react/engine/SubtreeEngine.h" #include #include #include "react/common/Types.h" /***************************************/ REACT_IMPL_BEGIN /**************************************/ namespace subtree { /////////////////////////////////////////////////////////////////////////////////////////////////// /// Parameters /////////////////////////////////////////////////////////////////////////////////////////////////// static const uint chunk_size = 8; static const uint dfs_threshold = 3; /////////////////////////////////////////////////////////////////////////////////////////////////// /// Turn /////////////////////////////////////////////////////////////////////////////////////////////////// Turn::Turn(TurnIdT id, TransactionFlagsT flags) : TurnBase( id, flags ) {} /////////////////////////////////////////////////////////////////////////////////////////////////// /// PulsecountEngine /////////////////////////////////////////////////////////////////////////////////////////////////// void EngineBase::OnNodeAttach(Node& node, Node& parent) { parent.Successors.Add(node); if (node.Level <= parent.Level) node.Level = parent.Level + 1; } void EngineBase::OnNodeDetach(Node& node, Node& parent) { parent.Successors.Remove(node); } void EngineBase::OnInputChange(Node& node, Turn& turn) { processChildren(node, turn); } /////////////////////////////////////////////////////////////////////////////////////////////////// /// UpdaterTask /////////////////////////////////////////////////////////////////////////////////////////////////// class UpdaterTask: public task { public: using BufferT = NodeBuffer; UpdaterTask(Turn& turn, Node* node) : turn_( turn ), nodes_( node ) {} UpdaterTask(UpdaterTask& other, SplitTag) : turn_( other.turn_ ), nodes_( other.nodes_, SplitTag( ) ) {} task* execute() { uint splitCount = 0; while (!nodes_.IsEmpty()) { Node& node = splitCount > dfs_threshold ? *nodes_.PopBack() : *nodes_.PopFront(); if (node.IsInitial() || node.ShouldUpdate()) node.Tick(&turn_); node.ClearInitialFlag(); node.SetShouldUpdate(false); // Defer if node was dynamically attached to a predecessor that // has not pulsed yet if (node.IsDeferred()) { node.ClearDeferredFlag(); continue; } // Repeat the update if a node was dynamically attached to a // predecessor that has already pulsed while (node.IsRepeated()) { node.ClearRepeatedFlag(); node.Tick(&turn_); } node.SetReadyCount(0); // Mark successors for update? bool update = node.IsChanged(); {// node.ShiftMutex Node::ShiftMutexT::scoped_lock lock(node.ShiftMutex, false); for (auto* succ : node.Successors) { if (update) succ->SetShouldUpdate(true); // Wait for more? if (succ->IncReadyCount()) continue; // Heavyweight - spawn new task if (succ->IsHeavyweight()) { auto& t = *new(task::allocate_additional_child_of(*parent())) UpdaterTask(turn_, succ); spawn(t); } // Leightweight - add to buffer, split if full else { nodes_.PushBack(succ); if (nodes_.IsFull()) { ++splitCount; //Delegate half the work to new task auto& t = *new(task::allocate_additional_child_of(*parent())) UpdaterTask(*this, SplitTag()); spawn(t); } } } node.ClearMarkedFlag(); }// ~node.ShiftMutex } return nullptr; } private: Turn& turn_; BufferT nodes_; }; /////////////////////////////////////////////////////////////////////////////////////////////////// /// EngineBase /////////////////////////////////////////////////////////////////////////////////////////////////// void EngineBase::Propagate(Turn& turn) { // Phase 1 while (scheduledNodes_.FetchNext()) { for (auto* curNode : scheduledNodes_.NextValues()) { if (curNode->Level < curNode->NewLevel) { curNode->Level = curNode->NewLevel; invalidateSuccessors(*curNode); scheduledNodes_.Push(curNode); continue; } curNode->ClearQueuedFlag(); curNode->Tick(&turn); } } // Phase 2 isInPhase2_ = true; assert((1 + subtreeRoots_.size()) <= static_cast((std::numeric_limits::max)())); rootTask_.set_ref_count(1 + static_cast(subtreeRoots_.size())); for (auto* node : subtreeRoots_) { // Ignore if root flag has been cleared because node was part of another subtree if (! node->IsRoot()) { rootTask_.decrement_ref_count(); continue; } spawnList_.push_back(*new(rootTask_.allocate_child()) UpdaterTask(turn, node)); node->ClearRootFlag(); } rootTask_.spawn_and_wait_for_all(spawnList_); subtreeRoots_.clear(); spawnList_.clear(); isInPhase2_ = false; } void EngineBase::OnNodePulse(Node& node, Turn& turn) { if (isInPhase2_) node.SetChangedFlag(); else processChildren(node, turn); } void EngineBase::OnNodeIdlePulse(Node& node, Turn& turn) { if (isInPhase2_) node.ClearChangedFlag(); } void EngineBase::OnDynamicNodeAttach(Node& node, Node& parent, Turn& turn) { if (isInPhase2_) { applyAsyncDynamicAttach(node, parent, turn); } else { OnNodeAttach(node, parent); invalidateSuccessors(node); // Re-schedule this node node.SetQueuedFlag(); scheduledNodes_.Push(&node); } } void EngineBase::OnDynamicNodeDetach(Node& node, Node& parent, Turn& turn) { if (isInPhase2_) applyAsyncDynamicDetach(node, parent, turn); else OnNodeDetach(node, parent); } void EngineBase::applyAsyncDynamicAttach(Node& node, Node& parent, Turn& turn) {// parent.ShiftMutex (write) NodeShiftMutexT::scoped_lock lock(parent.ShiftMutex, true); parent.Successors.Add(node); // Level recalulation applied when added to topoqueue next time. // During the async phase 2 it's not needed. if (node.NewLevel <= parent.Level) node.NewLevel = parent.Level + 1; // Has already nudged its neighbors? if (! parent.IsMarked()) { node.SetRepeatedFlag(); } else { node.SetDeferredFlag(); node.SetShouldUpdate(true); node.DecReadyCount(); } }// ~parent.ShiftMutex (write) void EngineBase::applyAsyncDynamicDetach(Node& node, Node& parent, Turn& turn) {// parent.ShiftMutex (write) NodeShiftMutexT::scoped_lock lock(parent.ShiftMutex, true); parent.Successors.Remove(node); }// ~parent.ShiftMutex (write) void EngineBase::processChildren(Node& node, Turn& turn) { // Add children to queue for (auto* succ : node.Successors) { // Ignore if node part of marked subtree if (succ->IsMarked()) continue; // Light nodes use sequential toposort in phase 1 if (! succ->IsHeavyweight()) { if (!succ->IsQueued()) { succ->SetQueuedFlag(); scheduledNodes_.Push(succ); } } // Heavy nodes + subtrees are deferred for parallel updating in phase 2 else { // Force an initial update for heavy non-input nodes. // (non-atomic flag, unlike ShouldUpdate) if (!succ->IsInputNode()) succ->SetInitialFlag(); succ->SetChangedFlag(); succ->SetRootFlag(); markSubtree(*succ); subtreeRoots_.push_back(succ); } } } void EngineBase::markSubtree(Node& root) { root.SetMarkedFlag(); root.WaitCount = 0; for (auto* succ : root.Successors) { if (!succ->IsMarked()) markSubtree(*succ); // Successor of another marked node? -> not a root anymore else if (succ->IsRoot()) succ->ClearRootFlag(); ++succ->WaitCount; } } void EngineBase::invalidateSuccessors(Node& node) { for (auto* succ : node.Successors) { if (succ->NewLevel <= node.Level) succ->NewLevel = node.Level + 1; } } } // ~namespace subtree /****************************************/ REACT_IMPL_END /***************************************/