跳至正文

go语言Log结构体

我们在第一部分实现了日志文件的读写;第二部分索引文件读写;第三部分sgement结构体的实现。今天就来看看Log结构体实现,它的主要功能是管理segments列表。我们整个代码逻辑是自底向上来实现的。

在internal/log目录创建log.go文件,添加以下代码:

package log

import (
	"io"
	"os"
	"path"
	"sort"
	"strconv"
	"strings"
	"sync"

	api "github.com/wangmingjunabc/proglog/api/v1"
)

type Log struct {
	mu            sync.RWMutex //锁的力度可以更细到segment
	Dir           string
	Config        Config
	activeSegment *Segment
	segments      []*Segment
}

日志包含一个segments列表存放所有日志文件和索引指针;一个指向当前读写的segment指针,称为活跃segment,用于写入当前日志。Dir是我们存储日志和索引文件的地方。

接着添加NewLog函数,创建Log实例:

proglog/internal/log/log.go

func NewLog(dir string, c Config) (*Log, error) {
	if c.Segment.MaxStoreBytes == 0 {
		c.Segment.MaxStoreBytes = 1024
	}
	if c.Segment.MaxIndexBytes == 0 {
		c.Segment.MaxIndexBytes = 1024
	}
	l := &Log{
		Dir:    dir,
		Config: c,
	}
	return l, l.setUp()
}

在NewLog(dir string, c Config)中,我们为调用者没有指定的配置设置默认值,创建一个Log实例,并调用setup方法对新建的Log实例做一些设置。

下面看看Log结构体的setup方法:

func (l *Log) setUp() error {
	//根据目录读取文件
	files, err := os.ReadDir(l.Dir)
	if err != nil {
		return err
	}
	//每个文件的名称都是根据baseOffset来命名的
	var baseOffsets []uint64
	for _, f := range files {
		//根据文件名提取baseOffset
		offStr := strings.TrimSuffix(f.Name(), path.Ext(f.Name())) //去掉文件后缀
		off, _ := strconv.ParseUint(offStr, 10, 0) //将字符串转Uint类型
		baseOffsets = append(baseOffsets, off)
	}
	//根据baseOffsets从小到大排序
	sort.Slice(baseOffsets, func(i, j int) bool {
		return baseOffsets[i] < baseOffsets[j]
	})
	//根据baseOffsets创建segment对象
	for i := 0; i < len(baseOffsets); i++ {
		if err = l.newSegment(baseOffsets[i]); err != nil {
			return err
		}
		i++
	}
	//如果目录中还没有日志和索引文件,就新建
	if l.segments == nil {
		if err = l.newSegment(l.Config.Segment.InitialOffset); err != nil {
			return err
		}
	}
	return nil
}

当日志启动时,它负责为磁盘上已经存在的日志文件,对segment初始化,或者,如果日志是新的并且没有历史日志,则负责创建第一个segment。我们在磁盘上获取文件列表并初始化segments,根据baseOffsets解析和排序(因为我们希望segments按照从最老到最新的顺序排列),然后使用newSegment方法创建segment对象。

接下来看看newSegment方法:

func (l *Log) newSegment(off uint64) error {
	s, err := newSegment(l.Dir, off, l.Config)
	if err != nil {
		return err
	}
	l.segments = append(l.segments, s)
	l.activeSegment = s
	return nil
}

newSegment(off int64)创建一个新segment实例,将该segment追加到日志的列表中,并成为当前可写入日志记录的活跃segment,后续会调用对其进行写操作。

下面是在Log中添加日志记录的Append方法:

func (l *Log) Append(record *api.Record) (uint64, error) {
	l.mu.Lock()
	defer l.mu.Unlock()
	off, err := l.activeSegment.Append(record)
	if err != nil {
		return 0, err
	}
	if l.activeSegment.IsMax() {
		err = l.newSegment(off + 1)
	}
	return off, nil
}

Append(*api.Record)向日志中追加一条记录。我们将记录添加到当前活跃segment中。之后,如果segment超出最大字节限制(根据最大配置),则创建一个新的segment。注意,这个方法我们用互斥锁,以协调对这部分代码的访问。当没有写操作时,我们使用RWMutex来授予读的访问权。如果您愿意的话,可以进一步优化,将锁用在每个segment,而不是整个日志对象(这里我没有这样做,因为保持代码简单。)效果会更好。

然后是读日志Read方法:

func (l *Log) Read(off uint64) (*api.Record, error) {
	l.mu.RLock()
	defer l.mu.RUnlock()
	var s *Segment
	for _, segment := range l.segments {
		//根据offset找到要读取的记录所在segment
		if segment.baseOffset <= off && off < segment.nextOffset {
			s = segment
			break
		}
	}
	if s == nil || s.nextOffset < off {
		return nil, api.ErrOutOfRange{Offset: off}
	}
	return s.Read(off)
}

Read(offset uint64)读取存储在给定偏移量上的记录。在Read(offset uint64)中,我们首先找到包含给定记录的segment。由于这些segment是按从最老到最新的顺序排列的,并且segment.baseOffset是当前实例的最小的偏移量,所以我们遍历这些segment,直到找到其基偏移量小于或等于我们要查找的偏移量的第一个segment。一旦我们知道了包含记录的segment,就从索引中获取索引条目,然后从segment的存储文件中读取数据并返回给调用者。

下面是一些常规方法,例如Close、Remove和Reset方法:

func (l *Log) Close() error {
	l.mu.Lock()
	defer l.mu.Unlock()
	//关闭日志的话,需要关闭所有segment对象
	for _, segment := range l.segments {
		if err := segment.Close(); err != nil {
			return err
		}
	}
	return nil
}

func (l *Log) Remove() error {
	if err := l.Close(); err != nil {
		return err
	}
	return os.RemoveAll(l.Dir)
}

func (l *Log) Reset() error {
	if err := l.Remove(); err != nil {
		return err
	}
	return l.setUp()
}
  • Close遍历segments并关闭它们。
  • Rmove关闭日志并删除所有文件。
  • Reset删除目录文件,新建对Log实例重新设置。

下面再为Log添加两个方法:LowestOffset和HighestOffset:

//获取所有记录最小偏移量,也就是第一条记录
func (l *Log) LowestOffset() (uint64, error) {
	l.mu.RLock()
	defer l.mu.RUnlock()
	return l.segments[0].baseOffset, nil
}

//获取最后一条记录的偏移量
func (l *Log) HighestOffset() (uint64, error) {
	l.mu.RLock()
	l.mu.RUnlock()
	off := l.segments[len(l.segments)-1].nextOffset
	if off == 0 {
		return 0, nil
	}
	return off - 1, nil
}

这两个方法可以知道Log存储的日志偏移量范围。在后面集群服务的一致性当中会用到。因为我们需要节点间相互同步日志、协调集群状态,我们需要这些信息来了解哪些节点拥有最老和最新的日志,以及哪些节点落后于其他节点,需要进行拷贝。

再为Log增加Truncate方法:

//将某个偏移量前面的所有记录都截断,也就是留下最新的部分记录
func (l *Log) Truncate(lowest uint64) error {
	l.mu.Lock()
	defer l.mu.Unlock()
	var segments []*Segment
	for _, s := range l.segments {
		if s.nextOffset <= lowest+1 {
			if err := s.Remove(); err != nil {
				return err
			}
			continue
		}
		segments = append(segments, s)
	}
	l.segments = segments
	return nil
}

Truncate(minimum uint64)删除所有最高偏移量小于lowest。因为实际当中没有无限磁盘空间的,所以我们会定期调用Truncate来删除我们(希望)已经处理过的日志,并且不再需要这些记录了。

最后添加一个Reader方法:

func (l *Log) Reader() io.Reader {
	l.mu.RLock()
	defer l.mu.RUnlock()
	readers := make([]io.Reader, len(l.segments))
	for i, segment := range l.segments {
		readers[i] = &originReader{segment.store, 0}
	}
	return io.MultiReader(readers...)
}

func (o *originReader) Read(p []byte) (int, error) {
	n, err := o.store.ReadAt(p, o.off)
	o.off += int64(n)
	return n, err
}

type originReader struct {
	*store
	off int64
}

Reader方法返回可以读取整个日志的io.Reader对象。在后面实现集群的一致性时需要日志支持快照功能,并根据快照恢复日志实例。Reader方法使用io.MultiReader将segment的存储文件串起来。通过originReader结构体将所有segment封装起来,传给io.MultiReader函数,确保可以从头将所有日志内容读取到。

下面对Log进行单元测试:

package log

import (
	"io/ioutil"
	"os"
	"testing"

	"github.com/stretchr/testify/require"
	api "github.com/wangmingjunabc/proglog/api/v1"
	"google.golang.org/protobuf/proto"
)

