【Go | 从0实现简单分布式缓存】-3:分布式节点通信

news/2025/2/26 15:49:46

在这里插入图片描述

本文目录

  • 一、通信流程
  • 二、peers.go
  • 三、http.go
  • 四、geecache.go
  • 五、测试代码

本文为极客兔兔动手写分布式缓存GeeCache学习笔记。

一、通信流程

在前面一节中,已经为 HTTPPool 实现了服务端功能,通信不仅需要服务端还需要客户端,因此本节来实现 HTTPPool 客户端的功能。

最开始我们的流程定义如下,前面已经实现了1和3,那么现在需要实现2这个点。

在这里插入图片描述
现在我们进一步来细化2这个点的步骤,来看看是怎么实现的。

在这里插入图片描述

二、peers.go

首先抽象出 2 个接口,PeerPickerPickPeer() 方法用于根据传入的 key 选择相应节点 PeerGetter

接口 PeerGetterGet() 方法用于从对应 group 查找缓存值。PeerGetter 就对应于上述流程中的 HTTP 客户端。

type PeerPicker interface {
	PickPeer(key string) (peer PeerGetter, ok bool)
	//根据传进来的键,去选择相对应的接口(就是调用一致性哈希算法)。
}

// PeerGetter is the interface that must be implemented by a peer.
type PeerGetter interface {
	Get(group string, key string) ([]byte, error)
	//从客户端去请求其他节点的对应的值的过程。
}

三、http.go

首先需要编写Peers的Set函数,也就是节点的注册函数。

先初始化一个一致性哈希环,将传入的节点地址添加到哈希环中。为每个节点创建一个 httpGetter 对象,并将它们存储到一个映射中,以便后续通过节点地址快速访问对应的 Getter 对象。整个过程是线程安全的,通过互斥锁保护共享资源的并发访问。

一句话来表示就是:Set() 方法实例化了一致性哈希算法,并且添加了传入的节点。并为每一个节点创建了一个 HTTP 客户端 httpGetter

peers ...string 是方法的参数,表示一个可变数量的字符串切片。调用时可以传入任意数量的字符串作为参数,这些字符串代表 HTTP 节点的地址。

func (p *HTTPPool) Set(peers ...string) {
	p.mu.Lock()
	defer p.mu.Unlock()
	p.peers = consistenthash.New(defaultReplicas, nil)
	p.peers.Add(peers...)
	p.httpGetters = make(map[string]*httpGetter, len(peers))
	for _, peer := range peers {
		p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
	}
}

consistenthash.New 是一致性哈希库的构造函数,用于创建一个新的一致性哈希环。

defaultReplicas 是一个常量,表示每个节点在哈希环上的虚拟副本数量。一致性哈希通过虚拟节点(副本)来提高负载均衡的效果。

nil 是一致性哈希的哈希函数参数,这里使用默认的哈希函数。

p.peers.Add 方法会将每个节点添加到一致性哈希环中。一致性哈希环会根据节点的哈希值将它们分布在环上。

p.httpGetters 是一个映射(map),用于存储每个节点的 HTTP Getter 对象。

make(map[string]*httpGetter, len(peers)) 创建了一个映射,键是节点地址(string),值是指向 httpGetter 的指针。

for _, peer := range peers 遍历传入的 peers 切片,peer 是当前节点的地址。p.httpGetters[peer] 是将节点地址作为键存储到映射中。&httpGetter{baseURL: peer + p.basePath} 创建了一个 httpGetter 对象,并将其地址存储到映射中。也就是peer 是节点的地址。p.basePath 是 HTTPPool 中定义的一个字段,表示 HTTP 请求的路径前缀。peer + p.basePath 拼接成完整的 HTTP 请求的基地址。


然后是Pickeer函数。PickerPeer() 包装了一致性哈希算法的 Get() 方法,根据具体的 key,选择节点peer,并返回该节点对应的 HTTP 客户端,也就是PeerGetter,bool就代表是否成功的找到对应的节点

p.peers.Get(key):调用一致性哈希环的 Get 方法,根据输入的 key 选择一个节点。peer != "":检查返回的节点地址是否为空。如果为空,说明没有找到合适的节点。peer != p.self:检查返回的节点是否是当前节点(p.self)。如果返回的节点是当前节点,说明不需要跨节点远程获取数据,可以直接跳过。

