LRUCache 的实现

4/22/2019

缘起

刷 leetcode 的时候碰到的这道题。LRUCache 在现实中也经常用到:

  • 内存换页,需要淘汰掉不常用的 page。
  • 缓存函数的结果,比如 Python 就自带的 lru_cache 的实现。
  • redis 在设置了 maxmemory 时,在内存占用达到最大值时会通过 LRU 淘汰掉对应的 key。

要求

Leetcode 题目要求如下

/* 
Design and implement a data structure for Least Recently Used (LRU) cache. It should support the following operations: get and put.

get(key) - Get the value (will always be positive) of the key if the key exists in the cache, otherwise return -1.
put(key, value) - Set or insert the value if the key is not already present. When the cache reached its capacity, it should invalidate the least recently used item before inserting a new item.

Follow up:
Could you do both operations in O(1) time complexity?

Example:
*/

LRUCache cache = new LRUCache( 2 /* capacity */ );

cache.put(1, 1);
cache.put(2, 2);
cache.get(1);       // returns 1
cache.put(3, 3);    // evicts key 2
cache.get(2);       // returns -1 (not found)
cache.put(4, 4);    // evicts key 1
cache.get(1);       // returns -1 (not found)
cache.get(3);       // returns 3
cache.get(4);       // returns 4

关键点在于 getput 操作的时间复杂度都需为 O(1)。

初版

为了实现 O(1) 的复杂度,使用哈希表来存储对应的元素。然后通过双向链表来实现 lru cache 相关的逻辑,

  • get 时,将命中的节点移动到头部
  • put 时如果命中已存在的节点,参照 get 操作,如果为新节点
    • 达到最大大小,删除尾部节点,然后在头部插入新节点
    • 未达到最大大小,则直接在头部插入新节点即可
# 使用 双向链表和 hashmap 的组合来实现 get,put 都为 O(1) 的复杂度,空间复杂度为 O(N)
class Node:
    def __init__(self, key, value, prev=None, next=None):
        self.key = key
        self.value = value
        self.prev = prev
        self.next = next
        
class LRUCache:

    def __init__(self, capacity: int):
        self.capacity = capacity
        self._map = {}
        # linked list with head
        self._list = Node(None, None)
        # 使用 last 指针来加快删除最后一个节点的速度
        # 在以下情况需要更新 last 指针
        # 1. 第一次插入数据时,需要将 last 指针指向新插入的节点
        # 2. 删除最后一个节点时,需要将 last 指针前移
        # 3. 当 get 或者 put 命中节点需要移动到链表顶端时。
        #    如果需要移动的节点正好是最后一个节点(这时候 last 指针才有可能发生变化)
        #    而且 last 指针之前不是头结点(如果是的话,移动后该节点还是尾节点,不需要变动 last 指针)
        self._last = self._list
        self.size = 0
        

    def get(self, key: int) -> int:
        if key not in self._map:
            return -1
        node = self._map[key]
        self._move_to_first(node)
        return node.value
   
    def _move_to_first(self, node):
        # 当移动的节点为最后一个节点,且该节点之前不为头结点
        # 将 last 指针前移    
        if node == self._last and node.prev != self._list:
            self._last = node.prev
        # 首先删除当前节点,要额外考虑该节点为最后一个节点,
        # 此时不需要调整之后节点的 prev 指针
        node.prev.next = node.next
        if node.next is not None:
            node.next.prev = node.prev
        self._insert_to_first(node)            
        
    def _insert_to_first(self, node):
        node.next = self._list.next
        node.prev = self._list
        self._list.next = node
        # 插入到头结点之后,如果该节点不是最后一个节点
        # 同样要调整之后节点的 prev 指针
        if node.next:
            node.next.prev = node
        
    def put(self, key: int, value: int) -> None:
        if key in self._map:
            node = self._map[key]
            node.value = value
            self._move_to_first(node)
            return
        self.size += 1
        node = Node(key, value)
        self._map[key] = node
        if self.size > self.capacity:
            # 直接根据 last 指针删除最后一个节点,然后前移 last 指针
            self._last.prev.next = None
            del self._map[self._last.key]
            self._last = self._last.prev
                
        # 如果插入时为空链表,设置 last 指针
        if self._list.next is None:
            self._last = node
        self._insert_to_first(node)        
        

