20
2019
04

guava二级缓存原理深入理解

1、CacheBuilder 是如何创建的?

@GwtCompatible(emulated = true)

public final class CacheBuilder<K, V> {

  private static final int DEFAULT_INITIAL_CAPACITY = 16;

  private static final int DEFAULT_CONCURRENCY_LEVEL = 4;

  private static final int DEFAULT_EXPIRATION_NANOS = 0;

  private static final int DEFAULT_REFRESH_NANOS = 0;

  static final Supplier<? extends StatsCounter> NULL_STATS_COUNTER = Suppliers.ofInstance(

      new StatsCounter() {

        @Override

        public void recordHits(int count) {}

        @Override

        public void recordMisses(int count) {}

        @Override

        public void recordLoadSuccess(long loadTime) {}

        @Override

        public void recordLoadException(long loadTime) {}

        @Override

        public void recordEviction() {}

        @Override

        public CacheStats snapshot() {

          return EMPTY_STATS;

        }

      });

  static final CacheStats EMPTY_STATS = new CacheStats(0, 0, 0, 0, 0, 0);

  static final Supplier<StatsCounter> CACHE_STATS_COUNTER =

      new Supplier<StatsCounter>() {

    @Override

    public StatsCounter get() {

      return new SimpleStatsCounter();

    }

  };

  enum NullListener implements RemovalListener<Object, Object> {

    INSTANCE;

    @Override

    public void onRemoval(RemovalNotification<Object, Object> notification) {}

  }

  enum OneWeigher implements Weigher<Object, Object> {

    INSTANCE;

    @Override

    public int weigh(Object key, Object value) {

      return 1;

    }

  }

  static final Ticker NULL_TICKER = new Ticker() {

    @Override

    public long read() {

      return 0;

    }

  };

  private static final Logger logger = Logger.getLogger(CacheBuilder.class.getName());

  static final int UNSET_INT = -1;

  boolean strictParsing = true;

  int initialCapacity = UNSET_INT;

  int concurrencyLevel = UNSET_INT;

  long maximumSize = UNSET_INT;

  long maximumWeight = UNSET_INT;

  Weigher<? super K, ? super V> weigher;

  Strength keyStrength;

  Strength valueStrength;

  long expireAfterWriteNanos = UNSET_INT;

  long expireAfterAccessNanos = UNSET_INT;

  long refreshNanos = UNSET_INT;

  Equivalence<Object> keyEquivalence;

  Equivalence<Object> valueEquivalence;

  RemovalListener<? super K, ? super V> removalListener;

  Ticker ticker;

  Supplier<? extends StatsCounter> statsCounterSupplier = NULL_STATS_COUNTER;

  // TODO(fry): make constructor private and update tests to use newBuilder

  CacheBuilder() {}

  /**

   * Constructs a new {@code CacheBuilder} instance with default settings, including strong keys,

   * strong values, and no automatic eviction of any kind.

   */

  public static CacheBuilder<Object, Object> newBuilder() {

    return new CacheBuilder<Object, Object>();

  }

  /**

   * Sets the minimum total size for the internal hash tables. For example, if the initial capacity

   * is {@code 60}, and the concurrency level is {@code 8}, then eight segments are created, each

   * having a hash table of size eight. Providing a large enough estimate at construction time

   * avoids the need for expensive resizing operations later, but setting this value unnecessarily

   * high wastes memory.

   *

   * @throws IllegalArgumentException if {@code initialCapacity} is negative

   * @throws IllegalStateException if an initial capacity was already set

   */

  public CacheBuilder<K, V> initialCapacity(int initialCapacity) {

    checkState(this.initialCapacity == UNSET_INT, "initial capacity was already set to %s",

        this.initialCapacity);

    checkArgument(initialCapacity >= 0);

    this.initialCapacity = initialCapacity;

    return this;

  }

  int getInitialCapacity() {

    return (initialCapacity == UNSET_INT) ? DEFAULT_INITIAL_CAPACITY : initialCapacity;

  }

  /**

   * Guides the allowed concurrency among update operations. Used as a hint for internal sizing. The

   * table is internally partitioned to try to permit the indicated number of concurrent updates

   * without contention. Because assignment of entries to these partitions is not necessarily

   * uniform, the actual concurrency observed may vary. Ideally, you should choose a value to

   * accommodate as many threads as will ever concurrently modify the table. Using a significantly

   * higher value than you need can waste space and time, and a significantly lower value can lead

   * to thread contention. But overestimates and underestimates within an order of magnitude do not

   * usually have much noticeable impact. A value of one permits only one thread to modify the cache

   * at a time, but since read operations and cache loading computations can proceed concurrently,

   * this still yields higher concurrency than full synchronization.

   *

   * <p> Defaults to 4. <b>Note:</b>The default may change in the future. If you care about this

   * value, you should always choose it explicitly.

   *

   * <p>The current implementation uses the concurrency level to create a fixed number of hashtable

   * segments, each governed by its own write lock. The segment lock is taken once for each explicit

   * write, and twice for each cache loading computation (once prior to loading the new value,

   * and once after loading completes). Much internal cache management is performed at the segment

   * granularity. For example, access queues and write queues are kept per segment when they are

   * required by the selected eviction algorithm. As such, when writing unit tests it is not

   * uncommon to specify {@code concurrencyLevel(1)} in order to achieve more deterministic eviction

   * behavior.

   *

   * <p>Note that future implementations may abandon segment locking in favor of more advanced

   * concurrency controls.

   *

   * @throws IllegalArgumentException if {@code concurrencyLevel} is nonpositive

   * @throws IllegalStateException if a concurrency level was already set

   */

