通惠河开始结冰之后,有人在岸边写到,生活是合理的。
世界正在科幻化,我这样对 lan 说过。
常数开始崩塌,不可变消失,生活变成一种缓慢受锤的过程。躯体也开始斑斑锈迹,挣扎,粉碎,甚至剥离,血管迸出蓝色液体。也只能自我束缚,并告知生活的艰难与深重。
所以,幸福在哪里?
lan 对我说,加点香菜简直香疯了。我开始慢慢爱上香菜。同时还有不要葱花,百事,法兹乐队,魏淑芬,动物园钉子户。还有敞开心扉。
糖醋排骨要把糖先慢慢炒化,炒方便面和老干妈是绝配,牛角包咬一口溢出火山熔岩般的馅,辣子鸡要炸两遍才好吃,番茄牛肉汤是人间美味。
我也开始写一本关于昼夜与厨房的诗,开始在晚上写这人间情事。
所以,你好哇,lan。
分布式缓存服务中,提供缓存服务的节点可能有很多个。在单机缓存服务中,数据被缓存的流程是这样的:
第一次查询数据时首先从源数据查询(比如数据库),找到之后,同时放入缓存服务器中,下次查询同样的数据时会直接从缓存服务器上查找。
但是缓存服务器一般不太可能是单机的,往往有多个节点。转换为分布式之后,会出现一些问题。
问题一数据冗余
考虑一下,单机服务的时候,利用 LRU 算法实现缓存的存取,一个 key 对应一个数据 value。分布式条件下,如果只是单纯的增加节点,这次查找 key 对应的数据在 A 节点上,下次查找的时候却在 B 服务器上。同一个 key 有多个缓存,完全没必要嘛,这样就是数据冗余了。
怎么解决?
利用哈希。首先对 key 值 hash,然后利用节点数取余。
h = hash(key) %len(node)
这样同一个 key 的数据只会被一个节点缓存。很 awesome 有没有。
But,我不可能一直是这几个节点呀,万一有的节点挂了呢,或者我要添加节点呢?
问题二容错性和扩展性
如果有节点挂了或者新增节点,都会导致len(node) 的变化,那么利用 hash 计算出来的值跟之前的就不一样。这样导致新增或者删除一个节点,之前的所有缓存都失效了!我的天哪!!!
这种问题就是 缓存雪崩 。
怎么办呢?利用一致性 hash 算法。
一致性哈希算法(Consistent Hashing)最早在论文《Consistent Hashing and Random Trees: Distributed Caching Protocols for Relieving Hot Spots on the World Wide Web》中被提出。
它的原理是,把所有的 hash 值空间(就是上面公式计算出来的 h)看成是一个环,取值范围从 0 到 2^32-1。将每个服务器节点通过 hash 映射到环上,同时将数据 key 通过 hash 函数也映射到环上,按顺时针方向,数据 key 跟哪个节点近就属于哪个节点。
举个例子,现在有三个缓存服务器节点 2,4,6,假设这个 hash 算法就是原样输出,我们将节点和数据(1,3,7,9)经过 hash 之后到环上:
按顺时针方向,数据 1 属于 node2,数据 3 属于 node4,数据 7、9 输入 node6。
貌似看起来不错,但是还有个问题。在节点较少的情况上,会发生 数据倾斜 的问题。比如上图所示,数据可能大量的堆积在 node6 和 node2 之间。
解决办法是添加虚拟节点,利用虚拟节点负载均衡每个数据。虚拟节点的做法是,对一个真实节点计算多个 hash,放到环上,所有这些虚拟节点的数据都属于真实节点。
这样所有的数据都均匀的分布在环上了。
算法实现
了解了原理,来动手实现一下一致性 hash 算法。整个算法模仿 go 语言的分布式缓存服务groupcache 实现,groupcache 可以说是memcached 的 go 语言实现。
首先定义一致性 hash 环结构体:
type Hash func(data []byte) uint32
// ConHash 一致性 hash
type ConHash struct {
hash Hash // hash 算法
replicas int // 虚拟节点数
nodes []int // hash 环节点数
hashMap map[int]string // 虚拟节点 - 真实节点
}
可以看到,类型 Hash 就是个回调函数,用户可以自定义 hash 算法。
然后需要往 hash 环上添加节点,根据指定的虚拟节点数用 hash 算法放到环上。
// Add 添加节点到 hash 环上
func (m *ConHash) Add(nodes ...string) {
for _, node := range nodes {
// 将节点值根据指定的虚拟节点数利用 hash 算法放置到环中
for i := 0; i < m.replicas; i++ {
h := int(m.hash([]byte(strconv.Itoa(i) + node)))
m.nodes = append(m.nodes, h)
// 映射虚拟节点到真实节点
m.hashMap[h] = node
}
}
sort.Ints(m.nodes)
}
同样还需要根据 key 值从环上获取对应的节点,获取到节点之后从该节点查找数据。
// Get 从 hash 环上获取 key 对应的节点
func (m *ConHash) Get(key string) string {
if len(m.nodes) == 0 {
return ""
}
// 计算 key 的 hash 值
h := int(m.hash([]byte(key)))
// 顺时针找到第一个匹配的虚拟节点
idx := sort.Search(len(m.nodes), func(i int) bool {
return m.nodes[i] >= h
})
// 从 hash 环查找
// 返回 hash 映射的真实节点
return m.hashMap[m.nodes[idx%len(m.nodes)]]
}
有的人说不对啊,为啥添加的都是服务器节点,数据不是也放在环上吗?
其实是因为 groupcache 将数据划分出一个 group 的概念,数据在内部存储上利用 hash+ 双向链表实现,缓存的数据被放在链表中。
整个流程是这样的,查找 key 值对应的数据时,根据 url 链接中的 group 和 key 值确定节点,如何确定的?上面的代码已经解释了,计算 key 值的 hash,看它属于哪个节点。
然后从该节点的双向链表中查找。如果节点不存在这个 key,从用户定义的数据源查找(比如数据库),找到之后将数据存入该 group 中。
以上,希望有帮助。
首先让我们来了解下 WSGI 规范是啥?
简单来说,WSGI 是服务器和应用之间的接口,前端过来的请求传到服务器之后比如 gunicorn,之后服务器会将请求转发给应用。因为有很多个服务器,如果我们为我们的应用根据不同的服务写不同的代码,会很麻烦,所以就出现了 WSGI。
WSGI 规定了 application 应该实现一个可调用的对象(函数,类,方法或者带__call__
的实例),这个对象应该接受两个位置参数:
- 环境变量(比如 header 信息,状态码等)
- 回调函数(WSGI 服务器负责),用来发送 http 状态和 header 等
同时,该对象需要返回可迭代的响应文本。
更具体的解释可以去 google 搜索相关知识。
一个最简单的实现:
def app(environ, start_response):
response_body = b"Hello, World!"
status = "200 OK"
# 将响应状态和 header 交给 WSGI 服务器比如 gunicorn
start_response(status, headers=[])
return iter([response_body])
我们可以直接使用 gunicorn 之类的服务启动这个 app。
有了 WSGI 规定,框架中就要实现规范中所要求的部分。我们来看看 Flask 是如何实现的。
Flask0.1 版本的实现中只有一个文件,一共 600 多行代码。根据官方文档,一个最简单的 web 服务像这样:
from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello_world():
return 'Hello World!'
if __name__ == '__main__':
app.run()
调用 Flask() 之后发生了什么?
首先在 __init__
内置方法中有这么几个变量:
class Flask(object):
def __init__(self, package_name):
# view_functions 存储视图函数名称和视图函数
self.view_functions = {}
# 路由字典
self.url_map = Map()
根据名字可以猜测,view_functions 用来存放视图函数,url_map 用来存放路由字典。暂时跳过,来看看 __call__
内置方法:
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)
environ,start_response,是不是在哪里见过?WSGI 规范中要求实现的对不对。它返回了 wsgi_app 方法:
def wsgi_app(self, environ, start_response):
with self.request_context(environ):
rv = self.preprocess_request()
if rv is None:
rv = self.dispatch_request()
response = self.make_response(rv)
response = self.process_response(response)
return response(environ, start_response)
看到了没,跟我们上面实现的那个简单的 app 是不是很像。
首先预处理请求,然后分发请求到不同的视图函数,最后响应。
我们先来看 dispatch_request 是如何实现的:
def dispatch_request(self):
# 精简了下代码
try:
endpoint, values = self.match_request()
return self.view_functions[endpoint](**values)
except HTTPException, e:
......
def match_request(self):
rv = _request_ctx_stack.top.url_adapter.match()
request.endpoint, request.view_args = rv
return rv
dispatch_request 首先获取 endpoint 和一些变量,然后在视图函数字典里找到对应的视图函数返回。endpoint 和 values 就是我们在定义路由的处理函数时,比如:
url_for('profile', username='John Doe')
其中 profile 就是 endpoint,也就是对应视图函数的名称,username 就是变量。
match_request 中这个 _request_ctx_stack 又是个啥。看起来它像是用来匹配路由的。
_request_ctx_stack 是请求上下文栈,用一个栈把当前请求相关的数据压入栈中,然后进行路由分发和后续处理,处理完成后退出。
具体来说,我们回过头看 wsgi_app方法中有个 with 语句,控制请求上下文的进入和退出。
with self.request_context(environ):
这个 request_context是这样的:
class _RequestContext(object):
def __init__(self, app, environ):
...
self.url_adapter = app.url_map.bind_to_environ(environ)
...
def __enter__(self):
_request_ctx_stack.push(self)
def __exit__(self, exc_type, exc_value, tb):
if tb is None or not self.app.debug:
_request_ctx_stack.pop()
其中的 url_adapter 获取了路由字典,然后连同其他变量一起被压入栈中,这样在上面的 match_request 方法中,从栈中获取 url_adapter ,然后匹配路由找到对应的 endpoint 和参数,然后根据 endpoint 和参数从view_functions 中查找对应的视图函数。
self.url_adapter = app.url_map.bind_to_environ(environ)
rv = _request_ctx_stack.top.url_adapter.match()
其中,bind_to_environ 将 url 绑定到目前的环境返回一个适配器,然后适配器去匹配请求。这两个方法都来自 Flask 的底层调用 werkzeug 。
梳理一下流程,首先适配器在 url_map 中查找当前路由对应的 endpoint 和 values,然后 dispatch_request 根据 endpoint 找到对应的视图函数,然后返回。
那么,url_map 中的路由和 endpoint 对应关系是从哪里来的?
我们在使用 Flask 是不是要用装饰器给视图函数加上路由和方法对吧,像这样:
@app.route('/')
def hello_world():
return 'Hello World!'
这个 route 装饰器长这样,可以看到它调用了 add_url_rule 方法。
def route(self, rule, **options):
def decorator(f):
self.add_url_rule(rule, f.__name__, **options)
self.view_functions[f.__name__] = f
return f
return decorator
def add_url_rule(self, rule, endpoint, **options):
options['endpoint'] = endpoint
options.setdefault('methods', ('GET',))
self.url_map.add(Rule(rule, **options))
add_url_rule 添加了路由和 endpoint 到 url_map 中,这样一个请求的路由过来后,url_adapter.match() 就能匹配到对应的 endpoint,然后根据 endpoint 从 view_functions 里面查找视图函数。
url_map 是 werkzeug 中的 Map 对象,然后添加的是 Rule 对象。它看起来像这样:
self.url_map = Map([
Rule('/', endpoint='home'),
Rule('/book/<id>', endpoint='book')
])
除了利用装饰器,我们也可以这样使用:
def index():
pass
app.add_url_rule('index', '/')
app.view_functions['index'] = index
现在,一切都解释清楚了,定义好视图函数后,app.run运行即可。
可以看到,Flask 中路由匹配是利用字典实现的,还有一种利用前缀树来实现路由的,比如 go 语言中的 gin 框架,关于如何用前缀树实现路由可以看我的另一篇文章:
在 Flask 中可以通过 app.config[‘NAME’] = what 的形式指定一些配置,比如设置 debug = True :
app.debug = True
# 或者
app.config['DEBUG'] = True
有些配置比如设置 ENV 和 TESTING 还可以直接利用 Flask 对象来设置,像这样:
app.testing = True
除了在程序中指定配置,也可以将配置写在单独的文件中,比如:
app = Flask(__name__)
app.config.from_object('yourapplication.default_settings')
app.config.from_envvar('YOURAPPLICATION_SETTINGS')
应用首先从 yourapplication.default_settings 模块载入配置,然后根据 YOURAPPLICATION_SETTINGS
环境变量所指向的文件的内容重载配置的值。
除了从配置文件加载,也可以定义类类指定配置,具体用法去看看官方文档就知道了。
知道了在 Flask 中如何使用配置,我们来看看它是如何实现的。
首先 config 肯定是个变量,在 Flask 这个类中被定义为:
self.config = self.make_config(instance_relative_config)
然后 make_config 的代码是这样的:
def make_config(self, instance_relative=False):
root_path = self.root_path
if instance_relative:
root_path = self.instance_path
# 默认配置
defaults = dict(self.default_config)
defaults['ENV'] = get_env()
defaults['DEBUG'] = get_debug_flag()
return self.config_class(root_path, defaults)
make_config 方法获取 flask 中默认的配置,以及 ENV 和 DEBUG 这两个配置,之后返回了 self.config_class 对象,它在类中是这样定义的:
config_class = Config
class Config(dict):
def __init__(self, root_path, defaults=None):
# dict.__init__让 Config 实例拥有字典行为 config['ENV']
dict.__init__(self, defaults or {})
self.root_path = root_path
config_class 本质上是 Config 类,注意看 Config 类的初始化方法,
dict.__init__(self, defaults or {})
这一行代码使得 Config 类可以像字典一样使用,比如 app.config['TESTING']=True
。当然你也可以使用 __getitem__
和 __setitem__
内置方法使得类具有字典的行为。
那么像 app.testing = True 这样的配置是如何实现的?
在 Flask 类中可以看到,这些类变量都是 ConfigAttribute 对象。
testing = ConfigAttribute('TESTING')
secret_key = ConfigAttribute('SECRET_KEY')
ConfigAttribute类如下:
class ConfigAttribute(object):
def __init__(self, name, get_converter=None):
self.__name__ = name
self.get_converter = get_converter
# obj 是被托管类实例
def __get__(self, obj, type=None):
# 如果被托管实例不存在,返回描述符自身
if obj is None:
return self
# 返回 Flask 实例的 config[name]
rv = obj.config[self.__name__]
if self.get_converter is not None:
rv = self.get_converter(rv)
return rv
def __set__(self, obj, value):
obj.config[self.__name__] = value
ConfigAttribute 是一个描述符类,描述符是什么?
描述符是对多个属性运用相同存取逻辑的一种方式。——《流畅的 python》
描述符实现了特定的内置方法,__get__
, __set__
和 __delete__
,常见的比如 Django 中 ORM 中的实现就是用的描述符:
class Person(models.Model):
# models.CharField 就是一个描述符
first_name = models.CharField(max_length=30)
last_name = models.CharField(max_length=30)
还有 python 内置的 @property @classmethod staticmethod 装饰器就是用描述符实现的。
说了这么多,来看看描述符到底怎么用。
首先来看ConfigAttribute类中的 __get__
方法:
# obj 是被托管类实例
def __get__(self, obj, type=None):
# 如果被托管实例不存在,返回描述符自身
if obj is None:
return self
# 返回 Flask 实例的 config[name]
rv = obj.config[self.__name__]
if self.get_converter is not None:
rv = self.get_converter(rv)
return rv
__get__
方法的参数 obj 是被托管类的实例,在这里就是 Flask 类,方法中首先判断被托管类是否存在,不存在就返回描述符本身。之后返回 Flask 类实例中的 config[name],看到没有 env 类变量实际上就是 config[name] 中指定的值。
来看 __set__
方法:
def __set__(self, obj, value):
obj.config[self.__name__] = value
__set__
方法中的 obj 同样是被托管类的实例,然后 value 被存储在被托管类的 config 变量中。所以,我们才可以用 app.testing = True 来指定配置。
说到底,描述符有什么用?我们来看,在 Flask 类中我们要指定类变量 testing, env, secret_key, session_cookie_name 等等,都需要从 config 变量中接收(保证配置的一致性)。我们为这些变量都写一个存取方法是不是很麻烦,使用描述符就可以简化流程,对外封装了具体的存取细节,并且减少代码量。
当然我们也可以使用一个函数,通过构建特性工厂的方式来实现,比如:
def config_attribute(name):
def getter(instance):
return instance.config[name]
def setter(instance, value):
instance.config[name] = value
# property 实际上就是@property 装饰器
return property(getter, setter)
# 在 Flask 中调用方式一样
testing = config_attribute('TESTING')
关于描述符的更多细节可以查看 《流畅的 python》 这本书中第 20 章的内容,有详细的介绍。
看这篇文章之前,建议看一下我之前写的:[源码分析]Flask 中路由匹配是如何实现的
BluePrint(蓝图)的概念说白了就是路由组,所有注册到该蓝图上的路由都使用同一个前缀。这样方便了管理,不同的功能可以放在一个模块(比如 admin 模块)中实现,更加解耦。
首先来看看蓝图是如何使用的:
# 定义一个蓝图
simple_page = Blueprint('simple_page', __name__,
template_folder='templates')
# 绑定视图函数
@simple_page.route('/', defaults={'page': 'index'})
@simple_page.route('/<page>')
def show(page):
try:
return render_template('pages/%s.html' % page)
except TemplateNotFound:
abort(404)
# 在主模块中注册路由
app = Flask(__name__)
app.register_blueprint(simple_page)
看上面的例子,首先定义了一个蓝图 simple_page,然后经由这个蓝图来定义路由以及绑定到视图函数上,最后在主模块中,注册这个蓝图即可。
看起来跟常见的定义视图函数的方式一样,只不过在添加路由的时候,需要以蓝图开头。
来看看源码中是如何实现的。
蓝图的功能是在 flask 0.7 版本中被加入的,app 在调用 register_blueprint 方法的时候会调用 Blueprint 类中的 register 方法来注册该蓝图中添加的所有路由。
def register_blueprint(self, blueprint, **options):
...
blueprint.register(self, options, first_registration)
我们看一下 register 方法:
# blueprints.py
def register(self, app, options, first_registration=False):
...
state = self.make_setup_state(app, options, first_registration)
...
for deferred in self.deferred_functions:
deferred(state)
额,make_setup_state 是个啥,deferred_functions 又是个啥。我们跳到 make_setup_state 来看看它里面有什么:
def make_setup_state(self, app, options, first_registration=False):
return BlueprintSetupState(self, app, options, first_registration)
返回了一个类。先不管。来看看 deferred_functions 是什么,从名字上可以看出是延迟函数之类的。
来梳理一下流程,app.register_blueprint 注册蓝图之后,会激活 Buleprint 类中的 register 方法,在 register 方法中循环调用 deferred_functions 中的函数来执行,我们大概能猜出来这段代码的功能就是将蓝图中定义的路由都添加到路由组中。
以上面的蓝图例子,
@simple_page.route('/', defaults={'page': 'index'})
蓝图的 route 方法是这样的:
def route(self, rule, **options):
def decorator(f):
self.add_url_rule(rule, f.__name__, f, **options)
return f
return decorator
route 方法是个装饰器,实际上调用了 add_url_rule 方法:
def add_url_rule(self, rule, endpoint=None, view_func=None, **options):
self.record(lambda s: s.add_url_rule(rule, endpoint, view_func, **options))
def record(self, func):
....
self.deferred_functions.append(func)
在 record 方法中,将 func 添加到了 deferred_functions 列表中,而 add_url_rule 中调用了 record 方法,那么一切就都可以解释了:
register 方法中的这段代码,
state = self.make_setup_state(app, options, first_registration)
...
for deferred in self.deferred_functions:
deferred(state)
循环 deferred_functions,deferred_functions 里面是啥?是 lambda,具体来说,就是蓝图中定义的路由和视图函数,我们通过
@simple_page.route('/<page>')
定义路由之后,实际上就是在 deferred_functions 里面添加了一个 lambda,为什么说它是 defer,因为只有在 register 注册的时候才会真正添加到 app 的 url_map 中。
上面代码中的 state 是一个 BlueprintSetupState 示例,这个类里面有一个 add_url_rule 方法,会在全局 app 的 url_map 中添加路由和视图函数。
def add_url_rule(self, rule, endpoint=None, view_func=None, **options):
self.app.add_url_rule(rule, '%s.%s' % (self.blueprint.name, endpoint),view_func, defaults=defaults, **options)
来梳理一下:
# state 是 BlueprintSetupState 实例
BlueprintSetupState -> state
# deferred_functions 里面是蓝图路由的 lambda
lambda s: s.add_url_rule -> deferred_functions
for deferred in self.deferred_functions:
deferred(state)
意思就是 lambda 中的 s 被赋值为 state ,然后state.add_url_rule,
这样就执行了app.add_url_rule
这个延迟执行设计的太巧妙了,蓝图中添加的路由规则只有在 register 方法中才真正的被添加到全局的路由 map 中。