Leetcode 经典实现

初版实现的方法,需要不断判断删除,移动的是不是尾部指针,引入了很多不必要的 if 判断。而 Leetcode 讨论区里面提出了一个更好的方法。

原本我们通过引入 Dummy Head 已经简化了头部相关的操作,这里额外再引入一个 Dummy tail ,这样的话在移动删除尾部节点的时候就不需要额外判断了。

# 使用 双向链表和 hashmap 的组合来实现 get,put 都为 O(1) 的复杂度,空间复杂度为 O(N)
class Node:
    def __init__(self, key, value, prev=None, next=None):
        self.key = key
        self.value = value
        self.prev = prev
        self.next = next
        
class LRUCache:

    def __init__(self, capacity: int):
        self.capacity = capacity
        self._map = {}
        # linked list with dummy head and tail
        self._list = Node(None, None)
        self._last = Node(None, None)
        self._list.next = self._last
        self.size = 0
    
    def _delete_node(self, node):
        node.prev.next = node.next
        node.next.prev = node.prev

    def get(self, key: int) -> int:
        if key not in self._map:
            return -1
        node = self._map[key]
        self._move_to_first(node)
        return node.value
   
    def _move_to_first(self, node):
        self._delete_node(node)
        self._insert_to_first(node)            
        
    def _insert_to_first(self, node):
        node.next = self._list.next
        node.prev = self._list
        
        node.next.prev = node
        self._list.next = node
        
    def put(self, key: int, value: int) -> None:
        if key in self._map:
            node = self._map[key]
            node.value = value
            self._move_to_first(node)
            return
        self.size += 1
        node = Node(key, value)
        self._map[key] = node
        if self.size > self.capacity:
            del self._map[self._last.prev.key]
            self._delete_node(self._last.prev)
                
        self._insert_to_first(node)        
        
            
        


# Your LRUCache object will be instantiated and called as such:
# obj = LRUCache(capacity)
# param_1 = obj.get(key)
# obj.put(key,value)

Python 内置的 LRUCache 实现

但是上面的方式还有一个问题,就是在 lru_cache 满了的时候,此时新增一个节点,会导致需要从链表中删除一个尾部的旧节点,然后同时在头部插入一个新节点。

有没有办法直接使用旧的删除的节点来代替新增的节点呢?这样在 LRUCache 满了的时候,put 新元素的性能会获得很大的提升。

而 Python 内部实现正是考虑了这一点,利用了带头结点root的循环双向链表,避免了该问题。

  • 添加新元素时,如果容量达到最大值,则直接利用 root 作为新节点,使用原来的尾部节点即 root.prev 作为新的 root 节点。
  • 使用 list 代替 node 节省空间。

下面是 Pythonlru_cache 的实现,因为原实现是装饰器,这里略作修改为类的实现:

# 使用 双向链表和 hashmap 的组合来实现 get,put 都为 O(1) 的复杂度,空间复杂度为 O(N)
PREV, NEXT, KEY, RESULT = 0, 1, 2, 3
        
class LRUCache:

    def __init__(self, capacity: int):
        self.capacity = capacity
        self._cache = {}
        # circular queue with root
        self.root = []
        self.root[:] = [self.root, self.root, None, None]
        self.size = 0
    
    def get(self, key: int) -> int:
        if key not in self._cache:
            return -1
        node = self._cache[key]
        self._move_to_front(node)
        
        return node[RESULT]
    
    def _delete_node(self, node):
        node[PREV][NEXT] = node[NEXT]
        node[NEXT][PREV] = node[PREV]
    
    def _insert_to_front(self, node):
        node[NEXT] = self.root
        node[PREV] = self.root[PREV]
        node[PREV][NEXT] = node[NEXT][PREV] = node
        
    def _move_to_front(self, node):
        self._delete_node(node)
        self._insert_to_front(node)            
        
        
    def put(self, key: int, value: int) -> None:
        if key in self._cache:
            node = self._cache[key]
            node[RESULT] = value
            self._move_to_front(node)
            return
        self.size += 1
        if self.size > self.capacity:
            # 直接使用 root 节点作为新节点,然后通过将 root[NEXT] 的待删除节点设为新的 root 节点,避免了删除和分配新节点的消耗
            self.root[KEY] = key
            self.root[RESULT] = value
            self._cache[key] = self.root
            
            self.root = self.root[NEXT]
            del self._cache[self.root[KEY]]
            return
        
        node = [None, None, key, value]
        self._cache[key] = node
        self._insert_to_front(node)        
        
            

