刷 leetcode 的时候碰到的这道题。LRUCache 在现实中也经常用到:
lru_cache
的实现。Leetcode 题目要求如下
/*
Design and implement a data structure for Least Recently Used (LRU) cache. It should support the following operations: get and put.
get(key) - Get the value (will always be positive) of the key if the key exists in the cache, otherwise return -1.
put(key, value) - Set or insert the value if the key is not already present. When the cache reached its capacity, it should invalidate the least recently used item before inserting a new item.
Follow up:
Could you do both operations in O(1) time complexity?
Example:
*/
LRUCache cache = new LRUCache( 2 /* capacity */ );
cache.put(1, 1);
cache.put(2, 2);
cache.get(1); // returns 1
cache.put(3, 3); // evicts key 2
cache.get(2); // returns -1 (not found)
cache.put(4, 4); // evicts key 1
cache.get(1); // returns -1 (not found)
cache.get(3); // returns 3
cache.get(4); // returns 4
关键点在于 get
和 put
操作的时间复杂度都需为 O(1)。
为了实现 O(1) 的复杂度,使用哈希表来存储对应的元素。然后通过双向链表来实现 lru cache
相关的逻辑,
get
时,将命中的节点移动到头部put
时如果命中已存在的节点,参照 get
操作,如果为新节点# 使用 双向链表和 hashmap 的组合来实现 get,put 都为 O(1) 的复杂度,空间复杂度为 O(N)
class Node:
def __init__(self, key, value, prev=None, next=None):
self.key = key
self.value = value
self.prev = prev
self.next = next
class LRUCache:
def __init__(self, capacity: int):
self.capacity = capacity
self._map = {}
# linked list with head
self._list = Node(None, None)
# 使用 last 指针来加快删除最后一个节点的速度
# 在以下情况需要更新 last 指针
# 1. 第一次插入数据时,需要将 last 指针指向新插入的节点
# 2. 删除最后一个节点时,需要将 last 指针前移
# 3. 当 get 或者 put 命中节点需要移动到链表顶端时。
# 如果需要移动的节点正好是最后一个节点(这时候 last 指针才有可能发生变化)
# 而且 last 指针之前不是头结点(如果是的话,移动后该节点还是尾节点,不需要变动 last 指针)
self._last = self._list
self.size = 0
def get(self, key: int) -> int:
if key not in self._map:
return -1
node = self._map[key]
self._move_to_first(node)
return node.value
def _move_to_first(self, node):
# 当移动的节点为最后一个节点,且该节点之前不为头结点
# 将 last 指针前移
if node == self._last and node.prev != self._list:
self._last = node.prev
# 首先删除当前节点,要额外考虑该节点为最后一个节点,
# 此时不需要调整之后节点的 prev 指针
node.prev.next = node.next
if node.next is not None:
node.next.prev = node.prev
self._insert_to_first(node)
def _insert_to_first(self, node):
node.next = self._list.next
node.prev = self._list
self._list.next = node
# 插入到头结点之后,如果该节点不是最后一个节点
# 同样要调整之后节点的 prev 指针
if node.next:
node.next.prev = node
def put(self, key: int, value: int) -> None:
if key in self._map:
node = self._map[key]
node.value = value
self._move_to_first(node)
return
self.size += 1
node = Node(key, value)
self._map[key] = node
if self.size > self.capacity:
# 直接根据 last 指针删除最后一个节点,然后前移 last 指针
self._last.prev.next = None
del self._map[self._last.key]
self._last = self._last.prev
# 如果插入时为空链表,设置 last 指针
if self._list.next is None:
self._last = node
self._insert_to_first(node)
初版实现的方法,需要不断判断删除,移动的是不是尾部指针,引入了很多不必要的 if
判断。而 Leetcode
讨论区里面提出了一个更好的方法。
原本我们通过引入 Dummy Head
已经简化了头部相关的操作,这里额外再引入一个 Dummy tail
,这样的话在移动删除尾部节点的时候就不需要额外判断了。
# 使用 双向链表和 hashmap 的组合来实现 get,put 都为 O(1) 的复杂度,空间复杂度为 O(N)
class Node:
def __init__(self, key, value, prev=None, next=None):
self.key = key
self.value = value
self.prev = prev
self.next = next
class LRUCache:
def __init__(self, capacity: int):
self.capacity = capacity
self._map = {}
# linked list with dummy head and tail
self._list = Node(None, None)
self._last = Node(None, None)
self._list.next = self._last
self.size = 0
def _delete_node(self, node):
node.prev.next = node.next
node.next.prev = node.prev
def get(self, key: int) -> int:
if key not in self._map:
return -1
node = self._map[key]
self._move_to_first(node)
return node.value
def _move_to_first(self, node):
self._delete_node(node)
self._insert_to_first(node)
def _insert_to_first(self, node):
node.next = self._list.next
node.prev = self._list
node.next.prev = node
self._list.next = node
def put(self, key: int, value: int) -> None:
if key in self._map:
node = self._map[key]
node.value = value
self._move_to_first(node)
return
self.size += 1
node = Node(key, value)
self._map[key] = node
if self.size > self.capacity:
del self._map[self._last.prev.key]
self._delete_node(self._last.prev)
self._insert_to_first(node)
# Your LRUCache object will be instantiated and called as such:
# obj = LRUCache(capacity)
# param_1 = obj.get(key)
# obj.put(key,value)
但是上面的方式还有一个问题,就是在 lru_cache
满了的时候,此时新增一个节点,会导致需要从链表中删除一个尾部的旧节点,然后同时在头部插入一个新节点。
有没有办法直接使用旧的删除的节点来代替新增的节点呢?这样在 LRUCache
满了的时候,put
新元素的性能会获得很大的提升。
而 Python 内部实现正是考虑了这一点,利用了带头结点root
的循环双向链表,避免了该问题。
root
作为新节点,使用原来的尾部节点即 root.prev
作为新的 root 节点。list
代替 node
节省空间。下面是 Python
的 lru_cache
的实现,因为原实现是装饰器,这里略作修改为类的实现:
# 使用 双向链表和 hashmap 的组合来实现 get,put 都为 O(1) 的复杂度,空间复杂度为 O(N)
PREV, NEXT, KEY, RESULT = 0, 1, 2, 3
class LRUCache:
def __init__(self, capacity: int):
self.capacity = capacity
self._cache = {}
# circular queue with root
self.root = []
self.root[:] = [self.root, self.root, None, None]
self.size = 0
def get(self, key: int) -> int:
if key not in self._cache:
return -1
node = self._cache[key]
self._move_to_front(node)
return node[RESULT]
def _delete_node(self, node):
node[PREV][NEXT] = node[NEXT]
node[NEXT][PREV] = node[PREV]
def _insert_to_front(self, node):
node[NEXT] = self.root
node[PREV] = self.root[PREV]
node[PREV][NEXT] = node[NEXT][PREV] = node
def _move_to_front(self, node):
self._delete_node(node)
self._insert_to_front(node)
def put(self, key: int, value: int) -> None:
if key in self._cache:
node = self._cache[key]
node[RESULT] = value
self._move_to_front(node)
return
self.size += 1
if self.size > self.capacity:
# 直接使用 root 节点作为新节点,然后通过将 root[NEXT] 的待删除节点设为新的 root 节点,避免了删除和分配新节点的消耗
self.root[KEY] = key
self.root[RESULT] = value
self._cache[key] = self.root
self.root = self.root[NEXT]
del self._cache[self.root[KEY]]
return
node = [None, None, key, value]
self._cache[key] = node
self._insert_to_front(node)
因为想到 redis 也实现了 lru cache
,就抽空看了下源码,发现跟想象中非常不一样,并不是常规的实现方式。
当 redis 达到设置的 maxmemory
,会从所有key 中随机抽样 5 个值,然后计算它们的 idle time,插入一个长度为 16 的待淘汰数组中,数组中的 entry 根据 idle time 升序排列,最右侧的就是接下来第一个被淘汰的。淘汰后如果内存还是不满足需要,则继续随机抽取 key
并循环以上过程。
因为是随机抽样,所以分为以下情况
抽样的 key idle 小于最左侧最小的 idle time,什么都不做,直接跳过
找到适合的插入位置 i
pool[i: end]
pool[0: i + 1]
,这样的话 idle 时间最小的就被淘汰了关键实现逻辑如下:
while (mem_freed < mem_tofree) {
sds bestkey = NULL;
struct evictionPoolEntry *pool = EvictionPoolLRU;
while(bestkey == NULL) {
evictionPoolPopulate(i, dict, db->dict, pool);
/* Go backward from best to worst element to evict. */
for (k = EVPOOL_SIZE-1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
de = dictFind(server.db[pool[k].dbid].dict,
pool[k].key);
/* Remove the entry from the pool. */
if (pool[k].key != pool[k].cached)
sdsfree(pool[k].key);
bestkey = dictGetKey(de);
break;
}
}
优点
get
、put
时间复杂度都为 O(1)locality
,即最近使用的数据很可能再次被使用缺点
long scan 的时候,会导致 lru 不断发生 evcit 行为。(数据库操作,从磁盘加载文件等,LFU 避免了该行为)
只利用了到了一部分的 locality,没有利用 最经常使用的数据很可能再次被使用
(LFU 做到了,但是更慢,Log(N))
在这篇文章还没完稿的时候,看到了 B 站的 LRUCache 的源码实现,下面就顺便来分析一下。下面是对应的源码:
package lrucache
// 这里是 Node 节点的定义,中规中矩
// Element - node to store cache item
type Element struct {
prev, next *Element
Key interface{}
Value interface{}
}
// Next - fetch older element
func (e *Element) Next() *Element {
return e.next
}
// Prev - fetch newer element
func (e *Element) Prev() *Element {
return e.prev
}
// 通过这个结构体,猜测使用的是跟 Leetcode 经典实现类似的带头尾伪节点的链表实现。
// 后面发现实际上并不是
// LRUCache - a data structure that is efficient to insert/fetch/delete cache items [both O(1) time complexity]
type LRUCache struct {
cache map[interface{}]*Element
head *Element
tail *Element
capacity int
}
// New - create a new lru cache object
func New(capacity int) *LRUCache {
// 可以看到初始化 LRUCache 时,并没有初始化 Dummy head 和 Dummy tail
// 不过这样岂不是在操作过程中需要进行很多 if 判断,后面的代码也验证了这点。
return &LRUCache{make(map[interface{}]*Element), nil, nil, capacity}
}
// Put - put a cache item into lru cache
func (lc *LRUCache) Put(key interface{}, value interface{}) {
if e, ok := lc.cache[key]; ok {
e.Value = value
// 等同之前的 _insert_to_front
lc.refresh(e)
return
}
if lc.capacity == 0 {
return
} else if len(lc.cache) >= lc.capacity {
// evict the oldest item
delete(lc.cache, lc.tail.Key)
// 等同之前的 _delete_node
lc.remove(lc.tail)
}
e := &Element{nil, lc.head, key, value}
lc.cache[key] = e
// 因为没有 Dummy head 和 Dummy tail 而带来的不必要的 if 判断
if len(lc.cache) != 1 {
lc.head.prev = e
} else {
lc.tail = e
}
lc.head = e
}
// Get - get value of key from lru cache with result
func (lc *LRUCache) Get(key interface{}) (interface{}, bool) {
if e, ok := lc.cache[key]; ok {
lc.refresh(e)
return e.Value, ok
}
return nil, false
}
func (lc *LRUCache) refresh(e *Element) {
// 可以看到这里很多的 if 判断,严重干扰了代码的阅读逻辑。
if e.prev != nil {
e.prev.next = e.next
if e.next == nil {
lc.tail = e.prev
} else {
e.next.prev = e.prev
}
e.prev = nil
e.next = lc.head
lc.head.prev = e
lc.head = e
}
}
func (lc *LRUCache) remove(e *Element) {
// 可以看到这里很多的 if 判断,严重干扰了代码的阅读逻辑。
if e.prev == nil {
lc.head = e.next
} else {
e.prev.next = e.next
}
if e.next == nil {
lc.tail = e.prev
} else {
e.next.prev = e.prev
}
}
怎么说呢,有点失望。一开始在看到结构定义了 head 、tail时以为是使用了 Dummy head 和 Dummy tail 的经典实现,但是在初始化时发现没有初始化对应的 head、tail,以为是使用了一种未知的新方法,但是一看 refresh 和 remove 的逻辑,发现是通过大量的 if、else 来判断 corner cases。
而大量使用 if 会严重干扰代码的可读性和可维护性,具体可见 Applying the Linus Torvalds “ Good Taste ” Coding Requirement 这篇文章。
跟网友的讨论中,有人又贴出了 google 的一版实现,在 Golang/groupcache
项目下。就顺便看了下对应的源码。
// Package lru implements an LRU cache.
package lru
import "container/list"
// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
// MaxEntries is the maximum number of cache entries before
// an item is evicted. Zero means no limit.
MaxEntries int
// OnEvicted optionally specifies a callback function to be
// executed when an entry is purged from the cache.
OnEvicted func(key Key, value interface{})
ll *list.List
cache map[interface{}]*list.Element
}
// Add adds a value to the cache.
func (c *Cache) Add(key Key, value interface{}) {
if c.cache == nil {
c.cache = make(map[interface{}]*list.Element)
c.ll = list.New()
}
if ee, ok := c.cache[key]; ok {
c.ll.MoveToFront(ee)
ee.Value.(*entry).value = value
return
}
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.RemoveOldest()
}
}
// Get looks up a key's value from the cache.
func (c *Cache) Get(key Key) (value interface{}, ok bool) {
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.ll.MoveToFront(ele)
return ele.Value.(*entry).value, true
}
return
}
关键点在于 container/list
,这是一个带头结点的循环双向链表,但是并没有暴露 root 节点,所以 google
的实现同 Leetcode
经典实现是一致的。
我还发了一个 issue 去询问为什么不采用类似 Python
的实现。官方的回答是目前够用,如果需要变更的话,需要 benchmark 的支持。理论上 Python
的实现在不断读取新数值的时候性能会好很多。
综合下来 Python 内置库 functools.lru_cache 的带头结点的双向循环队列的实现是最优雅的