  public CacheBuilder<K, V> concurrencyLevel(int concurrencyLevel) {

    checkState(this.concurrencyLevel == UNSET_INT, "concurrency level was already set to %s",

        this.concurrencyLevel);

    checkArgument(concurrencyLevel > 0);

    this.concurrencyLevel = concurrencyLevel;

    return this;

  }

  int getConcurrencyLevel() {

    return (concurrencyLevel == UNSET_INT) ? DEFAULT_CONCURRENCY_LEVEL : concurrencyLevel;

  }

  /**

   * Specifies the maximum number of entries the cache may contain. Note that the cache <b>may evict

   * an entry before this limit is exceeded</b>. As the cache size grows close to the maximum, the

   * cache evicts entries that are less likely to be used again. For example, the cache may evict an

   * entry because it hasn't been used recently or very often.

   *

   * <p>When {@code size} is zero, elements will be evicted immediately after being loaded into the

   * cache. This can be useful in testing, or to disable caching temporarily without a code change.

   *

   * <p>This feature cannot be used in conjunction with {@link #maximumWeight}.

   *

   * @param size the maximum size of the cache

   * @throws IllegalArgumentException if {@code size} is negative

   * @throws IllegalStateException if a maximum size or weight was already set

   */

  public CacheBuilder<K, V> maximumSize(long size) {

    checkState(this.maximumSize == UNSET_INT, "maximum size was already set to %s",

        this.maximumSize);

    checkState(this.maximumWeight == UNSET_INT, "maximum weight was already set to %s",

        this.maximumWeight);

    checkState(this.weigher == null, "maximum size can not be combined with weigher");

    checkArgument(size >= 0, "maximum size must not be negative");

    this.maximumSize = size;

    return this;

  }

  /**

   * Specifies the maximum weight of entries the cache may contain. Weight is determined using the

   * {@link Weigher} specified with {@link #weigher}, and use of this method requires a

   * corresponding call to {@link #weigher} prior to calling {@link #build}.

   *

   * <p>Note that the cache <b>may evict an entry before this limit is exceeded</b>. As the cache

   * size grows close to the maximum, the cache evicts entries that are less likely to be used

   * again. For example, the cache may evict an entry because it hasn't been used recently or very

   * often.

   *

   * <p>When {@code weight} is zero, elements will be evicted immediately after being loaded into

   * cache. This can be useful in testing, or to disable caching temporarily without a code

   * change.

   *

   * <p>Note that weight is only used to determine whether the cache is over capacity; it has no

   * effect on selecting which entry should be evicted next.

   *

   * <p>This feature cannot be used in conjunction with {@link #maximumSize}.

   *

   * @param weight the maximum total weight of entries the cache may contain

   * @throws IllegalArgumentException if {@code weight} is negative

   * @throws IllegalStateException if a maximum weight or size was already set

   * @since 11.0

   */

  @GwtIncompatible("To be supported")

  public CacheBuilder<K, V> maximumWeight(long weight) {

    checkState(this.maximumWeight == UNSET_INT, "maximum weight was already set to %s",

        this.maximumWeight);

    checkState(this.maximumSize == UNSET_INT, "maximum size was already set to %s",

        this.maximumSize);

    this.maximumWeight = weight;

    checkArgument(weight >= 0, "maximum weight must not be negative");

    return this;

  }

  /**

   * Specifies the weigher to use in determining the weight of entries. Entry weight is taken

   * into consideration by {@link #maximumWeight(long)} when determining which entries to evict, and

   * use of this method requires a corresponding call to {@link #maximumWeight(long)} prior to

   * calling {@link #build}. Weights are measured and recorded when entries are inserted into the

   * cache, and are thus effectively static during the lifetime of a cache entry.

   *

   * <p>When the weight of an entry is zero it will not be considered for size-based eviction

   * (though it still may be evicted by other means).

   *

   * <p><b>Important note:</b> Instead of returning <em>this</em> as a {@code CacheBuilder}

   * instance, this method returns {@code CacheBuilder<K1, V1>}. From this point on, either the

   * original reference or the returned reference may be used to complete configuration and build

   * the cache, but only the "generic" one is type-safe. That is, it will properly prevent you from

   * building caches whose key or value types are incompatible with the types accepted by the

   * weigher already provided; the {@code CacheBuilder} type cannot do this. For best results,

   * simply use the standard method-chaining idiom, as illustrated in the documentation at top,

   * configuring a {@code CacheBuilder} and building your {@link Cache} all in a single statement.

   *

   * <p><b>Warning:</b> if you ignore the above advice, and use this {@code CacheBuilder} to build

   * a cache whose key or value type is incompatible with the weigher, you will likely experience

   * a {@link ClassCastException} at some <i>undefined</i> point in the future.

   *

   * @param weigher the weigher to use in calculating the weight of cache entries

   * @throws IllegalArgumentException if {@code size} is negative

   * @throws IllegalStateException if a maximum size was already set

   * @since 11.0

   */

