说到索引,很多人肯定会立马想到 MySQL 中使用了 B+ 树这种索引结构,毕竟是面试常考题型。我在准备面试的过程中,也只是浅显的了解到 MySQL 在创建主键时会自动创建一个主键索引,除此之外,还有联合索引、唯一索引等索引结构。创建主键索引的时候会通过 B+ 树来存储数据。所以,MySQL 为什么要使用 B+ 树作为索引?还有没有其他索引结构?索引到底是干什么的?带着这些疑问,我查阅了相关的资料,同时以项目背景为出发点,希望对索引做一次重新认识。

要实现一个最最简单的数据库,利用 Bash 函数就可以做到:

#!/bin/bash
db_set () {
	echo "$1,$2" >> database
}

db_get () {
	grep "^$1," database | sed -e "s/^$1,//" | tail -n 1
}

这两个函数就是一个最简单的键值数据库,存数据时,不断在 database 文件末尾添加一行键值对即可,查数据时,通过 grep 会找到数据及之前所有的修改记录。这个数据库文件就像一个日志,不断的在末尾追加数据,由于传统磁盘对顺序写表现出的优异性能,数据库的写入性能会很高。但是有一个问题,如果一直往该文件中添加数据,久而久之,这个文件会变得异常大,文件中会出现很多重复的数据,同时,查找数据也是个问题,每次都要从头到尾找一次,别人都下班了,你还在等着查询结果。

既然文件很大,把它拆分成一个个小文件不就行了,同时定期对这些小文件进行压缩,剔除掉重复的和不要的数据,节省存储空间。但是查数据的问题怎么解决,现在的方式,每次找数据都要对整个表扫描一次,黄花菜都凉了。解决办法就是使用索引。

索引是什么

索引的本质就像一部字典前面的检索页,查字典时,通过拼音开头的字母可以快速查找到某个字的位置,索引也一样,通过给数据库中的数据添加类似路标的记号,这样从索引中就可以直接检索到该数据的位置。

让我们用最简单的 哈希索引 试试,哈希索引就是 在内存中将每个键都映射到数据文件中的字节偏移量,这个偏移量就是键对应值的位置。查找时,只要通过哈希映射找到偏移量,然后寻找该位置读取即可。

hash 索引

听起来是个不错的办法,但是大部分数据库都不会使用哈希索引,因为实际的数据库系统需要考虑的东西很多,比如发生崩溃后怎么处理,数据如何保证不丢失。除此之外,哈希索引需要整个放进内存中,如果我的键很多,散列冲突的可能性也会变大,同时,对于数据的范围查询支持也不够。

有没有更好的办法?

LSM-Tree 索引

聪明的大脑已经帮我们想出了解决办法。Google 传奇工程师 Jeff Dean 等人开发了一个可持久化的键值数据库 LevelDB。LevelDB 使用了一种称之为 SSTable(Sorted String Table)的表结构,顾名思义,SSTable 中数据会按照键的顺序排列,同时 LevelDB 在内存中维护着一个称之为 LSM-Tree (Log Structured Merge Tree)的索引结构,LSM-Tree 通过某种平衡树(比如基数树)结构保证键的有序,在超出树的容量后,就将其作为 SSTable 写入磁盘中,同时新的数据会被写入一个新的 LSM-Tree 中。

正如文章之前说的,LevelDB 会定期的在后台执行合并操作,将多个 SSTable 压缩为一个,以去除重复的或删除不用的数据。那么如何查找数据呢?既然键是有序的,查找就好办多了,比如使用二分查找,时间复杂度可以降到 O(log2n) 。LevelDB 会先在内存中查找关键字,然后在最近的 SSTable 中查找,然后在之前的表中查找。当然,在实际的实现中,LevelDB 对细节做了很多的优化,比如使用多层压缩来提升性能,这些都是后话了。

除了 LevelDB,Facebook 基于 LevelDB 开发了性能更好的 RocksDB,国内大名鼎鼎的分布式数据库 TiDB 底层存储就使用的 RocksDB。

B+Tree 索引

能不能使用 B+ 树来做索引?

当然可以,开源数据库 BoltDB 就使用 B+ 树来实现索引,而 etcd 的底层就使用的 BoltDB。除此之外,常见的关系型数据库(比如 MySQL、PostgresSQL 等)也常常使用 B+ 树来实现索引。相较于键值数据库,MySQL 等关系型数据库在数据存储上更为复杂,比如 Compact、Dynamic 行格式等,此处不做深究。所以,B+ 树索引有什么优点,为什么要用它来做索引。

