消息持久化
源码地址
[go-diskqueu源码地址](https://github.com/nsqio/go-diskqueue)
自己学习用[c版本](https://github.com/feifeiiiiiiiiiii/cnsq/blob/master/src/diskqueue/)
数据结构
定义
struct diskQueue {
readPos int64 // 当前读取文件的位置
writePos int64 // 当前写入文件文件位置
readFileNum int64 // 当前正在读取文件编号
writeFileNum int64 // 当前正在写入文件的编号
depth int64 // 当前队列已有消息的总量
name string // 队列名称
dataPath string // 队列持久化存储位置
maxBytesPerFile int64 // 消息存储文件的最大存储字节
minMsgSize int32 // 消息最小字节数
maxMsgSize int32 // 消息最大字节数
syncEvery int64 // 消息写入的数量需要fsync
syncTimeout time.Duration // 定时fsync
exitFlag int32 // 标志位
needSync bool // 同步标志位
nextReadPos int64 // 下一个待读取文件的位置
nextReadFileNum // 下一个待读取文件编号
readFile *os.File // 当前正在读取文件的句柄
writeFile *os.File // 当前正在写入文件的句柄
reader *bufio.Reader //
writeBuf bytes.Buffer // 写入缓冲区
readChan chan []byte // 消息读取channel
writeChan chan []byte // 消息写入的channel
writeResponseChan chan error
emptyChan chan int // 清空队列 对应接口 Empty()
emptyResponseChan chan error
exitChan chan int // 关闭 对应接口是 Close
exitSyncChan chan int
logf AppLogFunc
}
存储结构
元数据存储
元数据记录五个信息:
depth、readFileNum, readPos, writeFileNum, writePos, 具体意义看上面
文件命名:
metaName = name + ".diskqueue.meta.dat"
存储格式:
depth + "\n" +
readFileNum + "," + readPos + "," + "\n" +
writeFileNum + "," + writePos + "," + "\n"
数据存储
数据存储的内容:
dataLen(消息的大小,转换大端存储),消息内容(data)
文件命名:
name = dataPath + name + ".diskqueue.%06d.dat" + fileNum
存储格式:
BigEndian(dataLen)+Binary(data)
对外提供的接口
API列表
New(...) // 创建diskQueue实例
Put([]byte) error // 队列中放入消息
ReadChan() chan []byte // 读取消息
Close() error // 关闭diskQueue
Delete() error // 删除队列
Depth() int64 // 获取当前队列的消息数量
Empty() error // 清空队列
私有接口
API接口
exit(deleted bool) // 清空和关闭调用
deleteAllFiles() // 删除队列所有文件
skipToNextRWFile() // 确定下一个要读写的文件的位置和编号
readOne() // 读取消息
writeOne() // 写入消息
sync() // flush数据
retrieveMetaData() // 获取meta数据
persistMetaData() // 持久化数据
metaDataFileName() // 获取meta文件名
fileName(fileNum int64) // 获取文件编号名字
checkTailCorruption() // 检查是否正常
moveForward() // 读取下个消息
handleReadError() // 处理读取消息错误
ioLoop() // 核心函数
处理流程
创建队列
调用 New(...) 创建队列
1. 创建 diskQueue实例 (d)
2. 调用函数 `retrieveMetaData` 读取元数据信息
3. 生成一个协程 处理函数是 `ioLoop`, diskQueue最核心的处理
4. ioLoop 主要逻辑
var dataRead []byte // 存储消息
var err error
var count int64 // 操作(写入、读取)消息的计数器 和 syncEvery 一起决定是否需要flush磁盘的脏数据
var r chan []byte // 用来操作读取消息的chan,所有读取的消息都用chan(管道)的send出去
syncTicker := time.NewTicker(d.syncTimeout) // 创建一个syncTimeout的定时器,用来flush磁盘的脏数据
for {
if count == d.syncEvery {
d.needSync = true
}
if d.needSync {
d.sync() // flush磁盘脏数据
count = 0 // 计数器归零
}
// 读取消息 判断消息是否可读
if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne() // 读取一条消息
if err != nil {
d.handleReadError() // 出现读取异常的处理函数
continue
}
}
r = d.readChan
} else {
r = nil
}
select {
case r <- dataRead:
count++
d.moveForward()
case <-d.emptyChan:
// 清空消息的指令
d.emptyResponseChan <- d.deleteAllFiles()
count = 0
case dataWrite := <-d.writeChan:
// 处理写入消息的指令
count++
d.writeResponseChan <- d.writeOne(dataWrite)
case <-syncTicker.C:
if count == 0 {
// avoid sync when there's no activity
continue
}
d.needSync = true
case <-d.exitChan:
goto exit
}
}
Tips:
ioLoop实现了个CSP(Communicating Sequential Processes)并发模型,所有的操作都是基于chan通讯的,其实nsqd将go的chan玩的最溜了
读取消息
更简单,直接从 readChan 中读取,有数据则读取 无数据则阻塞直到有数据可读
写入消息
很简单 就是 把待写入的消息放入 writeChan 中, 写入的处理逻辑在 ioLoop 中