Line data Source code
1 : /** 2 : * \file BaseFilter.cpp 3 : */ 4 : 5 : #include "BaseFilter.h" 6 : #include <ATK/Core/Utilities.h> 7 : 8 : #if ATK_USE_THREADPOOL == 1 9 : #include <tbb/task_group.h> 10 : #endif 11 : 12 : #include <cassert> 13 : #include <cstdint> 14 : #include <stdexcept> 15 : 16 : #if ATK_PROFILING == 1 17 : #include <iostream> 18 : #endif 19 : 20 : namespace ATK 21 : { 22 2304 : BaseFilter::BaseFilter(gsl::index nb_input_ports, gsl::index nb_output_ports) 23 : :nb_input_ports(nb_input_ports), nb_output_ports(nb_output_ports), 24 2304 : connections(nb_input_ports, std::make_pair(-1, nullptr)), 25 4608 : input_mandatory_connection(nb_input_ports) 26 : { 27 2304 : } 28 : 29 35 : BaseFilter::BaseFilter(BaseFilter&& other) noexcept 30 35 : :nb_input_ports(other.nb_input_ports), nb_output_ports(other.nb_output_ports), input_sampling_rate(other.input_sampling_rate), output_sampling_rate(other.output_sampling_rate), connections(std::move(other.connections)), input_delay(other.input_delay), output_delay(std::move(other.output_delay)), latency(std::move(other.latency)), input_mandatory_connection(std::move(other.input_mandatory_connection)), is_reset(std::move(other.is_reset)) 31 : { 32 35 : } 33 : 34 2339 : BaseFilter::~BaseFilter() 35 : { 36 : #if ATK_PROFILING == 1 37 : std::cerr << "Object of type " << class_name << std::endl; 38 : std::cerr << "Input conversion time " << std::chrono::duration_cast<std::chrono::microseconds>(input_conversion_time).count() << "us" << std::endl; 39 : std::cerr << "Output conversion time " << std::chrono::duration_cast<std::chrono::microseconds>(output_conversion_time).count() << "us" << std::endl; 40 : std::cerr << "Process time " << std::chrono::duration_cast<std::chrono::microseconds>(process_time).count() << "us" << std::endl; 41 : #endif 42 2339 : } 43 : 44 61846 : void BaseFilter::reset() 45 : { 46 61846 : if (!is_reset) 47 : { 48 99379 : for (auto it = connections.begin(); it != connections.end(); ++it) 49 : { 50 50160 : if (it->second) 51 : { 52 50159 : it->second->reset(); 53 : } 54 : } 55 49219 : is_reset = true; 56 : } 57 61846 : } 58 : 59 2497 : void BaseFilter::setup() 60 : { 61 : // Nothing to do by default 62 2497 : } 63 : 64 2276 : void BaseFilter::full_setup() 65 : { 66 : #if ATK_PROFILING == 1 67 : class_name = typeid(*this).name(); 68 : #endif 69 2276 : setup(); 70 2276 : } 71 : 72 1411 : void BaseFilter::set_input_port(gsl::index input_port, BaseFilter& filter, gsl::index output_port) 73 : { 74 1411 : if(output_port >= filter.nb_output_ports) 75 : { 76 1 : throw RuntimeError("Output port does not exist for this filter"); 77 : } 78 1410 : if(input_port < nb_input_ports) 79 : { 80 1409 : connections[input_port] = std::make_pair(output_port, &filter); 81 1409 : if(filter.get_output_sampling_rate() != get_input_sampling_rate()) 82 : { 83 1 : throw RuntimeError("Input sample rate from this filter must be equal to the output sample rate of the connected filter"); 84 : } 85 : } 86 : else 87 : { 88 1 : throw RuntimeError("Input port doesn't exist for this filter"); 89 : } 90 1408 : } 91 : 92 1251 : void BaseFilter::set_input_sampling_rate(gsl::index rate) 93 : { 94 1251 : input_sampling_rate = rate; 95 1251 : if(output_sampling_rate == 0) 96 : { 97 1237 : output_sampling_rate = rate; 98 : } 99 1251 : full_setup(); 100 1251 : } 101 : 102 1441 : gsl::index BaseFilter::get_input_sampling_rate() const 103 : { 104 1441 : return input_sampling_rate; 105 : } 106 : 107 1180 : void BaseFilter::set_output_sampling_rate(gsl::index rate) 108 : { 109 1180 : output_sampling_rate = rate; 110 1180 : full_setup(); 111 1180 : } 112 : 113 1469 : gsl::index BaseFilter::get_output_sampling_rate() const 114 : { 115 1469 : return output_sampling_rate; 116 : } 117 : 118 3 : void BaseFilter::set_input_delay(gsl::index delay) 119 : { 120 3 : input_delay = delay; 121 3 : } 122 : 123 2 : gsl::index BaseFilter::get_input_delay() const 124 : { 125 2 : return input_delay; 126 : } 127 : 128 3 : void BaseFilter::set_output_delay(gsl::index delay) 129 : { 130 3 : output_delay = delay; 131 3 : } 132 : 133 50161 : gsl::index BaseFilter::get_output_delay() const 134 : { 135 50161 : return output_delay; 136 : } 137 : 138 11666 : void BaseFilter::process(gsl::index size) 139 : { 140 11666 : reset(); 141 11666 : process_conditionnally<true>(size); 142 11663 : } 143 : 144 1 : void BaseFilter::dryrun(gsl::index size) 145 : { 146 1 : reset(); 147 1 : process_conditionnally<false>(size); 148 1 : } 149 : 150 : #if ATK_USE_THREADPOOL == 1 151 : void BaseFilter::process_parallel(gsl::index size) 152 : { 153 : reset(); 154 : process_conditionnally_parallel(size); 155 : } 156 : #endif 157 : 158 : template<bool must_process> 159 61846 : void BaseFilter::process_conditionnally(gsl::index size) 160 : { 161 61846 : if(size == 0) 162 : { 163 1 : return; 164 : } 165 61845 : if(output_sampling_rate == 0) 166 : { 167 1 : throw RuntimeError("Output sampling rate is 0, must be non 0 to compute the needed size for filters processing"); 168 : } 169 61844 : if(!is_reset) 170 : { 171 12627 : return; 172 : } 173 99376 : for(size_t port = 0; port < connections.size(); ++port) 174 : { 175 50160 : if(connections[port].second == nullptr) 176 : { 177 1 : if(!input_mandatory_connection[port]) 178 : { 179 1 : throw RuntimeError("Input port " + std::to_string(port) + " is not connected"); 180 : } 181 : } 182 : else 183 : { 184 50159 : assert(output_sampling_rate); 185 50159 : connections[port].second->template process_conditionnally<must_process>(static_cast<uint64_t>(size) * input_sampling_rate / output_sampling_rate); 186 : } 187 : } 188 : #if ATK_PROFILING == 1 189 : auto timer = std::chrono::steady_clock::now(); 190 : #endif 191 49216 : prepare_process(static_cast<uint64_t>(size) * input_sampling_rate / output_sampling_rate); 192 : #if ATK_PROFILING == 1 193 : auto timer2 = std::chrono::steady_clock::now(); 194 : input_conversion_time += (timer2 - timer); 195 : timer = timer2; 196 : #endif 197 49215 : prepare_outputs(size); 198 : #if ATK_PROFILING == 1 199 : timer2 = std::chrono::steady_clock::now(); 200 : output_conversion_time += (timer2 - timer); 201 : timer = timer2; 202 : #endif 203 : if(must_process) 204 : { 205 49210 : process_impl(size); 206 : } 207 : #if ATK_PROFILING == 1 208 : timer2 = std::chrono::steady_clock::now(); 209 : process_time += (timer2 - timer); 210 : timer = timer2; 211 : #endif 212 49215 : is_reset = false; 213 49215 : last_size = size; 214 : } 215 : 216 : #if ATK_USE_THREADPOOL == 1 217 : void BaseFilter::process_conditionnally_parallel(gsl::index size) 218 : { 219 : if (size == 0) 220 : { 221 : return; 222 : } 223 : if (output_sampling_rate == 0) 224 : { 225 : throw RuntimeError("Output sampling rate is 0, must be non 0 to compute the needed size for filters processing"); 226 : } 227 : { // lock this entire loop, as we only want to do the processing if we are not reseted 228 : tbb::queuing_mutex::scoped_lock lock(mutex); 229 : if (!is_reset) 230 : { 231 : return; 232 : } 233 : tbb::task_group g; 234 : for(gsl::index port = 0; port < connections.size(); ++port) 235 : { 236 : if(connections[port].second == nullptr) 237 : { 238 : if(!input_mandatory_connection[port]) 239 : throw RuntimeError("Input port " + std::to_string(port) + " is not connected"); 240 : } 241 : else 242 : { 243 : assert(output_sampling_rate); 244 : auto filter = connections[port]; 245 : g.run([=]{filter->process_conditionnally_parallel(uint64_t(size) * input_sampling_rate / output_sampling_rate); }); 246 : } 247 : } 248 : g.wait(); 249 : #if ATK_PROFILING == 1 250 : boost::timer::cpu_timer timer; 251 : #endif 252 : prepare_process(size * input_sampling_rate / output_sampling_rate); 253 : #if ATK_PROFILING == 1 254 : boost::timer::cpu_times const input_elapsed_times(timer.elapsed()); 255 : input_conversion_time += (input_elapsed_times.system + input_elapsed_times.user); 256 : #endif 257 : prepare_outputs(size); 258 : #if ATK_PROFILING == 1 259 : boost::timer::cpu_times const output_elapsed_times(timer.elapsed()); 260 : output_conversion_time += (output_elapsed_times.system + output_elapsed_times.user); 261 : #endif 262 : process_impl(size); 263 : #if ATK_PROFILING == 1 264 : boost::timer::cpu_times const process_elapsed_times(timer.elapsed()); 265 : process_time += (process_elapsed_times.system + process_elapsed_times.user); 266 : #endif 267 : is_reset = false; 268 : last_size = size; 269 : } 270 : } 271 : #endif 272 : 273 1 : gsl::index BaseFilter::get_nb_input_ports() const 274 : { 275 1 : return nb_input_ports; 276 : } 277 : 278 4 : void BaseFilter::set_nb_input_ports(gsl::index nb_ports) 279 : { 280 4 : connections.resize(nb_ports, std::make_pair(-1, nullptr)); 281 4 : input_mandatory_connection.resize(nb_ports); 282 4 : nb_input_ports = nb_ports; 283 4 : } 284 : 285 0 : void BaseFilter::allow_inactive_connection(unsigned int port) 286 : { 287 0 : input_mandatory_connection[port] = true; 288 0 : } 289 : 290 1 : gsl::index BaseFilter::get_nb_output_ports() const 291 : { 292 1 : return nb_output_ports; 293 : } 294 : 295 16 : void BaseFilter::set_nb_output_ports(gsl::index nb_ports) 296 : { 297 16 : nb_output_ports = nb_ports; 298 16 : } 299 : 300 3 : void BaseFilter::set_latency(gsl::index latency) 301 : { 302 3 : this->latency = latency; 303 3 : } 304 : 305 2 : gsl::index BaseFilter::get_latency() const 306 : { 307 2 : return latency; 308 : } 309 : 310 7 : gsl::index BaseFilter::get_global_latency() const 311 : { 312 7 : gsl::index global_latency = 0; 313 10 : for(auto it = connections.begin(); it != connections.end(); ++it) 314 : { 315 4 : if(it->second == nullptr) 316 : { 317 1 : throw RuntimeError("Input port " + std::to_string(it - connections.begin()) + " is not connected"); 318 : } 319 : 320 3 : global_latency = std::max(global_latency, it->second->get_global_latency()); 321 : } 322 6 : return global_latency + latency; 323 : } 324 : 325 : }