基于 LSM-Tree 索引的数据库由于顺序写入的特点,有着很高的写入吞吐量,因为所有的前台写入都发生在内存中,并且所有后台写入都保持着顺序访问的模式。但是对于查询来说,往往需要在多个 SSTable 中依次查找,导致读取的吞吐量下降。而 B+ 树就不同了,在 B+ 树中只有叶子节点存储着数据,并且叶子节点之间通过链表顺序连接,在查找时,通过根节点确定数据的范围,然后顺着叶子节点的链表查找即可。在找到数据行对应的页之后,数据库会把整个页读入到内存中,并在内存中查找具体的数据行。

B+Tree

这就是为什么 MySQL 等数据库不使用 B 树来实现索引的原因,B 树中每个节点都会存储数据,在查找时,总是需要从根节点向下遍历子树查找满足条件的数据行,这个特点带来了大量的随机 I/O。

从 B+ 树的特点中,我们也能看到,其对于读取数据有着很好的性能,同时对于条件范围查询也能很好的支持,但是由于存储数据时会发生重复的随机磁盘写入,写入性能较差。为此,我也通过实验比较了几种使用不同索引结构的数据库,实验设备为公司电脑,环境为 Centos7 虚拟机(4core+4GB+40GB 磁盘)。

chart

可以看到,基于 B+ 树索引的 boltdb 和 bboltdb 的读取吞吐量很高,但是写入吞吐量却很低,而 rocksdb 和 leveldb 等基于 LSM 的数据库在写入吞吐量上表现优异,读取稍逊。

总结

数据库使用索引可以加快查询的速度,LSM 树索引的写入性能优异,而 B+ 树索引的读取性能更高,每条路上都有自己独特的风景,我们需要场景来选择合适的索引结构。除此之外,还存在着为搜索引擎设计的全文索引、模糊索引等。而引入索引的过程也造成了写的缓慢,这也是需要权衡的事。

(由于我的水平有限,文中难免出现错误,还望指正。)

参考文献

  1. https://draveness.me/whys-the-design-mysql-b-plus-tree/
  2. 《design data intensive application》——Martin Kleppmann
  3. https://github.com/boltdb/bolt
  4. https://github.com/google/leveldb
  5. https://shiniao.fun/posts/%E5%9F%BA%E4%BA%8Elsm%E7%9A%84%E6%95%B0%E6%8D%AE%E5%BA%93%E9%94%AE%E5%80%BC%E5%AD%98%E5%82%A8/
  6. https://yetanotherdevblog.com/lsm/

ERROR

文章有误,待更新。。。

数据库中使用索引可以加快查询的速度,索引的意思是说,给某些数据添加类似路标的记号,这样从索引中就可以直接检索到该数据的位置。以 MySQL 为例,添加主键时默认会为该属性加上主键索引,除此之外,MySQL 中还有联合索引、唯一索引等。以 MySQL 等为代表的关系型数据中索引常常用 B+ 树来实现,在 B+ 树中,叶子节点包含了所有的关键字的信息,并且按照主键大小排列,非叶子节点中存放着指向叶子节点中的指针(页号和页对应列的最小记录)。除了用 B+ 树实现索引外,常见的还有 Hash 索引、全文索引以及 LSM 树索引。最简单的是 Hash 索引,标注键在数据库的位置,直接通过 hash 映射找到键的位置即可。而 LSM 树常常用在键值数据库的索引实现上。

LSM 树

LSM(Log-Structured Merge Tree)树索引伴随着键值数据库而出现,以 RocksDB、LevelDB 为代表,比如国内著名的分布式数据库 TiDB 的底层存储实现就是用的 RocksDB。存键值对最简单的方式是直接以追加写的方式写入一个文件即可,但是不能一直写呀,否则这个文件会变得很大,同时可能存在很多重复的值(比如对访问量的计数),所以要把文件分成多个段,这样一个文件写满之后进行压缩,剔除重复的、不要的数据,然后在新的文件中写入。因为磁盘的特性,顺序写入的性能很高,但是查找数据是个问题,每次都要全表扫描,如果磁盘中的数据是有序的就好了,查找就会很快(二分查找)。

这种数据存储的方式其实叫做SSTable(排序字符串表),在每个 SSTable 文件中,数据按照键的顺序排序,当一个 SSTable 满了之后,通过合并压缩的方式,删除旧值以及重复的值。另外,数据肯定不会直接写入磁盘中的 SSTable,首先会写入内存中,也叫做内存表,当内存表超出大小后才作为 SSTable 文件写入磁盘,然后在后台定期压缩。

