Line data Source code
1 : // SPDX-License-Identifier: BSD-3-Clause
2 : // Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
3 : /// @file Accumulation functions for variables, based on transform.
4 : /// @author Simon Heybrock
5 : #pragma once
6 :
7 : #include "scipp/variable/shape.h"
8 : #include "scipp/variable/transform.h"
9 : #include "scipp/variable/variable_factory.h"
10 :
11 : namespace scipp::variable {
12 :
13 : namespace detail {
14 : template <class... Ts, class Op, class Var, class... Other>
15 52654 : static void do_accumulate(const std::tuple<Ts...> &types, Op op,
16 : const std::string_view &name, Var &&var,
17 : const Other &...other) {
18 : // Bail out (no threading) if:
19 : // - `other` is implicitly broadcast
20 : // - `other` are small, to avoid overhead (important for groupby), limit set
21 : // by tuning BM_groupby_large_table
22 : // - reduction to scalar with more than 1 `other`
23 52654 : const bool binned_input = (is_bins(other) || ...);
24 52654 : const scipp::index small_input = binned_input ? 2 : 16384;
25 126672 : if ((!other.dims().includes(var.dims()) || ...) ||
26 126672 : ((other.dims().volume() < small_input) && ...) ||
27 2 : (sizeof...(other) != 1 && var.dims().ndim() == 0))
28 44016 : return in_place<false>::transform_data(types, op, name, var, other...);
29 :
30 26273 : const auto reduce_chunk = [&](auto &&out, const Slice &slice) {
31 : // A typical cache line has 64 Byte, which would fit, e.g., 8 doubles. If
32 : // multiple threads write to different elements in the same cache lines we
33 : // have "false sharing", with a severe negative performance impact. 128 is a
34 : // somewhat arbitrary limit at which we can consider it unlikely that two
35 : // threads would frequently run into falsely shared elements. May be need
36 : // further tuning.
37 17629 : const bool avoid_false_sharing = out.dims().volume() < 128;
38 17629 : auto tmp = avoid_false_sharing ? copy(out) : out;
39 52887 : [&](const auto &...args) { // force slices to const, avoid readonly issues
40 17629 : in_place<false>::transform_data(types, op, name, tmp, args...);
41 17629 : }(other.slice(slice)...);
42 17629 : if (avoid_false_sharing)
43 16899 : copy(tmp, out);
44 17629 : };
45 :
46 : // TODO The parallelism could be improved for cases where the output has more
47 : // than one dimension, e.g., by flattening the output's dims in all inputs.
48 : // However, it is nontrivial to detect whether calling `flatten` on `other` is
49 : // possible without copies so this is not implemented at this point.
50 30769 : const auto accumulate_parallel = [&]() {
51 7375 : const auto dim = *var.dims().begin();
52 36875 : const auto reduce = [&](const auto &range) {
53 7375 : const Slice slice(dim, range.begin(), range.end());
54 7375 : reduce_chunk(var.slice(slice), slice);
55 : };
56 7375 : const auto size = var.dims()[dim];
57 7375 : core::parallel::parallel_for(core::parallel::blocked_range(0, size),
58 : reduce);
59 : };
60 : if constexpr (sizeof...(other) == 1) {
61 8644 : const bool reduce_outer =
62 8644 : (!var.dims().contains(other.dims().labels().front()) || ...);
63 : // This value is found from benchmarks reducing the outer dimension. Making
64 : // it larger can improve parallelism further, but increases the overhead
65 : // from copies. May need further tuning.
66 8644 : constexpr scipp::index chunking_limit = 65536;
67 8672 : if (var.dims().ndim() == 0 ||
68 8700 : (reduce_outer && var.dims()[*var.dims().begin()] < chunking_limit)) {
69 : // For small output sizes, especially with reduction along the outer
70 : // dimension, threading via the output's dimension does not provide
71 : // significant speedup, mainly due to partially transposed memory access
72 : // patterns. We thus chunk based on the input's dimension, for a 5x
73 : // speedup in many cases.
74 1269 : const auto outer_dim = (*other.dims().begin(), ...);
75 1269 : const auto outer_size = (other.dims()[outer_dim], ...);
76 1269 : const auto nchunk = std::min(scipp::index(24), outer_size);
77 1269 : const auto chunk_size = (outer_size + nchunk - 1) / nchunk;
78 : // The threading approach in used here is possible only under the
79 : // assumption that op(var, broadcast(var, ...)) leaves var unchanged. This
80 : // is true for "idempotent" *operations* such as `min` and `max` as well
81 : // as for the output (initial) *values* used for, e.g., `sum` (zero).
82 : // However there are situations where *neither* is the case, most notably
83 : // groupby(...).sum which calls accumulate multiple times *with the same
84 : // output*.
85 : // We could still support threading in such cases if the caller can
86 : // provide an initial value to use for initializing the output buffer
87 : // (instead of a broadcast of the output). For now we simply bail out if
88 : // we detect non-idempotent initial values.
89 1269 : auto v = copy(var);
90 1269 : if (in_place<false>::transform_data(types, op, name, v, var); var != v)
91 6 : return in_place<false>::transform_data(types, op, name, var, other...);
92 2526 : v = copy(
93 1263 : broadcast(var, merge({Dim::InternalAccumulate, nchunk}, var.dims())));
94 2526 : const auto reduce = [&](const auto &range) {
95 11517 : for (scipp::index i = range.begin(); i < range.end(); ++i) {
96 10254 : const Slice slice(outer_dim, std::min(i * chunk_size, outer_size),
97 10254 : std::min((i + 1) * chunk_size, outer_size));
98 10254 : reduce_chunk(v.slice({Dim::InternalAccumulate, i}), slice);
99 : }
100 : };
101 1263 : core::parallel::parallel_for(core::parallel::blocked_range(0, nchunk, 1),
102 : reduce);
103 1263 : in_place<false>::transform_data(types, op, name, var, v);
104 1269 : } else {
105 7375 : accumulate_parallel();
106 : }
107 : } else {
108 0 : accumulate_parallel();
109 : }
110 : }
111 :
112 : template <class... Ts, class Op, class Var, class... Other>
113 91626 : static void accumulate(const std::tuple<Ts...> &types, Op op,
114 : const std::string_view &name, Var &&var,
115 : Other &&...other) {
116 : // `other` not const, threading for cumulative ops not possible
117 : if constexpr ((!std::is_const_v<std::remove_reference_t<Other>> || ...))
118 38972 : return in_place<false>::transform_data(types, op, name, var, other...);
119 : else
120 52654 : do_accumulate(types, op, name, std::forward<Var>(var), other...);
121 52652 : }
122 :
123 : } // namespace detail
124 :
125 : /// Accumulate data elements of a variable in-place.
126 : ///
127 : /// This is equivalent to `transform_in_place`, with the only difference that
128 : /// the dimension check of the inputs is reversed. That is, it must be possible
129 : /// to broadcast the dimension of the first argument to that of the other
130 : /// argument. As a consequence, the operation may be applied multiple times to
131 : /// the same output element, effectively accumulating the result.
132 : ///
133 : /// WARNING: In contrast to the transform algorithms, accumulate does not touch
134 : /// the unit, since it would be hard to track, e.g., in multiplication
135 : /// operations.
136 : template <class... Ts, class Var, class Other, class Op>
137 79773 : void accumulate_in_place(Var &&var, Other &&other, Op op,
138 : const std::string_view &name) {
139 : // Note lack of dims check here and below: transform_data calls `merge` on the
140 : // dims which does the required checks, supporting broadcasting of outputs and
141 : // inputs but ensuring compatibility otherwise.
142 79773 : detail::accumulate(type_tuples<Ts...>(op), op, name, std::forward<Var>(var),
143 : other);
144 79771 : }
145 :
146 : template <class... Ts, class Var, class Op>
147 10683 : void accumulate_in_place(Var &&var, const Variable &var1, const Variable &var2,
148 : Op op, const std::string_view &name) {
149 10683 : detail::accumulate(type_tuples<Ts...>(op), op, name, std::forward<Var>(var),
150 : var1, var2);
151 10683 : }
152 :
153 : template <class... Ts, class Var, class Op>
154 1170 : void accumulate_in_place(Var &&var, Variable &var1, const Variable &var2,
155 : const Variable &var3, Op op,
156 : const std::string_view &name) {
157 1170 : detail::accumulate(type_tuples<Ts...>(op), op, name, std::forward<Var>(var),
158 : var1, var2, var3);
159 1170 : }
160 :
161 : } // namespace scipp::variable
|