func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
	p.mu.Lock()
	defer p.mu.Unlock()
	if peer := p.peers.Get(key); peer != "" && peer != p.self {
		p.Log("Pick peer %s", peer)
		return p.httpGetters[peer], true
	}
	return nil, false
}

四、geecache.go

geecache.go中需要添加方法。

首先,新增 getFromPeer() 方法,使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值,并将其封装为一个 ByteView 类型返回。。


func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
	bytes, err := peer.Get(g.name, key)
	if err != nil {
		return ByteView{}, err
	}
	return ByteView{b: bytes}, nil
}

原本的load()函数非常简单,是直接调用getLocally()函数,现在 需要改进。

首先尝试从分布式缓存系统中加载指定键(key)的值。它首先尝试从远程节点加载数据,也就是getFromPeer这个方法会帮我们去调用peer的Get()方法,如果失败,则退回到本地加载,也就是getLocally(key)

func (g *Group) load(key string) (value ByteView, err error) {
	if g.peers != nil {
		if peer, ok := g.peers.PickPeer(key); ok {
			if value, err = g.getFromPeer(peer, key); err == nil {
				return value, nil
			}
			log.Println("[GeeCache] Failed to get from peer", err)
		}
	}

	return g.getLocally(key)
}


接下来又到了调用http.go中的Get方法。

首先创建具体的 HTTP 客户端类 httpGetter,实现 PeerGetter 接口

baseURL 表示将要访问的远程节点的地址,例如 http://example.com/_geecache/

使用 http.Get() 方式获取返回值(http.Get(u)是发送一个 HTTP GET 请求到构造好的 URL),并转换为 []bytes 类型。

type httpGetter struct {
	baseURL string
}

func (h *httpGetter) Get(group string, key string) ([]byte, error) {
	u := fmt.Sprintf(
		"%v%v/%v",
		h.baseURL,
		url.QueryEscape(group),
		url.QueryEscape(key),
	)
	res, err := http.Get(u)
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()

	if res.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("server returned: %v", res.Status)
	}

	bytes, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return nil, fmt.Errorf("reading response body: %v", err)
	}

	return bytes, nil
}

var _ PeerGetter = (*httpGetter)(nil)

接下来讲讲具体代码的作用:

url.QueryEscape(group)url.QueryEscape(key):使用 url.QueryEscape 函数对缓存组名称和键进行 URL 编码,确保它们可以安全地嵌入到 URL 中。

var _ PeerGetter = (*httpGetter)(nil)

这行代码就比较熟悉了:通过类型断言,用于确保 *httpGetter 类型实现了 PeerGetter 接口。

也就是通过将 (*httpGetter)(nil) 赋值给 PeerGetter 类型的变量,强制编译器检查 *httpGetter 是否实现了 PeerGetter 接口。


所以在geecache.go中,我们一共实现需要添加下面的代码。

// A Group is a cache namespace and associated data loaded spread over
type Group struct {
	name      string
	getter    Getter
	mainCache cache
	peers     PeerPicker
}

// RegisterPeers registers a PeerPicker for choosing remote peer
func (g *Group) RegisterPeers(peers PeerPicker) {
	if g.peers != nil {
		panic("RegisterPeerPicker called more than once")
	}
	g.peers = peers
}

func (g *Group) load(key string) (value ByteView, err error) {
	if g.peers != nil {
		if peer, ok := g.peers.PickPeer(key); ok {
			if value, err = g.getFromPeer(peer, key); err == nil {
				return value, nil
			}
			log.Println("[GeeCache] Failed to get from peer", err)
		}
	}

	return g.getLocally(key)
}

func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
	bytes, err := peer.Get(g.name, key)
	if err != nil {
		return ByteView{}, err
	}
	return ByteView{b: bytes}, nil
}

新增 RegisterPeers() 方法,将 实现了 PeerPicker 接口的 HTTPPool 注入到 Group 中。

新增 getFromPeer() 方法,使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值。

修改 load 方法,使用 PickPeer() 方法选择节点,若非本机节点,则调用 getFromPeer() 从远程获取。若是本机节点或失败,则回退到 getLocally()

五、测试代码

测试总体代码如下:

var db = map[string]string{
	"Tom":  "630",
	"Jack": "589",
	"Sam":  "567",
}

func createGroup() *geecache.Group {
	return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
		func(key string) ([]byte, error) {
			log.Println("[SlowDB] search key", key)
			if v, ok := db[key]; ok {
				return []byte(v), nil
			}
			return nil, fmt.Errorf("%s not exist", key)
		}))
}