那么如何保证写入内存表的键值对是有序的?可以使用红黑树、B+ 树来实现,也有使用基数树来实现的(比如下面介绍的 bitcask 就使用基数树来对键值对排序)。这种先在内存中构建一颗有序树,当大小超出后写入磁盘的方式就是LSM 树

在读取数据的时候,首先在内存表中查找键所在的文件位置,然后在最近的 SSTable 中查找,没有的话继续找之前的。同时磁盘中的 SSTable 会定期合并压缩,成为新的 SSTable,这样可以节省空间,提高性能。

LSM 树的主要优点是所有前台写入都发生在内存中,并且所有后台写入都保持顺序访问模式。有着很高的写入吞吐量。

上文说的 LevelDB 来源于 Google 的 SSTable 论文,而 RocksDB 是对 levelDB 的一些改进,为了更加深入的了解 LSM 以及 SSTable,本文以类似的键值数据库bitcask为例,看看它们具体是怎么实现的。

bitcask

bitcask 在保证键的顺序上使用了一种自适应基数树(ART)的算法结构,论文在这:

https://db.in.tum.de/~leis/papers/ART.pdf

ART 是对基数树的一种改进算法,基数树就是前缀(Trie)树,只不过更节省空间。在前缀树中,每个节点是一个单词,而基数树中,如果一个节点是父节点的唯一子节点的话,那么该子节点将会与父节点进行合并。以插入 hello、hat、have 三个单词为例:

# trie
		e - l - l - o
	  /
* - h - a - t
	      \
	       v - e

# radix
			*
           /
        (ello)
         /
* - h - * -(a) - * - (t) - *
                 \
                 (ve)
                   \
                    *

前缀算法需要十个节点,而基数树算法只需要五个节点就能表示。

在 bitcask 中,SSTable 表示如下:

type datafile struct {
	sync.RWMutex

	id           int
	r            *os.File
	ra           *mmap.ReaderAt
	w            *os.File
	offset       int64
	// decode and encode 二进制
	dec          *codec.Decoder
	enc          *codec.Encoder
	maxKeySize   uint32
	maxValueSize uint64
}

在查找数据的时候,首先会从基于 ART 实现的索引中找到键所在的位置,然后在当前 SSTable 和之前的分别查找。


// Get retrieves the value of the given key. If the key is not found or an/I/O
// error occurs a null byte slice is returned along with the error.
func (b *Bitcask) Get(key []byte) ([]byte, error) {
	var df data.Datafile

	b.mu.RLock()
	// 优化,可以通过 bloom 算法判断键存不存在,存在的话继续 search
	// 不存在的话直接 error
	// 从 ART 索引找到键的位置
	// key: [fileid, offset, size]
	value, found := b.trie.Search(key)
	if !found {
		b.mu.RUnlock()
		return nil, ErrKeyNotFound
	}

	item := value.(internal.Item)
	// 如果这个键在当前 SSTable 中
	if item.FileID == b.curr.FileID() {
		// 查当前
		df = b.curr
	} else {
		// 查之前 SSTable(磁盘中)
		df = b.datafiles[item.FileID]
	}
	// 读取
	e, err := df.ReadAt(item.Offset, item.Size)
	b.mu.RUnlock()
	if err != nil {
		return nil, err
	}
	// 校验
	checksum := crc32.ChecksumIEEE(e.Value)
	if checksum != e.Checksum {
		return nil, ErrChecksumFailed
	}

	return e.Value, nil
}

写入键值对的时候,

  1. 如果 SSTable 超出大小了,关闭当前 SSTable,并再次打开,不过这次只能读了,不能写入(归档)
  2. 新建一个 SSTable,分配读写权限
  3. 如果没有超出大小,写入即可(encode)
// Put stores the key and value in the database.
func (b *Bitcask) Put(key, value []byte) error {
	......
	b.mu.Lock()
	// 写入
	offset, n, err := b.put(key, value)
	......
	item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
	// 加入 ART 索引
	b.trie.Insert(key, item)
	b.mu.Unlock()
	return nil
}

