Line data Source code
1 : /** 2 : * \file PipelineGlobalSinkFilter.cpp 3 : */ 4 : 5 : #include "PipelineGlobalSinkFilter.h" 6 : #include <ATK/Core/Utilities.h> 7 : 8 : #if ATK_USE_THREADPOOL == 1 9 : #include <tbb/task_group.h> 10 : #endif 11 : 12 : #include <algorithm> 13 : 14 : namespace ATK 15 : { 16 13 : PipelineGlobalSinkFilter::PipelineGlobalSinkFilter() 17 13 : :Parent(0, 0) 18 : { 19 13 : } 20 : 21 20 : void PipelineGlobalSinkFilter::add_filter(gsl::not_null<BaseFilter*> filter) 22 : { 23 20 : if(std::find(filters.begin(), filters.end(), filter) == filters.end()) 24 : { 25 19 : filters.push_back(filter); 26 : } 27 : else 28 : { 29 1 : throw RuntimeError("Try to ad a filter that was aleady added"); 30 : } 31 19 : } 32 : 33 3 : void PipelineGlobalSinkFilter::remove_filter(gsl::not_null<const BaseFilter*> filter) 34 : { 35 3 : auto it = std::find(filters.begin(), filters.end(), filter); 36 3 : if(it != filters.end()) 37 : { 38 1 : filters.erase(it); 39 : } 40 : else 41 : { 42 2 : throw RuntimeError("Try to remove a filter that was not added"); 43 : } 44 1 : } 45 : 46 0 : int PipelineGlobalSinkFilter::get_type() const 47 : { 48 0 : return 0; // bogus 49 : } 50 : 51 1 : void PipelineGlobalSinkFilter::set_input_port(gsl::index input_port, gsl::not_null<BaseFilter*> filter, gsl::index output_port) 52 : { 53 1 : set_input_port(input_port, *filter, output_port); 54 0 : } 55 : 56 1 : void PipelineGlobalSinkFilter::set_input_port(gsl::index input_port, BaseFilter& filter, gsl::index output_port) 57 : { 58 1 : throw RuntimeError("This function must not be called on pipelines"); 59 : } 60 : 61 0 : void PipelineGlobalSinkFilter::set_parallel(bool parallel) 62 : { 63 0 : activate_parallel = parallel; 64 0 : } 65 : 66 9 : void PipelineGlobalSinkFilter::prepare_process(gsl::index size) 67 : { 68 : // Nothing to do 69 9 : } 70 : 71 9 : void PipelineGlobalSinkFilter::prepare_outputs(gsl::index size ) 72 : { 73 : // Nothing to do 74 9 : } 75 : 76 1 : void PipelineGlobalSinkFilter::dryrun(gsl::index size) 77 : { 78 3 : for(auto filter: filters) 79 : { 80 2 : filter->reset(); 81 : } 82 3 : for(auto filter: filters) 83 : { 84 2 : filter->process_conditionnally<false>(uint64_t(size) * filter->get_output_sampling_rate() / input_sampling_rate); 85 : } 86 1 : } 87 : 88 9 : void PipelineGlobalSinkFilter::process_impl(gsl::index size ) const 89 : { 90 27 : for(auto filter: filters) 91 : { 92 18 : filter->reset(); 93 : } 94 : #if ATK_USE_THREADPOOL == 1 95 : if (activate_parallel) 96 : { 97 : tbb::task_group g; 98 : for(auto filter: filters) 99 : { 100 : g.run([=] { 101 : filter->process_conditionnally<true>( 102 : uint64_t(size) * (*it)->get_output_sampling_rate() / input_sampling_rate); 103 : }); 104 : } 105 : g.wait(); 106 : return; 107 : } 108 : #endif 109 27 : for(auto filter: filters) 110 : { 111 18 : filter->process_conditionnally<true>(uint64_t(size) * filter->get_output_sampling_rate() / input_sampling_rate); 112 : } 113 9 : } 114 : 115 : }