00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_concurrent_queue_internal_H
00022 #define __TBB_concurrent_queue_internal_H
00023
00024 #include "tbb_stddef.h"
00025 #include "tbb_machine.h"
00026 #include "atomic.h"
00027 #include "spin_mutex.h"
00028 #include "cache_aligned_allocator.h"
00029 #include "tbb_exception.h"
00030 #include <iterator>
00031 #include <new>
00032
00033 namespace tbb {
00034
00035 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00036
00037
00038 namespace strict_ppl {
00039 template<typename T, typename A> class concurrent_queue;
00040 }
00041
00042 template<typename T, typename A> class concurrent_bounded_queue;
00043
00044 namespace deprecated {
00045 template<typename T, typename A> class concurrent_queue;
00046 }
00047 #endif
00048
00050 namespace strict_ppl {
00051
00053 namespace internal {
00054
00055 using namespace tbb::internal;
00056
00057 typedef size_t ticket;
00058
00059 template<typename T> class micro_queue ;
00060 template<typename T> class micro_queue_pop_finalizer ;
00061 template<typename T> class concurrent_queue_base_v3;
00062
00064
00067 struct concurrent_queue_rep_base : no_copy {
00068 template<typename T> friend class micro_queue;
00069 template<typename T> friend class concurrent_queue_base_v3;
00070
00071 protected:
00073 static const size_t phi = 3;
00074
00075 public:
00076
00077 static const size_t n_queue = 8;
00078
00080 struct page {
00081 page* next;
00082 uintptr_t mask;
00083 };
00084
00085 atomic<ticket> head_counter;
00086 char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00087 atomic<ticket> tail_counter;
00088 char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00089
00091 size_t items_per_page;
00092
00094 size_t item_size;
00095
00097 atomic<size_t> n_invalid_entries;
00098
00099 char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
00100 } ;
00101
00102 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
00103 return uintptr_t(p)>1;
00104 }
00105
00107
00110 class concurrent_queue_page_allocator
00111 {
00112 template<typename T> friend class micro_queue ;
00113 template<typename T> friend class micro_queue_pop_finalizer ;
00114 protected:
00115 virtual ~concurrent_queue_page_allocator() {}
00116 private:
00117 virtual concurrent_queue_rep_base::page* allocate_page() = 0;
00118 virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
00119 } ;
00120
00121 #if _MSC_VER && !defined(__INTEL_COMPILER)
00122
00123 #pragma warning( push )
00124 #pragma warning( disable: 4146 )
00125 #endif
00126
00128
00130 template<typename T>
00131 class micro_queue : no_copy {
00132 typedef concurrent_queue_rep_base::page page;
00133
00135 class destroyer: no_copy {
00136 T& my_value;
00137 public:
00138 destroyer( T& value ) : my_value(value) {}
00139 ~destroyer() {my_value.~T();}
00140 };
00141
00142 T& get_ref( page& page, size_t index ) {
00143 return static_cast<T*>(static_cast<void*>(&page+1))[index];
00144 }
00145
00146 void copy_item( page& dst, size_t index, const void* src ) {
00147 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
00148 }
00149
00150 void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00151 new( &get_ref(dst,dindex) ) T( static_cast<const T*>(static_cast<const void*>(&src+1))[sindex] );
00152 }
00153
00154 void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00155 T& from = get_ref(src,index);
00156 destroyer d(from);
00157 *static_cast<T*>(dst) = from;
00158 }
00159
00160 void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
00161
00162 public:
00163 friend class micro_queue_pop_finalizer<T>;
00164
00165 atomic<page*> head_page;
00166 atomic<ticket> head_counter;
00167
00168 atomic<page*> tail_page;
00169 atomic<ticket> tail_counter;
00170
00171 spin_mutex page_mutex;
00172
00173 void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
00174
00175 bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
00176
00177 micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
00178
00179 page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
00180
00181 void invalidate_page_and_rethrow( ticket k ) ;
00182 };
00183
00184 template<typename T>
00185 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
00186 atomic_backoff backoff;
00187 do {
00188 backoff.pause();
00189 if( counter&1 ) {
00190 ++rb.n_invalid_entries;
00191 throw_bad_last_alloc_exception_v4();
00192 }
00193 } while( counter!=k ) ;
00194 }
00195
00196 template<typename T>
00197 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
00198 k &= -concurrent_queue_rep_base::n_queue;
00199 page* p = NULL;
00200 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00201 if( !index ) {
00202 try {
00203 concurrent_queue_page_allocator& pa = base;
00204 p = pa.allocate_page();
00205 } catch (...) {
00206 ++base.my_rep->n_invalid_entries;
00207 invalidate_page_and_rethrow( k );
00208 }
00209 p->mask = 0;
00210 p->next = NULL;
00211 }
00212
00213 if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
00214
00215 if( p ) {
00216 spin_mutex::scoped_lock lock( page_mutex );
00217 page* q = tail_page;
00218 if( is_valid_page(q) )
00219 q->next = p;
00220 else
00221 head_page = p;
00222 tail_page = p;
00223 } else {
00224 p = tail_page;
00225 }
00226
00227 try {
00228 copy_item( *p, index, item );
00229
00230 p->mask |= uintptr_t(1)<<index;
00231 tail_counter += concurrent_queue_rep_base::n_queue;
00232 } catch (...) {
00233 ++base.my_rep->n_invalid_entries;
00234 tail_counter += concurrent_queue_rep_base::n_queue;
00235 throw;
00236 }
00237 }
00238
00239 template<typename T>
00240 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
00241 k &= -concurrent_queue_rep_base::n_queue;
00242 if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
00243 if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
00244 page& p = *head_page;
00245 __TBB_ASSERT( &p, NULL );
00246 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00247 bool success = false;
00248 {
00249 micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL );
00250 if( p.mask & uintptr_t(1)<<index ) {
00251 success = true;
00252 assign_and_destroy_item( dst, p, index );
00253 } else {
00254 --base.my_rep->n_invalid_entries;
00255 }
00256 }
00257 return success;
00258 }
00259
00260 template<typename T>
00261 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
00262 head_counter = src.head_counter;
00263 tail_counter = src.tail_counter;
00264 page_mutex = src.page_mutex;
00265
00266 const page* srcp = src.head_page;
00267 if( is_valid_page(srcp) ) {
00268 ticket g_index = head_counter;
00269 try {
00270 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
00271 size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00272 size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
00273
00274 head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
00275 page* cur_page = head_page;
00276
00277 if( srcp != src.tail_page ) {
00278 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
00279 cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
00280 cur_page = cur_page->next;
00281 }
00282
00283 __TBB_ASSERT( srcp==src.tail_page, NULL );
00284 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00285 if( last_index==0 ) last_index = base.my_rep->items_per_page;
00286
00287 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
00288 cur_page = cur_page->next;
00289 }
00290 tail_page = cur_page;
00291 } catch (...) {
00292 invalidate_page_and_rethrow( g_index );
00293 }
00294 } else {
00295 head_page = tail_page = NULL;
00296 }
00297 return *this;
00298 }
00299
00300 template<typename T>
00301 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
00302
00303 page* invalid_page = (page*)uintptr_t(1);
00304 {
00305 spin_mutex::scoped_lock lock( page_mutex );
00306 tail_counter = k+concurrent_queue_rep_base::n_queue+1;
00307 page* q = tail_page;
00308 if( is_valid_page(q) )
00309 q->next = invalid_page;
00310 else
00311 head_page = invalid_page;
00312 tail_page = invalid_page;
00313 }
00314 throw;
00315 }
00316
00317 template<typename T>
00318 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
00319 concurrent_queue_page_allocator& pa = base;
00320 page* new_page = pa.allocate_page();
00321 new_page->next = NULL;
00322 new_page->mask = src_page->mask;
00323 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
00324 if( new_page->mask & uintptr_t(1)<<begin_in_page )
00325 copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
00326 return new_page;
00327 }
00328
00329 template<typename T>
00330 class micro_queue_pop_finalizer: no_copy {
00331 typedef concurrent_queue_rep_base::page page;
00332 ticket my_ticket;
00333 micro_queue<T>& my_queue;
00334 page* my_page;
00335 concurrent_queue_page_allocator& allocator;
00336 public:
00337 micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
00338 my_ticket(k), my_queue(queue), my_page(p), allocator(b)
00339 {}
00340 ~micro_queue_pop_finalizer() ;
00341 };
00342
00343 template<typename T>
00344 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
00345 page* p = my_page;
00346 if( is_valid_page(p) ) {
00347 spin_mutex::scoped_lock lock( my_queue.page_mutex );
00348 page* q = p->next;
00349 my_queue.head_page = q;
00350 if( !is_valid_page(q) ) {
00351 my_queue.tail_page = NULL;
00352 }
00353 }
00354 my_queue.head_counter = my_ticket;
00355 if( is_valid_page(p) ) {
00356 allocator.deallocate_page( p );
00357 }
00358 }
00359
00360 #if _MSC_VER && !defined(__INTEL_COMPILER)
00361 #pragma warning( pop )
00362 #endif // warning 4146 is back
00363
00364 template<typename T> class concurrent_queue_iterator_rep ;
00365 template<typename T> class concurrent_queue_iterator_base_v3;
00366
00368
00371 template<typename T>
00372 struct concurrent_queue_rep : public concurrent_queue_rep_base {
00373 micro_queue<T> array[n_queue];
00374
00376 static size_t index( ticket k ) {
00377 return k*phi%n_queue;
00378 }
00379
00380 micro_queue<T>& choose( ticket k ) {
00381
00382 return array[index(k)];
00383 }
00384 };
00385
00387
00391 template<typename T>
00392 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
00394 concurrent_queue_rep<T>* my_rep;
00395
00396 friend struct concurrent_queue_rep<T>;
00397 friend class micro_queue<T>;
00398 friend class concurrent_queue_iterator_rep<T>;
00399 friend class concurrent_queue_iterator_base_v3<T>;
00400
00401 protected:
00402 typedef typename concurrent_queue_rep<T>::page page;
00403
00404 private:
00405 virtual page *allocate_page() {
00406 concurrent_queue_rep<T>& r = *my_rep;
00407 size_t n = sizeof(page) + r.items_per_page*r.item_size;
00408 return reinterpret_cast<page*>(allocate_block ( n ));
00409 }
00410
00411 virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
00412 concurrent_queue_rep<T>& r = *my_rep;
00413 size_t n = sizeof(page) + r.items_per_page*r.item_size;
00414 deallocate_block( reinterpret_cast<void*>(p), n );
00415 }
00416
00418 virtual void *allocate_block( size_t n ) = 0;
00419
00421 virtual void deallocate_block( void *p, size_t n ) = 0;
00422
00423 protected:
00424 concurrent_queue_base_v3( size_t item_size ) ;
00425
00426 virtual ~concurrent_queue_base_v3() {
00427 #if __TBB_USE_ASSERT
00428 size_t nq = my_rep->n_queue;
00429 for( size_t i=0; i<nq; i++ )
00430 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
00431 #endif
00432 cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
00433 }
00434
00436 void internal_push( const void* src ) {
00437 concurrent_queue_rep<T>& r = *my_rep;
00438 ticket k = r.tail_counter++;
00439 r.choose(k).push( src, k, *this );
00440 }
00441
00443
00444 bool internal_try_pop( void* dst ) ;
00445
00447 size_t internal_size() const ;
00448
00450 bool internal_empty() const ;
00451
00453
00454 void internal_finish_clear() ;
00455
00457 void internal_throw_exception() const {
00458 throw_exception( eid_bad_alloc );
00459 }
00460
00462 void assign( const concurrent_queue_base_v3& src ) ;
00463 };
00464
00465 template<typename T>
00466 concurrent_queue_base_v3<T>::concurrent_queue_base_v3( size_t item_size ) {
00467 my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
00468 __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
00469 __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
00470 __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
00471 __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
00472 memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
00473 my_rep->item_size = item_size;
00474 my_rep->items_per_page = item_size<=8 ? 32 :
00475 item_size<=16 ? 16 :
00476 item_size<=32 ? 8 :
00477 item_size<=64 ? 4 :
00478 item_size<=128 ? 2 :
00479 1;
00480 }
00481
00482 template<typename T>
00483 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
00484 concurrent_queue_rep<T>& r = *my_rep;
00485 ticket k;
00486 do {
00487 k = r.head_counter;
00488 for(;;) {
00489 if( r.tail_counter<=k ) {
00490
00491 return false;
00492 }
00493
00494 ticket tk=k;
00495 #if defined(_MSC_VER) && defined(_Wp64)
00496 #pragma warning (push)
00497 #pragma warning (disable: 4267)
00498 #endif
00499 k = r.head_counter.compare_and_swap( tk+1, tk );
00500 #if defined(_MSC_VER) && defined(_Wp64)
00501 #pragma warning (pop)
00502 #endif
00503 if( k==tk )
00504 break;
00505
00506 }
00507 } while( !r.choose( k ).pop( dst, k, *this ) );
00508 return true;
00509 }
00510
00511 template<typename T>
00512 size_t concurrent_queue_base_v3<T>::internal_size() const {
00513 concurrent_queue_rep<T>& r = *my_rep;
00514 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
00515 ticket hc = r.head_counter;
00516 size_t nie = r.n_invalid_entries;
00517 ticket tc = r.tail_counter;
00518 __TBB_ASSERT( hc!=tc || !nie, NULL );
00519 ptrdiff_t sz = tc-hc-nie;
00520 return sz<0 ? 0 : size_t(sz);
00521 }
00522
00523 template<typename T>
00524 bool concurrent_queue_base_v3<T>::internal_empty() const {
00525 concurrent_queue_rep<T>& r = *my_rep;
00526 ticket tc = r.tail_counter;
00527 ticket hc = r.head_counter;
00528
00529 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
00530 }
00531
00532 template<typename T>
00533 void concurrent_queue_base_v3<T>::internal_finish_clear() {
00534 concurrent_queue_rep<T>& r = *my_rep;
00535 size_t nq = r.n_queue;
00536 for( size_t i=0; i<nq; ++i ) {
00537 page* tp = r.array[i].tail_page;
00538 if( is_valid_page(tp) ) {
00539 __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
00540 deallocate_page( tp );
00541 r.array[i].tail_page = NULL;
00542 } else
00543 __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
00544 }
00545 }
00546
00547 template<typename T>
00548 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
00549 concurrent_queue_rep<T>& r = *my_rep;
00550 r.items_per_page = src.my_rep->items_per_page;
00551
00552
00553 r.head_counter = src.my_rep->head_counter;
00554 r.tail_counter = src.my_rep->tail_counter;
00555 r.n_invalid_entries = src.my_rep->n_invalid_entries;
00556
00557
00558 for( size_t i = 0; i<r.n_queue; ++i )
00559 r.array[i].assign( src.my_rep->array[i], *this);
00560
00561 __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
00562 "the source concurrent queue should not be concurrently modified." );
00563 }
00564
00565 template<typename Container, typename Value> class concurrent_queue_iterator;
00566
00567 template<typename T>
00568 class concurrent_queue_iterator_rep: no_assign {
00569 public:
00570 ticket head_counter;
00571 const concurrent_queue_base_v3<T>& my_queue;
00572 typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
00573 concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
00574 head_counter(queue.my_rep->head_counter),
00575 my_queue(queue)
00576 {
00577 for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
00578 array[k] = queue.my_rep->array[k].head_page;
00579 }
00580
00582 bool get_item( void*& item, size_t k ) ;
00583 };
00584
00585 template<typename T>
00586 bool concurrent_queue_iterator_rep<T>::get_item( void*& item, size_t k ) {
00587 if( k==my_queue.my_rep->tail_counter ) {
00588 item = NULL;
00589 return true;
00590 } else {
00591 typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
00592 __TBB_ASSERT(p,NULL);
00593 size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
00594 item = static_cast<unsigned char*>(static_cast<void*>(p+1)) + my_queue.my_rep->item_size*i;
00595 return (p->mask & uintptr_t(1)<<i)!=0;
00596 }
00597 }
00598
00600
00601 template<typename Value>
00602 class concurrent_queue_iterator_base_v3 : no_assign {
00604
00605 concurrent_queue_iterator_rep<Value>* my_rep;
00606
00607 template<typename C, typename T, typename U>
00608 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00609
00610 template<typename C, typename T, typename U>
00611 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00612 protected:
00614 mutable void* my_item;
00615
00616 public:
00618 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
00619 #if __GNUC__==4&&__GNUC_MINOR__==3
00620
00621 __asm__ __volatile__("": : :"memory");
00622 #endif
00623 }
00624
00626 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00627 assign(i);
00628 }
00629
00631 concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
00632
00633 protected:
00635 void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
00636
00638 void advance() ;
00639
00641 ~concurrent_queue_iterator_base_v3() {
00642 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00643 my_rep = NULL;
00644 }
00645 };
00646
00647 template<typename Value>
00648 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
00649 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00650 new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
00651 size_t k = my_rep->head_counter;
00652 if( !my_rep->get_item(my_item, k) ) advance();
00653 }
00654
00655 template<typename Value>
00656 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
00657 if( my_rep!=other.my_rep ) {
00658 if( my_rep ) {
00659 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00660 my_rep = NULL;
00661 }
00662 if( other.my_rep ) {
00663 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00664 new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
00665 }
00666 }
00667 my_item = other.my_item;
00668 }
00669
00670 template<typename Value>
00671 void concurrent_queue_iterator_base_v3<Value>::advance() {
00672 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
00673 size_t k = my_rep->head_counter;
00674 const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
00675 #if TBB_USE_ASSERT
00676 void* tmp;
00677 my_rep->get_item(tmp,k);
00678 __TBB_ASSERT( my_item==tmp, NULL );
00679 #endif
00680 size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
00681 if( i==queue.my_rep->items_per_page-1 ) {
00682 typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
00683 root = root->next;
00684 }
00685
00686 my_rep->head_counter = ++k;
00687 if( !my_rep->get_item(my_item, k) ) advance();
00688 }
00689
00690 template<typename T>
00691 static inline const concurrent_queue_base_v3<const T>& add_constness( const concurrent_queue_base_v3<T>& q )
00692 {
00693 return *reinterpret_cast<const concurrent_queue_base_v3<const T> *>(&q) ;
00694 }
00695
00696 template<typename T>
00697 static inline const concurrent_queue_iterator_base_v3<const T>& add_constness( const concurrent_queue_iterator_base_v3<T>& q )
00698 {
00699 return *reinterpret_cast<const concurrent_queue_iterator_base_v3<const T> *>(&q) ;
00700 }
00701
00703
00705 template<typename Container, typename Value>
00706 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<Value>,
00707 public std::iterator<std::forward_iterator_tag,Value> {
00708 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00709 template<typename T, class A>
00710 friend class ::tbb::strict_ppl::concurrent_queue;
00711 #else
00712 public:
00713 #endif
00715 concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
00716 concurrent_queue_iterator_base_v3<Value>(queue)
00717 {
00718 }
00719
00720 template<typename T>
00721 concurrent_queue_iterator( const concurrent_queue_base_v3<T>& other ) :
00722 concurrent_queue_iterator_base_v3<Value>(add_constness(other))
00723 {
00724 }
00725
00726 public:
00727 concurrent_queue_iterator() {}
00728
00730 concurrent_queue_iterator( const concurrent_queue_iterator<Container,Value>& other ) :
00731 concurrent_queue_iterator_base_v3<Value>(other)
00732 {
00733 }
00734
00735 template<typename T>
00736 concurrent_queue_iterator( const concurrent_queue_iterator<Container,T>& other ) :
00737 concurrent_queue_iterator_base_v3<Value>(add_constness(other))
00738 {
00739 }
00740
00742 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00743 assign(other);
00744 return *this;
00745 }
00746
00748 Value& operator*() const {
00749 return *static_cast<Value*>(this->my_item);
00750 }
00751
00752 Value* operator->() const {return &operator*();}
00753
00755 concurrent_queue_iterator& operator++() {
00756 this->advance();
00757 return *this;
00758 }
00759
00761 Value* operator++(int) {
00762 Value* result = &operator*();
00763 operator++();
00764 return result;
00765 }
00766 };
00767
00768
00769 template<typename C, typename T, typename U>
00770 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00771 return i.my_item==j.my_item;
00772 }
00773
00774 template<typename C, typename T, typename U>
00775 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00776 return i.my_item!=j.my_item;
00777 }
00778
00779 }
00780
00782
00783 }
00784
00786 namespace internal {
00787
00788 class concurrent_queue_rep;
00789 class concurrent_queue_iterator_rep;
00790 class concurrent_queue_iterator_base_v3;
00791 template<typename Container, typename Value> class concurrent_queue_iterator;
00792
00794
00796 class concurrent_queue_base_v3: no_copy {
00798 concurrent_queue_rep* my_rep;
00799
00800 friend class concurrent_queue_rep;
00801 friend struct micro_queue;
00802 friend class micro_queue_pop_finalizer;
00803 friend class concurrent_queue_iterator_rep;
00804 friend class concurrent_queue_iterator_base_v3;
00805 protected:
00807 struct page {
00808 page* next;
00809 uintptr_t mask;
00810 };
00811
00813 ptrdiff_t my_capacity;
00814
00816 size_t items_per_page;
00817
00819 size_t item_size;
00820
00821 private:
00822 virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
00823 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
00824 protected:
00825 __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
00826 virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
00827
00829 void __TBB_EXPORTED_METHOD internal_push( const void* src );
00830
00832 void __TBB_EXPORTED_METHOD internal_pop( void* dst );
00833
00835 bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
00836
00838
00839 bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
00840
00842 ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
00843
00845 bool __TBB_EXPORTED_METHOD internal_empty() const;
00846
00848 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
00849
00851 virtual page *allocate_page() = 0;
00852
00854 virtual void deallocate_page( page *p ) = 0;
00855
00857
00858 void __TBB_EXPORTED_METHOD internal_finish_clear() ;
00859
00861 void __TBB_EXPORTED_METHOD internal_throw_exception() const;
00862
00864 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
00865
00866 private:
00867 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
00868 };
00869
00871
00872 class concurrent_queue_iterator_base_v3 {
00874
00875 concurrent_queue_iterator_rep* my_rep;
00876
00877 template<typename C, typename T, typename U>
00878 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00879
00880 template<typename C, typename T, typename U>
00881 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00882 protected:
00884 mutable void* my_item;
00885
00887 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
00888
00890 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00891 assign(i);
00892 }
00893
00895 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
00896
00898 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
00899
00901 void __TBB_EXPORTED_METHOD advance();
00902
00904 __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
00905 };
00906
00907 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
00908
00910
00912 template<typename Container, typename Value>
00913 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
00914 public std::iterator<std::forward_iterator_tag,Value> {
00915 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
00916 template<typename T, class A>
00917 friend class ::tbb::concurrent_bounded_queue;
00918
00919 template<typename T, class A>
00920 friend class ::tbb::deprecated::concurrent_queue;
00921 #else
00922 public:
00923 #endif
00925 concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
00926 concurrent_queue_iterator_base_v3(queue)
00927 {
00928 }
00929
00930 public:
00931 concurrent_queue_iterator() {}
00932
00935 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00936 concurrent_queue_iterator_base_v3(other)
00937 {}
00938
00940 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00941 assign(other);
00942 return *this;
00943 }
00944
00946 Value& operator*() const {
00947 return *static_cast<Value*>(my_item);
00948 }
00949
00950 Value* operator->() const {return &operator*();}
00951
00953 concurrent_queue_iterator& operator++() {
00954 advance();
00955 return *this;
00956 }
00957
00959 Value* operator++(int) {
00960 Value* result = &operator*();
00961 operator++();
00962 return result;
00963 }
00964 };
00965
00966
00967 template<typename C, typename T, typename U>
00968 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00969 return i.my_item==j.my_item;
00970 }
00971
00972 template<typename C, typename T, typename U>
00973 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00974 return i.my_item!=j.my_item;
00975 }
00976
00977 }
00978
00980
00981 }
00982
00983 #endif