// put inserts a new (key, value). Both key and value are valid inputs.
func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
	size := b.curr.Size()
	// 一个 SSTable(默认 1MB)装不下了
	if size >= int64(b.config.MaxDatafileSize) {
		// 关闭当前 SSTable
		err := b.curr.Close()
		if err != nil {
			return -1, 0, err
		}

		id := b.curr.FileID()
		// 将当前 SSTable 归档,设为只读
		df, err := data.NewDatafile(b.path, id, true, b.config.MaxKeySize, b.config.MaxValueSize)
		if err != nil {
			return -1, 0, err
		}

		b.datafiles[id] = df

		id = b.curr.FileID() + 1
		// 新建 SSTable 文件(id+1),并分配读写权限
		curr, err := data.NewDatafile(b.path, id, false, b.config.MaxKeySize, b.config.MaxValueSize)
		if err != nil {
			return -1, 0, err
		}
		b.curr = curr
	}
	// 写入 key-value
	e := internal.NewEntry(key, value)
	return b.curr.Write(e)
}

而合并和压缩 SSTable 会在后台周期性的执行:

  1. 创建临时表
  2. 查找键是否在 ART 索引叶子节点上,将存在的放入临时表中(合并压缩,老的会被新的代替)
  3. 移除所有 SSTable
  4. 重命名临时表为新的 SSTable
  5. 重新打开数据库
	// Rewrite all key/value pairs into merged database
	// Doing this automatically strips deleted keys and
	// old key/value pairs
	err = b.Fold(func(key []byte) error {
		// b.Get 的 key 都是经过 ART 处理过的
		value, err := b.Get(key)
		if err != nil {
			return err
		}

		if err := mdb.Put(key, value); err != nil {
			return err
		}

		return nil
	})


// Fold iterates over all keys in the database calling the function `f` for
// each key. If the function returns an error, no further keys are processed
// and the error returned.
func (b *Bitcask) Fold(f func(key []byte) error) (err error) {
	b.mu.RLock()
	defer b.mu.RUnlock()

	// key 在 ART 的叶子节点,返回 true,否则返回 false,不处理
	b.trie.ForEach(func(node art.Node) bool {
		if err = f(node.Key()); err != nil {
			return false
		}
		return true
	})

	return
}

参考文献:

[1] “The Adaptive Radix Tree:ARTful Indexing for Main-Memory Databases”, Viktor Leis, Alfons Kemper, Thomas Neumann.

[2] 《设计数据密集型应用》

[3] https://github.com/prologic/bitcask

[4] https://stackoverflow.com/questions/14708134/what-is-the-difference-between-trie-and-radix-trie-data-structures

数据库的发展经历了从传统的关系型数据库、NoSQL(Not Only SQL)数据库到近几年新出现的分布式 NewSQL 数据库,整个趋势由单机逐渐向分布式方向发展。关系型数据库自 1970 年由 Edgar Codd 提出以来[1],在相当长的一段时间内,成为市场占有量最大的数据库产品。除此之外,网络型数据库和分层型数据库也在一段时间内短暂出现过。

然而关系型数据库有自身的不足之处,其数据关系模型表达与实际应用层之间存在不连贯性,且无法高效的扩展到多个节点,以及对于大数据量、高吞吐量的写入支持有限等。NoSQL 的出现旨在解决这些不足,NoSQL 被解释为“不仅仅是 SQL”,以 Google 的 BigTable[2] 和 Amazon 的 DynamoDB[3] 为代表,其在模型上更加灵活,包括文档、键值、列族、图等多种数据模型。以开源 NoSQL 数据库 MongoDB 为例,其数据模型以文档作为基本结构,文档中可任意存放键值对,数据模型由存入数据的结构决定。

在数据关系的表达上,NoSQL 对于一对多关系有更强的灵活性,对查询更友好,不需要跨表连接,但是对于多对多关系,两种数据库并没有多大不同。除此之外,NoSQL 数据库往往针对可扩展性、高可用性等专门设计,这使得其支持更复杂的多数据中心架构,性能也更强。然而传统的关系型数据库近些年在其数据模型和高可用性等方面也添加了相应支持,比如开源数据库 PostgreSQL 在其 9.3 版本之后,添加了对文档模型的支持。在高可用上,MySQL 也支持主从复制以及自 5.7 版本之后出现的 MySQL Group Replication 技术,进一步增强了在扩展性和高可用性上的支持。可以说,关系型数据库和 NoSQL 数据库相互借鉴各自的优点,协同发展,呈现出混合持久化的状态。