  @GwtIncompatible("To be supported")

  public <K1 extends K, V1 extends V> CacheBuilder<K1, V1> weigher(

      Weigher<? super K1, ? super V1> weigher) {

    checkState(this.weigher == null);

    if (strictParsing) {

      checkState(this.maximumSize == UNSET_INT, "weigher can not be combined with maximum size",

          this.maximumSize);

    }

    // safely limiting the kinds of caches this can produce

    @SuppressWarnings("unchecked")

    CacheBuilder<K1, V1> me = (CacheBuilder<K1, V1>) this;

    me.weigher = checkNotNull(weigher);

    return me;

  }

  // Make a safe contravariant cast now so we don't have to do it over and over.

  @SuppressWarnings("unchecked")

  <K1 extends K, V1 extends V> Weigher<K1, V1> getWeigher() {

    return (Weigher<K1, V1>) MoreObjects.firstNonNull(weigher, OneWeigher.INSTANCE);

  }

  /**

   * Specifies that each entry should be automatically removed from the cache once a fixed duration

   * has elapsed after the entry's creation, or the most recent replacement of its value.

   *

   * <p>When {@code duration} is zero, this method hands off to

   * {@link #maximumSize(long) maximumSize}{@code (0)}, ignoring any otherwise-specificed maximum

   * size or weight. This can be useful in testing, or to disable caching temporarily without a code

   * change.

   *

   * <p>Expired entries may be counted in {@link Cache#size}, but will never be visible to read or

   * write operations. Expired entries are cleaned up as part of the routine maintenance described

   * in the class javadoc.

   *

   * @param duration the length of time after an entry is created that it should be automatically

   *     removed

   * @param unit the unit that {@code duration} is expressed in

   * @throws IllegalArgumentException if {@code duration} is negative

   * @throws IllegalStateException if the time to live or time to idle was already set

   */