func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
	peers := geecache.NewHTTPPool(addr)
	peers.Set(addrs...)
	gee.RegisterPeers(peers)
	log.Println("geecache is running at", addr)
	log.Fatal(http.ListenAndServe(addr[7:], peers))
}

func startAPIServer(apiAddr string, gee *geecache.Group) {
	http.Handle("/api", http.HandlerFunc(
		func(w http.ResponseWriter, r *http.Request) {
			key := r.URL.Query().Get("key")
			view, err := gee.Get(key)
			if err != nil {
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			w.Header().Set("Content-Type", "application/octet-stream")
			w.Write(view.ByteSlice())

		}))
	log.Println("fontend server is running at", apiAddr)
	log.Fatal(http.ListenAndServe(apiAddr[7:], nil))

}

func main() {
	var port int
	var api bool
	flag.IntVar(&port, "port", 8001, "Geecache server port")
	flag.BoolVar(&api, "api", false, "Start a api server?")
	flag.Parse()

	apiAddr := "http://localhost:9999"
	addrMap := map[int]string{
		8001: "http://localhost:8001",
		8002: "http://localhost:8002",
		8003: "http://localhost:8003",
	}

	var addrs []string
	for _, v := range addrMap {
		addrs = append(addrs, v)
	}

	gee := createGroup()
	if api {
		go startAPIServer(apiAddr, gee)
	}
	startCacheServer(addrMap[port], addrs, gee)
}

接下来挨个讲讲对应的功能。

定义了一个模拟的数据库,使用一个字符串到字符串的映射来存储键值对。它模拟了后端存储,用于在缓存未命中时提供数据。

也就是提供Getter回调函数,方便在没有找到数据的时候来返回。

var db = map[string]string{
	"Tom":  "630",
	"Jack": "589",
	"Sam":  "567",
}

创建了一个缓存组,名为 “scores”,缓存大小为 2KB。它使用了一个自定义的 GetterFunc,当缓存未命中时,会从模拟数据库 db(也就是上面的本地db模拟数据库) 中获取数据。如果键不存在,则返回错误。

func createGroup() *geecache.Group {
	return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
		func(key string) ([]byte, error) {
			log.Println("[SlowDB] search key", key)
			if v, ok := db[key]; ok {
				return []byte(v), nil
			}
			return nil, fmt.Errorf("%s not exist", key)
		}))
}

启动了一个缓存服务器,监听指定的地址 addr。它创建了一个 HTTP 节点池 peers,并将所有节点地址 addrs 添加到池中。然后,它将节点池注册到缓存组 gee 中,并启动 HTTP 服务以监听缓存请求。

func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
	peers := geecache.NewHTTPPool(addr)
	peers.Set(addrs...)
	gee.RegisterPeers(peers)
	log.Println("geecache is running at", addr)
	log.Fatal(http.ListenAndServe(addr[7:], peers))
}

启动了一个 API 服务器,监听指定的地址 apiAddr。它为 /api 路径提供了一个 HTTP 处理函数,该函数从请求中提取键 key,并从缓存组 gee 中获取数据。如果获取成功,它将数据作为响应返回;如果失败,则返回错误。