分布式 NewSQL 数据库的出现基于可扩展性、高可用性、数据一致性等方面的考虑。其中可扩展性是指水平方向(垂直扩展指的是扩展单机,以共享内存或者共享磁盘的方式存在)的扩展,将单台机器的负载分散到多台机器上,提供更强的处理能力。高可用保障了在单台机器出现故障的情况下,系统仍能继续提供服务。数据一致性作为分布式事务的必要条件,保证了所有节点对某个事件达成一致。NewSQL 概念的产生来源于 Google 于 2012 年发表的 Spanner[4] 数据库,该论文将传统关系模型以及 NoSQL 数据库的扩展性相结合,使得数据库同时支持分布式又具有传统 SQL 的能力。除了 Spanner 数据库,国外的 CockroachDB 以及国内的 TDSQL、MyCAT、TiDB[5]、OceanBase、SequoiaDB 等都是新兴的分布式数据库产品。

分布式数据库从实现上可以分为三类:

  • 一类是以传统数据库组成集群,利用主从复制等实现分布式,比如 MySQL 集群方案
  • 一类是在现有数据库之上以中间件代理的形式,提供自动分库分表、故障切换、分布式事务等支持,以 MyCAT、TDSQL 等为代表。
  • 一类是原生的分布式架构,通过共识算法实现高可用性、扩展性、数据一致性等支持,以 TiDB、OceanBase 等为代表。

分布式 CAP 理论指出[6],一个分布式系统不可能同时满足一致性、可用性和分区容错性这三个特点,其中分区容错性无法避免,势必要在一致性和可用性中做出权衡。

可用性的保证可以通过复制技术实现,通过在多台机器上保存数据副本,提高系统可用性和读取吞吐量。MyCAT 以及 TDSQL 均支持主从复制的方式。传统主从复制方式的问题在于无法保证数据的强一致性,如果主库故障,可能会出现多个节点成为主库(脑裂问题),导致数据丢失或损坏。MySQL 在 5.7 版本推出了 MySQL Group Replication 功能,实现了基于 Paxos 共识算法的高可用性和数据强一致性保证,TiDB 基于 Raft 共识算法[7]保证了数据的强一致。

分布式数据库的另一个特点是对事务的支持,分布式场景下保障事务的 ACID 原则常见的办法有 2PC 协议、TCC 协议以及 SAGA 协议,TiDB、OceanBase 等均使用两阶段提交协议(2PC)来实现跨多个节点的事务提交。

动态扩展的实现可使用分区的方式,将原有单个节点的压力分散到多个节点,提升系统性能。分区面临的问题是如何将数据和查询负载均匀分布在各个节点,常见的解决办法有基于 Hash 的分区和基于 Range 的分区,TiDB 使用 Range 的方式分区,而 OceanBase 两种都支持。除了对以上特点的支持,分布式数据库还具有 HTAP、SQL 引擎、兼容性等特点。

参考文献:

[1] Edgar F. Codd: “A Relational Model of Data for Large Shared Data Banks,” Communications of the ACM, volume 13, number 6, pages 377–387, June 1970.

[2] CHANG, Fay, DEAN, et al. Bigtable : A Distributed Storage System for Structured Data[J]. Acm Transactions on Computer Systems, 2008, 26(2):1-26.

[3] Decandia G, Hastorun D, Jampani M, et al. Dynamo: Amazon’s Highly Available Key-value Store[J]. Acm Sigops Operating Systems Review, 2007, 41(6):205-220.

[4] J. C. Corbett, J. Dean, M. Epstein, A. Fikes, et al. Spanner: Google’s Globally Distributed Database. ACMTrans. Comput. Syst., 31(3):8:1–8:22, 2013.

[5] Dongxu Huang, Qi Liu, Qiu Cui, Zhuhe Fang, Xiaoyu Ma, Fei Xu, Li Shen, Liu Tang, Yuxing Zhou, Menglong Huang, Wan Wei, Cong Liu, Jian Zhang, Jianjun Li, Xuelian Wu, Lingyu Song, Ruoxi Sun, Shuaipeng Yu, Lei Zhao, Nicholas Cameron, Liquan Pei, Xin Tang. TiDB: A Raft-based HTAP Database. PVLDB, 13(12): 3072-3084, 2020.

[6] Seth Gilbert and Nancy Lynch: “Perspectives on the CAP Theorem,” IEEE Computer Magazine, volume 45, number 2, pages 30–36, February 2012.

[7] Heidi Howard, Malte Schwarzkopf, Anil Madhavapeddy, and Jon Crowcroft: “Raft Refloated: Do We Have Consensus?,” ACM SIGOPS Operating Systems Review, volume 49, number 1, pages 12–21, January 2015. doi:10.1145/2723872.2723876