  public CacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) {

    checkState(expireAfterWriteNanos == UNSET_INT, "expireAfterWrite was already set to %s ns",

        expireAfterWriteNanos);

    checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit);

    this.expireAfterWriteNanos = unit.toNanos(duration);

    return this;

  }

  long getExpireAfterWriteNanos() {

    return (expireAfterWriteNanos == UNSET_INT) ? DEFAULT_EXPIRATION_NANOS : expireAfterWriteNanos;

  }

  /**

   * Specifies that each entry should be automatically removed from the cache once a fixed duration

   * has elapsed after the entry's creation, the most recent replacement of its value, or its last

   * access. Access time is reset by all cache read and write operations (including

   * {@code Cache.asMap().get(Object)} and {@code Cache.asMap().put(K, V)}), but not by operations

   * on the collection-views of {@link Cache#asMap}.

   *

   * <p>When {@code duration} is zero, this method hands off to

   * {@link #maximumSize(long) maximumSize}{@code (0)}, ignoring any otherwise-specificed maximum

   * size or weight. This can be useful in testing, or to disable caching temporarily without a code

   * change.

   *

   * <p>Expired entries may be counted in {@link Cache#size}, but will never be visible to read or

   * write operations. Expired entries are cleaned up as part of the routine maintenance described

   * in the class javadoc.

   *

   * @param duration the length of time after an entry is last accessed that it should be

   *     automatically removed

   * @param unit the unit that {@code duration} is expressed in

   * @throws IllegalArgumentException if {@code duration} is negative

   * @throws IllegalStateException if the time to idle or time to live was already set

   */

  public CacheBuilder<K, V> expireAfterAccess(long duration, TimeUnit unit) {

    checkState(expireAfterAccessNanos == UNSET_INT, "expireAfterAccess was already set to %s ns",

        expireAfterAccessNanos);

    checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit);

    this.expireAfterAccessNanos = unit.toNanos(duration);

    return this;

  }

  long getExpireAfterAccessNanos() {

    return (expireAfterAccessNanos == UNSET_INT)

        ? DEFAULT_EXPIRATION_NANOS : expireAfterAccessNanos;

  }

  /**

   * Specifies that active entries are eligible for automatic refresh once a fixed duration has

   * elapsed after the entry's creation, or the most recent replacement of its value. The semantics

   * of refreshes are specified in {@link LoadingCache#refresh}, and are performed by calling

   * {@link CacheLoader#reload}.

   *

   * <p>As the default implementation of {@link CacheLoader#reload} is synchronous, it is

   * recommended that users of this method override {@link CacheLoader#reload} with an asynchronous

   * implementation; otherwise refreshes will be performed during unrelated cache read and write

   * operations.

   *

   * <p>Currently automatic refreshes are performed when the first stale request for an entry

   * occurs. The request triggering refresh will make a blocking call to {@link CacheLoader#reload}

   * and immediately return the new value if the returned future is complete, and the old value

   * otherwise.

   *

   * <p><b>Note:</b> <i>all exceptions thrown during refresh will be logged and then swallowed</i>.

   *

   * @param duration the length of time after an entry is created that it should be considered

   *     stale, and thus eligible for refresh

   * @param unit the unit that {@code duration} is expressed in

   * @throws IllegalArgumentException if {@code duration} is negative

   * @throws IllegalStateException if the refresh interval was already set

   * @since 11.0

   */

  @Beta

  @GwtIncompatible("To be supported (synchronously).")

  public CacheBuilder<K, V> refreshAfterWrite(long duration, TimeUnit unit) {

    checkNotNull(unit);

    checkState(refreshNanos == UNSET_INT, "refresh was already set to %s ns", refreshNanos);

    checkArgument(duration > 0, "duration must be positive: %s %s", duration, unit);

    this.refreshNanos = unit.toNanos(duration);

    return this;

  }

  long getRefreshNanos() {

    return (refreshNanos == UNSET_INT) ? DEFAULT_REFRESH_NANOS : refreshNanos;

  }

  /**

   * Specifies a nanosecond-precision time source for use in determining when entries should be

   * expired. By default, {@link System#nanoTime} is used.

   *

   * <p>The primary intent of this method is to facilitate testing of caches which have been

   * configured with {@link #expireAfterWrite} or {@link #expireAfterAccess}.

   *

   * @throws IllegalStateException if a ticker was already set

   */

  public CacheBuilder<K, V> ticker(Ticker ticker) {

    checkState(this.ticker == null);

    this.ticker = checkNotNull(ticker);

    return this;

  }

  Ticker getTicker(boolean recordsTime) {

    if (ticker != null) {

      return ticker;

    }

    return recordsTime ? Ticker.systemTicker() : NULL_TICKER;

  }

  /**

   * Specifies a listener instance that caches should notify each time an entry is removed for any

   * {@linkplain RemovalCause reason}. Each cache created by this builder will invoke this listener

   * as part of the routine maintenance described in the class documentation above.

   *

   * <p><b>Warning:</b> after invoking this method, do not continue to use <i>this</i> cache

   * builder reference; instead use the reference this method <i>returns</i>. At runtime, these

   * point to the same instance, but only the returned reference has the correct generic type

   * information so as to ensure type safety. For best results, use the standard method-chaining

   * idiom illustrated in the class documentation above, configuring a builder and building your

   * cache in a single statement. Failure to heed this advice can result in a {@link

   * ClassCastException} being thrown by a cache operation at some <i>undefined</i> point in the

   * future.

   *

   * <p><b>Warning:</b> any exception thrown by {@code listener} will <i>not</i> be propagated to

   * the {@code Cache} user, only logged via a {@link Logger}.

   *

   * @return the cache builder reference that should be used instead of {@code this} for any

   *     remaining configuration and cache building

   * @throws IllegalStateException if a removal listener was already set

   */

  @CheckReturnValue

  public <K1 extends K, V1 extends V> CacheBuilder<K1, V1> removalListener(

      RemovalListener<? super K1, ? super V1> listener) {

    checkState(this.removalListener == null);

    // safely limiting the kinds of caches this can produce

    @SuppressWarnings("unchecked")

    CacheBuilder<K1, V1> me = (CacheBuilder<K1, V1>) this;

    me.removalListener = checkNotNull(listener);

    return me;

  }

  // Make a safe contravariant cast now so we don't have to do it over and over.

  @SuppressWarnings("unchecked")

  <K1 extends K, V1 extends V> RemovalListener<K1, V1> getRemovalListener() {

    return (RemovalListener<K1, V1>)

        MoreObjects.firstNonNull(removalListener, NullListener.INSTANCE);

  }

  /**

   * Enable the accumulation of {@link CacheStats} during the operation of the cache. Without this

   * {@link Cache#stats} will return zero for all statistics. Note that recording stats requires

   * bookkeeping to be performed with each operation, and thus imposes a performance penalty on

   * cache operation.

   *

   * @since 12.0 (previously, stats collection was automatic)

   */

  public CacheBuilder<K, V> recordStats() {

    statsCounterSupplier = CACHE_STATS_COUNTER;

    return this;

  }

  

  boolean isRecordingStats() {

    return statsCounterSupplier == CACHE_STATS_COUNTER;

  }

  Supplier<? extends StatsCounter> getStatsCounterSupplier() {

    return statsCounterSupplier;

  }

  /**

   * Builds a cache, which either returns an already-loaded value for a given key or atomically

   * computes or retrieves it using the supplied {@code CacheLoader}. If another thread is currently

   * loading the value for this key, simply waits for that thread to finish and returns its

   * loaded value. Note that multiple threads can concurrently load values for distinct keys.

   *

   * <p>This method does not alter the state of this {@code CacheBuilder} instance, so it can be

   * invoked again to create multiple independent caches.

   *

   * @param loader the cache loader used to obtain new values

   * @return a cache having the requested features

   */

  public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(

      CacheLoader<? super K1, V1> loader) {

    checkWeightWithWeigher();

    return new LocalCache.LocalLoadingCache<K1, V1>(this, loader);

  }

  /**

   * Builds a cache which does not automatically load values when keys are requested.

   *

   * <p>Consider {@link #build(CacheLoader)} instead, if it is feasible to implement a

   * {@code CacheLoader}.

   *

   * <p>This method does not alter the state of this {@code CacheBuilder} instance, so it can be

   * invoked again to create multiple independent caches.

   *

   * @return a cache having the requested features

   * @since 11.0

   */

  public <K1 extends K, V1 extends V> Cache<K1, V1> build() {

    checkWeightWithWeigher();

    checkNonLoadingCache();

    return new LocalCache.LocalManualCache<K1, V1>(this);

  }

}

  如上,使用建造者模式创建 LoadingCache<K, V> 缓存; 设置好 最大值,过期时间等参数;

