跳至正文

Go语言实现Log读写

本文将从下往上实现日志包代码,从存储和索引文件开始,由于“日志”一词至少可以指三种不同的东西—记录、存储记录的文件和将段连接在一起的抽象数据类型—为了避免混淆,我将始终使用以下术语来表示这些概念:

  • Record:存储在日志中的记录(一条数据)。
  • Store:存Records的文件。
  • Index:存索引的文件。
  • Segment:将存储和索引联系在一起的抽象数据类型。
  • Log:将以上所有类型联系在一起的抽象数据类型。

Store代码实现

首先为Log包创建internal/log目录,并新建store.go文件,增加以下代码:

package log

import (
	"bufio"
	"encoding/binary"
	"os"
	"sync"
)

var enc = binary.BigEndian

const lenWidth = 8 //存放单条记录长度

type store struct {
	*os.File           //存储日志文件
	mu   sync.Mutex    //并发控制
	buf  *bufio.Writer //写日志缓存
	size uint64        //日志文件大小
}

func newStore(f *os.File) (*store, error) {
	fi, err := os.Stat(f.Name())
	if err != nil {
		return nil, err
	}
	size := uint64(fi.Size())

	return &store{
		File: f,
		buf:  bufio.NewWriter(f),
		size: size,
	}, nil
}

store结构体对文件进行封装,包含两个API:在文件中添加以及读取二进制记录。

newStore(*os.File)函数根据指定文件创建store对象。os.Stat(name string)获取文件的当前大小,以防我们从已有数据的文件中重新创建store,例如,我们的服务重新启动了,就会发生这种情况。

代码中反复引用enc变量和lenWidth常量,所以把它们放在最上面,这样很容易找到。enc定义了保存记录大小和索引条目的编码方式,lenWidth定义了存储记录长度的字节数。

下面实现向文件中添加日志记录Append方法:

func (s *store) Append(p []byte) (n uint64, pos uint64, err error) {
	s.mu.Lock()
	defer s.mu.Unlock()
	pos = s.size
  //将p的长度写入缓存
	if err := binary.Write(s.buf, enc, uint64(len(p))); err != nil {
		return 0, 0, err
	}
  //实际字节内容写入缓存
	w, err := s.buf.Write(p)
	if err != nil {
		return 0, 0, err
	}
	w += lenWidth
	s.size += uint64(w)
	return uint64(w), pos, nil
}

Append([]byte)将给定的字节数组p存储到store对象中。这里保存记录的长度,这样在读取记录时,就知道要读多少字节。代码中将数据写入缓存,而不是直接写文件,可以减少系统调用并提高性能。如果用户要写大量的小记录,效率更高。然后我们返回写入的字节数(Go API通常会返回写入字节数),以及记录存储在文件中的位置。后面为记录创建一个关联的索引条目时,将使用这个存储位置。

下面再实现Read方法,用于读取记录:

func (s *store) Read(pos uint64) ([]byte, error) {
	s.mu.Lock()
	defer s.mu.Unlock()
	if err := s.buf.Flush(); err != nil {
		return nil, err
	}
	size := make([]byte, lenWidth)
  //先读取记录的字节数
	if _, err := s.File.ReadAt(size, int64(pos)); err != nil {
		return nil, err
	}
	b := make([]byte, enc.Uint64(size))
  //读取记录
	if _, err := s.File.ReadAt(b, int64(pos+lenWidth)); err != nil {
		return nil, err
	}
	return b, nil
}

Read(pos uint64)返回存储在给定位置的记录。首先刷新缓冲区,以防我们读取缓冲区尚未刷新到磁盘的记录。第一步找出需要读取多少字节才能获得整个记录,然后获取并返回记录。注意编译器分配的字节切片不会离开在堆栈中声明的函数。当值的生命周期超过函数调用的生命周期时(例如,如果您返回该值),会发生变量逃逸,也就是存储到堆内存中。

下面在Read方法下面增加一个ReadAt方法:

