aa
ConcurrentAutoTable.java
Go to the documentation of this file.
1 package com.cliffc.aa.util;
2 
3 import java.io.Serializable;
4 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
5 
6 import sun.misc.Unsafe;
7 
19 public class ConcurrentAutoTable implements Serializable {
20 
21  // --- public interface ---
22 
30  public void add( long x ) { add_if( x); }
32  public void decrement() { add_if(-1L); }
34  public void increment() { add_if( 1L); }
35 
39  public void set( long x ) {
40  CAT newcat = new CAT(null,4,x);
41  // Spin until CAS works
42  while( !CAS_cat(_cat,newcat) ) {/*empty*/}
43  }
44 
50  public long get() { return _cat.sum(); }
52  public int intValue() { return (int)_cat.sum(); }
54  public long longValue() { return _cat.sum(); }
55 
60  public long estimate_get( ) { return _cat.estimate_sum(); }
61 
65  public String toString() { return _cat.toString(); }
66 
71  public void print() { _cat.print(); }
72 
77  public int internal_size() { return _cat._t.length; }
78 
79  // Only add 'x' to some slot in table, hinted at by 'hash'. The sum can
80  // overflow. Value is CAS'd so no counts are lost. The CAS is retried until
81  // it succeeds. Returned value is the old value.
82  private long add_if( long x ) { return _cat.add_if(x,hash(),this); }
83 
84  // The underlying array of concurrently updated long counters
85  private volatile CAT _cat = new CAT(null,16/*Start Small, Think Big!*/,0L);
86  private static AtomicReferenceFieldUpdater<ConcurrentAutoTable,CAT> _catUpdater =
87  AtomicReferenceFieldUpdater.newUpdater(ConcurrentAutoTable.class,CAT.class, "_cat");
88  private boolean CAS_cat( CAT oldcat, CAT newcat ) { return _catUpdater.compareAndSet(this,oldcat,newcat); }
89 
90  // Hash spreader
91  private static int hash() {
92  //int h = (int)Thread.currentThread().getId();
93  int h = System.identityHashCode(Thread.currentThread());
94  return h<<3; // Pad out cache lines. The goal is to avoid cache-line contention
95  }
96 
97  // --- CAT -----------------------------------------------------------------
98  private static class CAT implements Serializable {
99 
100  // Unsafe crud: get a function which will CAS arrays
101  private static final Unsafe _unsafe = UtilUnsafe.getUnsafe();
102  private static final int _Lbase = _unsafe.arrayBaseOffset(long[].class);
103  private static final int _Lscale = _unsafe.arrayIndexScale(long[].class);
104  private static long rawIndex(long[] ary, int i) {
105  assert i >= 0 && i < ary.length;
106  return _Lbase + i * _Lscale;
107  }
108  private static boolean CAS( long[] A, int idx, long old, long nnn ) {
109  return _unsafe.compareAndSwapLong( A, rawIndex(A,idx), old, nnn );
110  }
111 
112  //volatile long _resizers; // count of threads attempting a resize
113  //static private final AtomicLongFieldUpdater<CAT> _resizerUpdater =
114  // AtomicLongFieldUpdater.newUpdater(CAT.class, "_resizers");
115 
116  private final CAT _next;
117  private volatile long _fuzzy_sum_cache;
118  private volatile long _fuzzy_time;
119  private static final int MAX_SPIN=1;
120  private final long[] _t; // Power-of-2 array of longs
121 
122  CAT( CAT next, int sz, long init ) {
123  _next = next;
124  _t = new long[sz];
125  _t[0] = init;
126  }
127 
128  // Only add 'x' to some slot in table, hinted at by 'hash'. The sum can
129  // overflow. Value is CAS'd so no counts are lost. The CAS is attempted
130  // ONCE.
131  public long add_if( long x, int hash, ConcurrentAutoTable master ) {
132  final long[] t = _t;
133  final int idx = hash & (t.length-1);
134  // Peel loop; try once fast
135  long old = t[idx];
136  final boolean ok = CAS( t, idx, old, old+x );
137  if( ok ) return old; // Got it
138  // Try harder
139  int cnt=0;
140  while( true ) {
141  old = t[idx];
142  if( CAS( t, idx, old, old+x ) ) break; // Got it!
143  cnt++;
144  }
145  if( cnt < MAX_SPIN ) return old; // Allowable spin loop count
146  if( t.length >= 1024*1024 ) return old; // too big already
147 
148  // Too much contention; double array size in an effort to reduce contention
149  //long r = _resizers;
150  //final int newbytes = (t.length<<1)<<3/*word to bytes*/;
151  //while( !_resizerUpdater.compareAndSet(this,r,r+newbytes) )
152  // r = _resizers;
153  //r += newbytes;
154  if( master._cat != this ) return old; // Already doubled, don't bother
155  //if( (r>>17) != 0 ) { // Already too much allocation attempts?
156  // // We could use a wait with timeout, so we'll wakeup as soon as the new
157  // // table is ready, or after the timeout in any case. Annoyingly, this
158  // // breaks the non-blocking property - so for now we just briefly sleep.
159  // //synchronized( this ) { wait(8*megs); } // Timeout - we always wakeup
160  // try { Thread.sleep(r>>17); } catch( InterruptedException e ) { }
161  // if( master._cat != this ) return old;
162  //}
163 
164  CAT newcat = new CAT(this,t.length*2,0);
165  // Take 1 stab at updating the CAT with the new larger size. If this
166  // fails, we assume some other thread already expanded the CAT - so we
167  // do not need to retry until it succeeds.
168  while( master._cat == this && !master.CAS_cat(this,newcat) ) {/*empty*/}
169  return old;
170  }
171 
172 
173  // Return the current sum of all things in the table. Writers can be
174  // updating the table furiously, so the sum is only locally accurate.
175  public long sum( ) {
176  long sum = _next == null ? 0 : _next.sum(); // Recursively get cached sum
177  final long[] t = _t;
178  for( long cnt : t ) sum += cnt;
179  return sum;
180  }
181 
182  // Fast fuzzy version. Used a cached value until it gets old, then re-up
183  // the cache.
184  public long estimate_sum( ) {
185  // For short tables, just do the work
186  if( _t.length <= 64 ) return sum();
187  // For bigger tables, periodically freshen a cached value
188  long millis = System.currentTimeMillis();
189  if( _fuzzy_time != millis ) { // Time marches on?
190  _fuzzy_sum_cache = sum(); // Get sum the hard way
191  _fuzzy_time = millis; // Indicate freshness of cached value
192  }
193  return _fuzzy_sum_cache; // Return cached sum
194  }
195 
196  public String toString( ) { return Long.toString(sum()); }
197 
198  public void print() {
199  long[] t = _t;
200  System.out.print("["+t[0]);
201  for( int i=1; i<t.length; i++ )
202  System.out.print(","+t[i]);
203  System.out.print("]");
204  if( _next != null ) _next.print();
205  }
206  }
207 }
com.cliffc.aa.util.ConcurrentAutoTable.CAT._fuzzy_time
volatile long _fuzzy_time
Definition: ConcurrentAutoTable.java:118
com.cliffc.aa.util.ConcurrentAutoTable
An auto-resizing table of.
Definition: ConcurrentAutoTable.java:19
com.cliffc.aa.util.UtilUnsafe.getUnsafe
static Unsafe getUnsafe()
Fetch the Unsafe.
Definition: UtilUnsafe.java:19
com.cliffc.aa.util.ConcurrentAutoTable.internal_size
int internal_size()
Return the internal counter striping factor.
Definition: ConcurrentAutoTable.java:77
com.cliffc.aa.util.ConcurrentAutoTable.CAT._next
final CAT _next
Definition: ConcurrentAutoTable.java:116
com.cliffc.aa.util.UtilUnsafe
Simple class to obtain access to the Unsafe object.
Definition: UtilUnsafe.java:15
com.cliffc.aa.util.ConcurrentAutoTable.CAT.CAS
static boolean CAS(long[] A, int idx, long old, long nnn)
Definition: ConcurrentAutoTable.java:108
com.cliffc.aa.util.ConcurrentAutoTable.add
void add(long x)
Add the given value to current counter value.
Definition: ConcurrentAutoTable.java:30
com.cliffc.aa.util.ConcurrentAutoTable.CAT.CAT
CAT(CAT next, int sz, long init)
Definition: ConcurrentAutoTable.java:122
com.cliffc.aa.util.ConcurrentAutoTable.CAT.estimate_sum
long estimate_sum()
Definition: ConcurrentAutoTable.java:184
com.cliffc.aa.util.ConcurrentAutoTable.CAT.print
void print()
Definition: ConcurrentAutoTable.java:198
com.cliffc.aa.util.ConcurrentAutoTable.CAT._unsafe
static final Unsafe _unsafe
Definition: ConcurrentAutoTable.java:101
com.cliffc.aa.util.ConcurrentAutoTable.decrement
void decrement()
add with -1
Definition: ConcurrentAutoTable.java:32
com.cliffc.aa.util.ConcurrentAutoTable._cat
volatile CAT _cat
Definition: ConcurrentAutoTable.java:85
com.cliffc.aa.util.ConcurrentAutoTable.intValue
int intValue()
Same as get, included for completeness.
Definition: ConcurrentAutoTable.java:52
com.cliffc.aa.util.ConcurrentAutoTable.CAT._t
final long[] _t
Definition: ConcurrentAutoTable.java:120
com.cliffc.aa.util.ConcurrentAutoTable.toString
String toString()
Return the counter's.
Definition: ConcurrentAutoTable.java:65
com.cliffc.aa.util.ConcurrentAutoTable.CAT.rawIndex
static long rawIndex(long[] ary, int i)
Definition: ConcurrentAutoTable.java:104
com.cliffc.aa.util.ConcurrentAutoTable.CAS_cat
boolean CAS_cat(CAT oldcat, CAT newcat)
Definition: ConcurrentAutoTable.java:88
com.cliffc.aa.util.ConcurrentAutoTable.CAT._fuzzy_sum_cache
volatile long _fuzzy_sum_cache
Definition: ConcurrentAutoTable.java:117
com.cliffc.aa.util.ConcurrentAutoTable.CAT.MAX_SPIN
static final int MAX_SPIN
Definition: ConcurrentAutoTable.java:119
com.cliffc.aa.util.ConcurrentAutoTable.estimate_get
long estimate_get()
A cheaper get.
Definition: ConcurrentAutoTable.java:60
com.cliffc.aa.util.ConcurrentAutoTable.CAT._Lscale
static final int _Lscale
Definition: ConcurrentAutoTable.java:103
com.cliffc.aa.util.ConcurrentAutoTable.longValue
long longValue()
Same as get, included for completeness.
Definition: ConcurrentAutoTable.java:54
com.cliffc.aa.util.ConcurrentAutoTable.increment
void increment()
add with +1
Definition: ConcurrentAutoTable.java:34
com.cliffc.aa.util.ConcurrentAutoTable.CAT
Definition: ConcurrentAutoTable.java:98
com.cliffc.aa.util.ConcurrentAutoTable.print
void print()
A more verbose print than toString, showing internal structure.
Definition: ConcurrentAutoTable.java:71
com.cliffc.aa.util.ConcurrentAutoTable.CAT.toString
String toString()
Definition: ConcurrentAutoTable.java:196
com.cliffc.aa.util.ConcurrentAutoTable.CAT.add_if
long add_if(long x, int hash, ConcurrentAutoTable master)
Definition: ConcurrentAutoTable.java:131
com.cliffc.aa.util.ConcurrentAutoTable._catUpdater
static AtomicReferenceFieldUpdater< ConcurrentAutoTable, CAT > _catUpdater
Definition: ConcurrentAutoTable.java:86
com.cliffc.aa.util.ConcurrentAutoTable.add_if
long add_if(long x)
Definition: ConcurrentAutoTable.java:82
com.cliffc.aa.util.ConcurrentAutoTable.CAT.sum
long sum()
Definition: ConcurrentAutoTable.java:175
com.cliffc.aa.util.ConcurrentAutoTable.hash
static int hash()
Definition: ConcurrentAutoTable.java:91
com.cliffc.aa.util.ConcurrentAutoTable.CAT._Lbase
static final int _Lbase
Definition: ConcurrentAutoTable.java:102