2、如何获取一个guava缓存?

  其实就是一个get方法而已! stringDbCacheContainer.get(key);

    // com.google.common.cache.LocalCache

    // LoadingCache methods

    @Override

    public V get(K key) throws ExecutionException {

        // 两种数据来源,一是直接获取,二是调用 load() 方法加载数据

      return localCache.getOrLoad(key);

    }

    // com.google.common.cache.LocalCache

  V getOrLoad(K key) throws ExecutionException {

    return get(key, defaultLoader);

  }

  V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {

    int hash = hash(checkNotNull(key));

    // 还记得 ConcurrentHashMap 吗? 先定位segment, 再定位 entry

    return segmentFor(hash).get(key, hash, loader);

  }

  Segment<K, V> segmentFor(int hash) {

    // TODO(fry): Lazily create segments?

    return segments[(hash >>> segmentShift) & segmentMask];

  }

    // 核心取数逻辑在此get 中

    // loading

    V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {

      checkNotNull(key);

      checkNotNull(loader);

      try {

        if (count != 0) { // read-volatile

          // don't call getLiveEntry, which would ignore loading values

          ReferenceEntry<K, V> e = getEntry(key, hash);

          if (e != null) {

            // 如果存在值,则依据 ticker 进行判断是否过期,从而直接返回值,具体过期逻辑稍后再说

            long now = map.ticker.read();

            V value = getLiveValue(e, now);

            if (value != null) {

              recordRead(e, now);

              statsCounter.recordHits(1);

              return scheduleRefresh(e, key, hash, value, now, loader);

            }

            ValueReference<K, V> valueReference = e.getValueReference();

            if (valueReference.isLoading()) {

              return waitForLoadingValue(e, key, valueReference);

            }

          }

        }

        // 初次加载或过期之后,进入加载逻辑,重要

        // at this point e is either null or expired;

        return lockedGetOrLoad(key, hash, loader);

      } catch (ExecutionException ee) {

        Throwable cause = ee.getCause();

        if (cause instanceof Error) {

          throw new ExecutionError((Error) cause);

        } else if (cause instanceof RuntimeException) {

          throw new UncheckedExecutionException(cause);

        }

        throw ee;

      } finally {

        postReadCleanup();

      }

    }

    // static class Segment<K, V> extends ReentrantLock

    // 整个 Segment 继承了 ReentrantLock, 所以 LocalCache 的锁是依赖于 ReentrantLock 实现的

    V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader)

        throws ExecutionException {

      ReferenceEntry<K, V> e;

      ValueReference<K, V> valueReference = null;

      LoadingValueReference<K, V> loadingValueReference = null;

      boolean createNewEntry = true;

      lock();

      try {

        // re-read ticker once inside the lock

        long now = map.ticker.read();

        // 在更新值前,先把过期数据清除

        preWriteCleanup(now);

        int newCount = this.count - 1;

        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;

        int index = hash & (table.length() - 1);

        ReferenceEntry<K, V> first = table.get(index);

        // 处理 hash 碰撞时的链表查询

        for (e = first; e != null; e = e.getNext()) {

          K entryKey = e.getKey();

          if (e.getHash() == hash && entryKey != null

              && map.keyEquivalence.equivalent(key, entryKey)) {

            valueReference = e.getValueReference();

            if (valueReference.isLoading()) {

              createNewEntry = false;

            } else {

              V value = valueReference.get();

              if (value == null) {

                enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED);

              } else if (map.isExpired(e, now)) {

                // This is a duplicate check, as preWriteCleanup already purged expired

                // entries, but let's accomodate an incorrect expiration queue.

                enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED);

              } else {

                recordLockedRead(e, now);

                statsCounter.recordHits(1);

                // we were concurrent with loading; don't consider refresh

                return value;

              }

              // immediately reuse invalid entries

              writeQueue.remove(e);

              accessQueue.remove(e);

              this.count = newCount; // write-volatile

            }

            break;

          }

        }

        // 如果是第一次加载,则先创建 Entry, 进入 load() 逻辑

        if (createNewEntry) {

          loadingValueReference = new LoadingValueReference<K, V>();

          if (e == null) {

            e = newEntry(key, hash, first);

            e.setValueReference(loadingValueReference);

            table.set(index, e);

          } else {

            e.setValueReference(loadingValueReference);

          }

        }

      } finally {

        unlock();

        postWriteCleanup();

      }

      if (createNewEntry) {

        try {

          // Synchronizes on the entry to allow failing fast when a recursive load is

          // detected. This may be circumvented when an entry is copied, but will fail fast most

          // of the time.

          // 同步加载数据源值, 从 loader 中处理

          synchronized (e) {

            return loadSync(key, hash, loadingValueReference, loader);

          }

        } finally {

            // 记录未命中计数,默认为空

          statsCounter.recordMisses(1);

        }

      } else {

        // The entry already exists. Wait for loading.

        // 如果有线程正在更新缓存,则等待结果即可,具体实现就是调用 Future.get() 

        return waitForLoadingValue(e, key, valueReference);

      }

    }

    // 加载原始值

    // at most one of loadSync.loadAsync may be called for any given LoadingValueReference

    V loadSync(K key, int hash, LoadingValueReference<K, V> loadingValueReference,

        CacheLoader<? super K, V> loader) throws ExecutionException {

        // loadingValueReference中保存了回调引用,加载原始值

      ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);

      // 存储数据入缓存,以便下次使用

      return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);

    }

    // 从 loader 中加载数据, 

    public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {

      stopwatch.start();

      V previousValue = oldValue.get();

      try {

        // 如果原来没有值,则直接加载后返回

        if (previousValue == null) {

          V newValue = loader.load(key);

          return set(newValue) ? futureValue : Futures.immediateFuture(newValue);

        }

        // 否则一般为无过期时间的数据进行 reload, 如果 reload() 的结果为空,则直接返回

        // 须重写 reload() 实现

        ListenableFuture<V> newValue = loader.reload(key, previousValue);

        if (newValue == null) {

          return Futures.immediateFuture(null);

        }

        // To avoid a race, make sure the refreshed value is set into loadingValueReference

        // *before* returning newValue from the cache query.

        return Futures.transform(newValue, new Function<V, V>() {

          @Override

          public V apply(V newValue) {

            LoadingValueReference.this.set(newValue);

            return newValue;

          }

        });

      } catch (Throwable t) {

        if (t instanceof InterruptedException) {

          Thread.currentThread().interrupt();

        }

        return setException(t) ? futureValue : fullyFailedFuture(t);

      }

    }

    // com.google.common.util.concurrent.Uninterruptibles

    /**

     * Waits uninterruptibly for {@code newValue} to be loaded, and then records loading stats.

     */

    V getAndRecordStats(K key, int hash, LoadingValueReference<K, V> loadingValueReference,

        ListenableFuture<V> newValue) throws ExecutionException {

      V value = null;

      try {

        // 同步等待加载结果,注意,此处返回值不允许为null, 否则将报异常,这可能是为了规避缓存攻击漏洞吧

        value = getUninterruptibly(newValue);

        if (value == null) {

          throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");

        }

        // 加载成功记录,此处扩展点,默认为空

        statsCounter.recordLoadSuccess(loadingValueReference.elapsedNanos());

        // 最后将值存入缓存容器中,返回(论hash的重要性)

        storeLoadedValue(key, hash, loadingValueReference, value);

        return value;

      } finally {

        if (value == null) {

          statsCounter.recordLoadException(loadingValueReference.elapsedNanos());

          removeLoadingValue(key, hash, loadingValueReference);

        }

      }

    }

    

  /**

   * Invokes {@code future.}{@link Future#get() get()} uninterruptibly.

   * To get uninterruptibility and remove checked exceptions, see

   * {@link Futures#getUnchecked}.

   *

   * <p>If instead, you wish to treat {@link InterruptedException} uniformly

   * with other exceptions, see {@link Futures#get(Future, Class) Futures.get}

   * or {@link Futures#makeChecked}.

   *

   * @throws ExecutionException if the computation threw an exception

   * @throws CancellationException if the computation was cancelled

   */

  public static <V> V getUninterruptibly(Future<V> future)

      throws ExecutionException {

    boolean interrupted = false;

    try {

      while (true) {

        try {

          return future.get();

        } catch (InterruptedException e) {

          interrupted = true;

        }

      }

    } finally {

      if (interrupted) {

        Thread.currentThread().interrupt();

      }

    }

  }

  如上,就是获取一个缓存的过程。总结下来就是:

  1. 先使用hash定位到 segment中,然后尝试直接到 map中获取结果;

  2. 如果没有找到或者已过期,则调用客户端的load()方法加载原始数据;

  3. 将结果存入 segment.map 中,本地缓存生效;

  4. 记录命中情况,读取计数;