什么是 Lock-Free?

在并发访问某个资源的实现中,经常利用锁机制来保证对资源的正确访问。但是锁机制的问题在于会出先死锁、活锁或者线程调度优先级被抢占等问题,同时锁的增加和释放都会消耗时间,导致性能问题。

Lock-Free 指的是不通过锁机制来保证资源的并发访问。也就是说线程间不会相互阻塞了。

lock-free)

实现 Lock-Free 常见的解决办法是利用 CAS 操作,CAS 是啥?

CAS(Compare and Swap)是一种原子操作,原子很好理解,不可分割(比如原子事务),原子操作意味着 CPU 在操作内存时(读写)要么一次完成,要么失败,不会出现只完成一部分的现象。现代 CPU 对原子的读写操作都有相应的支持,比如 X86/64 架构就通过 CAS 的方式来实现,而 ARM 通过 LL/SC(Load-Link/Store-Conditional)来实现。

在 Go 语言中,可通过 atomic 包中的 CompareAndSwap** 方法来编程实现 CAS:

func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

使用 CAS 的过程中有一个问题,考虑如下状况:

如果线程 1 读取共享内存地址得到 A,这时候线程 2 抢占线程 1,将 A 的值修改为 B,然后又改回 A,线程 1 再次读取得到 A,虽然结果相同,但是 A 已经被修改过了,这个就是ABA 问题

一种办法是通过类似版本号的方式来解决,每次更新的时候 counter+1,比如对于上面的问题,在线程 2 修改的时候,因为增加了版本号,导致修改前后的 A 值并不相同:

1A--2B--3A

在论文《 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms》 中,描述了一种利用 CAS 的 Lock-Free 队列的实现,通过 counter 机制解决了 CAS 中的 ABA 问题,并且给出了详细的伪代码实现,可查看论文中的详细介绍。

structure pointer_t {ptr: pointer to node_t, count: unsigned integer}
 structure node_t {value: data type, next: pointer_t}
 structure queue_t {Head: pointer_t, Tail: pointer_t}

 initialize(Q: pointer to queue_t)
    node = new_node()		// Allocate a free node
    node->next.ptr = NULL	// Make it the only node in the linked list
    Q->Head.ptr = Q->Tail.ptr = node	// Both Head and Tail point to it

 enqueue(Q: pointer to queue_t, value: data type)
  E1:   node = new_node()	// Allocate a new node from the free list
  E2:   node->value = value	// Copy enqueued value into node
  E3:   node->next.ptr = NULL	// Set next pointer of node to NULL
  E4:   loop			// Keep trying until Enqueue is done
  E5:      tail = Q->Tail	// Read Tail.ptr and Tail.count together
  E6:      next = tail.ptr->next	// Read next ptr and count fields together
  E7:      if tail == Q->Tail	// Are tail and next consistent?
              // Was Tail pointing to the last node?
  E8:         if next.ptr == NULL
                 // Try to link node at the end of the linked list
  E9:            if CAS(&tail.ptr->next, next, <node, next.count+1>)
 E10:               break	// Enqueue is done.  Exit loop
 E11:            endif
 E12:         else		// Tail was not pointing to the last node
                 // Try to swing Tail to the next node
 E13:            CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
 E14:         endif
 E15:      endif
 E16:   endloop
        // Enqueue is done.  Try to swing Tail to the inserted node
 E17:   CAS(&Q->Tail, tail, <node, tail.count+1>)

 dequeue(Q: pointer to queue_t, pvalue: pointer to data type): boolean
  D1:   loop			     // Keep trying until Dequeue is done
  D2:      head = Q->Head	     // Read Head
  D3:      tail = Q->Tail	     // Read Tail
  D4:      next = head.ptr->next    // Read Head.ptr->next
  D5:      if head == Q->Head	     // Are head, tail, and next consistent?
  D6:         if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
  D7:            if next.ptr == NULL  // Is queue empty?
  D8:               return FALSE      // Queue is empty, couldn't dequeue
  D9:            endif
                 // Tail is falling behind.  Try to advance it
 D10:            CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
 D11:         else		     // No need to deal with Tail
                 // Read value before CAS
                 // Otherwise, another dequeue might free the next node
 D12:            *pvalue = next.ptr->value
                 // Try to swing Head to the next node
 D13:            if CAS(&Q->Head, head, <next.ptr, head.count+1>)
 D14:               break             // Dequeue is done.  Exit loop
 D15:            endif
 D16:         endif
 D17:      endif
 D18:   endloop
 D19:   free(head.ptr)		     // It is safe now to free the old node
 D20:   return TRUE                   // Queue was not empty, dequeue succeeded

