我们在第一部分实现了日志文件的读写;第二部分索引文件读写;第三部分sgement结构体的实现。今天就来看看Log结构体实现,它的主要功能是管理segments列表。我们整个代码逻辑是自底向上来实现的。
在internal/log目录创建log.go文件,添加以下代码:
package log
import (
"io"
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
api "github.com/wangmingjunabc/proglog/api/v1"
)
type Log struct {
mu sync.RWMutex //锁的力度可以更细到segment
Dir string
Config Config
activeSegment *Segment
segments []*Segment
}
日志包含一个segments列表存放所有日志文件和索引指针;一个指向当前读写的segment指针,称为活跃segment,用于写入当前日志。Dir是我们存储日志和索引文件的地方。
接着添加NewLog函数,创建Log实例:
proglog/internal/log/log.go
func NewLog(dir string, c Config) (*Log, error) {
if c.Segment.MaxStoreBytes == 0 {
c.Segment.MaxStoreBytes = 1024
}
if c.Segment.MaxIndexBytes == 0 {
c.Segment.MaxIndexBytes = 1024
}
l := &Log{
Dir: dir,
Config: c,
}
return l, l.setUp()
}
在NewLog(dir string, c Config)中,我们为调用者没有指定的配置设置默认值,创建一个Log实例,并调用setup方法对新建的Log实例做一些设置。
下面看看Log结构体的setup方法:
func (l *Log) setUp() error {
//根据目录读取文件
files, err := os.ReadDir(l.Dir)
if err != nil {
return err
}
//每个文件的名称都是根据baseOffset来命名的
var baseOffsets []uint64
for _, f := range files {
//根据文件名提取baseOffset
offStr := strings.TrimSuffix(f.Name(), path.Ext(f.Name())) //去掉文件后缀
off, _ := strconv.ParseUint(offStr, 10, 0) //将字符串转Uint类型
baseOffsets = append(baseOffsets, off)
}
//根据baseOffsets从小到大排序
sort.Slice(baseOffsets, func(i, j int) bool {
return baseOffsets[i] < baseOffsets[j]
})
//根据baseOffsets创建segment对象
for i := 0; i < len(baseOffsets); i++ {
if err = l.newSegment(baseOffsets[i]); err != nil {
return err
}
i++
}
//如果目录中还没有日志和索引文件,就新建
if l.segments == nil {
if err = l.newSegment(l.Config.Segment.InitialOffset); err != nil {
return err
}
}
return nil
}
当日志启动时,它负责为磁盘上已经存在的日志文件,对segment初始化,或者,如果日志是新的并且没有历史日志,则负责创建第一个segment。我们在磁盘上获取文件列表并初始化segments,根据baseOffsets解析和排序(因为我们希望segments按照从最老到最新的顺序排列),然后使用newSegment方法创建segment对象。
接下来看看newSegment方法:
func (l *Log) newSegment(off uint64) error {
s, err := newSegment(l.Dir, off, l.Config)
if err != nil {
return err
}
l.segments = append(l.segments, s)
l.activeSegment = s
return nil
}
newSegment(off int64)创建一个新segment实例,将该segment追加到日志的列表中,并成为当前可写入日志记录的活跃segment,后续会调用对其进行写操作。
下面是在Log中添加日志记录的Append方法:
func (l *Log) Append(record *api.Record) (uint64, error) {
l.mu.Lock()
defer l.mu.Unlock()
off, err := l.activeSegment.Append(record)
if err != nil {
return 0, err
}
if l.activeSegment.IsMax() {
err = l.newSegment(off + 1)
}
return off, nil
}
Append(*api.Record)向日志中追加一条记录。我们将记录添加到当前活跃segment中。之后,如果segment超出最大字节限制(根据最大配置),则创建一个新的segment。注意,这个方法我们用互斥锁,以协调对这部分代码的访问。当没有写操作时,我们使用RWMutex来授予读的访问权。如果您愿意的话,可以进一步优化,将锁用在每个segment,而不是整个日志对象(这里我没有这样做,因为保持代码简单。)效果会更好。
然后是读日志Read方法:
func (l *Log) Read(off uint64) (*api.Record, error) {
l.mu.RLock()
defer l.mu.RUnlock()
var s *Segment
for _, segment := range l.segments {
//根据offset找到要读取的记录所在segment
if segment.baseOffset <= off && off < segment.nextOffset {
s = segment
break
}
}
if s == nil || s.nextOffset < off {
return nil, api.ErrOutOfRange{Offset: off}
}
return s.Read(off)
}
Read(offset uint64)读取存储在给定偏移量上的记录。在Read(offset uint64)中,我们首先找到包含给定记录的segment。由于这些segment是按从最老到最新的顺序排列的,并且segment.baseOffset是当前实例的最小的偏移量,所以我们遍历这些segment,直到找到其基偏移量小于或等于我们要查找的偏移量的第一个segment。一旦我们知道了包含记录的segment,就从索引中获取索引条目,然后从segment的存储文件中读取数据并返回给调用者。
下面是一些常规方法,例如Close、Remove和Reset方法:
func (l *Log) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
//关闭日志的话,需要关闭所有segment对象
for _, segment := range l.segments {
if err := segment.Close(); err != nil {
return err
}
}
return nil
}
func (l *Log) Remove() error {
if err := l.Close(); err != nil {
return err
}
return os.RemoveAll(l.Dir)
}
func (l *Log) Reset() error {
if err := l.Remove(); err != nil {
return err
}
return l.setUp()
}
- Close遍历segments并关闭它们。
- Rmove关闭日志并删除所有文件。
- Reset删除目录文件,新建对Log实例重新设置。
下面再为Log添加两个方法:LowestOffset和HighestOffset:
//获取所有记录最小偏移量,也就是第一条记录
func (l *Log) LowestOffset() (uint64, error) {
l.mu.RLock()
defer l.mu.RUnlock()
return l.segments[0].baseOffset, nil
}
//获取最后一条记录的偏移量
func (l *Log) HighestOffset() (uint64, error) {
l.mu.RLock()
l.mu.RUnlock()
off := l.segments[len(l.segments)-1].nextOffset
if off == 0 {
return 0, nil
}
return off - 1, nil
}
这两个方法可以知道Log存储的日志偏移量范围。在后面集群服务的一致性当中会用到。因为我们需要节点间相互同步日志、协调集群状态,我们需要这些信息来了解哪些节点拥有最老和最新的日志,以及哪些节点落后于其他节点,需要进行拷贝。
再为Log增加Truncate方法:
//将某个偏移量前面的所有记录都截断,也就是留下最新的部分记录
func (l *Log) Truncate(lowest uint64) error {
l.mu.Lock()
defer l.mu.Unlock()
var segments []*Segment
for _, s := range l.segments {
if s.nextOffset <= lowest+1 {
if err := s.Remove(); err != nil {
return err
}
continue
}
segments = append(segments, s)
}
l.segments = segments
return nil
}
Truncate(minimum uint64)删除所有最高偏移量小于lowest。因为实际当中没有无限磁盘空间的,所以我们会定期调用Truncate来删除我们(希望)已经处理过的日志,并且不再需要这些记录了。
最后添加一个Reader方法:
func (l *Log) Reader() io.Reader {
l.mu.RLock()
defer l.mu.RUnlock()
readers := make([]io.Reader, len(l.segments))
for i, segment := range l.segments {
readers[i] = &originReader{segment.store, 0}
}
return io.MultiReader(readers...)
}
func (o *originReader) Read(p []byte) (int, error) {
n, err := o.store.ReadAt(p, o.off)
o.off += int64(n)
return n, err
}
type originReader struct {
*store
off int64
}
Reader方法返回可以读取整个日志的io.Reader对象。在后面实现集群的一致性时需要日志支持快照功能,并根据快照恢复日志实例。Reader方法使用io.MultiReader将segment的存储文件串起来。通过originReader结构体将所有segment封装起来,传给io.MultiReader函数,确保可以从头将所有日志内容读取到。
下面对Log进行单元测试:
package log
import (
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/require"
api "github.com/wangmingjunabc/proglog/api/v1"
"google.golang.org/protobuf/proto"
)
func TestLog(t *testing.T) {
for scenario, fn := range map[string]func(t *testing.T, log *Log){
"append and read a record success": testAppendRead,
"offset out of range error": testOutOfRangeErr,
"init the existing segments": testInitExisting,
"read": testReader,
"truncate": testTruncate,
} {
t.Run(scenario, func(t *testing.T) {
dir, err := ioutil.TempDir("", "store-test")
require.NoError(t, err)
defer os.RemoveAll(dir)
c := Config{}
c.Segment.MaxStoreBytes = 32
c.Segment.MaxIndexBytes = 1024
log, err := NewLog(dir, c)
require.NoError(t, err)
fn(t, log)
})
}
}
TestLog(t *testing.T)定义了一个map来测试日志。这样我们就不必重复为每个测试用例创建新Log实例。
现在,让我们定义测试用例。将以下测试用例放在TestLog函数下面:
func testAppendRead(t *testing.T, log *Log) {
record := &api.Record{
Value: []byte("hello world"),
}
off, err := log.Append(record)
require.NoError(t, err)
require.Equal(t, uint64(0), off)
read, err := log.Read(off)
require.NoError(t, err)
require.Equal(t, record.Value, read.Value)
}
testAppendRead(*testing.T, *log. log)可以测试成功添加记录到日志,并从日志中读取记录。当我们向日志追加一条记录时,日志将返回与该记录相关的偏移量。所以,当我们向log请求该偏移量的记录时,期望得到与我们追加的相同的记录。
func testOutOfRangeErr(t *testing.T, log *Log) {
read, err := log.Read(1)
require.Nil(t, read)
apiError := err.(api.ErrOutOfRange)
require.Equal(t, uint64(1), apiError.Offset)
}
testOutOfRangeErr(*testing.T, *log. log)测试当尝试读取超出日志已存储偏移量范围时,日志是否返回错误。
func testInitExisting(t *testing.T, log *Log) {
record := &api.Record{Value: []byte("hello world")}
for i := 0; i < 3; i++ {
_, err := log.Append(record)
require.NoError(t, err)
}
require.NoError(t, log.Close())
offset, err := log.LowestOffset()
require.NoError(t, err)
require.Equal(t, uint64(0), offset)
highestOffset, err := log.HighestOffset()
require.NoError(t, err)
require.Equal(t, uint64(2), highestOffset)
//在现有目录下新建Log实例
n, err := NewLog(log.Dir, log.Config)
require.NoError(t, err)
offset, err = n.LowestOffset()
require.NoError(t, err)
require.Equal(t, uint64(0), offset)
highestOffset, err = n.HighestOffset()
require.NoError(t, err)
require.Equal(t, uint64(2), highestOffset)
}
testInitExisting(*testing.T, *log. log)测试在创建日志时,日志是否从先前日志实例存储的数据中创建对象。在关闭原始日志之前,向原始日志追加三条记录。然后我们创建一个配置与旧日志相同目录的新日志对象。最后,我们确认新日志是根据原始日志存储的数据创建实例的。
func testReader(t *testing.T, log *Log) {
record := &api.Record{Value: []byte("hello world")}
u, err := log.Append(record)
require.NoError(t, err)
require.Equal(t, uint64(0), u)
reader := log.Reader()
b, err := ioutil.ReadAll(reader)
require.NoError(t, err)
read := &api.Record{}
err = proto.Unmarshal(b[lenWidth:], read)
require.NoError(t, err)
require.Equal(t, record.Value, read.Value)
}
testReader(*testing.T, *log. log)测试我们是否可以读取存储在磁盘上的完整的日志,以便我们可以在有限状态机中做快照和恢复日志。
func testTruncate(t *testing.T, log *Log) {
record := &api.Record{Value: []byte("hello world")}
for i := 0; i < 3; i++ {
_, err := log.Append(record)
require.NoError(t, err)
}
err := log.Truncate(1)
require.NoError(t, err)
//读取被截断的记录会报错
_, err = log.Read(0)
require.Error(t, err)
}
testTruncate(*testing.T, *log. log)测试,我们可以截断日志并删除不再需要的旧segments。
以上就是我们的Log代码,整个代码实现已经可以用在实际项目当中了。很多分布式项目例如最新版本的kafka在集群的协调中就使用到了类似日志模块。
厉害
受教了感谢作者