跳至正文

go语言日志索引

本文将为这些记录创建索引。索引的作用是为了加快读取特定日志记录。下面在项目的internal/log目录下创建index.go文件,并添加以下代码:

package log

import (
	"io"
	"os"

	"github.com/tysonmote/gommap"
)

var (
	offWidth uint64 = 4
	posWidth uint64 = 8
	entWidth        = offWidth + posWidth
)

type index struct {
	file *os.File     //存放索引的文件
	mmap gommap.MMap  //索引文件在内存中的映射
	size uint64
}

在文件顶部定义了一些常量方便后面使用。单条索引包含两个字断:1、日志记录的偏移量offset;2、记录在store文件中的位置。offset存储为uint32类型,位置pos字段存储为uint64,因此分别占用4和8个字节的存储空间。使用entWidth找到索引的具体位置,因为我们可以根据索引的offset计算记录在文件中的位置为:offset*entWidth。注意会有两个偏移量,一个是存储文件的一个是索引文件的。

结构体index定义了索引文件,包含一个持久化文件和内存映射对象。size字段存储索引文件的大小,也就是添加下一条索引的位置。下面增加newIndex函数:

func newIndex(f *os.File, c Config) (*index, error) {
	idx := &index{file: f}    //根据参数创建index对象

	fi, err := os.Stat(f.Name()) //获取文件信息
	if err != nil {
		return nil, err
	}

	idx.size = uint64(fi.Size())  //读取文件大小
	if err = os.Truncate(f.Name(), int64(c.Segment.MaxIndexBytes)); err != nil { //先给文件分配写空间
		return nil, err
	}
	//将文件存储空间映射到内存当中
	if idx.mmap, err = gommap.Map(idx.file.Fd(), gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED); err != nil {
		return nil, err
	}
	return idx, nil
}

newIndex(*os.File)为给定的文件创建一个索引对象。我们创建索引并保存文件的当前大小,以便在添加索引条目时跟踪索引文件中的数据量。在对文件进行内存映射之前,我们将文件增长到最大文件容量,然后将创建的索引返回给调用者。注意gommap第三包的使用方法。

下面为索引对象添加Close方法:

func (i *index) Close() error {
  //将内存中内容刷新到文件中
	if err := i.mmap.Sync(gommap.MS_SYNC); err != nil {
		return nil
	}
  //刷新持久化文件内容到磁盘
	if err := i.file.Sync(); err != nil {
		return err
	}
  //根据实际写入的索引大小,截取后面未写入的空间
	if err := i.file.Truncate(int64(i.size)); err != nil {
		return err
	}
	return i.file.Close()
}

Close方法确保内存映射文件已将其数据同步到持久化文件,并且持久化文件已将其内容刷新到磁盘存储中。然后,它将持久化文件截断为实际的索引总数据量,并关闭文件。

现在我们已经看到了打开和关闭索引的代码,讨论了增长和截断文件业务的全部内容。

当我们启动日志服务时,需要知道下一个记录该添加到日志上的偏移量是多少。通过查看索引文件的最后一条索引来找到下一个记录的偏移量,只需要读取索引文件最后12个字节的内容。但是,当我们添加索引时,这个过程被打乱了,因为需要对它进行内存映射。(调整它们大小的原因是一旦它们被内存映射,我们就不能调整它们的大小了。)我们通过在文件末尾追加空格来扩展文件,这样最后一条索引就不再位于文件末尾了——在这条索引和文件末尾之间有一些未知的空间。此空间将阻止服务正常重新启动。这就是为什么我们在关闭索引文件时,通过截断索引文件来删除空白空间,这样可以做到最后一条索引是放在文件末尾。这种优雅关闭服务的方式,可以让服务能够正常有效地重启。

接下来就是索引的读写方法实现。先介绍Read方法:

func (i *index) Read(in int64) (out uint32, pos uint64, err error) {
	//索引文件为空情况
	if i.size == 0 {
		return 0, 0, io.EOF
	}
  //如果in==-1,读取最后一条索引
	if in == -1 {
		out = uint32((i.size / entWidth) - 1)
	} else {
		out = uint32(in)
	}
  //根据偏移量找到对应的索引存放位置
	pos = uint64(out) * entWidth
	if i.size < pos+entWidth {
		return 0, 0, io.EOF
	}
  //索引第一个字段是日志记录的偏移量
	out = enc.Uint32(i.mmap[pos : pos+offWidth])
  //第二字段是日志记录存放位置
	pos = enc.Uint64(i.mmap[pos+offWidth : pos+entWidth])
	return out, pos, nil
}