除此之外,该论文还给出了一种 two-lock 的并发队列实现,通过在 Head 和 Tail 分别添加锁,来保证入队和出队的完全并发操作。

Lock-Free 常用来实现底层的数据结构,比如队列、栈等,本文比较了使用单锁机制的队列实现和参考上述论文的 Lock-Free 队列实现,在 1<<12 个节点的出队入队中,两种算法实现的性能测试结果如下图所示:

性能测试

可以看到,随着处理器个数的增加,队列的 Lock-Free 算法一直稳定在 200ns/op,性能更佳,而使用锁的算法耗时要高出一倍。

代码实现参考:

https://github.com/rilkee/distributed/queue/

参考文献:

  1. http://preshing.com/20120612/an-introduction-to-lock-free-programming/
  2. Michael, M. M., & Scott, M. L. (1996). Simple, fast, and practical non-blocking and blocking concurrent queue algorithms. Proceedings of the Annual ACM Symposium on Principles of Distributed Computing, 267–275. https://doi.org/10.1145/248052.248106

事务在数据库中代表一系列操作要么全部都完成,要么全部都失败,ACID 规定了事务操作的原子性、一致性、隔离性和持久性。然而数据库的环境不可能只在单机上,在分布式环境下,一个事务中某个操作可能发往 A 节点,而另一个操作发往 B 节点,这就导致无法保证 ACID 的原则。

实现分布式事务常见的解决办法有以下几种:XA 两阶段提交协议、TCC 协议和 SAGA 协议。但是这些解决办法都不可能完全保证事务不出错。分布式系统中有一个 CAP 定理,说的是在分布式情况下,不可能同时满足一致性、可用性和容错性这三个条件,一般需要满足其中两个条件。

XA 两阶段提交协议

XA 协议规定了分布式事务的标准,其中 AP 代表应用程序,TM 代表事务管理器,负责协调和管理事务,而RM 代表着资源管理器。

image-20200723085002817

而事务的具体处理过程就是 TM 和 RM 之间的交互,分为两个阶段:

第一阶段:事务管理器要求每个涉及到事务的数据库预提交 (precommit) 此操作,并反映是否可以提交。

第二阶段:事务管理器要求每个数据库提交数据,或者回滚数据。

以 MySQL 中的 XA 处理逻辑为例(MySQL5.7 版本实现了对 XA 协议的支持),来看下这两个阶段的逻辑处理过程。

对于一个事务:

begin;
insert into student values ('xiaoming', 18);
update test set age = 18 where name = 'xiaoming';
commit;

第一阶段

事务管理器会生成一个全局的事务 ID,比如使用 uuid 生成一个唯一的 ID,为了方便用 xid1 代替。

首先,遇到 begin,不处理。

然后是 insert 操作,事务管理器根据表中主键的值计算(hash)出应该分布在哪个节点上,比如 insert 语句被计算出应该发到节点 A 上,事务管理器就像 A 节点发送命令开始 XA 事务,同时将 insert 语句发送过去。

xa start 'xid1';  # 开启事务
insert into student values ('xiaoming', 18);

接下来 update 操作,同样的,事务管理器根据主键计算所属节点,开启 XA,发送 update 语句。

xa start 'xid1';
update test set age = 18 where name = 'xiaoming';

commit 的时候,事务管理器分别向节点 A 和 B 发送一个预提交操作:

xa end 'xid1';
xa prepare 'xid1';

第二阶段

如果节点 A 和 B 都返回就绪 ready,此时进入 第二阶段

事务管理器分别向节点 AB 发送 commit 操作:

xa commit 'xid1';

相反的,如果有任何一个节点是 unready,事务管理器就会通知 A、B 节点的操作回滚:

xa rollback'xid1';

有一个问题,如果在进入第二阶段 commit 的时候,某个数据节点出现故障,会导致节点状态不一致。解决办法是把 XA 事务处理的过程也存入日志数据,比如 MySQL 将其写入了 binlog,这样在出现问题时还可以恢复。

整个 XA 的过程:

# 阶段一
xa start 'xid1';
insert into test values (1, 1);

xa start 'xid1';
update test set b = 1 where a = 10;

xa end 'xid1';
xa prepare 'xid1';

# 阶段二
xa commit 'xid1';
# or
xa rollback 'xid1'; # 失败回滚

