_concurrent_queue_internal.h

00001 /*
00002     Copyright 2005-2010 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_concurrent_queue_internal_H
00022 #define __TBB_concurrent_queue_internal_H
00023 
00024 #include "tbb_stddef.h"
00025 #include "tbb_machine.h"
00026 #include "atomic.h"
00027 #include "spin_mutex.h"
00028 #include "cache_aligned_allocator.h"
00029 #include "tbb_exception.h"
00030 #include <iterator>
00031 #include <new>
00032 
00033 namespace tbb {
00034 
00035 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00036 
00037 // forward declaration
00038 namespace strict_ppl {
00039 template<typename T, typename A> class concurrent_queue;
00040 }
00041 
00042 template<typename T, typename A> class concurrent_bounded_queue;
00043 
00044 namespace deprecated {
00045 template<typename T, typename A> class concurrent_queue;
00046 }
00047 #endif
00048 
00050 namespace strict_ppl {
00051 
00053 namespace internal {
00054 
00055 using namespace tbb::internal;
00056 
00057 typedef size_t ticket;
00058 
00059 template<typename T> class micro_queue ;
00060 template<typename T> class micro_queue_pop_finalizer ;
00061 template<typename T> class concurrent_queue_base_v3;
00062 
00064 
00067 struct concurrent_queue_rep_base : no_copy {
00068     template<typename T> friend class micro_queue;
00069     template<typename T> friend class concurrent_queue_base_v3;
00070 
00071 protected:
00073     static const size_t phi = 3;
00074 
00075 public:
00076     // must be power of 2
00077     static const size_t n_queue = 8;
00078 
00080     struct page {
00081         page* next;
00082         uintptr_t mask; 
00083     };
00084 
00085     atomic<ticket> head_counter;
00086     char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00087     atomic<ticket> tail_counter;
00088     char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00089 
00091     size_t items_per_page;
00092 
00094     size_t item_size;
00095 
00097     atomic<size_t> n_invalid_entries;
00098 
00099     char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
00100 } ;
00101 
00102 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
00103     return uintptr_t(p)>1;
00104 }
00105 
00107 
00110 class concurrent_queue_page_allocator
00111 {
00112     template<typename T> friend class micro_queue ;
00113     template<typename T> friend class micro_queue_pop_finalizer ;
00114 protected:
00115     virtual ~concurrent_queue_page_allocator() {}
00116 private:
00117     virtual concurrent_queue_rep_base::page* allocate_page() = 0;
00118     virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
00119 } ;
00120 
00121 #if _MSC_VER && !defined(__INTEL_COMPILER)
00122 // unary minus operator applied to unsigned type, result still unsigned
00123 #pragma warning( push )
00124 #pragma warning( disable: 4146 )
00125 #endif
00126 
00128 
00130 template<typename T>
00131 class micro_queue : no_copy {
00132     typedef concurrent_queue_rep_base::page page;
00133 
00135     class destroyer: no_copy {
00136         T& my_value;
00137     public:
00138         destroyer( T& value ) : my_value(value) {}
00139         ~destroyer() {my_value.~T();}          
00140     };
00141 
00142     T& get_ref( page& page, size_t index ) {
00143         return static_cast<T*>(static_cast<void*>(&page+1))[index];
00144     }
00145 
00146     void copy_item( page& dst, size_t index, const void* src ) {
00147         new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); 
00148     }
00149 
00150     void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00151         new( &get_ref(dst,dindex) ) T( static_cast<const T*>(static_cast<const void*>(&src+1))[sindex] );
00152     }
00153 
00154     void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00155         T& from = get_ref(src,index);
00156         destroyer d(from);
00157         *static_cast<T*>(dst) = from;
00158     }
00159 
00160     void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
00161 
00162 public:
00163     friend class micro_queue_pop_finalizer<T>;
00164 
00165     atomic<page*> head_page;
00166     atomic<ticket> head_counter;
00167 
00168     atomic<page*> tail_page;
00169     atomic<ticket> tail_counter;
00170 
00171     spin_mutex page_mutex;
00172     
00173     void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
00174 
00175     bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
00176 
00177     micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
00178 
00179     page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
00180 
00181     void invalidate_page_and_rethrow( ticket k ) ;
00182 };
00183 
00184 template<typename T>
00185 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
00186     atomic_backoff backoff;
00187     do {
00188         backoff.pause();
00189         if( counter&1 ) {
00190             ++rb.n_invalid_entries;
00191             throw_bad_last_alloc_exception_v4();
00192         }
00193     } while( counter!=k ) ;
00194 }
00195 
00196 template<typename T>
00197 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
00198     k &= -concurrent_queue_rep_base::n_queue;
00199     page* p = NULL;
00200     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00201     if( !index ) {
00202         try {
00203             concurrent_queue_page_allocator& pa = base;
00204             p = pa.allocate_page();
00205         } catch (...) {
00206             ++base.my_rep->n_invalid_entries;
00207             invalidate_page_and_rethrow( k );
00208         }
00209         p->mask = 0;
00210         p->next = NULL;
00211     }
00212     
00213     if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
00214         
00215     if( p ) {
00216         spin_mutex::scoped_lock lock( page_mutex );
00217         page* q = tail_page;
00218         if( is_valid_page(q) )
00219             q->next = p;
00220         else
00221             head_page = p; 
00222         tail_page = p;
00223     } else {
00224         p = tail_page;
00225     }
00226    
00227     try {
00228         copy_item( *p, index, item );
00229         // If no exception was thrown, mark item as present.
00230         p->mask |= uintptr_t(1)<<index;
00231         tail_counter += concurrent_queue_rep_base::n_queue; 
00232     } catch (...) {
00233         ++base.my_rep->n_invalid_entries;
00234         tail_counter += concurrent_queue_rep_base::n_queue; 
00235         throw;
00236     }
00237 }
00238 
00239 template<typename T>
00240 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
00241     k &= -concurrent_queue_rep_base::n_queue;
00242     if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
00243     if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
00244     page& p = *head_page;
00245     __TBB_ASSERT( &p, NULL );
00246     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00247     bool success = false; 
00248     {
00249         micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL ); 
00250         if( p.mask & uintptr_t(1)<<index ) {
00251             success = true;
00252             assign_and_destroy_item( dst, p, index );
00253         } else {
00254             --base.my_rep->n_invalid_entries;
00255         }
00256     }
00257     return success;
00258 }
00259 
00260 template<typename T>
00261 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
00262     head_counter = src.head_counter;
00263     tail_counter = src.tail_counter;
00264     page_mutex   = src.page_mutex;
00265 
00266     const page* srcp = src.head_page;
00267     if( is_valid_page(srcp) ) {
00268         ticket g_index = head_counter;
00269         try {
00270             size_t n_items  = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
00271             size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00272             size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
00273 
00274             head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
00275             page* cur_page = head_page;
00276 
00277             if( srcp != src.tail_page ) {
00278                 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
00279                     cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
00280                     cur_page = cur_page->next;
00281                 }
00282 
00283                 __TBB_ASSERT( srcp==src.tail_page, NULL );
00284                 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00285                 if( last_index==0 ) last_index = base.my_rep->items_per_page;
00286 
00287                 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
00288                 cur_page = cur_page->next;
00289             }
00290             tail_page = cur_page;
00291         } catch (...) {
00292             invalidate_page_and_rethrow( g_index );
00293         }
00294     } else {
00295         head_page = tail_page = NULL;
00296     }
00297     return *this;
00298 }
00299 
00300 template<typename T>
00301 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
00302     // Append an invalid page at address 1 so that no more pushes are allowed.
00303     page* invalid_page = (page*)uintptr_t(1);
00304     {
00305         spin_mutex::scoped_lock lock( page_mutex );
00306         tail_counter = k+concurrent_queue_rep_base::n_queue+1;
00307         page* q = tail_page;
00308         if( is_valid_page(q) )
00309             q->next = invalid_page;
00310         else
00311             head_page = invalid_page;
00312         tail_page = invalid_page;
00313     }
00314     throw;
00315 }
00316 
00317 template<typename T>
00318 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
00319     concurrent_queue_page_allocator& pa = base;
00320     page* new_page = pa.allocate_page();
00321     new_page->next = NULL;
00322     new_page->mask = src_page->mask;
00323     for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
00324         if( new_page->mask & uintptr_t(1)<<begin_in_page )
00325             copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
00326     return new_page;
00327 }
00328 
00329 template<typename T>
00330 class micro_queue_pop_finalizer: no_copy {
00331     typedef concurrent_queue_rep_base::page page;
00332     ticket my_ticket;
00333     micro_queue<T>& my_queue;
00334     page* my_page; 
00335     concurrent_queue_page_allocator& allocator;
00336 public:
00337     micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
00338         my_ticket(k), my_queue(queue), my_page(p), allocator(b)
00339     {}
00340     ~micro_queue_pop_finalizer() ;
00341 };
00342 
00343 template<typename T>
00344 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
00345     page* p = my_page;
00346     if( is_valid_page(p) ) {
00347         spin_mutex::scoped_lock lock( my_queue.page_mutex );
00348         page* q = p->next;
00349         my_queue.head_page = q;
00350         if( !is_valid_page(q) ) {
00351             my_queue.tail_page = NULL;
00352         }
00353     }
00354     my_queue.head_counter = my_ticket;
00355     if( is_valid_page(p) ) {
00356         allocator.deallocate_page( p );
00357     }
00358 }
00359 
00360 #if _MSC_VER && !defined(__INTEL_COMPILER)
00361 #pragma warning( pop )
00362 #endif // warning 4146 is back
00363 
00364 template<typename T> class concurrent_queue_iterator_rep ;
00365 template<typename T> class concurrent_queue_iterator_base_v3;
00366 
00368 
00371 template<typename T>
00372 struct concurrent_queue_rep : public concurrent_queue_rep_base {
00373     micro_queue<T> array[n_queue];
00374 
00376     static size_t index( ticket k ) {
00377         return k*phi%n_queue;
00378     }
00379 
00380     micro_queue<T>& choose( ticket k ) {
00381         // The formula here approximates LRU in a cache-oblivious way.
00382         return array[index(k)];
00383     }
00384 };
00385 
00387 
00391 template<typename T>
00392 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
00394     concurrent_queue_rep<T>* my_rep;
00395 
00396     friend struct concurrent_queue_rep<T>;
00397     friend class micro_queue<T>;
00398     friend class concurrent_queue_iterator_rep<T>;
00399     friend class concurrent_queue_iterator_base_v3<T>;
00400 
00401 protected:
00402     typedef typename concurrent_queue_rep<T>::page page;
00403 
00404 private:
00405     /* override */ virtual page *allocate_page() {
00406         concurrent_queue_rep<T>& r = *my_rep;
00407         size_t n = sizeof(page) + r.items_per_page*r.item_size;
00408         return reinterpret_cast<page*>(allocate_block ( n ));
00409     }
00410 
00411     /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
00412         concurrent_queue_rep<T>& r = *my_rep;
00413         size_t n = sizeof(page) + r.items_per_page*r.item_size;
00414         deallocate_block( reinterpret_cast<void*>(p), n );
00415     }
00416 
00418     virtual void *allocate_block( size_t n ) = 0;
00419 
00421     virtual void deallocate_block( void *p, size_t n ) = 0;
00422 
00423 protected:
00424     concurrent_queue_base_v3( size_t item_size ) ;
00425 
00426     /* override */ virtual ~concurrent_queue_base_v3() {
00427 #if __TBB_USE_ASSERT
00428         size_t nq = my_rep->n_queue;
00429         for( size_t i=0; i<nq; i++ )
00430             __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
00431 #endif /* __TBB_USE_ASSERT */
00432         cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
00433     }
00434 
00436     void internal_push( const void* src ) {
00437         concurrent_queue_rep<T>& r = *my_rep;
00438         ticket k = r.tail_counter++;
00439         r.choose(k).push( src, k, *this );
00440     }
00441 
00443 
00444     bool internal_try_pop( void* dst ) ;
00445 
00447     size_t internal_size() const ;
00448 
00450     bool internal_empty() const ;
00451 
00453     /* note that the name may be misleading, but it remains so due to a historical accident. */
00454     void internal_finish_clear() ;
00455 
00457     void internal_throw_exception() const {
00458         throw_exception( eid_bad_alloc );
00459     }
00460 
00462     void assign( const concurrent_queue_base_v3& src ) ;
00463 };
00464 
00465 template<typename T>
00466 concurrent_queue_base_v3<T>::concurrent_queue_base_v3( size_t item_size ) {
00467     my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
00468     __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
00469     __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
00470     __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
00471     __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
00472     memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
00473     my_rep->item_size = item_size;
00474     my_rep->items_per_page = item_size<=8 ? 32 :
00475                              item_size<=16 ? 16 : 
00476                              item_size<=32 ? 8 :
00477                              item_size<=64 ? 4 :
00478                              item_size<=128 ? 2 :
00479                              1;
00480 }
00481 
00482 template<typename T>
00483 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
00484     concurrent_queue_rep<T>& r = *my_rep;
00485     ticket k;
00486     do {
00487         k = r.head_counter;
00488         for(;;) {
00489             if( r.tail_counter<=k ) {
00490                 // Queue is empty 
00491                 return false;
00492             }
00493             // Queue had item with ticket k when we looked.  Attempt to get that item.
00494             ticket tk=k;
00495 #if defined(_MSC_VER) && defined(_Wp64)
00496     #pragma warning (push)
00497     #pragma warning (disable: 4267)
00498 #endif
00499             k = r.head_counter.compare_and_swap( tk+1, tk );
00500 #if defined(_MSC_VER) && defined(_Wp64)
00501     #pragma warning (pop)
00502 #endif
00503             if( k==tk )
00504                 break;
00505             // Another thread snatched the item, retry.
00506         }
00507     } while( !r.choose( k ).pop( dst, k, *this ) );
00508     return true;
00509 }
00510 
00511 template<typename T>
00512 size_t concurrent_queue_base_v3<T>::internal_size() const {
00513     concurrent_queue_rep<T>& r = *my_rep;
00514     __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
00515     ticket hc = r.head_counter;
00516     size_t nie = r.n_invalid_entries;
00517     ticket tc = r.tail_counter;
00518     __TBB_ASSERT( hc!=tc || !nie, NULL );
00519     ptrdiff_t sz = tc-hc-nie;
00520     return sz<0 ? 0 :  size_t(sz);
00521 }
00522 
00523 template<typename T>
00524 bool concurrent_queue_base_v3<T>::internal_empty() const {
00525     concurrent_queue_rep<T>& r = *my_rep;
00526     ticket tc = r.tail_counter;
00527     ticket hc = r.head_counter;
00528     // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
00529     return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
00530 }
00531 
00532 template<typename T>
00533 void concurrent_queue_base_v3<T>::internal_finish_clear() {
00534     concurrent_queue_rep<T>& r = *my_rep;
00535     size_t nq = r.n_queue;
00536     for( size_t i=0; i<nq; ++i ) {
00537         page* tp = r.array[i].tail_page;
00538         if( is_valid_page(tp) ) {
00539             __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
00540             deallocate_page( tp );
00541             r.array[i].tail_page = NULL;
00542         } else 
00543             __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
00544     }
00545 }
00546 
00547 template<typename T>
00548 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
00549     concurrent_queue_rep<T>& r = *my_rep;
00550     r.items_per_page = src.my_rep->items_per_page;
00551 
00552     // copy concurrent_queue_rep.
00553     r.head_counter = src.my_rep->head_counter;
00554     r.tail_counter = src.my_rep->tail_counter;
00555     r.n_invalid_entries = src.my_rep->n_invalid_entries;
00556 
00557     // copy micro_queues
00558     for( size_t i = 0; i<r.n_queue; ++i )
00559         r.array[i].assign( src.my_rep->array[i], *this);
00560 
00561     __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter, 
00562             "the source concurrent queue should not be concurrently modified." );
00563 }
00564 
00565 template<typename Container, typename Value> class concurrent_queue_iterator;
00566 
00567 template<typename T>
00568 class concurrent_queue_iterator_rep: no_assign {
00569 public:
00570     ticket head_counter;
00571     const concurrent_queue_base_v3<T>& my_queue;
00572     typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
00573     concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
00574         head_counter(queue.my_rep->head_counter),
00575         my_queue(queue)
00576     {
00577         for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
00578             array[k] = queue.my_rep->array[k].head_page;
00579     }
00580 
00582     bool get_item( void*& item, size_t k ) ;
00583 };
00584 
00585 template<typename T>
00586 bool concurrent_queue_iterator_rep<T>::get_item( void*& item, size_t k ) {
00587     if( k==my_queue.my_rep->tail_counter ) {
00588         item = NULL;
00589         return true;
00590     } else {
00591         typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
00592         __TBB_ASSERT(p,NULL);
00593         size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
00594         item = static_cast<unsigned char*>(static_cast<void*>(p+1)) + my_queue.my_rep->item_size*i;
00595         return (p->mask & uintptr_t(1)<<i)!=0;
00596     }
00597 }
00598 
00600 
00601 template<typename Value>
00602 class concurrent_queue_iterator_base_v3 : no_assign {
00604 
00605     concurrent_queue_iterator_rep<Value>* my_rep;
00606 
00607     template<typename C, typename T, typename U>
00608     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00609 
00610     template<typename C, typename T, typename U>
00611     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00612 protected:
00614     mutable void* my_item;
00615 
00616 public:
00618     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
00619 #if __GNUC__==4&&__GNUC_MINOR__==3
00620         // to get around a possible gcc 4.3 bug
00621         __asm__ __volatile__("": : :"memory");
00622 #endif
00623     }
00624 
00626     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00627         assign(i);
00628     }
00629 
00631     concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
00632 
00633 protected:
00635     void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
00636 
00638     void advance() ;
00639 
00641     ~concurrent_queue_iterator_base_v3() {
00642         cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00643         my_rep = NULL;
00644     }
00645 };
00646 
00647 template<typename Value>
00648 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
00649     my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00650     new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
00651     size_t k = my_rep->head_counter;
00652     if( !my_rep->get_item(my_item, k) ) advance();
00653 }
00654 
00655 template<typename Value>
00656 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
00657     if( my_rep!=other.my_rep ) {
00658         if( my_rep ) {
00659             cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00660             my_rep = NULL;
00661         }
00662         if( other.my_rep ) {
00663             my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00664             new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
00665         }
00666     }
00667     my_item = other.my_item;
00668 }
00669 
00670 template<typename Value>
00671 void concurrent_queue_iterator_base_v3<Value>::advance() {
00672     __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );  
00673     size_t k = my_rep->head_counter;
00674     const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
00675 #if TBB_USE_ASSERT
00676     void* tmp;
00677     my_rep->get_item(tmp,k);
00678     __TBB_ASSERT( my_item==tmp, NULL );
00679 #endif /* TBB_USE_ASSERT */
00680     size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
00681     if( i==queue.my_rep->items_per_page-1 ) {
00682         typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
00683         root = root->next;
00684     }
00685     // advance k
00686     my_rep->head_counter = ++k;
00687     if( !my_rep->get_item(my_item, k) ) advance();
00688 }
00689 
00690 template<typename T>
00691 static inline const concurrent_queue_base_v3<const T>& add_constness( const concurrent_queue_base_v3<T>& q )
00692 {
00693     return *reinterpret_cast<const concurrent_queue_base_v3<const T> *>(&q) ;
00694 }
00695 
00696 template<typename T>
00697 static inline const concurrent_queue_iterator_base_v3<const T>& add_constness( const concurrent_queue_iterator_base_v3<T>& q )
00698 {
00699     return *reinterpret_cast<const concurrent_queue_iterator_base_v3<const T> *>(&q) ;
00700 }
00701 
00703 
00705 template<typename Container, typename Value>
00706 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<Value>,
00707         public std::iterator<std::forward_iterator_tag,Value> {
00708 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00709     template<typename T, class A>
00710     friend class ::tbb::strict_ppl::concurrent_queue;
00711 #else
00712 public: // workaround for MSVC
00713 #endif 
00715     concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
00716         concurrent_queue_iterator_base_v3<Value>(queue)
00717     {
00718     }
00719 
00720     template<typename T>
00721     concurrent_queue_iterator( const concurrent_queue_base_v3<T>& other ) :
00722         concurrent_queue_iterator_base_v3<Value>(add_constness(other))
00723     {
00724     }
00725 
00726 public:
00727     concurrent_queue_iterator() {}
00728 
00730     concurrent_queue_iterator( const concurrent_queue_iterator<Container,Value>& other ) :
00731         concurrent_queue_iterator_base_v3<Value>(other)
00732     {
00733     }
00734 
00735     template<typename T>
00736     concurrent_queue_iterator( const concurrent_queue_iterator<Container,T>& other ) :
00737         concurrent_queue_iterator_base_v3<Value>(add_constness(other))
00738     {
00739     }
00740 
00742     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00743         assign(other);
00744         return *this;
00745     }
00746 
00748     Value& operator*() const {
00749         return *static_cast<Value*>(this->my_item);
00750     }
00751 
00752     Value* operator->() const {return &operator*();}
00753 
00755     concurrent_queue_iterator& operator++() {
00756         this->advance();
00757         return *this;
00758     }
00759 
00761     Value* operator++(int) {
00762         Value* result = &operator*();
00763         operator++();
00764         return result;
00765     }
00766 }; // concurrent_queue_iterator
00767 
00768 
00769 template<typename C, typename T, typename U>
00770 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00771     return i.my_item==j.my_item;
00772 }
00773 
00774 template<typename C, typename T, typename U>
00775 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00776     return i.my_item!=j.my_item;
00777 }
00778 
00779 } // namespace internal
00780 
00782 
00783 } // namespace strict_ppl
00784 
00786 namespace internal {
00787 
00788 class concurrent_queue_rep;
00789 class concurrent_queue_iterator_rep;
00790 class concurrent_queue_iterator_base_v3;
00791 template<typename Container, typename Value> class concurrent_queue_iterator;
00792 
00794 
00796 class concurrent_queue_base_v3: no_copy {
00798     concurrent_queue_rep* my_rep;
00799 
00800     friend class concurrent_queue_rep;
00801     friend struct micro_queue;
00802     friend class micro_queue_pop_finalizer;
00803     friend class concurrent_queue_iterator_rep;
00804     friend class concurrent_queue_iterator_base_v3;
00805 protected:
00807     struct page {
00808         page* next;
00809         uintptr_t mask; 
00810     };
00811 
00813     ptrdiff_t my_capacity;
00814    
00816     size_t items_per_page;
00817 
00819     size_t item_size;
00820 
00821 private:
00822     virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
00823     virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
00824 protected:
00825     __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
00826     virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
00827 
00829     void __TBB_EXPORTED_METHOD internal_push( const void* src );
00830 
00832     void __TBB_EXPORTED_METHOD internal_pop( void* dst );
00833 
00835     bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
00836 
00838 
00839     bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
00840 
00842     ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
00843 
00845     bool __TBB_EXPORTED_METHOD internal_empty() const;
00846 
00848     void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
00849 
00851     virtual page *allocate_page() = 0;
00852 
00854     virtual void deallocate_page( page *p ) = 0;
00855 
00857     /* note that the name may be misleading, but it remains so due to a historical accident. */
00858     void __TBB_EXPORTED_METHOD internal_finish_clear() ;
00859 
00861     void __TBB_EXPORTED_METHOD internal_throw_exception() const;
00862 
00864     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
00865 
00866 private:
00867     virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
00868 };
00869 
00871 
00872 class concurrent_queue_iterator_base_v3 {
00874 
00875     concurrent_queue_iterator_rep* my_rep;
00876 
00877     template<typename C, typename T, typename U>
00878     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00879 
00880     template<typename C, typename T, typename U>
00881     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00882 protected:
00884     mutable void* my_item;
00885 
00887     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
00888 
00890     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00891         assign(i);
00892     }
00893 
00895     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
00896 
00898     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
00899 
00901     void __TBB_EXPORTED_METHOD advance();
00902 
00904     __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
00905 };
00906 
00907 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
00908 
00910 
00912 template<typename Container, typename Value>
00913 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
00914         public std::iterator<std::forward_iterator_tag,Value> {
00915 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
00916     template<typename T, class A>
00917     friend class ::tbb::concurrent_bounded_queue;
00918 
00919     template<typename T, class A>
00920     friend class ::tbb::deprecated::concurrent_queue;
00921 #else
00922 public: // workaround for MSVC
00923 #endif 
00925     concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
00926         concurrent_queue_iterator_base_v3(queue)
00927     {
00928     }
00929 
00930 public:
00931     concurrent_queue_iterator() {}
00932 
00935     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00936         concurrent_queue_iterator_base_v3(other)
00937     {}
00938 
00940     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00941         assign(other);
00942         return *this;
00943     }
00944 
00946     Value& operator*() const {
00947         return *static_cast<Value*>(my_item);
00948     }
00949 
00950     Value* operator->() const {return &operator*();}
00951 
00953     concurrent_queue_iterator& operator++() {
00954         advance();
00955         return *this;
00956     }
00957 
00959     Value* operator++(int) {
00960         Value* result = &operator*();
00961         operator++();
00962         return result;
00963     }
00964 }; // concurrent_queue_iterator
00965 
00966 
00967 template<typename C, typename T, typename U>
00968 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00969     return i.my_item==j.my_item;
00970 }
00971 
00972 template<typename C, typename T, typename U>
00973 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00974     return i.my_item!=j.my_item;
00975 }
00976 
00977 } // namespace internal;
00978 
00980 
00981 } // namespace tbb
00982 
00983 #endif /* __TBB_concurrent_queue_internal_H */

Copyright © 2005-2009 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.