func startAPIServer(apiAddr string, gee *geecache.Group) {
	http.Handle("/api", http.HandlerFunc(
		func(w http.ResponseWriter, r *http.Request) {
			key := r.URL.Query().Get("key")
			view, err := gee.Get(key)
			if err != nil {
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			w.Header().Set("Content-Type", "application/octet-stream")
			w.Write(view.ByteSlice())
		}))
	log.Println("fontend server is running at", apiAddr)
	log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
}

通过命令行参数解析来设置缓存服务器的端口和是否启动 API 服务器。它定义了一个地址映射 addrMap,用于存储所有缓存节点的地址。然后,它创建了一个缓存组 gee,并根据参数决定是否启动 API 服务器。最后,它启动一个缓存服务器,监听指定的端口。

func main() {
	var port int
	var api bool
	flag.IntVar(&port, "port", 8001, "Geecache server port")
	flag.BoolVar(&api, "api", false, "Start a api server?")
	flag.Parse()

	apiAddr := "http://localhost:9999"
	addrMap := map[int]string{
		8001: "http://localhost:8001",
		8002: "http://localhost:8002",
		8003: "http://localhost:8003",
	}

	var addrs []string
	for _, v := range addrMap {
		addrs = append(addrs, v)
	}

	gee := createGroup()
	if api {
		go startAPIServer(apiAddr, gee)
	}
	startCacheServer(addrMap[port], addrs, gee)
}

也就是总的来说,startCacheServer() 用来启动缓存服务器:创建 HTTPPool,添加节点信息,注册到 gee 中,启动 HTTP 服务(共3个端口,8001/8002/8003),用户不感知。

startAPIServer() 用来启动一个 API 服务(端口 9999),与用户进行交互,用户感知。

main() 函数需要命令行传入 port 和 api 2 个参数,用来在指定端口启动 HTTP 服务。


测试会发现一个明显的问题,就是测试的时候,并发了 3 个请求 ?key=Tom,从日志中可以看到,三次均选择了节点 8001,这是一致性哈希算法的功劳。但是有一个问题在于,同时向 8001 发起了 3 次请求。

假如有 10 万个在并发请求该数据呢?那就会向 8001 同时发起 10 万次请求,如果 8001 又同时向数据库发起 10 万次查询请求,很容易导致缓存被击穿。

三次请求的结果是一致的,对于相同的 key,能不能只向 8001 发起一次请求?这就是需要优化的点,也就是防止缓存击穿。


http://www.niftyadmin.cn/n/5868903.html

相关文章

vscode设置自动换行

vscode设置自动换行 方法 方法 点击文件->首选项->设置。搜索word wrap -> 选择 on 。 搜索Word Wrap&#xff0c;并把选项改为on。

星海智算+ DeepSeek-R1:技术突破与行业应用的协同革新

一、前言 在当今数字化时代&#xff0c;人工智能&#xff08;AI&#xff09;正以前所未有的速度改变着商业和社会的方方面面。最近爆火的DeepSeek-R1系列模型&#xff0c;以其强大的推理能力和在中文的推理、代码和数学任务高效的性能得到了全球用户的热议。该模型不仅在多项专…

C++之string类的模拟实现(超详细)

们学习东西&#xff0c;先学习如果使用它&#xff0c;然后再学习如何实现它 文章目录 目录 1. 命名空间以及头文件 2.string类的成员变量 3.string类的成员函数 3.1 构造函数 3.2 析构函数 3.3 拷贝构造函数 3.4 赋值运算符重载 3.5 c_str函数 3.6 size函数 3.7 clea…

HTML5特殊字符

HTML中常用的特殊符号一般都以“&”开头&#xff0c;以“;”结束。

Java数据结构第十四期:走进二叉树的奇妙世界(三)

专栏&#xff1a;数据结构(Java版) 个人主页&#xff1a;手握风云 目录 一、二叉树OJ练习题 1.1. 相同的树 1.2. 另一棵树的子树 1.3. 翻转二叉树 1.4. 平衡二叉树 1.5. 对称二叉树 一、二叉树OJ练习题 1.1. 相同的树 判断两棵树是否相同&#xff0c;我们是否只能遍历一…

vscode中使用PlatformIO创建工程加载慢

最近使用vscodeplatformIO开发esp32s3&#xff0c;第一次创建工程时加载速度很慢&#xff0c;查询资料解决问题&#xff0c;特此记录。 1.新建环境变量pyhton 此电脑-属性-高级系统设置中&#xff08;直接搜索高级系统设置也行&#xff09;&#xff0c;添加系统变量&#xff…

腾讯云安全加速:应对网络攻击与访问延迟的现实挑战

​​​​​​ 随着互联网业务的全球化发展&#xff0c;企业面临着网络攻击、访问延迟、跨境访问不稳定等问题。无论是电商、金融、在线教育&#xff0c;还是 SaaS 平台&#xff0c;用户体验的流畅性与安全性都直接影响着业务成败。而DDoS 攻击、爬虫、数据泄露等安全威胁不断增…

R-INLA实现绿地与狐狸寄生虫数据空间建模:含BYM、SPDE模型及PC先验应用可视化...

全文链接&#xff1a;https://tecdat.cn/?p40720 本论文旨在为对空间建模感兴趣的研究人员客户提供使用R-INLA进行空间数据建模的基础教程。通过对区域数据和地统计&#xff08;标记点&#xff09;数据的分析&#xff0c;介绍了如何拟合简单模型、构建和运行更复杂的空间模型&…