concurrent_queue.h

00001 /*
00002     Copyright 2005-2010 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_concurrent_queue_H
00022 #define __TBB_concurrent_queue_H
00023 
00024 #include "_concurrent_queue_internal.h"
00025 
00026 namespace tbb {
00027 
00028 namespace strict_ppl {
00029 
00031 
00034 template<typename T, typename A = cache_aligned_allocator<T> > 
00035 class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
00036     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00037 
00039     typedef typename A::template rebind<char>::other page_allocator_type;
00040     page_allocator_type my_allocator;
00041 
00043     /*overide*/ virtual void *allocate_block( size_t n ) {
00044         void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
00045         if( !b )
00046             internal::throw_exception(internal::eid_bad_alloc); 
00047         return b;
00048     }
00049 
00051     /*override*/ virtual void deallocate_block( void *b, size_t n ) {
00052         my_allocator.deallocate( reinterpret_cast<char*>(b), n );
00053     }
00054 
00055 public:
00057     typedef T value_type;
00058 
00060     typedef T& reference;
00061 
00063     typedef const T& const_reference;
00064 
00066     typedef size_t size_type;
00067 
00069     typedef ptrdiff_t difference_type;
00070 
00072     typedef A allocator_type;
00073 
00075     explicit concurrent_queue(const allocator_type& a = allocator_type()) : 
00076         internal::concurrent_queue_base_v3<T>( sizeof(T) ), my_allocator( a )
00077     {
00078     }
00079 
00081     template<typename InputIterator>
00082     concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
00083         internal::concurrent_queue_base_v3<T>( sizeof(T) ), my_allocator( a )
00084     {
00085         for( ; begin != end; ++begin )
00086             internal_push(&*begin);
00087     }
00088     
00090     concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) : 
00091         internal::concurrent_queue_base_v3<T>( sizeof(T) ), my_allocator( a )
00092     {
00093         assign( src );
00094     }
00095     
00097     ~concurrent_queue();
00098 
00100     void push( const T& source ) {
00101         internal_push( &source );
00102     }
00103 
00105 
00107     bool try_pop( T& result ) {
00108         return internal_try_pop( &result );
00109     }
00110 
00112     size_type unsafe_size() const {return this->internal_size();}
00113 
00115     bool empty() const {return this->internal_empty();}
00116 
00118     void clear() ;
00119 
00121     allocator_type get_allocator() const { return this->my_allocator; }
00122 
00123     typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
00124     typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
00125 
00126     //------------------------------------------------------------------------
00127     // The iterators are intended only for debugging.  They are slow and not thread safe.
00128     //------------------------------------------------------------------------
00129     iterator unsafe_begin() {return iterator(*this);}
00130     iterator unsafe_end() {return iterator();}
00131     const_iterator unsafe_begin() const {return const_iterator(*this);}
00132     const_iterator unsafe_end() const {return const_iterator();}
00133 } ;
00134 
00135 template<typename T, class A>
00136 concurrent_queue<T,A>::~concurrent_queue() {
00137     clear();
00138     this->internal_finish_clear();
00139 }
00140 
00141 template<typename T, class A>
00142 void concurrent_queue<T,A>::clear() {
00143     while( !empty() ) {
00144         T value;
00145         internal_try_pop(&value);
00146     }
00147 }
00148 
00149 } // namespace strict_ppl
00150     
00152 
00157 template<typename T, class A = cache_aligned_allocator<T> >
00158 class concurrent_bounded_queue: public internal::concurrent_queue_base_v3 {
00159     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00160 
00162     typedef typename A::template rebind<char>::other page_allocator_type;
00163     page_allocator_type my_allocator;
00164 
00166     class destroyer: internal::no_copy {
00167         T& my_value;
00168     public:
00169         destroyer( T& value ) : my_value(value) {}
00170         ~destroyer() {my_value.~T();}          
00171     };
00172 
00173     T& get_ref( page& page, size_t index ) {
00174         __TBB_ASSERT( index<items_per_page, NULL );
00175         return static_cast<T*>(static_cast<void*>(&page+1))[index];
00176     }
00177 
00178     /*override*/ virtual void copy_item( page& dst, size_t index, const void* src ) {
00179         new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); 
00180     }
00181 
00182     /*override*/ virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00183         new( &get_ref(dst,dindex) ) T( static_cast<const T*>(static_cast<const void*>(&src+1))[sindex] );
00184     }
00185 
00186     /*override*/ virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00187         T& from = get_ref(src,index);
00188         destroyer d(from);
00189         *static_cast<T*>(dst) = from;
00190     }
00191 
00192     /*overide*/ virtual page *allocate_page() {
00193         size_t n = sizeof(page) + items_per_page*item_size;
00194         page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
00195         if( !p )
00196             internal::throw_exception(internal::eid_bad_alloc); 
00197         return p;
00198     }
00199 
00200     /*override*/ virtual void deallocate_page( page *p ) {
00201         size_t n = sizeof(page) + items_per_page*item_size;
00202         my_allocator.deallocate( reinterpret_cast<char*>(p), n );
00203     }
00204 
00205 public:
00207     typedef T value_type;
00208 
00210     typedef A allocator_type;
00211 
00213     typedef T& reference;
00214 
00216     typedef const T& const_reference;
00217 
00219 
00221     typedef std::ptrdiff_t size_type;
00222 
00224     typedef std::ptrdiff_t difference_type;
00225 
00227     explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) : 
00228         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00229     {
00230     }
00231 
00233     concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type()) : 
00234         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00235     {
00236         assign( src );
00237     }
00238 
00240     template<typename InputIterator>
00241     concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
00242         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00243     {
00244         for( ; begin != end; ++begin )
00245             internal_push_if_not_full(&*begin);
00246     }
00247 
00249     ~concurrent_bounded_queue();
00250 
00252     void push( const T& source ) {
00253         internal_push( &source );
00254     }
00255 
00257 
00258     void pop( T& destination ) {
00259         internal_pop( &destination );
00260     }
00261 
00263 
00265     bool try_push( const T& source ) {
00266         return internal_push_if_not_full( &source );
00267     }
00268 
00270 
00272     bool try_pop( T& destination ) {
00273         return internal_pop_if_present( &destination );
00274     }
00275 
00277 
00280     size_type size() const {return internal_size();}
00281 
00283     bool empty() const {return internal_empty();}
00284 
00286     size_type capacity() const {
00287         return my_capacity;
00288     }
00289 
00291 
00293     void set_capacity( size_type capacity ) {
00294         internal_set_capacity( capacity, sizeof(T) );
00295     }
00296 
00298     allocator_type get_allocator() const { return this->my_allocator; }
00299 
00301     void clear() ;
00302 
00303     typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
00304     typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
00305 
00306     //------------------------------------------------------------------------
00307     // The iterators are intended only for debugging.  They are slow and not thread safe.
00308     //------------------------------------------------------------------------
00309     iterator unsafe_begin() {return iterator(*this);}
00310     iterator unsafe_end() {return iterator();}
00311     const_iterator unsafe_begin() const {return const_iterator(*this);}
00312     const_iterator unsafe_end() const {return const_iterator();}
00313 
00314 }; 
00315 
00316 template<typename T, class A>
00317 concurrent_bounded_queue<T,A>::~concurrent_bounded_queue() {
00318     clear();
00319     internal_finish_clear();
00320 }
00321 
00322 template<typename T, class A>
00323 void concurrent_bounded_queue<T,A>::clear() {
00324     while( !empty() ) {
00325         T value;
00326         internal_pop_if_present(&value);
00327     }
00328 }
00329 
00330 namespace deprecated {
00331 
00333 
00338 template<typename T, class A = cache_aligned_allocator<T> > 
00339 class concurrent_queue: public concurrent_bounded_queue<T,A> {
00340 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00341     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00342 #endif 
00343 
00344 public:
00346     explicit concurrent_queue(const A& a = A()) : 
00347         concurrent_bounded_queue<T,A>( a )
00348     {
00349     }
00350 
00352     concurrent_queue( const concurrent_queue& src, const A& a = A()) : 
00353         concurrent_bounded_queue<T,A>( src, a )
00354     {
00355     }
00356 
00358     template<typename InputIterator>
00359     concurrent_queue( InputIterator begin, InputIterator end, const A& a = A()) :
00360         concurrent_bounded_queue<T,A>( begin, end, a )
00361     {
00362     }
00363 
00365 
00367     bool push_if_not_full( const T& source ) {
00368         return try_push( source );
00369     }
00370 
00372 
00376     bool pop_if_present( T& destination ) {
00377         return try_pop( destination );
00378     }
00379 
00380     typedef typename concurrent_bounded_queue<T,A>::iterator iterator;
00381     typedef typename concurrent_bounded_queue<T,A>::const_iterator const_iterator;
00382     //
00383     //------------------------------------------------------------------------
00384     // The iterators are intended only for debugging.  They are slow and not thread safe.
00385     //------------------------------------------------------------------------
00386     iterator begin() {return this->unsafe_begin();}
00387     iterator end() {return this->unsafe_end();}
00388     const_iterator begin() const {return this->unsafe_begin();}
00389     const_iterator end() const {return this->unsafe_end();}
00390 }; 
00391 
00392 }
00393     
00394 
00395 #if TBB_DEPRECATED
00396 using deprecated::concurrent_queue;
00397 #else
00398 using strict_ppl::concurrent_queue;    
00399 #endif
00400 
00401 } // namespace tbb
00402 
00403 #endif /* __TBB_concurrent_queue_H */

Copyright © 2005-2009 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.