本文将从下往上实现日志包代码,从存储和索引文件开始,由于“日志”一词至少可以指三种不同的东西—记录、存储记录的文件和将段连接在一起的抽象数据类型—为了避免混淆,我将始终使用以下术语来表示这些概念:
- Record:存储在日志中的记录(一条数据)。
- Store:存Records的文件。
- Index:存索引的文件。
- Segment:将存储和索引联系在一起的抽象数据类型。
- Log:将以上所有类型联系在一起的抽象数据类型。
Store代码实现
首先为Log包创建internal/log目录,并新建store.go文件,增加以下代码:
package log
import (
"bufio"
"encoding/binary"
"os"
"sync"
)
var enc = binary.BigEndian
const lenWidth = 8 //存放单条记录长度
type store struct {
*os.File //存储日志文件
mu sync.Mutex //并发控制
buf *bufio.Writer //写日志缓存
size uint64 //日志文件大小
}
func newStore(f *os.File) (*store, error) {
fi, err := os.Stat(f.Name())
if err != nil {
return nil, err
}
size := uint64(fi.Size())
return &store{
File: f,
buf: bufio.NewWriter(f),
size: size,
}, nil
}
store结构体对文件进行封装,包含两个API:在文件中添加以及读取二进制记录。
newStore(*os.File)函数根据指定文件创建store对象。os.Stat(name string)获取文件的当前大小,以防我们从已有数据的文件中重新创建store,例如,我们的服务重新启动了,就会发生这种情况。
代码中反复引用enc变量和lenWidth常量,所以把它们放在最上面,这样很容易找到。enc定义了保存记录大小和索引条目的编码方式,lenWidth定义了存储记录长度的字节数。
下面实现向文件中添加日志记录Append方法:
func (s *store) Append(p []byte) (n uint64, pos uint64, err error) {
s.mu.Lock()
defer s.mu.Unlock()
pos = s.size
//将p的长度写入缓存
if err := binary.Write(s.buf, enc, uint64(len(p))); err != nil {
return 0, 0, err
}
//实际字节内容写入缓存
w, err := s.buf.Write(p)
if err != nil {
return 0, 0, err
}
w += lenWidth
s.size += uint64(w)
return uint64(w), pos, nil
}
Append([]byte)将给定的字节数组p存储到store对象中。这里保存记录的长度,这样在读取记录时,就知道要读多少字节。代码中将数据写入缓存,而不是直接写文件,可以减少系统调用并提高性能。如果用户要写大量的小记录,效率更高。然后我们返回写入的字节数(Go API通常会返回写入字节数),以及记录存储在文件中的位置。后面为记录创建一个关联的索引条目时,将使用这个存储位置。
下面再实现Read方法,用于读取记录:
func (s *store) Read(pos uint64) ([]byte, error) {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.buf.Flush(); err != nil {
return nil, err
}
size := make([]byte, lenWidth)
//先读取记录的字节数
if _, err := s.File.ReadAt(size, int64(pos)); err != nil {
return nil, err
}
b := make([]byte, enc.Uint64(size))
//读取记录
if _, err := s.File.ReadAt(b, int64(pos+lenWidth)); err != nil {
return nil, err
}
return b, nil
}
Read(pos uint64)返回存储在给定位置的记录。首先刷新缓冲区,以防我们读取缓冲区尚未刷新到磁盘的记录。第一步找出需要读取多少字节才能获得整个记录,然后获取并返回记录。注意编译器分配的字节切片不会离开在堆栈中声明的函数。当值的生命周期超过函数调用的生命周期时(例如,如果您返回该值),会发生变量逃逸,也就是存储到堆内存中。
下面在Read方法下面增加一个ReadAt方法:
func (s *store) ReadAt(p []byte, off int64) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.buf.Flush(); err != nil {
return 0, err
}
return s.File.ReadAt(p, off)
}
ReadAt(p []byte, off int64)方法,从off指定的偏移量开始读取长度len(p)字节数据。该方法实现了io.ReadAt接口。
最后添加Close方法:
func (s *store) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
err := s.buf.Flush() //刷新缓存保存到文件
if err != nil {
return err
}
return s.File.Close() //最后关闭文件
}
Close方法在关闭文件时,将缓冲区数据刷新到文件中。
接下来,我们对上面的代码进行测试。在log目录中创建store_test.go文件,添加以下单元测试代码:
package log
import (
"fmt"
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/require" //用于单元测的第三方包
)
var (
write = []byte("hello world")
width = uint64(len(write)) + lenWidth
)
func TestStoreAppendRead(t *testing.T) {
f, err := ioutil.TempFile("", "store_append_read_test")
require.NoError(t, err)
defer os.Remove(f.Name())
s, err := newStore(f)
require.NoError(t, err)
testAppend(t, s)
testRead(t, s)
testReadAt(t, s)
s, err = newStore(f)
require.NoError(t, err)
testRead(t, s)
}
在这个测试中,我们创建了一个带有临时文件的store对象,并调用两个帮助函数来测试对store对象追加和读取内容。然后我们再次创建store对象并测试从中读取记录,验证我们的服务在重新启动后能恢复原有的状态。
在TestStoreAppendRead函数之后,添加这些帮助函数:
func testAppend(t *testing.T, s *store) {
t.Helper()
for i := uint64(1); i < 4; i++ {
n, pos, err := s.Append(write)
require.NoError(t, err)
require.Equal(t, pos+n, width*i)
}
}
func testRead(t *testing.T, s *store) {
t.Helper()
var pos uint64
for i := uint64(1); i < 4; i++ {
read, err := s.Read(pos)
require.NoError(t, err)
require.Equal(t, write, read)
pos += width
}
}
func testReadAt(t *testing.T, s *store) {
t.Helper()
off := int64(0)
for i := uint64(1); i < 4; i++ {
b := make([]byte, lenWidth)
n, err := s.ReadAt(b, off)
require.NoError(t, err)
require.Equal(t, lenWidth, n)
off += int64(n)
size := enc.Uint64(b)
b = make([]byte, size)
n, err = s.ReadAt(b, off)
require.NoError(t, err)
require.Equal(t, write, b)
require.Equal(t, int(size), n)
off += int64(n)
}
}
在testReadAt后面增添Close方法到测试代码:
func TestStoreClose(t *testing.T) {
f, err := ioutil.TempFile("", "store_append_read_test")
require.NoError(t, err)
defer os.Remove(f.Name())
s, err := newStore(f)
require.NoError(t, err)
_, _, err = s.Append(write)
require.NoError(t, err)
f, beforeSize, err := openFile(f.Name())
require.NoError(t, err)
err = s.Close()
require.NoError(t, err)
_, afterSize, err := openFile(f.Name())
require.NoError(t, err)
require.True(t, afterSize > beforeSize)
}
func openFile(name string) (file *os.File, size int64, err error) {
f, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, 0, err
}
fi, err := f.Stat()
if err != nil {
return nil, 0, err
}
fmt.Println(name)
return f, fi.Size(), nil
}
如果这些测试通过的话,我们的日志包已经可以追加和读取持久划记录了。
总结:
本文主要实现了提交日志服务中的两个底层核心功能,日志的存储和读取。我们对每个功能都进行了单元测试。