Functions: enable multi-threading when many nodes are scheduled at once

Nodes that are scheduled can be executed in any order in theory. So when
there are many scheduled nodes, it can be benefitial to start evaluating
them in parallel.

Note that it is not very common that many nodes are scheduled at the
same time in typical setups because the evaluator uses a depth-first heuristic
to decide in which order to evaluate nodes. It can happen more easily in
generated node trees though.

Also, this change only has an affect in practice if none of the scheduled nodes
uses multi-threading internally, as this would also trigger the user of multiple
threads in the graph executor.
This commit is contained in:
Jacques Lucke 2023-10-08 16:21:23 +02:00
parent 8822e4de73
commit 7bd509f73a
1 changed files with 39 additions and 5 deletions

View File

@ -250,6 +250,25 @@ struct ScheduledNodes {
{
return this->priority_.is_empty() && this->normal_.is_empty();
}
int64_t nodes_num() const
{
return priority_.size() + normal_.size();
}
/**
* Split up the scheduled nodes into two groups that can be worked on in parallel.
*/
void split_into(ScheduledNodes &other)
{
BLI_assert(this != &other);
const int64_t priority_split = priority_.size() / 2;
const int64_t normal_split = normal_.size() / 2;
other.priority_.extend(priority_.as_span().drop_front(priority_split));
other.normal_.extend(normal_.as_span().drop_front(normal_split));
priority_.resize(priority_split);
normal_.resize(normal_split);
}
};
struct CurrentTask {
@ -794,6 +813,16 @@ class Executor {
current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed);
}
this->run_node_task(*node, current_task, local_data);
/* If there are many nodes scheduled at the same time, it's benefitial to let multiple
* threads work on those. */
if (current_task.scheduled_nodes.nodes_num() > 128) {
if (this->try_enable_multi_threading()) {
std::unique_ptr<ScheduledNodes> split_nodes = std::make_unique<ScheduledNodes>();
current_task.scheduled_nodes.split_into(*split_nodes);
this->push_to_task_pool(std::move(split_nodes));
}
}
}
}
@ -1229,10 +1258,10 @@ class Executor {
/**
* Allow other threads to steal all the nodes that are currently scheduled on this thread.
*/
void move_scheduled_nodes_to_task_pool(CurrentTask &current_task)
void push_all_scheduled_nodes_to_task_pool(CurrentTask &current_task)
{
BLI_assert(this->use_multi_threading());
ScheduledNodes *scheduled_nodes = MEM_new<ScheduledNodes>(__func__);
std::unique_ptr<ScheduledNodes> scheduled_nodes = std::make_unique<ScheduledNodes>();
{
std::lock_guard lock{current_task.mutex};
if (current_task.scheduled_nodes.is_empty()) {
@ -1241,6 +1270,11 @@ class Executor {
*scheduled_nodes = std::move(current_task.scheduled_nodes);
current_task.has_scheduled_nodes.store(false, std::memory_order_relaxed);
}
this->push_to_task_pool(std::move(scheduled_nodes));
}
void push_to_task_pool(std::unique_ptr<ScheduledNodes> scheduled_nodes)
{
/* All nodes are pushed as a single task in the pool. This avoids unnecessary threading
* overhead when the nodes are fast to compute. */
BLI_task_pool_push(
@ -1254,9 +1288,9 @@ class Executor {
const LocalData local_data = executor.get_local_data();
executor.run_task(new_current_task, local_data);
},
scheduled_nodes,
scheduled_nodes.release(),
true,
[](TaskPool * /*pool*/, void *data) { MEM_delete(static_cast<ScheduledNodes *>(data)); });
[](TaskPool * /*pool*/, void *data) { delete static_cast<ScheduledNodes *>(data); });
}
LocalData get_local_data()
@ -1410,7 +1444,7 @@ inline void Executor::execute_node(const FunctionNode &node,
if (!this->try_enable_multi_threading()) {
return;
}
this->move_scheduled_nodes_to_task_pool(current_task);
this->push_all_scheduled_nodes_to_task_pool(current_task);
};
lazy_threading::HintReceiver blocking_hint_receiver{blocking_hint_fn};