redis 的 LRU 淘汰算法的实现

因为想到 redis 也实现了 lru cache,就抽空看了下源码,发现跟想象中非常不一样,并不是常规的实现方式。

当 redis 达到设置的 maxmemory,会从所有key 中随机抽样 5 个值,然后计算它们的 idle time,插入一个长度为 16 的待淘汰数组中,数组中的 entry 根据 idle time 升序排列,最右侧的就是接下来第一个被淘汰的。淘汰后如果内存还是不满足需要,则继续随机抽取 key 并循环以上过程。

因为是随机抽样,所以分为以下情况

  • 抽样的 key idle 小于最左侧最小的 idle time,什么都不做,直接跳过

  • 找到适合的插入位置 i

    • 如果 pool 还有剩 余空间,右移 pool[i: end]
    • 没有的话,左移pool[0: i + 1],这样的话 idle 时间最小的就被淘汰了

关键实现逻辑如下:

    while (mem_freed < mem_tofree) {
          sds bestkey = NULL;
          struct evictionPoolEntry *pool = EvictionPoolLRU;
           while(bestkey == NULL) {  
                evictionPoolPopulate(i, dict, db->dict, pool);
       
              /* Go backward from best to worst element to evict. */
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                                        if (pool[k].key == NULL) continue;
                                            de = dictFind(server.db[pool[k].dbid].dict,
                            pool[k].key);
                    
                                        /* Remove the entry from the pool. */
                    if (pool[k].key != pool[k].cached)
                        sdsfree(pool[k].key);
                     bestkey = dictGetKey(de);
                        break;
                }
           }

优缺点

优点

  1. LRU 实现简单,getput 时间复杂度都为 O(1)
  2. 利用了 locality ,即最近使用的数据很可能再次被使用
  3. 能更快的对短期行为作出反应

缺点

  1. long scan 的时候,会导致 lru 不断发生 evcit 行为。(数据库操作,从磁盘加载文件等,LFU 避免了该行为)

  2. 只利用了到了一部分的 locality,没有利用 最经常使用的数据很可能再次被使用(LFU 做到了,但是更慢,Log(N))

B 站源码解析之 LRUCache 实现

在这篇文章还没完稿的时候,看到了 B 站的 LRUCache 的源码实现,下面就顺便来分析一下。下面是对应的源码:

package lrucache

// 这里是 Node 节点的定义,中规中矩
// Element - node to store cache item
type Element struct {
	prev, next *Element
	Key        interface{}
	Value      interface{}
}

// Next - fetch older element
func (e *Element) Next() *Element {
	return e.next
}

// Prev - fetch newer element
func (e *Element) Prev() *Element {
	return e.prev
}

// 通过这个结构体,猜测使用的是跟 Leetcode 经典实现类似的带头尾伪节点的链表实现。
// 后面发现实际上并不是
// LRUCache - a data structure that is efficient to insert/fetch/delete cache items [both O(1) time complexity]
type LRUCache struct {
	cache    map[interface{}]*Element
	head     *Element
	tail     *Element
	capacity int
}

// New - create a new lru cache object
func New(capacity int) *LRUCache {
    // 可以看到初始化 LRUCache 时,并没有初始化 Dummy head 和 Dummy tail
    // 不过这样岂不是在操作过程中需要进行很多 if 判断,后面的代码也验证了这点。
	return &LRUCache{make(map[interface{}]*Element), nil, nil, capacity}
}