EverDB 分布式事务的支持

MyCat 中的实现

EDB-Grid 组件中,借鉴了 MyCat(也是一个数据库中间件)的 XA 处理逻辑,MyCat 根据 XA 协议实现了对分布式事务的支持,具体来说:

通过数据库编程接口(比如 JDBC,也就是 XA 协议中的 AP)开启 XA 事务,然后执行 SQL 语句,预提交,最后 commit。

 // 开始 XA 事务
 conn.prepareStatement("set xa=on").execute();

// 插入语句
// 分别预提交
conn.prepareStatement(sql1).execute();
conn.prepareStatement(sql2).execute();

// commit
 conn.commit();

过程跟 MySQL 类似,在实现上,利用 uuid 生成了一个全局的事务 ID:

public void setXATXEnabled(boolean xaTXEnabled) {
   if (xaTXEnabled) {
       if (this.xaTXID == null) {
           xaTXID = genXATXID(); // 获得 XA 事务编号
       }
   } else {
       this.xaTXID = null;
   }
}
//......
public static String getUUID() {
   String s = UUID.randomUUID().toString();
   return s.substring(0, 8) + s.substring(9, 13) + s.substring(14, 18) + s.substring(19, 23) + s.substring(24);
}

然后在事务管理器向节点分发语句时,会先写入 XA START:

if (expectAutocommit == false && xaTxID != null && xaStatus == TxState.TX_INITIALIZE_STATE) {
       xaCmd = "XA START " + xaTxID + ';';
       this.xaStatus = TxState.TX_STARTED_STATE;
   }

//......

// and our query sql to multi command at last
sb.append(rrn.getStatement() + ";");
// syn and execute others
this.sendQueryCmd(sb.toString());

MyCat 在执行事务操作是,会同时将其写入日志中,保证可恢复。

if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE) { // XA 事务
               //recovery Log
               participantLogEntry[started] = new ParticipantLogEntry(xaTxId, conn.getHost(), 0, conn.getSchema(), ((MySQLConnection) conn).getXaStatus());
               String[] cmds = new String[]{"XA END " + xaTxId, // XA END 命令
                       "XA PREPARE " + xaTxId}; // XA PREPARE 命令
               mysqlCon.execBatchCmd(cmds);

同样的,commit 时也会同步写入日志。

rollback:

if (needRollback) {
           for (int j = 0; j < coordinatorLogEntry.participants.length; j++) {
               ParticipantLogEntry participantLogEntry = coordinatorLogEntry.participants[j];
               //XA rollback
               String xacmd = "XA ROLLBACK " + coordinatorLogEntry.id + ';';
               OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(new String[0], new XARollbackCallback());
               outloop:
               // ...

EverDB 中的实现

再来看下 EverDB 的处理过程:

首先是生成 xid,从 0 开始递增。

unsigned long XA_manager::generate_xid()
{
  unsigned long ret = 0;
  xid_mutex.acquire();
  try {
    //TODO: find a place to do init_max_xid
    if (!init_xid)
      init_max_xid();
    ret = xid_next++;
    if (!ret) // 0 is kept as the initial value
      ++ret;
      //...

开始 XA 事务:

void MySQLXA_helper::init_conn_to_start_xa(Session *session,
                                           DataSpace *space,
                                           Connection *conn)
{
  unsigned long xid = session->get_xa_id();

  // clear the pending transaction
  conn->execute_one_modify_sql("COMMIT;");

  // ......

    record_xa_redo_log(session, space, sql.c_str());  // log

  }

  // ...

  // start xa transaction
  sql += "XA START '";
  sql += tmp;
  sql += "';";
  conn->execute_one_modify_sql(sql.c_str());
  conn->set_start_xa_conn(true);
}

第二阶段:XA COMMIT 或者 ROLLBACK:

void xa_commit_or_rollback_xid(Connection *conn, string xid, int flag)
{
  string sql("");
  if (flag == TC_TRANSACTION_COMMIT)
    sql += "XA COMMIT '"; // xa commit
  else if (flag != TC_TRANSACTION_COMMIT)
    sql += "XA ROLLBACK '"; // xa rollback

  sql += xid.c_str();
  sql += "';";

  check_xa_sql_is_not_running(conn, sql);
  TimeValue timeout = TimeValue(backend_sql_net_timeout);
  //......
  }
}

同时事务处理的过程会写入 redolog 中,比如上面的开始 XA 事务中 record_xa_redo_log