3、如何处理过期?

   其实刚刚我们在看get()方法时,就看到了一些端倪。

  要确认两点: 1. 是否有创建异步清理线程进行过期数据清理? 2. 清理过程中,原始数据如何自处?

  其实guava的清理时机是在加载数据之前进行的!

    // com.google.common.cache.LocalCache

    // static class Segment<K, V> extends ReentrantLock

    // 整个 Segment 继承了 ReentrantLock, 所以 LocalCache 的锁是依赖于 ReentrantLock 实现的

    V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader)

        throws ExecutionException {

      ReferenceEntry<K, V> e;

      ValueReference<K, V> valueReference = null;

      LoadingValueReference<K, V> loadingValueReference = null;

      boolean createNewEntry = true;

      lock();

      try {

        // re-read ticker once inside the lock

        long now = map.ticker.read();

        // 在更新值前,先把过期数据清除

        preWriteCleanup(now);

        int newCount = this.count - 1;

        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;

        int index = hash & (table.length() - 1);

        ReferenceEntry<K, V> first = table.get(index);

        // 处理 hash 碰撞时的链表查询

        for (e = first; e != null; e = e.getNext()) {

          K entryKey = e.getKey();

          if (e.getHash() == hash && entryKey != null

              && map.keyEquivalence.equivalent(key, entryKey)) {

            valueReference = e.getValueReference();

            if (valueReference.isLoading()) {

              createNewEntry = false;

            } else {

              V value = valueReference.get();

              if (value == null) {

                enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED);

              } else if (map.isExpired(e, now)) {

                // This is a duplicate check, as preWriteCleanup already purged expired

                // entries, but let's accomodate an incorrect expiration queue.

                enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED);

              } else {

                recordLockedRead(e, now);

                statsCounter.recordHits(1);

                // we were concurrent with loading; don't consider refresh

                return value;

              }

              // immediately reuse invalid entries

              writeQueue.remove(e);

              accessQueue.remove(e);

              this.count = newCount; // write-volatile

            }

            break;

          }

        }

        // 如果是第一次加载,则先创建 Entry, 进入 load() 逻辑

        if (createNewEntry) {

          loadingValueReference = new LoadingValueReference<K, V>();

          if (e == null) {

            e = newEntry(key, hash, first);

            e.setValueReference(loadingValueReference);

            table.set(index, e);

          } else {

            e.setValueReference(loadingValueReference);

          }

        }

      } finally {

        unlock();

        postWriteCleanup();

      }

      if (createNewEntry) {

        try {

          // Synchronizes on the entry to allow failing fast when a recursive load is

          // detected. This may be circumvented when an entry is copied, but will fail fast most

          // of the time.

          // 同步加载数据源值, 从 loader 中处理

          synchronized (e) {

            return loadSync(key, hash, loadingValueReference, loader);

          }

        } finally {

            // 记录未命中计数,默认为空

          statsCounter.recordMisses(1);

        }

      } else {

        // The entry already exists. Wait for loading.

        return waitForLoadingValue(e, key, valueReference);

      }

    }

    // 我们来细看下 preWriteCleanup(now);  是如何清理过期数据的

    /**

     * Performs routine cleanup prior to executing a write. This should be called every time a

     * write thread acquires the segment lock, immediately after acquiring the lock.

     *

     * <p>Post-condition: expireEntries has been run.

     */

    @GuardedBy("this")

    void preWriteCleanup(long now) {

      runLockedCleanup(now);

    }

    void runLockedCleanup(long now) {

        // 再次确保清理数据时,锁是存在的

      if (tryLock()) {

        try {

            // 当存在特殊类型数据时,可以先进行清理

          drainReferenceQueues();

          // 清理过期数据,按时间清理

          expireEntries(now); // calls drainRecencyQueue

          // 读计数清零

          readCount.set(0);

        } finally {

          unlock();

        }

      }

    }

    /**

     * Drain the key and value reference queues, cleaning up internal entries containing garbage

     * collected keys or values.

     */

    @GuardedBy("this")

    void drainReferenceQueues() {

      if (map.usesKeyReferences()) {

        drainKeyReferenceQueue();

      }

      if (map.usesValueReferences()) {

        drainValueReferenceQueue();

      }

    }

    @GuardedBy("this")

    void expireEntries(long now) {

        // 更新最近的访问队列

      drainRecencyQueue();

      ReferenceEntry<K, V> e;

      // 从头部开始取元素,如果过期就进行清理

      // 写队列超时: 清理, 访问队列超时: 清理

      while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {

        if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {

          throw new AssertionError();

        }

      }

      while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {

        if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {

          throw new AssertionError();

        }

      }

    }

    @Override

    public ReferenceEntry<K, V> peek() {

      ReferenceEntry<K, V> next = head.getNextInAccessQueue();

      return (next == head) ? null : next;

    }

    // 清理指定类型的元素,如 过期元素

    @GuardedBy("this")

    boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) {

      int newCount = this.count - 1;

      AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;

      int index = hash & (table.length() - 1);

      ReferenceEntry<K, V> first = table.get(index);

      for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {

        if (e == entry) {

          ++modCount;

          // 调用 removeValueFromChain, 清理具体元素

          ReferenceEntry<K, V> newFirst = removeValueFromChain(

              first, e, e.getKey(), hash, e.getValueReference(), cause);

          newCount = this.count - 1;

          table.set(index, newFirst);

          this.count = newCount; // write-volatile

          return true;

        }

      }

      return false;

    }

    @GuardedBy("this")

    @Nullable

    ReferenceEntry<K, V> removeValueFromChain(ReferenceEntry<K, V> first,

        ReferenceEntry<K, V> entry, @Nullable K key, int hash, ValueReference<K, V> valueReference,

        RemovalCause cause) {

      enqueueNotification(key, hash, valueReference, cause);

      // 清理两队列

      writeQueue.remove(entry);

      accessQueue.remove(entry);

      if (valueReference.isLoading()) {

        valueReference.notifyNewValue(null);

        return first;

      } else {

        return removeEntryFromChain(first, entry);

      }

    }

    @GuardedBy("this")

    @Nullable

    ReferenceEntry<K, V> removeEntryFromChain(ReferenceEntry<K, V> first,

        ReferenceEntry<K, V> entry) {

      int newCount = count;

      // 普通情况,则直接返回 next 元素链即可

      // 针对有first != entry 的情况,则依次将 first 移动到队尾,然后跳到下一个元素返回

      ReferenceEntry<K, V> newFirst = entry.getNext();

      for (ReferenceEntry<K, V> e = first; e != entry; e = e.getNext()) {

        // 将first链表倒转到 newFirst 尾部

        ReferenceEntry<K, V> next = copyEntry(e, newFirst);

        if (next != null) {

          newFirst = next;

        } else {

          removeCollectedEntry(e);

          newCount--;

        }

      }

      this.count = newCount;

      return newFirst;

    }

    

  到此,我们就完整的看到了一个 key 的过期处理流程了。总结就是:

  1. 在读取的时候,触发清理操作;

  2. 使用 ReentrantLock 来进行线程安全的更新;

  3. 读取计数器清零,元素数量减少;

  3. 怎样主动放入一个缓存?

  这个和普通的map的put方法一样,简单看下即可!

    // com.google.common.cache.LocalCache$LocalManualCache

    @Override

    public void put(K key, V value) {

      localCache.put(key, value);

    }

    

    // com.google.common.cache.LocalCache

  @Override

  public V put(K key, V value) {

    checkNotNull(key);

    checkNotNull(value);

    int hash = hash(key);

    return segmentFor(hash).put(key, hash, value, false);

  }

  

    // com.google.common.cache.LocalCache$Segment

    @Nullable

    V put(K key, int hash, V value, boolean onlyIfAbsent) {

      lock();

      try {

        long now = map.ticker.read();

        preWriteCleanup(now);

        int newCount = this.count + 1;

        if (newCount > this.threshold) { // ensure capacity

          expand();

          newCount = this.count + 1;

        }

        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;

        int index = hash & (table.length() - 1);

        ReferenceEntry<K, V> first = table.get(index);

        // Look for an existing entry.

        for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {

          K entryKey = e.getKey();

          if (e.getHash() == hash && entryKey != null

              && map.keyEquivalence.equivalent(key, entryKey)) {

            // We found an existing entry.

            ValueReference<K, V> valueReference = e.getValueReference();

            V entryValue = valueReference.get();

            if (entryValue == null) {

              ++modCount;

              if (valueReference.isActive()) {

                enqueueNotification(key, hash, valueReference, RemovalCause.COLLECTED);

                setValue(e, key, value, now);

                newCount = this.count; // count remains unchanged

              } else {

                setValue(e, key, value, now);

                newCount = this.count + 1;

              }

              this.count = newCount; // write-volatile

              evictEntries();

              return null;

            } else if (onlyIfAbsent) {

              // Mimic

              // "if (!map.containsKey(key)) ...

              // else return map.get(key);

              recordLockedRead(e, now);

              return entryValue;

            } else {

              // clobber existing entry, count remains unchanged

              ++modCount;

              enqueueNotification(key, hash, valueReference, RemovalCause.REPLACED);

              setValue(e, key, value, now);

              evictEntries();

              return entryValue;

            }

          }

        }

        // Create a new entry.

        ++modCount;

        ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);

        setValue(newEntry, key, value, now);

        table.set(index, newEntry);

        newCount = this.count + 1;

        this.count = newCount; // write-volatile

        evictEntries();

        return null;

      } finally {

        unlock();

        postWriteCleanup();

      }

    }

  就这样,基于guava的二级缓存功能就搞定了。

原文链接:https://www.qiquanji.com/post/8448.html

本站声明:网站内容来源于网络,如有侵权,请联系我们,我们将及时处理。

微信扫码关注

更新实时通知

« 上一篇 下一篇 »

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。