// Put - put a cache item into lru cache
func (lc *LRUCache) Put(key interface{}, value interface{}) {
	if e, ok := lc.cache[key]; ok {
		e.Value = value
        // 等同之前的 _insert_to_front
		lc.refresh(e)
		return
	}

	if lc.capacity == 0 {
		return
	} else if len(lc.cache) >= lc.capacity {
		// evict the oldest item
		delete(lc.cache, lc.tail.Key)
        // 等同之前的 _delete_node
		lc.remove(lc.tail)
	}

	e := &Element{nil, lc.head, key, value}
	lc.cache[key] = e
    // 因为没有 Dummy head 和 Dummy tail 而带来的不必要的 if 判断
	if len(lc.cache) != 1 {
		lc.head.prev = e
	} else {
		lc.tail = e
	}
	lc.head = e
}

// Get - get value of key from lru cache with result
func (lc *LRUCache) Get(key interface{}) (interface{}, bool) {
	if e, ok := lc.cache[key]; ok {
		lc.refresh(e)
		return e.Value, ok
	}
	return nil, false
}


func (lc *LRUCache) refresh(e *Element) {
    // 可以看到这里很多的 if 判断,严重干扰了代码的阅读逻辑。
	if e.prev != nil {
		e.prev.next = e.next
		if e.next == nil {
			lc.tail = e.prev
		} else {
			e.next.prev = e.prev
		}
		e.prev = nil
		e.next = lc.head
		lc.head.prev = e
		lc.head = e
	}
}

func (lc *LRUCache) remove(e *Element) {
    // 可以看到这里很多的 if 判断,严重干扰了代码的阅读逻辑。
	if e.prev == nil {
		lc.head = e.next
	} else {
		e.prev.next = e.next
	}
	if e.next == nil {
		lc.tail = e.prev
	} else {
		e.next.prev = e.prev
	}
}

  怎么说呢,有点失望。一开始在看到结构定义了 head 、tail时以为是使用了 Dummy head 和 Dummy tail 的经典实现,但是在初始化时发现没有初始化对应的 head、tail,以为是使用了一种未知的新方法,但是一看 refresh 和 remove 的逻辑,发现是通过大量的 if、else 来判断 corner cases。

  而大量使用 if 会严重干扰代码的可读性和可维护性,具体可见 Applying the Linus Torvalds “ Good Taste ” Coding Requirement 这篇文章。

Golang/groupcache LRUCache 实现

跟网友的讨论中,有人又贴出了 google 的一版实现,在 Golang/groupcache 项目下。就顺便看了下对应的源码。

// Package lru implements an LRU cache.
package lru

import "container/list"

// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
	// MaxEntries is the maximum number of cache entries before
	// an item is evicted. Zero means no limit.
	MaxEntries int

	// OnEvicted optionally specifies a callback function to be
	// executed when an entry is purged from the cache.
	OnEvicted func(key Key, value interface{})

	ll    *list.List
	cache map[interface{}]*list.Element
}

// Add adds a value to the cache.
func (c *Cache) Add(key Key, value interface{}) {
	if c.cache == nil {
		c.cache = make(map[interface{}]*list.Element)
		c.ll = list.New()
	}
	if ee, ok := c.cache[key]; ok {
		c.ll.MoveToFront(ee)
		ee.Value.(*entry).value = value
		return
	}
	ele := c.ll.PushFront(&entry{key, value})
	c.cache[key] = ele
	if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
		c.RemoveOldest()
	}
}

// Get looks up a key's value from the cache.
func (c *Cache) Get(key Key) (value interface{}, ok bool) {
	if c.cache == nil {
		return
	}
	if ele, hit := c.cache[key]; hit {
		c.ll.MoveToFront(ele)
		return ele.Value.(*entry).value, true
	}
	return
}

关键点在于 container/list ,这是一个带头结点的循环双向链表,但是并没有暴露 root 节点,所以 google 的实现同 Leetcode 经典实现是一致的。 我还发了一个 issue 去询问为什么不采用类似 Python 的实现。官方的回答是目前够用,如果需要变更的话,需要 benchmark 的支持。理论上 Python 的实现在不断读取新数值的时候性能会好很多。

综合下来 Python 内置库 functools.lru_cache 的带头结点的双向循环队列的实现是最优雅的