Read(int64)方法接收偏移量并返回相关记录在存储文件中的位置。给定的索引偏移量是相对偏移量;每个索引文件,0总是第一条索引的的偏移量,1是第二个条,依此类推。通过将偏移量存储为uint32s,我们使用相对偏移量来减少索引的大小。如果我们使用绝对偏移量,必须将偏移量存储为uint64s,每个条目需要4个字节。4个字节听起来并不多,如果你把它乘以人们经常使用分布式日志的记录数量,而像LinkedIn这样的公司每天有数万亿的记录。即使是相对较小的公司每天也能产生数十亿条记录。

然后是索引的write方法:

func (i *index) Write(off uint32, pos uint64) error {
  //检查剩余空间是否能存下整条索引,否则返回io.EOF代码文件已存满了
	if uint64(len(i.mmap)) < i.size+entWidth {
		return io.EOF
	}
	//记录偏移量写入第一个字段
	enc.PutUint32(i.mmap[i.size:i.size+offWidth], off)
  //记录存储位置写入第二字段
	enc.PutUint64(i.mmap[i.size+offWidth:i.size+entWidth], pos)
	i.size += entWidth
	return nil
}

Write(off uint32, pos uint32)将给定的偏移量和位置写入到索引文件。首先,验证是否有写入条目的空间。如果有空间,则对偏移量和位置进行编码,并将它们写入内存映射文件。然后我们更新下一个写入的位置。size保存的是下一条索引写入的位置。

下面再为索引增加一个Name方法:

func (i *index) Name() string {
	return i.file.Name()
}

接下来我们为上面的代码写单元测试,在项目的internal/log目录中创建index_test.go文件,添加以下代码:

package log

import (
	"io"
	"io/ioutil"
	"os"
	"testing"

	"github.com/stretchr/testify/require"
)

func TestIndex(t *testing.T) {
	f, err := ioutil.TempFile(os.TempDir(), "index_test")
	require.NoError(t, err)
	defer os.Remove(f.Name())

	c := Config{}
	c.Segment.MaxIndexBytes = 1024
	idx, err := newIndex(f, c)
	require.NoError(t, err)
	_, _, err = idx.Read(-1)
	require.Equal(t, io.EOF, err)
	require.Equal(t, f.Name(), idx.Name())

	//准备测试用例
	entries := []struct {
		Offset uint32
		Pos    uint64
	}{{
		Offset: 0,
		Pos:    0,
	},
		{
			Offset: 1,
			Pos:    10,
		},
	}

这段代码设置了单元测试。创建一个索引文件,并使其足够大,可以通过Truncate调用,写入我们的测试条目。我们必须在使用之前对读文件进行截取,因为要将文件映射到内存的一个字节片,如果我们在写入文件之前不增加文件的大小,就会发生越界错误。

最后添加以下代码完成测试:

for _, want := range entries {
		err = idx.Write(want.Offset, want.Pos)
		require.NoError(t, err)

		_, pos, err := idx.Read(int64(want.Offset))
		require.NoError(t, err)
		require.Equal(t, want.Pos, pos)
	}
 //读取还未写入索引位置,报错
	_, _, err = idx.Read(int64(len(entries)))
	require.Equal(t, io.EOF, err)
	_ = idx.Close()

  //重新打开索引文件,可恢复状态
	f, _ = os.OpenFile(f.Name(), os.O_RDWR, 0600)
	idx, err = newIndex(f, c)
	require.NoError(t, err)
  //这里读取最后一条索引,也就是第二条
	off, pos, err := idx.Read(-1)
	require.NoError(t, err)
	require.Equal(t, uint32(1), off)
	require.Equal(t, entries[1].Pos, pos)
}

我们遍历每个条目并将其写入索引文件。检查是否可以通过read方法将相同的条目读回。然后,在尝试读取超出索引条目数量的数据时,验证索引和扫描文件是否出错。当服务使用现有索引文件重新启动时,检查索引是否从现有文件恢复已有状态。

最后,我们需要配置存储文件和索引文件的最大存储容量。添加config结构体来集中存放日志的配置,便于统一对日志相关参数进行设置。创建internal/log/config.go文件

type Config struct {
	Segment struct {
		MaxStoreBytes uint64
		MaxIndexBytes uint64
		InitialOffset uint64
	}
}

到这里就完成了日志存储和索引类型的代码,它们构成了日志的基础功能。后面将介绍日志段的结构体类型,也就是将日志存储和索引关联起来的结构体。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注