LCOV - code coverage report
Current view: top level - variable/include/scipp/variable - accumulate.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 59 60 98.3 %
Date: 2024-04-28 01:25:40 Functions: 84 104 80.8 %

          Line data    Source code
       1             : // SPDX-License-Identifier: BSD-3-Clause
       2             : // Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
       3             : /// @file Accumulation functions for variables, based on transform.
       4             : /// @author Simon Heybrock
       5             : #pragma once
       6             : 
       7             : #include "scipp/variable/shape.h"
       8             : #include "scipp/variable/transform.h"
       9             : #include "scipp/variable/variable_factory.h"
      10             : 
      11             : namespace scipp::variable {
      12             : 
      13             : namespace detail {
      14             : template <class... Ts, class Op, class Var, class... Other>
      15       52654 : static void do_accumulate(const std::tuple<Ts...> &types, Op op,
      16             :                           const std::string_view &name, Var &&var,
      17             :                           const Other &...other) {
      18             :   // Bail out (no threading) if:
      19             :   // - `other` is implicitly broadcast
      20             :   // - `other` are small, to avoid overhead (important for groupby), limit set
      21             :   //   by tuning BM_groupby_large_table
      22             :   // - reduction to scalar with more than 1 `other`
      23       52654 :   const bool binned_input = (is_bins(other) || ...);
      24       52654 :   const scipp::index small_input = binned_input ? 2 : 16384;
      25      126672 :   if ((!other.dims().includes(var.dims()) || ...) ||
      26      126672 :       ((other.dims().volume() < small_input) && ...) ||
      27           2 :       (sizeof...(other) != 1 && var.dims().ndim() == 0))
      28       44016 :     return in_place<false>::transform_data(types, op, name, var, other...);
      29             : 
      30       26273 :   const auto reduce_chunk = [&](auto &&out, const Slice &slice) {
      31             :     // A typical cache line has 64 Byte, which would fit, e.g., 8 doubles. If
      32             :     // multiple threads write to different elements in the same cache lines we
      33             :     // have "false sharing", with a severe negative performance impact. 128 is a
      34             :     // somewhat arbitrary limit at which we can consider it unlikely that two
      35             :     // threads would frequently run into falsely shared elements. May be need
      36             :     // further tuning.
      37       17629 :     const bool avoid_false_sharing = out.dims().volume() < 128;
      38       17629 :     auto tmp = avoid_false_sharing ? copy(out) : out;
      39       52887 :     [&](const auto &...args) { // force slices to const, avoid readonly issues
      40       17629 :       in_place<false>::transform_data(types, op, name, tmp, args...);
      41       17629 :     }(other.slice(slice)...);
      42       17629 :     if (avoid_false_sharing)
      43       16899 :       copy(tmp, out);
      44       17629 :   };
      45             : 
      46             :   // TODO The parallelism could be improved for cases where the output has more
      47             :   // than one dimension, e.g., by flattening the output's dims in all inputs.
      48             :   // However, it is nontrivial to detect whether calling `flatten` on `other` is
      49             :   // possible without copies so this is not implemented at this point.
      50       30769 :   const auto accumulate_parallel = [&]() {
      51        7375 :     const auto dim = *var.dims().begin();
      52       36875 :     const auto reduce = [&](const auto &range) {
      53        7375 :       const Slice slice(dim, range.begin(), range.end());
      54        7375 :       reduce_chunk(var.slice(slice), slice);
      55             :     };
      56        7375 :     const auto size = var.dims()[dim];
      57        7375 :     core::parallel::parallel_for(core::parallel::blocked_range(0, size),
      58             :                                  reduce);
      59             :   };
      60             :   if constexpr (sizeof...(other) == 1) {
      61        8644 :     const bool reduce_outer =
      62        8644 :         (!var.dims().contains(other.dims().labels().front()) || ...);
      63             :     // This value is found from benchmarks reducing the outer dimension. Making
      64             :     // it larger can improve parallelism further, but increases the overhead
      65             :     // from copies. May need further tuning.
      66        8644 :     constexpr scipp::index chunking_limit = 65536;
      67        8672 :     if (var.dims().ndim() == 0 ||
      68        8700 :         (reduce_outer && var.dims()[*var.dims().begin()] < chunking_limit)) {
      69             :       // For small output sizes, especially with reduction along the outer
      70             :       // dimension, threading via the output's dimension does not provide
      71             :       // significant speedup, mainly due to partially transposed memory access
      72             :       // patterns. We thus chunk based on the input's dimension, for a 5x
      73             :       // speedup in many cases.
      74        1269 :       const auto outer_dim = (*other.dims().begin(), ...);
      75        1269 :       const auto outer_size = (other.dims()[outer_dim], ...);
      76        1269 :       const auto nchunk = std::min(scipp::index(24), outer_size);
      77        1269 :       const auto chunk_size = (outer_size + nchunk - 1) / nchunk;
      78             :       // The threading approach in used here is possible only under the
      79             :       // assumption that op(var, broadcast(var, ...)) leaves var unchanged. This
      80             :       // is true for "idempotent" *operations* such as `min` and `max` as well
      81             :       // as for the output (initial) *values* used for, e.g., `sum` (zero).
      82             :       // However there are situations where *neither* is the case, most notably
      83             :       // groupby(...).sum which calls accumulate multiple times *with the same
      84             :       // output*.
      85             :       // We could still support threading in such cases if the caller can
      86             :       // provide an initial value to use for initializing the output buffer
      87             :       // (instead of a broadcast of the output). For now we simply bail out if
      88             :       // we detect non-idempotent initial values.
      89        1269 :       auto v = copy(var);
      90        1269 :       if (in_place<false>::transform_data(types, op, name, v, var); var != v)
      91           6 :         return in_place<false>::transform_data(types, op, name, var, other...);
      92        2526 :       v = copy(
      93        1263 :           broadcast(var, merge({Dim::InternalAccumulate, nchunk}, var.dims())));
      94        2526 :       const auto reduce = [&](const auto &range) {
      95       11517 :         for (scipp::index i = range.begin(); i < range.end(); ++i) {
      96       10254 :           const Slice slice(outer_dim, std::min(i * chunk_size, outer_size),
      97       10254 :                             std::min((i + 1) * chunk_size, outer_size));
      98       10254 :           reduce_chunk(v.slice({Dim::InternalAccumulate, i}), slice);
      99             :         }
     100             :       };
     101        1263 :       core::parallel::parallel_for(core::parallel::blocked_range(0, nchunk, 1),
     102             :                                    reduce);
     103        1263 :       in_place<false>::transform_data(types, op, name, var, v);
     104        1269 :     } else {
     105        7375 :       accumulate_parallel();
     106             :     }
     107             :   } else {
     108           0 :     accumulate_parallel();
     109             :   }
     110             : }
     111             : 
     112             : template <class... Ts, class Op, class Var, class... Other>
     113       91626 : static void accumulate(const std::tuple<Ts...> &types, Op op,
     114             :                        const std::string_view &name, Var &&var,
     115             :                        Other &&...other) {
     116             :   // `other` not const, threading for cumulative ops not possible
     117             :   if constexpr ((!std::is_const_v<std::remove_reference_t<Other>> || ...))
     118       38972 :     return in_place<false>::transform_data(types, op, name, var, other...);
     119             :   else
     120       52654 :     do_accumulate(types, op, name, std::forward<Var>(var), other...);
     121       52652 : }
     122             : 
     123             : } // namespace detail
     124             : 
     125             : /// Accumulate data elements of a variable in-place.
     126             : ///
     127             : /// This is equivalent to `transform_in_place`, with the only difference that
     128             : /// the dimension check of the inputs is reversed. That is, it must be possible
     129             : /// to broadcast the dimension of the first argument to that of the other
     130             : /// argument. As a consequence, the operation may be applied multiple times to
     131             : /// the same output element, effectively accumulating the result.
     132             : ///
     133             : /// WARNING: In contrast to the transform algorithms, accumulate does not touch
     134             : /// the unit, since it would be hard to track, e.g., in multiplication
     135             : /// operations.
     136             : template <class... Ts, class Var, class Other, class Op>
     137       79773 : void accumulate_in_place(Var &&var, Other &&other, Op op,
     138             :                          const std::string_view &name) {
     139             :   // Note lack of dims check here and below: transform_data calls `merge` on the
     140             :   // dims which does the required checks, supporting broadcasting of outputs and
     141             :   // inputs but ensuring compatibility otherwise.
     142       79773 :   detail::accumulate(type_tuples<Ts...>(op), op, name, std::forward<Var>(var),
     143             :                      other);
     144       79771 : }
     145             : 
     146             : template <class... Ts, class Var, class Op>
     147       10683 : void accumulate_in_place(Var &&var, const Variable &var1, const Variable &var2,
     148             :                          Op op, const std::string_view &name) {
     149       10683 :   detail::accumulate(type_tuples<Ts...>(op), op, name, std::forward<Var>(var),
     150             :                      var1, var2);
     151       10683 : }
     152             : 
     153             : template <class... Ts, class Var, class Op>
     154        1170 : void accumulate_in_place(Var &&var, Variable &var1, const Variable &var2,
     155             :                          const Variable &var3, Op op,
     156             :                          const std::string_view &name) {
     157        1170 :   detail::accumulate(type_tuples<Ts...>(op), op, name, std::forward<Var>(var),
     158             :                      var1, var2, var3);
     159        1170 : }
     160             : 
     161             : } // namespace scipp::variable

Generated by: LCOV version 1.14