func (s *store) ReadAt(p []byte, off int64) (int, error) {
	s.mu.Lock()
	defer s.mu.Unlock()
	if err := s.buf.Flush(); err != nil {
		return 0, err
	}
	return s.File.ReadAt(p, off)
}

ReadAt(p []byte, off int64)方法,从off指定的偏移量开始读取长度len(p)字节数据。该方法实现了io.ReadAt接口。

最后添加Close方法:

func (s *store) Close() error {
	s.mu.Lock()
	defer s.mu.Unlock()
	err := s.buf.Flush() //刷新缓存保存到文件
	if err != nil {
		return err
	}
	return s.File.Close() //最后关闭文件
}

Close方法在关闭文件时,将缓冲区数据刷新到文件中。

接下来,我们对上面的代码进行测试。在log目录中创建store_test.go文件,添加以下单元测试代码:

package log

import (
	"fmt"
	"io/ioutil"
	"os"
	"testing"

	"github.com/stretchr/testify/require"   //用于单元测的第三方包
)

var (
	write = []byte("hello world")
	width = uint64(len(write)) + lenWidth
)

func TestStoreAppendRead(t *testing.T) {
	f, err := ioutil.TempFile("", "store_append_read_test")
	require.NoError(t, err)
	defer os.Remove(f.Name())
	s, err := newStore(f)
	require.NoError(t, err)

	testAppend(t, s)
	testRead(t, s)
	testReadAt(t, s)

	s, err = newStore(f)
	require.NoError(t, err)
	testRead(t, s)
}

在这个测试中,我们创建了一个带有临时文件的store对象,并调用两个帮助函数来测试对store对象追加和读取内容。然后我们再次创建store对象并测试从中读取记录,验证我们的服务在重新启动后能恢复原有的状态。

在TestStoreAppendRead函数之后,添加这些帮助函数:

func testAppend(t *testing.T, s *store) {
	t.Helper()
	for i := uint64(1); i < 4; i++ {
		n, pos, err := s.Append(write)
		require.NoError(t, err)
		require.Equal(t, pos+n, width*i)
	}
}

func testRead(t *testing.T, s *store) {
	t.Helper()
	var pos uint64
	for i := uint64(1); i < 4; i++ {
		read, err := s.Read(pos)
		require.NoError(t, err)
		require.Equal(t, write, read)
		pos += width
	}
}

func testReadAt(t *testing.T, s *store) {
	t.Helper()
	off := int64(0)
	for i := uint64(1); i < 4; i++ {
		b := make([]byte, lenWidth)
		n, err := s.ReadAt(b, off)
		require.NoError(t, err)
		require.Equal(t, lenWidth, n)
		off += int64(n)
		size := enc.Uint64(b)
		b = make([]byte, size)
		n, err = s.ReadAt(b, off)
		require.NoError(t, err)
		require.Equal(t, write, b)
		require.Equal(t, int(size), n)
		off += int64(n)
	}
}

在testReadAt后面增添Close方法到测试代码:

func TestStoreClose(t *testing.T) {
	f, err := ioutil.TempFile("", "store_append_read_test")
	require.NoError(t, err)
	defer os.Remove(f.Name())
	s, err := newStore(f)
	require.NoError(t, err)
	_, _, err = s.Append(write)
	require.NoError(t, err)

	f, beforeSize, err := openFile(f.Name())
	require.NoError(t, err)
	err = s.Close()
	require.NoError(t, err)

	_, afterSize, err := openFile(f.Name())
	require.NoError(t, err)
	require.True(t, afterSize > beforeSize)
}

func openFile(name string) (file *os.File, size int64, err error) {
	f, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
	if err != nil {
		return nil, 0, err
	}
	fi, err := f.Stat()
	if err != nil {
		return nil, 0, err
	}
	fmt.Println(name)
	return f, fi.Size(), nil
}

如果这些测试通过的话,我们的日志包已经可以追加和读取持久划记录了。

总结:

本文主要实现了提交日志服务中的两个底层核心功能,日志的存储和读取。我们对每个功能都进行了单元测试。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注