func TestLog(t *testing.T) {
	for scenario, fn := range map[string]func(t *testing.T, log *Log){
		"append and read a record success": testAppendRead,
		"offset out of range error":        testOutOfRangeErr,
		"init the existing segments":       testInitExisting,
		"read":                             testReader,
		"truncate":                         testTruncate,
	} {
		t.Run(scenario, func(t *testing.T) {
			dir, err := ioutil.TempDir("", "store-test")
			require.NoError(t, err)
			defer os.RemoveAll(dir)

			c := Config{}
			c.Segment.MaxStoreBytes = 32
			c.Segment.MaxIndexBytes = 1024
			log, err := NewLog(dir, c)
			require.NoError(t, err)
			fn(t, log)
		})
	}
}

TestLog(t *testing.T)定义了一个map来测试日志。这样我们就不必重复为每个测试用例创建新Log实例。

现在,让我们定义测试用例。将以下测试用例放在TestLog函数下面:

func testAppendRead(t *testing.T, log *Log) {
	record := &api.Record{
		Value: []byte("hello world"),
	}
	off, err := log.Append(record)
	require.NoError(t, err)
	require.Equal(t, uint64(0), off)

	read, err := log.Read(off)
	require.NoError(t, err)
	require.Equal(t, record.Value, read.Value)
}

testAppendRead(*testing.T, *log. log)可以测试成功添加记录到日志,并从日志中读取记录。当我们向日志追加一条记录时,日志将返回与该记录相关的偏移量。所以,当我们向log请求该偏移量的记录时,期望得到与我们追加的相同的记录。

func testOutOfRangeErr(t *testing.T, log *Log) {
	read, err := log.Read(1)
	require.Nil(t, read)
	apiError := err.(api.ErrOutOfRange)
	require.Equal(t, uint64(1), apiError.Offset)
}

testOutOfRangeErr(*testing.T, *log. log)测试当尝试读取超出日志已存储偏移量范围时,日志是否返回错误。

func testInitExisting(t *testing.T, log *Log) {
	record := &api.Record{Value: []byte("hello world")}
	for i := 0; i < 3; i++ {
		_, err := log.Append(record)
		require.NoError(t, err)
	}
	require.NoError(t, log.Close())

	offset, err := log.LowestOffset()
	require.NoError(t, err)
	require.Equal(t, uint64(0), offset)

	highestOffset, err := log.HighestOffset()
	require.NoError(t, err)
	require.Equal(t, uint64(2), highestOffset)

	//在现有目录下新建Log实例
	n, err := NewLog(log.Dir, log.Config)
	require.NoError(t, err)

	offset, err = n.LowestOffset()
	require.NoError(t, err)
	require.Equal(t, uint64(0), offset)

	highestOffset, err = n.HighestOffset()
	require.NoError(t, err)
	require.Equal(t, uint64(2), highestOffset)
}

testInitExisting(*testing.T, *log. log)测试在创建日志时,日志是否从先前日志实例存储的数据中创建对象。在关闭原始日志之前,向原始日志追加三条记录。然后我们创建一个配置与旧日志相同目录的新日志对象。最后,我们确认新日志是根据原始日志存储的数据创建实例的。

func testReader(t *testing.T, log *Log) {
	record := &api.Record{Value: []byte("hello world")}
	u, err := log.Append(record)
	require.NoError(t, err)
	require.Equal(t, uint64(0), u)

	reader := log.Reader()
	b, err := ioutil.ReadAll(reader)
	require.NoError(t, err)

	read := &api.Record{}
	err = proto.Unmarshal(b[lenWidth:], read)
	require.NoError(t, err)
	require.Equal(t, record.Value, read.Value)
}

testReader(*testing.T, *log. log)测试我们是否可以读取存储在磁盘上的完整的日志,以便我们可以在有限状态机中做快照和恢复日志。

func testTruncate(t *testing.T, log *Log) {
	record := &api.Record{Value: []byte("hello world")}
	for i := 0; i < 3; i++ {
		_, err := log.Append(record)
		require.NoError(t, err)
	}
	err := log.Truncate(1)
	require.NoError(t, err)
	
	//读取被截断的记录会报错
	_, err = log.Read(0)
	require.Error(t, err)
}

testTruncate(*testing.T, *log. log)测试,我们可以截断日志并删除不再需要的旧segments。

以上就是我们的Log代码,整个代码实现已经可以用在实际项目当中了。很多分布式项目例如最新版本的kafka在集群的协调中就使用到了类似日志模块。

《go语言Log结构体》有2个想法

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注