Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Versioning](http://semver.org/spec/v2.0.0.html).
- Introduce new experimental `for_each_item` utility to iterate over a celerity range (#199)
- Add new environment variables `CELERITY_HORIZON_STEP` and `CELERITY_HORIZON_MAX_PARALLELISM` to control Horizon generation (#199)
- Add new `experimental::constrain_split` API to limit how a kernel can be split (#?)
- Add divergence check blockchain for automatic detection of diverging tasks in debug mode (#217)
- Add automatic detection of diverging execution in debug mode (#217)
- `distr_queue::fence` and `buffer_snapshot` are now stable, subsuming the `experimental::` APIs of the same name (#225)
- Celerity now warns at runtime when a task declares reads from uninitialized buffers or writes with overlapping ranges between nodes (#224)
- Introduce new `experimental::hint` API for providing the runtime with additional information on how to execute a task (#227)
Expand Down
8 changes: 7 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ endif()

option(CELERITY_ACCESS_PATTERN_DIAGNOSTICS "Diagnose uninitialized reads and overlapping writes" ${DEFAULT_ENABLE_DEBUG_CHECKS})
option(CELERITY_ACCESSOR_BOUNDARY_CHECK "Enable accessor boundary check" ${DEFAULT_ENABLE_DEBUG_CHECKS})
option(CELERITY_DIVERGENCE_CHECK "Enable divergence check" ${DEFAULT_ENABLE_DEBUG_CHECKS})

if(CELERITY_ACCESSOR_BOUNDARY_CHECK AND NOT (CMAKE_BUILD_TYPE STREQUAL "Debug"))
message(STATUS "Accessor boundary check enabled - this will impact kernel performance")
endif()

if(CELERITY_DIVERGENCE_CHECK AND NOT (CMAKE_BUILD_TYPE STREQUAL "Debug"))
message(STATUS "Divergence checker enabled - this will impact the overall performance")
endif()

set(CELERITY_CMAKE_DIR "${PROJECT_SOURCE_DIR}/cmake")
set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH}" "${CELERITY_CMAKE_DIR}")
find_package(MPI 2.0 REQUIRED)
Expand Down Expand Up @@ -186,7 +191,7 @@ set(SOURCES
src/command_graph.cc
src/config.cc
src/device_queue.cc
src/divergence_block_chain.cc
src/divergence_checker.cc
src/executor.cc
src/distributed_graph_generator.cc
src/graph_serializer.cc
Expand Down Expand Up @@ -289,6 +294,7 @@ target_compile_definitions(celerity_runtime PUBLIC
CELERITY_FEATURE_UNNAMED_KERNELS=$<BOOL:${CELERITY_FEATURE_UNNAMED_KERNELS}>
CELERITY_DETAIL_HAS_NAMED_THREADS=$<BOOL:${CELERITY_DETAIL_HAS_NAMED_THREADS}>
CELERITY_ACCESSOR_BOUNDARY_CHECK=$<BOOL:${CELERITY_ACCESSOR_BOUNDARY_CHECK}>
CELERITY_DIVERGENCE_CHECK=$<BOOL:${CELERITY_DIVERGENCE_CHECK}>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also needs to be added to cmake/celerity-config.cmake.in!

CELERITY_ACCESS_PATTERN_DIAGNOSTICS=$<BOOL:${CELERITY_ACCESS_PATTERN_DIAGNOSTICS}>
)

Expand Down
4 changes: 4 additions & 0 deletions docs/pitfalls.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,7 @@ if(rand() > 1337) {
celerity::buffer<float, 2> my_buffer(...);
}
```

> Diverging Host-Execution can be detected at runtime by enabling the
> `CELERITY_DIVERGENCE_CHECK` CMake option at the cost of some runtime
> overhead (enabled by default in debug builds).
28 changes: 14 additions & 14 deletions include/divergence_block_chain.h → include/divergence_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
#include "communicator.h"
#include "recorders.h"

namespace celerity::detail {
struct runtime_testspy;
}

namespace celerity::detail::divergence_checker_detail {
using task_hash = size_t;
using divergence_map = std::unordered_map<task_hash, std::vector<node_id>>;
Expand Down Expand Up @@ -67,34 +63,38 @@ class divergence_block_chain {
std::vector<task_record> m_task_records;
size_t m_tasks_checked = 0;
size_t m_hashes_added = 0;
task_hash m_last_hash = 0;

std::vector<int> m_per_node_hash_counts;
std::mutex m_task_records_mutex;

std::chrono::time_point<std::chrono::steady_clock> m_last_cleared = std::chrono::steady_clock::now();
std::chrono::seconds m_time_of_last_warning = std::chrono::seconds(0);

std::unique_ptr<communicator> m_communicator;

void divergence_out(const divergence_map& check_map, const int task_num);
void reprot_divergence(const divergence_map& check_map, const int task_num);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void reprot_divergence(const divergence_map& check_map, const int task_num);
void report_divergence(const divergence_map& check_map, const int task_num);


void add_new_hashes();
void clear(const int min_progress);
std::pair<int, int> collect_hash_counts();
per_node_task_hashes collect_hashes(const int min_hash_count) const;
divergence_map create_check_map(const per_node_task_hashes& task_hashes, const int task_num) const;
divergence_map create_divergence_map(const per_node_task_hashes& task_hashes, const int task_num) const;

void check_for_deadlock() const;
void check_for_deadlock();

static void log_node_divergences(const divergence_map& check_map, const int task_num);
static void log_node_divergences(const divergence_map& check_map, const int task_id);
static void log_task_record(const divergence_map& check_map, const task_record& task, const task_hash hash);
void log_task_record_once(const divergence_map& check_map, const int task_num);

void add_new_task(const task_record& task);
task_record thread_save_get_task_record(const size_t task_num);
};
}; // namespace celerity::detail::divergence_checker_detail

namespace celerity::detail {
class divergence_checker {
friend struct ::celerity::detail::runtime_testspy;
friend struct runtime_testspy;

public:
divergence_checker(task_recorder& task_recorder, std::unique_ptr<communicator> comm, bool test_mode = false)
Expand All @@ -111,6 +111,10 @@ class divergence_checker {
~divergence_checker() { stop(); }

private:
std::thread m_thread;
bool m_is_running = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m_is_running must be protected by a mutex!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just use an atomic.

divergence_checker_detail::divergence_block_chain m_block_chain;

void start() {
m_thread = std::thread(&divergence_checker::run, this);
m_is_running = true;
Expand All @@ -129,9 +133,5 @@ class divergence_checker {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

std::thread m_thread;
bool m_is_running = false;
divergence_block_chain m_block_chain;
};
}; // namespace celerity::detail::divergence_checker_detail
}; // namespace celerity::detail
2 changes: 1 addition & 1 deletion include/grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ template <int Dims>
struct std::hash<celerity::detail::region<Dims>> {
std::size_t operator()(const celerity::detail::region<Dims> r) {
std::size_t seed = 0;
for(auto box : r.get_boxes()) {
for(auto& box : r.get_boxes()) {
celerity::detail::utils::hash_combine(seed, std::hash<celerity::detail::box<Dims>>{}(box));
}
return seed;
Expand Down
7 changes: 4 additions & 3 deletions include/mpi_communicator.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once

#include <memory>

#include <mpi.h>
Expand All @@ -11,6 +12,8 @@ class mpi_communicator : public communicator {
mpi_communicator(MPI_Comm comm) : m_comm(comm) {}

private:
MPI_Comm m_comm;

void allgather_inplace_impl(std::byte* sendrecvbuf, const int sendrecvcount) override {
MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, sendrecvbuf, sendrecvcount, MPI_BYTE, m_comm);
};
Expand All @@ -32,7 +35,5 @@ class mpi_communicator : public communicator {
MPI_Comm_rank(m_comm, &rank);
return static_cast<node_id>(rank);
}

MPI_Comm m_comm;
};
} // namespace celerity::detail
} // namespace celerity::detail
2 changes: 1 addition & 1 deletion include/ranges.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ struct std::hash<celerity::detail::coordinate<Interface, Dims>> {
std::size_t operator()(const celerity::detail::coordinate<Interface, Dims>& r) const noexcept {
std::size_t seed = 0;
for(int i = 0; i < Dims; ++i) {
celerity::detail::utils::hash_combine(seed, std::hash<int>{}(r[i]));
celerity::detail::utils::hash_combine(seed, std::hash<size_t>{}(r[i]));
}
return seed;
};
Expand Down
9 changes: 5 additions & 4 deletions include/recorders.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ class task_recorder {
void record_task(const task& tsk);

void add_callback(task_callback callback);
void invoke_callbacks(const task_record& tsk) const;

const task_records& get_tasks() const { return m_recorded_tasks; }

private:
task_records m_recorded_tasks;
std::vector<task_callback> m_callbacks{};
const buffer_manager* m_buff_mngr;

void invoke_callbacks(const task_record& tsk) const;
};

// Command recording
Expand Down Expand Up @@ -104,16 +105,16 @@ struct command_record {

class command_recorder {
public:
using command_record = std::vector<command_record>;
using command_records = std::vector<command_record>;

command_recorder(const task_manager* task_mngr, const buffer_manager* buff_mngr = nullptr) : m_task_mngr(task_mngr), m_buff_mngr(buff_mngr) {}

void record_command(const abstract_command& com);

const command_record& get_commands() const { return m_recorded_commands; }
const command_records& get_commands() const { return m_recorded_commands; }

private:
command_record m_recorded_commands;
command_records m_recorded_commands;
const task_manager* m_task_mngr;
const buffer_manager* m_buff_mngr;
};
Expand Down
4 changes: 2 additions & 2 deletions include/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "command.h"
#include "config.h"
#include "device_queue.h"
#include "divergence_block_chain.h"
#include "divergence_checker.h"
#include "frame.h"
#include "host_queue.h"
#include "recorders.h"
Expand Down Expand Up @@ -102,7 +102,7 @@ namespace detail {
size_t m_num_nodes;
node_id m_local_nid;

std::unique_ptr<divergence_checker_detail::divergence_checker> m_divergence_check;
std::unique_ptr<divergence_checker> m_divergence_check;

// These management classes are only constructed on the master node.
std::unique_ptr<command_graph> m_cdag;
Expand Down
5 changes: 5 additions & 0 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ namespace detail {
const auto has_dry_run_nodes = parsed_and_validated_envs.get(env_dry_run_nodes);
if(has_dry_run_nodes) { m_dry_run_nodes = *has_dry_run_nodes; }

#if CELERITY_DIVERGENCE_CHECK
// divergence checker needs recording
m_recording = true;
#else
m_recording = parsed_and_validated_envs.get_or(env_recording, false);
#endif
Comment on lines +204 to +209
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we print a warning that recording is being force-enabled here? What about the user explicitly setting CELERITY_RECORDING=0? cc @PeterTh @fknorr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not a big fan of CELERITY_RECORDING as it exists for that very reason. The user does not care about DAGs being recorded, they care about divergence checks or graph printing, from which we can decide whether recording needs to be active or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, maybe we should just get rid of it in a small follow-up PR.

m_horizon_step = parsed_and_validated_envs.get(env_horizon_step);
m_horizon_max_parallelism = parsed_and_validated_envs.get(env_horizon_max_para);

Expand Down
Loading