segment结构体包含了日志的存储和索引类型,协调这两者之间的操作。例如,当向segment中添加一条记录时,segment需要将数据写入它的存储区,并在索引文件中添加一条新索引条目。读操作类似,segment需要从索引中查找条目,然后从存储中获取数据。首先,在internal/log目录中创建一个名为segment.go的文件。并增加以下代码:proglog/internal/log/segment.go
package log
import (
"fmt"
"os"
"path"
"github.com/wangmingjunabc/proglog/api/v1"
"google.golang.org/protobuf/proto"
)
type Segment struct {
store *store
index *index
baseOffset, nextOffset uint64
config Config
}
segment需要调用日志的存储和索引文件,因此创建两个指针来分别指向存储文件和索引文件。还需要知道记录的基础偏移量和下一个偏移量,并且要计算出索引的相对偏移量。这里也添加config配置,用于对比存储文件和索引文件大小是否超过限制。
func newSegment(dir string, baseOffset uint64, c Config) (*Segment, error) {
//根据配置文件创建segment对象
s := &Segment{
baseOffset: baseOffset,
config: c,
}
var err error
//创建日志存储文件
storeFile, err := os.OpenFile(path.Join(dir, fmt.Sprintf("%d%s", baseOffset, ".store")),
os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
//创建存储对象
if s.store, err = newStore(storeFile); err != nil {
return nil, err
}
//创建索引文件
indexFile, err := os.OpenFile(path.Join(dir, fmt.Sprintf("%d%s", baseOffset, ".index")),
os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
//创建索引对象
if s.index, err = newIndex(indexFile, c); err != nil {
return nil, err
}
//判断索引文件是否为空
if off, _, err := s.index.Read(-1); err != nil {
s.nextOffset = baseOffset
} else {
s.nextOffset = baseOffset + uint64(off) + 1
}
return s, nil
}
每个存储文件和索引文件都限制了大小,当超过限制就会新建一个segment对象。在打开文件的时候,传入了os.O_CREATE参数,如果文件不存在就新建。os.O_APPEND参数表示写入日志时,会追加到文件末尾。最后,我们设置segment的下一个偏移量,为添加下一个记录做准备。如果索引为空,那么附加到segment的下一个记录将是第一个记录,它的偏移量将是segment的基偏移量。如果索引至少有一个条目,那么这意味着写入的下一个记录的偏移量应该取段末尾的偏移量,这是通过将基偏移量和相对偏移量相加1得到的。
接下来就可以对segment对象添加Append方法:
func (s *Segment) Append(record *log_v1.Record) (offset uint64, err error) {
//获取当前偏移量
cur := s.nextOffset
record.Offset = cur
p, err := proto.Marshal(record)
if err != nil {
return 0, err
}
//将记录写入存储文件对象
_, pos, err := s.store.Append(p)
if err != nil {
return 0, err
}
//将相对偏移量和记录位置写入索引文件
if err = s.index.Write(uint32(s.nextOffset-s.baseOffset), pos); err != nil {
return 0, err
}
s.nextOffset++
return cur, nil
}
Append方法将记录写入segment对象,并返回新追加记录的偏移量。segment通过两个步骤添加一条记录:将记录添加到存储中,然后添加一个索引条目。由于索引偏移量是相对于基数偏移量的,所以我们从基数偏移量(两者都是绝对偏移量)中减去segment的下一个偏移量,以得到记录在segment中的相对偏移量。然后增加下一个偏移量,为后面的追加调用做准备。
然后是为segement添加Read方法:
func (s *Segment) Read(off uint64) (*log_v1.Record, error) {
//根据记录的偏移量读取索引条目
_, pos, err := s.index.Read(int64(off - s.baseOffset))
if err != nil {
return nil, err
}
//根据索引条目存储了记录的位置,读取记录
p, err := s.store.Read(pos)
if err != nil {
return nil, err
}
//对记录反序列化处理
record := &log_v1.Record{}
err = proto.Unmarshal(p, record)
return record, nil
}
Read(off uint64)返回给定偏移量的记录。与Append类似,要读取一条记录,segment必须将绝对偏移量转换为相对偏移量,并获得相关的索引项。一旦有了索引项,segment就可以直接指导了记录在存储中的位置,并读取适当数量的数据。
下面为segment添加IsMaxed方法,返回文件是否超出限制:
func (s *Segment) IsMax() bool {
return s.store.size >= s.config.Segment.MaxStoreBytes ||
s.index.size >= s.config.Segment.MaxIndexBytes
}
IsMaxed返回segement对象中存储文件和索引文件是否达到了最大,要么是写了太多记录到存储文件中,要么是写了太多的索引。如果写了少量的长日志,那么您就会碰到存储文件大小限制;如果您写了大量的小日志,那么您将达到索引文件限制。使用这个方法来判断是否需要创建一个新的segment对象。
为segment添加Remove方法,关闭segment对象文件并删除相应文件,清理磁盘空间。
func (s *Segment) Remove() error {
if err := s.Close(); err != nil {
return err
}
if err := os.Remove(s.store.Name()); err != nil {
return err
}
if err := os.Remove(s.index.Name()); err != nil {
return err
}
return nil
}
最后添加Close方法:
func (s *Segment) Close() error {
if err := s.index.Close(); err != nil {
return err
}
if err := s.store.Close(); err != nil {
return err
}
return nil
}
以上是所有segment对象代码,下面为segment写单元测试。在proglog/internal/log目录中新建segment_test.go文件。
package log
import (
"io"
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/require"
api "github.com/wangmingjunabc/proglog/api/v1"
)
func TestSegment(t *testing.T) {
//创建测试临时目录
dir, _ := ioutil.TempDir("", "segment_test")
defer os.RemoveAll(dir)
//创建记录对象
want := &api.Record{Value: []byte("hello world")}
c := Config{}
c.Segment.MaxStoreBytes = 1024 //存储文件最大字节数
c.Segment.MaxIndexBytes = entWidth * 3 //索引文件最大字节数
//创建segment对象
s, err := newSegment(dir, 16, c)
require.NoError(t, err)
require.Equal(t, uint64(16), s.nextOffset)
require.False(t, s.IsMax())
//添加3条内容相同记录
for i := uint64(0); i < 3; i++ {
off, err := s.Append(want)
require.NoError(t, err)
require.Equal(t, 16+i, off)
got, err := s.Read(off)
require.NoError(t, err)
require.Equal(t, want.Value, got.Value)
}
//再添加一条记录,索引文件会超出最大字节数报错
_, err = s.Append(want)
require.Equal(t, io.EOF, err)
//max index
require.True(t, s.IsMax())
//修改存储文件最大值
c.Segment.MaxStoreBytes = uint64(len(want.Value) * 3)
c.Segment.MaxIndexBytes = 1024
//新建segment对象
s, err = newSegment(dir, 16, c)
require.NoError(t, err)
//超出最大限制
require.True(t, s.IsMax())
err = s.Remove() //删除一条记录
require.NoError(t, err)
s, err = newSegment(dir, 16, c)
require.NoError(t, err)
require.False(t, s.IsMax())
}
我们测试可以添加一个记录到segment中,接着读取添加的记录,并最终达到存储和索引配置的最大字节数。用相同的基偏移量和dir调用newSegment两次检查函数是否从已有的索引和日志文件中加载segment的状态。