存储是优先的,满了去掉谁
访问次数,去掉访问最少的
LRU 认为,如果数据最近被访问过,那么将来被访问的概率也会更高。
整体逻辑并不复杂,包装 entry 是为了删除队尾的时候方便
delete(c.cache, kv.key)
Gopackage lru import "container/list" type Value interface { Len() int } type Cache struct { maxBytes int64 nbytes int64 // 使用双向链表 ll *list.List // 使用 map cache map[string]*list.Element OnEvicted func(key string, value Value) } func New(maxBytes int64, onEvicted func(string2 string, value Value)) *Cache { return &Cache{ maxBytes: maxBytes, ll: list.New(), cache: make(map[string]*list.Element), OnEvicted: onEvicted, } } type entry struct { key string value Value } func (c *Cache) Add(key string, value Value) { if ele, ok := c.cache[key]; ok { // LRU 调整顺序到头部 头部是最近使用的,尾部是最久未使用的 c.ll.MoveToFront(ele) c.nbytes += int64(len(key)) + int64(value.Len()) kv := ele.Value.(*entry) kv.value = value } else { // 新增 ele := c.ll.PushFront(&entry{key, value}) c.cache[key] = ele c.nbytes += int64(len(key)) + int64(value.Len()) } for c.maxBytes != 0 && c.maxBytes < c.nbytes { c.RemoveOldest() } } func (c *Cache) RemoveOldest() { ele := c.ll.Back() if ele == nil { return } c.ll.Remove(ele) kv := ele.Value.(*entry) delete(c.cache, kv.key) c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len()) if c.OnEvicted != nil { c.OnEvicted(kv.key, kv.value) } } func (c *Cache) Get(key string) (value Value, ok bool) { if ele, ok := c.cache[key]; ok { c.ll.MoveToFront(ele) kv := ele.Value.(*entry) return kv.value, true } return } func (c *Cache) Len() int { return c.ll.Len() }
为现有的 Cache 增加并发读取的能力
先看看使用层
Gofunc TestGet(t *testing.T) { loadCounts := make(map[string]int, len(db)) gee := NewGroup("scores", 2<<10, GetterFunc( func(key string) ([]byte, error) { log.Println("[SlowDB] search key", key) if v, ok := db[key]; ok { if _, ok := loadCounts[key]; !ok { loadCounts[key] = 0 } loadCounts[key] += 1 return []byte(v), nil } return nil, fmt.Errorf("%s not exist", key) })) for k, v := range db { if view, err := gee.Get(k); err != nil || view.String() != v { t.Fatal("failed to get value of Tom") } // load from callback function if _, err := gee.Get(k); err != nil || loadCounts[k] > 1 { t.Fatalf("cache %s miss", k) } // cache hit } if view, err := gee.Get("unknown"); err == nil { t.Fatalf("the value of unknow should be empty, but %s got", view) } }
NewGroup
创建返回 gee,也就是缓存实例,创建的时候第三个参数是缓存未命中时的获取逻辑,获取到了,拿到的结果会加入缓存
NewGroup 的 核心 Get 实现
Go// A Getter loads data for a key. type Getter interface { Get(key string) ([]byte, error) } // A GetterFunc implements Getter with a function. type GetterFunc func(key string) ([]byte, error) // Get implements Getter interface function func (f GetterFunc) Get(key string) ([]byte, error) { return f(key) } // A Group is a cache namespace and associated data loaded spread over type Group struct { name string getter Getter mainCache cache } // Get value for a key from cache func (g *Group) Get(key string) (ByteView, error) { if key == "" { return ByteView{}, fmt.Errorf("key is required") } if v, ok := g.mainCache.get(key); ok { log.Println("[GeeCache] hit") return v, nil } return g.load(key) } func (g *Group) load(key string) (value ByteView, err error) { return g.getLocally(key) } func (g *Group) getLocally(key string) (ByteView, error) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{b: cloneBytes(bytes)} g.populateCache(key, value) return value, nil } func (g *Group) populateCache(key string, value ByteView) { g.mainCache.add(key, value) }
Getter 这里接口型函数的设计可看
cache 的实现
Gopackage geecache import ( "geecache/lru" "sync" ) type cache struct { mu sync.Mutex lru *lru.Cache cacheBytes int64 } func (c *cache) add(key string, value ByteView) { c.mu.Lock() defer c.mu.Unlock() if c.lru == nil { c.lru = lru.New(c.cacheBytes, nil) } c.lru.Add(key, value) } func (c *cache) get(key string) (value ByteView, ok bool) { c.mu.Lock() defer c.mu.Unlock() if c.lru == nil { return } if v, ok := c.lru.Get(key); ok { return v.(ByteView), ok } return }
这里为 lru 的读取套上了一层锁的机制
NOTE 多个协程(goroutine)同时读写同一个变量,在并发度较高的情况下,会发生冲突。确保一次只有一个协程(goroutine)可以访问该变量以避免冲突,这称之为
互斥
,互斥锁可以解决这个问题。sync.Mutex 是一个互斥锁,可以由不同的协程加锁和解锁。
sync.Mutex
是 Go 语言标准库提供的一个互斥锁,当一个协程(goroutine)获得了这个锁的拥有权后,其它请求锁的协程(goroutine) 就会阻塞在Lock()
方法的调用上,直到调用Unlock()
锁被释放。
byte 是很通用的存储结构,抽象一层这个结构,并满足我们 Value 的接口要求
Go// Value use Len to count how many bytes it takes type Value interface { Len() int } package geecache // A ByteView holds an immutable view of bytes. type ByteView struct { b []byte } // Len returns the view's length func (v ByteView) Len() int { return len(v.b) } // ByteSlice returns a copy of the data as a byte slice. func (v ByteView) ByteSlice() []byte { return cloneBytes(v.b) } // String returns the data as a string, making a copy if necessary. func (v ByteView) String() string { return string(v.b) } func cloneBytes(b []byte) []byte { c := make([]byte, len(b)) copy(c, b) return c }
至此整体结构
CODEgeecache/ |--lru/ |--lru.go // lru 缓存淘汰策略 |--byteview.go // 缓存值的抽象与封装 |--cache.go // 并发控制 |--geecache.go // 负责与外部交互,控制缓存存储和获取的主流程
至此,这一章节的单机并发缓存就已经完成了。
NOTE 分布式缓存需要实现节点间通信,建立基于 HTTP 的通信机制是比较常见和简单的做法。如果一个节点启动了 HTTP 服务,那么这个节点就可以被其他节点访问。今天我们就为单机节点搭建 HTTP Server。
目标是可以通过 HTTP 协议访问到 上面实现的 geecache
使用层为
Gofunc main() { geecache.NewGroup("scores", 2<<10, geecache.GetterFunc( func(key string) ([]byte, error) { log.Println("[SlowDB] search key", key) if v, ok := db[key]; ok { return []byte(v), nil } return nil, fmt.Errorf("%s not exist", key) })) addr := "localhost:9999" peers := geecache.NewHTTPPool(addr) log.Println("geecache is running at", addr) log.Fatal(http.ListenAndServe(addr, peers)) }
启动后可以通过 HTTP 进行访问
Zsh$ curl http://localhost:9999/_geecache/scores/Tom 630 $ curl http://localhost:9999/_geecache/scores/kkk kkk not exist
URL 格式为 /<basepath>/<groupname>/<key>
通过 groupname 得到 group 实例,再使用 group.Get(key)
获取缓存数据。
Gopeers := geecache.NewHTTPPool(addr) http.ListenAndServe(addr, peers)
peers 需要满足 ListenAndServe 的接口要求
Gofunc ListenAndServe(addr string, handler Handler) error { server := &Server{Addr: addr, Handler: handler} return server.ListenAndServe() } type Handler interface { ServeHTTP(ResponseWriter, *Request) }
因此造一个 HTTPPool
实现 ServeHTTP 接口就好
Goconst defaultBasePath = "/_geecache/" // HTTPPool implements PeerPicker for a pool of HTTP peers. type HTTPPool struct { // this peer's base URL, e.g. "https://example.net:8000" self string basePath string } // NewHTTPPool initializes an HTTP pool of peers. func NewHTTPPool(self string) *HTTPPool { return &HTTPPool{ self: self, basePath: defaultBasePath, } } // Log info with server name func (p *HTTPPool) Log(format string, v ...interface{}) { log.Printf("[Server %s] %s", p.self, fmt.Sprintf(format, v...)) } // ServeHTTP handle all http requests func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !strings.HasPrefix(r.URL.Path, p.basePath) { panic("HTTPPool serving unexpected path: " + r.URL.Path) } p.Log("%s %s", r.Method, r.URL.Path) // /<basepath>/<groupname>/<key> required parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2) if len(parts) != 2 { http.Error(w, "bad request", http.StatusBadRequest) return } groupName := parts[0] key := parts[1] group := GetGroup(groupName) if group == nil { http.Error(w, "no such group: "+groupName, http.StatusNotFound) return } view, err := group.Get(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/octet-stream") w.Write(view.ByteSlice()) }
接下来,运行 main 函数,使用 curl 做一些简单测试:
Bash$ curl http://localhost:9999/_geecache/scores/Tom 630 $ curl http://localhost:9999/_geecache/scores/kkk kkk not exist
GeeCache 的日志输出如下:
CODE2020/02/11 23:28:39 geecache is running at localhost:9999 2020/02/11 23:29:08 [Server localhost:9999] GET /_geecache/scores/Tom 2020/02/11 23:29:08 [SlowDB] search key Tom 2020/02/11 23:29:16 [Server localhost:9999] GET /_geecache/scores/kkk 2020/02/11 23:29:16 [SlowDB] search key kkk
当我们有多个 Cache Server 的时候,一个 key 的请求过来,去哪个 Cache Server 来请求呢 需要满足以下条件
方案 1 如下图
方案 2 一致性哈希 如上图
并且为了解决分布不均匀的问题,增加虚拟节点
Gopackage consistenthash import ( "hash/crc32" "sort" "strconv" ) // Hash maps bytes to uint32 type Hash func(data []byte) uint32 // Map constains all hashed keys type Map struct { hash Hash replicas int keys []int // Sorted hashMap map[int]string } // New creates a Map instance func New(replicas int, fn Hash) *Map { m := &Map{ replicas: replicas, hash: fn, hashMap: make(map[int]string), } if m.hash == nil { m.hash = crc32.ChecksumIEEE } return m } // Add adds some keys to the hash. func (m *Map) Add(keys ...string) { for _, key := range keys { for i := 0; i < m.replicas; i++ { hash := int(m.hash([]byte(strconv.Itoa(i) + key))) m.keys = append(m.keys, hash) m.hashMap[hash] = key } } sort.Ints(m.keys) } // Get gets the closest item in the hash to the provided key. func (m *Map) Get(key string) string { if len(m.keys) == 0 { return "" } hash := int(m.hash([]byte(key))) // Binary search for appropriate replica. idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash }) return m.hashMap[m.keys[idx%len(m.keys)]] }
最后一行 m.keys[idx%len(m.keys)]
有些理解成本
NOTE 如果
idx == len(m.keys)
,说明应选择m.keys[0]
,因为m.keys
是一个环状结构,所以用取余数的方式来处理这种情况。
一致性哈希 - Consistent Hashing 是什么?为什么系统设计面试中经常会提到?10 分钟讲解一致性哈希 | 系统设计 System Design EP1
现在的流程为
CODE是 接收 key --> 检查是否被缓存 -----> 返回缓存值 ⑴ | 否 是 |-----> 是否应当从远程节点获取 -----> 与远程节点交互 --> 返回缓存值 ⑵ | 否 |-----> 调用`回调函数`,获取值并添加到缓存 --> 返回缓存值 ⑶
1、3 已经在 单机并发缓存那里实现,本节主要来解决 2
CODE使用一致性哈希选择节点 是 是 |-----> 是否是远程节点 -----> HTTP 客户端访问远程节点 --> 成功?-----> 服务端返回返回值 | 否 ↓ 否 |----------------------------> 回退到本地节点处理。
Goimport ( "flag" "fmt" "geecache" "log" "net/http" "strings" ) var db = map[string]string{ "Tom": "630", "Jack": "589", "Sam": "567", } func createGroup() *geecache.Group { return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc( func(key string) ([]byte, error) { log.Println("[SlowDB] search key", key) if v, ok := db[key]; ok { return []byte(v), nil } return nil, fmt.Errorf("%s not exist", key) })) } func startCacheServer(addr string, addrs []string, gee *geecache.Group) { peers := geecache.NewHTTPPool(addr) peers.Set(addrs...) gee.RegisterPeers(peers) log.Println("geecache is running at", addr) log.Fatal(http.ListenAndServe(strings.TrimPrefix(addr, "http://"), peers)) } func startAPIServer(apiAddr string, gee *geecache.Group) { http.Handle("/api", http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { key := r.URL.Query().Get("key") view, err := gee.Get(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/octet-stream") w.Write(view.ByteSlice()) })) log.Println("fontend server is running at", apiAddr) log.Fatal(http.ListenAndServe(strings.TrimPrefix(apiAddr, "http://"), nil)) } func main() { var port int var api bool flag.IntVar(&port, "port", 8001, "Geecache server port") flag.BoolVar(&api, "api", false, "Start a api server?") flag.Parse() apiAddr := "http://0.0.0.0:9999" addrMap := map[int]string{ 8001: "http://0.0.0.0:8001", 8002: "http://0.0.0.0:8002", 8003: "http://0.0.0.0:8003", } var addrs []string for _, v := range addrMap { addrs = append(addrs, v) } gee := createGroup() if api { go startAPIServer(apiAddr, gee) } startCacheServer(addrMap[port], addrs, gee) }
Bash#!/bin/bash trap "rm server;kill 0" EXIT go build -o server ./server -port=8001 & ./server -port=8002 & ./server -port=8003 -api=1 & sleep 2 echo ">>> start test" curl "http://127.0.0.1:9999/api?key=Tom" & curl "http://127.0.0.1:9999/api?key=Tom" & curl "http://127.0.0.1:9999/api?key=Tom" & wait
起了三个 Cache Server,端口分别在 8001 8002 8003 其中有个服务同时起了 Cache Server 和 对外暴露的 API 服务,这个 API 服务放在 9999 端口上
执行结果是
Zsh2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8001 2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8002 2023/11/22 09:29:09 fontend server is running at http://0.0.0.0:9999 2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8003 >>> start test 2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002 2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002 2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002 2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom 2023/11/22 09:29:11 [SlowDB] search key Tom 2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom 2023/11/22 09:29:11 [GeeCache] hit 2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom 2023/11/22 09:29:11 [GeeCache] hit 630630630
如下图
上面这两个图可以结合着理解 一致性哈希在分布式节点的使用
var _ PeerPicker = (*HTTPPool)(nil)
这行代码是用于验证 *HTTPPool
类型是否实现了 PeerPicker
接口的一种方式。
即利用强制类型转换,确保 struct HTTPPool 实现了接口 PeerPicker。这样 IDE 和编译期间就可以检查,而不是等到使用的时候。
缓存雪崩:缓存在同一时刻全部失效,造成瞬时 DB 请求量大、压力骤增,引起雪崩。缓存雪崩通常因为缓存服务器宕机、缓存的 key 设置了相同的过期时间等引起。
缓存击穿:一个存在的 key,在缓存过期的一刻,同时有大量的请求,这些请求都会击穿到 DB ,造成瞬时 DB 请求量大、压力骤增。
缓存穿透:查询一个不存在的数据,因为不存在则不会写到缓存中,所以每次都会去请求 DB,如果瞬间流量过大,穿透到 DB,导致宕机。
在分布式节点中有以下使用结果
Bash#!/bin/bash trap "rm server;kill 0" EXIT go build -o server ./server -port=8001 & ./server -port=8002 & ./server -port=8003 -api=1 & sleep 2 echo ">>> start test" curl "http://127.0.0.1:9999/api?key=Tom" & curl "http://127.0.0.1:9999/api?key=Tom" & curl "http://127.0.0.1:9999/api?key=Tom" & wait
Zsh2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8001 2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8002 2023/11/22 09:29:09 fontend server is running at http://0.0.0.0:9999 2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8003 >>> start test 2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002 2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002 2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002 2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom 2023/11/22 09:29:11 [SlowDB] search key Tom 2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom 2023/11/22 09:29:11 [GeeCache] hit 2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom 2023/11/22 09:29:11 [GeeCache] hit 630630630
对着 api 请求了 三次,8003 向 8002 发了三次请求
本次来实现一个叫作 singleflight 的包来解决这个问题,做到只对远端进行一次请求
从实现上来看,就是对向远端请求的那个流程加锁
Gofunc (g *Group) load(key string) (value ByteView, err error) { // each key is only fetched once (either locally or remotely) // regardless of the number of concurrent callers. viewi, err := g.loader.Do(key, func() (interface{}, error) { if g.peers != nil { if peer, ok := g.peers.PickPeer(key); ok { if value, err = g.getFromPeer(peer, key); err == nil { return value, nil } log.Println("[GeeCache] Failed to get from peer", err) } } return g.getLocally(key) }) if err == nil { return viewi.(ByteView), nil } return }
实现如下
Gopackage singleflight import "sync" // call is an in-flight or completed Do call type call struct { wg sync.WaitGroup val interface{} err error } // Group represents a class of work and forms a namespace in which // units of work can be executed with duplicate suppression. type Group struct { mu sync.Mutex // protects m m map[string]*call // lazily initialized } // Do executes and returns the results of the given function, making // sure that only one execution is in-flight for a given key at a // time. If a duplicate comes in, the duplicate caller waits for the // original to complete and receives the same results. func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { g.mu.Unlock() c.wg.Wait() return c.val, c.err } c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() c.val, c.err = fn() c.wg.Done() g.mu.Lock() delete(g.m, key) g.mu.Unlock() return c.val, c.err }
两个锁