Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
aggregator.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB__aggregator_H
18 #define __TBB__aggregator_H
19 
20 #define __TBB_aggregator_H_include_area
22 
23 #if !TBB_PREVIEW_AGGREGATOR
24 #error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h
25 #endif
26 
27 #include "atomic.h"
28 #include "tbb_profiling.h"
29 
30 namespace tbb {
31 namespace interface6 {
32 
33 using namespace tbb::internal;
34 
36  template<typename handler_type> friend class aggregator_ext;
37  uintptr_t status;
39 public:
40  enum aggregator_operation_status { agg_waiting=0, agg_finished };
41  aggregator_operation() : status(agg_waiting), my_next(NULL) {}
43  void start() { call_itt_notify(acquired, &status); }
45 
46  void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); }
49 };
50 
51 namespace internal {
52 
54  friend class basic_handler;
55  virtual void apply_body() = 0;
56 public:
58  virtual ~basic_operation_base() {}
59 };
60 
61 template<typename Body>
63  const Body& my_body;
64  void apply_body() __TBB_override { my_body(); }
65 public:
66  basic_operation(const Body& b) : basic_operation_base(), my_body(b) {}
67 };
68 
70 public:
72  void operator()(aggregator_operation* op_list) const {
73  while (op_list) {
74  // ITT note: &(op_list->status) tag is used to cover accesses to the operation data.
75  // The executing thread "acquires" the tag (see start()) and then performs
76  // the associated operation w/o triggering a race condition diagnostics.
77  // A thread that created the operation is waiting for its status (see execute_impl()),
78  // so when this thread is done with the operation, it will "release" the tag
79  // and update the status (see finish()) to give control back to the waiting thread.
80  basic_operation_base& request = static_cast<basic_operation_base&>(*op_list);
81  // IMPORTANT: need to advance op_list to op_list->next() before calling request.finish()
82  op_list = op_list->next();
83  request.start();
84  request.apply_body();
85  request.finish();
86  }
87  }
88 };
89 
90 } // namespace internal
91 
93 
95 template <typename handler_type>
97 public:
98  aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; }
99 
101 
102  void process(aggregator_operation *op) { execute_impl(*op); }
103 
104 protected:
109 
110  // ITT note: &(op.status) tag is used to cover accesses to this operation. This
111  // thread has created the operation, and now releases it so that the handler
112  // thread may handle the associated operation w/o triggering a race condition;
113  // thus this tag will be acquired just before the operation is handled in the
114  // handle_operations functor.
116  // insert the operation into the list
117  do {
118  // ITT may flag the following line as a race; it is a false positive:
119  // This is an atomic read; we don't provide itt_hide_load_word for atomics
120  op.my_next = res = mailbox; // NOT A RACE
121  } while (mailbox.compare_and_swap(&op, res) != res);
122  if (!res) { // first in the list; handle the operations
123  // ITT note: &mailbox tag covers access to the handler_busy flag, which this
124  // waiting handler thread will try to set before entering handle_operations.
125  call_itt_notify(acquired, &mailbox);
126  start_handle_operations();
127  __TBB_ASSERT(op.status, NULL);
128  }
129  else { // not first; wait for op to be ready
133  }
134  }
135 
136 
137 private:
139  atomic<aggregator_operation *> mailbox;
140 
142 
143  uintptr_t handler_busy;
144 
145  handler_type handle_operations;
146 
149  aggregator_operation *pending_operations;
150 
151  // ITT note: &handler_busy tag covers access to mailbox as it is passed
152  // between active and waiting handlers. Below, the waiting handler waits until
153  // the active handler releases, and the waiting handler acquires &handler_busy as
154  // it becomes the active_handler. The release point is at the end of this
155  // function, when all operations in mailbox have been handled by the
156  // owner of this aggregator.
157  call_itt_notify(prepare, &handler_busy);
158  // get handler_busy: only one thread can possibly spin here at a time
159  spin_wait_until_eq(handler_busy, uintptr_t(0));
160  call_itt_notify(acquired, &handler_busy);
161  // acquire fence not necessary here due to causality rule and surrounding atomics
162  __TBB_store_with_release(handler_busy, uintptr_t(1));
163 
164  // ITT note: &mailbox tag covers access to the handler_busy flag itself.
165  // Capturing the state of the mailbox signifies that handler_busy has been
166  // set and a new active handler will now process that list's operations.
167  call_itt_notify(releasing, &mailbox);
168  // grab pending_operations
169  pending_operations = mailbox.fetch_and_store(NULL);
170 
171  // handle all the operations
172  handle_operations(pending_operations);
173 
174  // release the handler
175  itt_store_word_with_release(handler_busy, uintptr_t(0));
176  }
177 };
178 
180 class aggregator : private aggregator_ext<internal::basic_handler> {
181 public:
182  aggregator() : aggregator_ext<internal::basic_handler>(internal::basic_handler()) {}
184 
186  template<typename Body>
187  void execute(const Body& b) {
189  this->execute_impl(op);
190  }
191 };
192 
193 } // namespace interface6
194 
195 using interface6::aggregator;
196 using interface6::aggregator_ext;
197 using interface6::aggregator_operation;
198 
199 } // namespace tbb
200 
202 #undef __TBB_aggregator_H_include_area
203 
204 #endif // __TBB__aggregator_H
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_override
Definition: tbb_stddef.h:240
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function h
The graph class.
Identifiers declared inside namespace internal should never be used directly by client code.
Definition: atomic.h:65
void itt_hide_store_word(T &dst, T src)
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Definition: tbb_machine.h:399
T itt_load_word_with_acquire(const tbb::atomic< T > &src)
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
T itt_hide_load_word(const T &src)
void call_itt_notify(notify_type, void *)
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:391
void set_next(aggregator_operation *n)
Definition: aggregator.h:48
aggregator_operation * my_next
Definition: aggregator.h:38
void finish()
Call finish when done handling this operation.
Definition: aggregator.h:46
aggregator_operation * next()
Definition: aggregator.h:47
void start()
Call start before handling this operation.
Definition: aggregator.h:43
void operator()(aggregator_operation *op_list) const
Definition: aggregator.h:72
Aggregator base class and expert interface.
Definition: aggregator.h:96
atomic< aggregator_operation * > mailbox
An atomically updated list (aka mailbox) of aggregator_operations.
Definition: aggregator.h:139
void start_handle_operations()
Trigger the handling of operations when the handler is free.
Definition: aggregator.h:148
aggregator_ext(const handler_type &h)
Definition: aggregator.h:98
void execute_impl(aggregator_operation &op)
Definition: aggregator.h:107
void process(aggregator_operation *op)
EXPERT INTERFACE: Enter a user-made operation into the aggregator's mailbox.
Definition: aggregator.h:102
uintptr_t handler_busy
Controls thread access to handle_operations.
Definition: aggregator.h:143
Basic aggregator interface.
Definition: aggregator.h:180
void execute(const Body &b)
BASIC INTERFACE: Enter a function for exclusive execution by the aggregator.
Definition: aggregator.h:187
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.