在事务数据库中,借助事务才能保证读写数据的正确性。本文介绍事务系统的设计
简要的说,事务是对数据库数据的读写序列。 以事务生命周期为线索,串起相关模块,来理解事务系统的工作机制。 事务生命周期:
- 创建事务。为事务分配id和开始时间戳。
- 事务读数据。并发事务环境下,一个事务能读到哪些数据,并如何读到这些数据。
- 事务写数据。新增、删除、更新都属于写的范畴。生成undo log。冲突检测。
- 提交事务。为事务分配提交时间戳。生成redo log并落盘。有可能出发checkpoint。
- 撤销事务。回滚事务数据。
- GC事务。事务已经提交,但是事务的数据有可能被其他事务引用,还不能立即从系统删除事务对象。
设计了一些模块,共同完成事务功能。
- 事务管理器。创建、提交、销毁、GC事务。维护活跃事务队列。触发checkpoint.
- 事务对象。维护时间戳,记录undo log,事务写的数据。提交操作,回滚操作。
- walog。事务提交时,记录redo log。
- replay。存储引擎启动时,重放未checkpoint的redo log。
- checkpoint。checkpoint为动词时,表示打checkpoint的行为:redo log与表已有数据结合,生成最新事务修改后的数据。checkpoint为名词时,表示打checkpoint之后的结果。
- 数据多版本。多版本的生成、维护、销毁、可见性。
TxnMgr
创建、提交、回滚事务。
维护以下状态:
- 当前开始时间戳。整数值。从2开始。每创建新事务后加1。小于
2^62
- 当前事务id,整数值。从
2^62
开始。每创建新事务后加1。大于等于2^62
- 最低活跃事务id和最低活跃事务开始时间。不一定来自同一个活跃事务。
- 活跃事务队列。已经创建未commit或rollback的事务。
- 已经提交事务队列。已经commit但未GC的事务。
- 待GC事务队列。能GC但还未GC的事务。
TxnMgr{
currentStartTs uint64
currentTxnId uint64
lowestActiveId uint64
lowestActiveStart uint64
activeTxns []Txn
committedTxns []Txn
oldTxns []Txn
}
Txn
表示一个事务对象。
有以下状态:
- 名词。标识作用。
- 开始时间戳。
- id.
- commit时间戳。提交时更新。
- undo log。事务操作日志。
- local存储。事务写的但未提交的数据。
- 活跃query id. GC时用到。
- 最高活跃query id.GC时用到。
Txn{
name
startTs
id
commitid
undoBuffer UndoBuffer
local storage
active query id
highest active query id
}
初始化Txn
对象
- 分配开始时间戳
- 分配事务id
- 提交id = 0
- 初始化undo log
- 初始化local 存储
并记录Txn
对象到TxnMgr
的活跃事务队列。
checkpoint会在后面单独讲。这里只讲不带checkpoint的提交框架。
- 分配commit时间戳。也是从
TxnMgr.currentStartTs
分配。小于2^62
。小于事务id。大于事务开始时间戳。 Txn
对象的提交操作。后续会单独讲。TxnMgr
GC事务。
Txn
对象的回滚操作。后续会单独讲。TxnMgr
GC事务。
- 计算最低活跃事务id、最低活跃事务开始时间、最低active query id。遍历活跃事务, 分别求三个值的最小值。
- 从活跃队列删除
Txn
对象 - 如果是提交事务,
Txn
对象进已经提交事务队列。 - 如果是回滚事务,
Txn
对象进GC队列。并记录此时的活跃query id. - 清理已经提交事务队列,找出能可以去GC的
Txn
对象,并进入GC队列。并给Txn
对象最高活跃query id赋值,用于后续GC。判断条件: commitId < 最低活跃事务开始时间 - 清理GC队列,找出能被GC的事务。判断条件:最高活跃query id < 最低active query id。
与数据直接相关的组件:
- UndoBuffer。记录undo log.
- LocalStorage。local存储。事务已经写入但未提交的数据。
事务有insert、delete、update、catalog操作,一一对应有undo log entry。 UndoBuffer 是undo log entry的数组。不同的操作类型,会在不同的时机创建entry。这个在后面会具体讲。
UndoBuffer{
undo log entry []buffer ptr
}
LocalStorage记录事务对每张表写的数据。
LocalStorage{
txn
tableStorage map<DataTable,LocalTableStorage>
}
LocalTableStorage{
DataTable 对应的表
RowGroupCollection 事务对此表写入的数据。
}
Txn
提交事务。
- LocalStorage提交操作
- UndoBuffer提交操作
- walog刷盘
事务提交时,要先提交LocalStorage
的数据。
实质是遍历每个表,将事务写的数据合并到表中去。并更新索引和生成undo log.
func (LocalStorage) Commit() {
for table,localTableStorage := LocalStorage.tableStorage {
LocalStorage.Flush(table,localTableStorage)
}
}
func(LocalStroage) Flush(table,localTableStorage){
if (表为空 或者 批量append)且事务无删除操作 {
//特殊情况
更新索引;
将localTableStorage的数据直接挪动到表中;
}else{
//一般情况
将localTableStorage中的数据读出来,append到表中并更新索引
}
生成undo log insert entry = {start row,count,table}
}
其中Flush
函数完成合并数据、更新索引、生成undo log。
- 合并数据。将
localTableStorage
中的数据读出来,append到表中。同时也能插入索引。表的append操作和索引的内容后续再讲。 - 生成undo log insert entry。记录insert的内容:开始rowid,数据总行数,目标表对象。
事务提交时,在LocalStorage
提交后,提交UndoBuffer
。
遍历每个undo log entry,依据entry类型,基于entry 生成redo log写入walog.
func(UndoBuffer)Commit(walog,commitid){
for undo log entry := UndoBuffer.logs{
CommitEntry(entry type,entry buffer)
}
}
CommitEntry
对insert、delete、update、catalog类型的undo log entry,采用不同的做法生成redo log,并更新版本信息。
- insert entry
在
LocalStorage
提交时,已经将数据append表中,并且生成了insert entry={开始rowid,总行数,表}。 提交insert entry,实质是将表中由insert entry圈定的数据再读出来,写入walog. 这些再读出来的数据就是insert redo log。 写完walog后,更新insert entry圈定的数据的版本信息,即填充事务的commitid。
//写walog
insert entry.table.WriteToLog(walog,start rowid,count){
table.ScanTablesegment(start rowid, count,func(data){
walog.WriteInsert(data)
})
}
//更新版本信息,回填commitid
insert entry.table.CommitAppend(commitId,start rowid,count)
- delete entry 后续会讲delete entry是如何生成的。这里只介绍delete entry的提交方式。 delete entry={base row id,count,row offsets,table,ChunkInfo},表示对表删除了哪些行、总行数。对应的ChunkInfo. row offsets是相对于base row id的偏移。实际被删除的row id = base row id + row offset. 提交delete entry,是将实际被删除的row id写入walog. 更新版本信息ChunkInfo中每行的commitid。
//算时间被删除row id
for i < delete entry.count {
row ids[i] = base row id + row offsets[i]
}
//写walog
walog.WriteDelete(row ids);
//更新版本信息
delete entry.ChunkInfo.CommitDelete(
commitid,
delete entry.rows,
delete entry.count)
- update entry 后续会讲update entry是如何生成的。这里只介绍update entry的提交方式。 update entry = {UpdateSegment, vectorIndex, versionNumber,count,tuples,update entry ptr},表示对RowGroup的某个列的某个Vector中的某些行做了更新操作。RowGroup可以存60个Vector,每个Vector 2048行。 提交update entry,将update后的新值和rowid读出来,写入walog。 更新ChunkInfo的versionNumber,给版本信息填commit.
updateChunk {
updated data;
rowids;
}
//从版本链中取出update后的新值。实质是版本链表头结点中的值。
update entry.UpdateSegment.FetchCommitted(update entry.vectorIndex,updateChunk.updated data);
//计算row id
start rowid = column.start rowid + update entry.vectorIndex * 2048;
for i < update entry.count{
updateChunk.rowids[update entry.tuples[i]] = start rowid + update entry.tuples[i];
}
//写walog
walog.WriteUpdate(updateChunk)
//更新版本信息
update entry.versionNumber= commitId
- catalog entry 先跳过,后续介绍catalog时再讲。
Txn
回滚事务。
- LocalStorage回滚操作
- UndoBuffer回滚操作
事务对表的修改未提交,回滚时,只需要将LocalTableStorage
删除即可。
undo log相对复杂。undo log entry生成时,以追加方式进入UndoBuffer。回滚时,逆向遍历undo log entry,依据entry类型,回滚entry.
- insert entry
insert entry={开始rowid,总行数,表}。
回滚insert entry时,是将表中从
开始rowid
开始的行都删掉。也会删除索引中的数据。 通过开始rowid
计算所属的RowGroup。将此之后的RowGroup全部删掉。此RowGroup内部,对每列数据ColumnData计算开始rowid
所属的ColumnSegment
。将此之后的所有ColumnSegment
删掉。此ColumnSegment
内部,清理掉block上开始rowid
之后的数据即可。 同时删掉行版本信息。
insert entry.table.RevertAppend(insert entry.start rowid,insert entry.count){
table.rowGroupCollection.RevertAppend(start rowid,count)
}
table.rowGroupCollection.RevertAppend(start rowid,count){
segIdx :=计算start rowid所在的RowGroup;
删掉从segIdx+1开始的所有RowGroup;
所属RowGroup.RevertAppend(start rowid);
}
RowGroup.RevertAppend(start rowid){
删除版本信息
for columnData in RowGroup{
columnData.RevertAppend(start rowid);
}
}
columnData.RevertAppend(start rowid){
segIdx := 计算start rowid所在的columnSegment;
删掉从segIdx+1开始的所有ColumnSegment;
所属ColumnSegment.RevertAppend(start rowid);
}
ColumnSegment.RevertAppend(start rowid){
调整count即可清理block上的数据
}
- delete entry delete entry={base row id,count,row offsets,table,ChunkInfo},表示对表删除了哪些行、总行数。对应的ChunkInfo. row offsets是相对于base row id的偏移。实际被删除的row id = base row id + row offset. 回滚delete entry,是对行版本信息,打上特殊的标记,使删除无效。
ChunkInfo.CommitDelete(-1,delete entry.row offsets,delete entry.count){
for i < count{
ChunkInfo.deleted[row offsets[i]] = -1;
}
}
- update entry update entry = {UpdateSegment, vectorIndex, versionNumber,count,tuples,update entry ptr},表示对RowGroup的某个列的某个Vector中的某些行做了更新操作。RowGroup可以存60个Vector,每个Vector 2048行。 回滚update entry,是将最新的列版本信息中,此事务update的最新值 恢复到此事务update的旧值。最新的列版本信息存在于版本链表头结点中。 再将此事务产生的版本从版本链表中删掉。
update entry.UpdateSegment.RollbackUpdate(update entry){
//将列版本信息恢复到旧值。
//实质是将update entry的中的旧值,更新到版本链表头结点中。
head := updateSegment.UpdateNode.Info[update entry.vectorIndex].UpdateInfo
for i < update entry.count {
head.Data[j] = update entry.data[i]
}
从版本链表中删除此版本;
}
- catalog entry 先跳过,后续介绍catalog时再讲。
walog参与了事务系统的多个环节
- 事务提交时,将redo log存入walog;
- replay时,基于未checkpoint的redo log构建版本数据
- checkpoint时,将未checkpoint的redo log与表中数据合并落盘
walog管理redo log。物理上是文件,顺序存储redo log entry. 写与读redo log entry. 写redo log entry实质是redo log entry的序列化。读redo log entry实质是redo log entry的反序列化。 不同的redo log entry有不同的序列化和反序列化的接口。
redo log entry分类
- insert,delete,update。事务不同的写操作产生的。
- flush。表示一个事务提交结束。两个flush之间entry表示一个独立事务产生的。
- checkpoint。在checkpoint entry之前的entry都已经应用到表上了,不再需要了。
- create table, create schema,use table。创建表、schema和用表,后续catalog时再讲。
WriteAheadLog
是walog对象。
每种redo log entry类型,有单独的接口。
- WriteInsert,WriteDelete,WriteUpdate
- Flush
- WriteCheckpoint
- WriteCreateTable,WriteCreateSchema,WriteSetTable
存储引擎启动时,先加载元信息。基于这些元信息,加载checkpoint。之后再读redo log,并replay redo log entry. replay操作,实际是顺序读取每个entry,重新对表执行entry对应的操作,形成表的版本数据。
replay分两阶段:
- 扫描walog,找到checkpoint entry,拿到checkpoint block id. 正常情况下,做多个checkpoint entry。 此阶段不会应用每个entry. 如果checkpoint block id 与 元数据中的block id相同,说明walog已经应用到表上了。此时replay结束。
- 扫描walog,将每个entry应用到表上。
每种redo log entry类型,有不用的应用方法。
- insert,delete,update。对entry中的数据,分别insert,delete,update到当前表对象中。
- flush。提交当前事务。并创建下一个新事务。
- checkpoint。记录checkpoint block id.
- create table。创建一个新表。
- create schema。创建一个新schema.
- use table。从catalog取当前操作的表对象。
replay过程中事务对表对象的操作,不会再生成redo log,也不会触发checkpoint. replay 成功后,walog被清空。
checkpoint是将未checkpoint的redo log与表中数据合并落盘。
TxnMgr
在提交事务时,会检测是否能在此事务提交成功后,进行checkpoint.
checkpoint需要满足一定的条件。
- 同时只有一个checkpoint操作。
- 已经提交的事务是否都GC了。
- 此事务的redo log 大小 + walog size 是否超过阈值
- 此事务是否提交成功
在此事务提交成功后,再去做checkpoint的操作。要明确的是checkpoint不需要事务参与,期间也没有活跃事务会影响到系统数据。
分几步理解checkpoint过程。
- checkpoint整体操作流程是怎么样的。
- 被checkpoint的数据有哪些。
- 数据持久化后的layout是怎么样
- 存储引擎启动后,怎么使用checkpoint数据。
数据库的内存数据是多层次的
- schema。至少一个schema. 每个schema由多个表组成。
- 表。
- RowGroup集合。
- RowGroup
- ColumnData
- ColumnSegment
- Vector
- 行版本数据和列版本数据
- stats
简洁的说,checkpoint就是按层次关系持久化这些数据。 在程序框架上,checkpoint是多层循环。
for sch := schemas {
持久化schema元数据;
for tab := sch.tables {
for rowGroup := tab.RowGroups{
for columnData := rowGroup.Columns{
for columnSegment := columnData.ColumnSegments{
for vector := columnSegment.vectors{
读block数据;
读版本数据;
持久化;
}
}
}
}
持久化RowGroup元数据和行版本数据;
}
}
从循环内层向外层介绍持久化。 分多个方面讲每层的持久化:
- 要持久化的数据有哪些,在哪儿,怎么读取它们
- 持久化数据的方法是什么
- 数据被持久化后,放哪儿了。形成了哪些元数据。
- 元数据该放哪儿
对应最内层的循环。也是最复杂的层次。
for columnSegment := columnData.ColumnSegments{
for vector := columnSegment.vectors{
读block数据;
读版本数据;
持久化;
}
}
相关的数据有:
- 每个ColumnSegment中block上的数据。直接读每个vector即可。
- 列版本数据。在ColumnData.UpdateSegment上。版本链头结点就是最新的数据。
要持久化的数据 = vector+列版本数据。即用版本数据中对应此vector的最新数据替换vector中行。
分配一些新的ColumnSegment。要持久化的数据会追加到这些新ColumnSegment的block. 最终每个新ColumnSegment会从内存中写入存储,同时获得存储位置={blockid,offset}. 这些新的ColumnSegment会有一组存储位置={blockid,offset}。
最终用这些新的ColumnSegment替换原先ColumnData中的ColumnSegment。
func(ColumnData)Checkpoint(){
nodes := ColumnData.ColumnSegments;
ColumnDataCheckpointer.Checkpoint(nodes)
ColumnData.ColumnSegments = newNodes;
返回元数据;
}
func ColumnDataCheckpointer.Checkpoint(nodes){
WriteToDisk(){
分配新的ColumnSegment
ScanSegments(){
for 旧ColumnSegment := nodes{
for rowIdx := 旧ColumnSegment.Count {
CheckpointScan(rowIdx,count){
vector = 读旧ColumnSegment.block;
vector = vector + 列版本数据更新;
}
vector 追加到 新ColumnSegment ;
}
}
}
这些新的ColumnSegment 写入存储,获取存储位置;
元数据 = 存储位置{blockid,offset}的数组;
newNodes = 这些新的ColumnSegment
}
}
RowGroup的持久化过程分为:
- 每一列先持久化。并拿到持久化后的列元数据数组。就是上面环节。
- 每个列元数据再持久化,并得到存储位置。又获得列元数据的存储位置的数组。类似二级指针。
列元数据的存储位置的数组
列0元数据的存储位置;
...
列i元数据的存储位置;
ColumnSegment 0 的存储位置;
...
ColumnSegment M-1 的存储位置;
...
列N-1元数据的存储位置;
RowGroup的元数据 = 行版本信息 + 列元数据的存储位置的数组。
func(RowGroup) Checkpoint(){
列元数据数组=WriteToDisk(){
for columnData := rowGroup.Columns{
列元数据=columnData.Checkpoint()
}
}
for 列元数据 := 列元数据数组{
列元数据的位置 = WriteDataPointer(){
WriteColumnDataPointers(){
for ColumnSegment位置 := 列元数据{
将ColumnSegment位置写入存储
}
}
}
}
返回RowGroup元数据;
}
处理过程:
- 持久化每个RowGroup,得到RowGroup元数据数组
- 存储RowGroup元数据数组。并得到存储位置。
- 将RowGroup元数据数组的存储位置 再次 存储到meta block.
- 持久化索引的数据。相对简单,不再细说。
RowGroup元数据数组的存储位置:
RowGroup 0 的元数据;
...
RowGroup x-1 的元数据;
表的元数据 = RowGroup元数据数组的存储位置。
func(DataTable) Checkpoint(){
for rowGroup := tab.RowGroups{
RowGroup元数据 = rowGroup.Checkpoint()
}
FinalizeTable(){
for RowGroup元数据 := RowGroup元数据数组{
for 列元数据的存储位置 := RowGroup元数据.列元数据的存储位置的数组 {
写入存储
}
行版本信息 写入存储
}
RowGroup元数据数组的存储位置 存储到meta block;
持久化索引;
}
}
在本文中schema实质是database的含义,包含至少一张表。
- 每个schema的每个表逐一持久化。schema的定义和表的定义一并持久化。
- walog写入checkpoint entry,flush entry。标记checkpoint.
- 持久化database header。记录meta block. 相对简单,不细说。
- 清空walog.
func(CheckpointWriter)CreateCheckpoint(){
for sch := schemas {
WriteSchema(sch){
schema定义持久化;
for tab := sch.tables{
WriteTable(tab){
table定义持久化;
table.Checkpoint()
}
}
}
}
}
在物理存储上,数据是按block组织的。block上的数据是什么,由元数据指定。 元数据分为:
- 固定元数据。database header在文件的开头。
- 动态元数据。schema和表定义。RowGroup元数据、列元数据的元数据、列元数据。行和列版本。 表的非元数据和元数据在block层面上是紧挨着的、分散的、连续的。这是难以说清楚的。 ColumnSegment 可能会横跨多个block。这些block不保证连续排列。 依据数据逻辑关系,从顶层元数据往下直到列数据。
database header:
pointer to metablock;
metablock:
schema count;
schema i 定义;
table count;
table j 定义;
table j RowGroup元数据数组的地址;
table j 索引数据的地址个数;
table j 索引数据的地址数组;
table j 索引 x的地址;
某个block:
table j RowGroup元数据数组;
RowGroup count;
RowGroup k 元数据;
列元数据数组的地址;
行版本信息;
某个block:
RowGroup K 的列元数据数组:
列 m 元数据;
列 m 的数据占据block的地址count;
列 m 的第n个block的地址;
某个block:
列 m 的第n个block 的数据;
某个block:
table j 索引 x的数据;
依据layout,存储引擎启动时,先读database header,拿到meta block的数据。 从metablock中,依次能拿到schema定义。每个表的定义和RowGroup元数据数组的地址。 之后,依次读每个RowGroup,每个ColumnSegment 读到表的所有数据。
详细介绍了事务系统的TxnMgr
,Txn
对象,walog。解释了walog基础作用。着重讲解了replay、checkpoint工作机制。它们有机结合支撑事务的有序、安全、原子、持久化的运行。