TinyKv Project1 Standalone KV
前言
project1还是比较简单的,project2的难道就陡增了。对于project1来说,本文更多的还是讲一些不了解的概念。在每一个project1开始之前,仔细阅读文档。
Project1 StandaloneKV 文档翻译
Project1 StandaloneKV
在这个项目中,我们将会在列族
的支持下建立一个独立的 key/value 存储 gRPC 服务
。Standalone 意味着只有一个节点,而不是一个分布式系统。列族(CF, Column family)是一个类似 key 命名空间的术语,即同一个 key 在不同列族中的值是不同的。
你可以简单地将多个 CF 视为独立的小型数据库。CF 在 Project4 中被用来支持事务模型。
该服务支持四个基本操作:Put/Delete/Get/Scan,它维护了一个简单的 key/value Pairs 的数据库,其中 key and value 都是字符串。
- Put:替换数据库中指定 CF 的某个 key 的 value。
- Delete:删除指定 CF 的 key 的 value。
- Get:获取指定 CF 的某个 key 的当前值。
- Scan:获取指定 CF 的一系列 key 的 current value。
该项目可以分为2个步骤,包括:
- 实现一个独立的存储引擎。
- 实现原始的 key/value 服务处理程序。
代码
gRPC 服务在 kv/main.go 中被初始化,它包含一个 tinykv.Server,它提供了名为 TinyKv 的 gRPC 服务 。它由 proto/proto/tinykvpb.proto 中的 protocol-buffer 定义,rpc 请求和响应的细节被定义在 proto/proto/kvrpcpb.proto 中。
一般来说,不需要改变 proto 文件,因为所有必要的字段都已经被定义了。但如果仍然需要改变,可以修改 proto 文件并运行 make proto 来更新 proto/pkg/xxx/xxx.pb.go 中生成的 go 相关代码。
此外,Server 依赖于一个 Storage
,这是一个需要为独立存储引擎实现的接口
,位于 kv/storage/standalone_storage/standalone_storage.go 中。一旦 StandaloneStorage 中实现了接口 Storage
,就可以用它实现Server 的原始 key/value 服务。
实现独立的存储引擎
第一个任务是实现 badger key/value 的 API。gRPC 服务依赖于一个在 kv/storage/storage.go 中定义的 Storage。在这种情况下,独立的存储引擎只是由两个方法提供的 badger key/value API。
type Storage interface {
// 省略其他的代码
Write(ctx *kvrpcpb.Context, batch []Modify) error
Reader(ctx *kvrpcpb.Context) (StorageReader, error)
}
-
Write:应该提供一种方式,将一系列的修改应用到内部状态,
内部状态是一个 badger 的实例。
-
Reader:应该
返回一个 StorageReader
,支持 key/value 的单点读取和快照扫描的操作。 -
现在不需要考虑 kvrpcpb.Context,它在后面的项目中使用。
提示:
- 使用 badger.Txn 来实现 Reader 函数,因为 badger 提供的事务处理程序可以提供 keys 和 values 的一致快照。
- Badger 没有给出对 CF 的支持。engine_util 包(kv/util/engine_util)通过给 keys 添加前缀来模拟 CF。例如,一个属于特定 CF 的 key 被存储为 $ { cf } _ $ { key } 。它封装了 badger 以提供对 CF 的操作,还提供了许多有用的辅助函数。所以应该通过 engine_util 提供的方法进行所有读写操作。阅读 util/engine_util/doc.go 可以了解更多。
- TinyKV使用了 badger 原始版本的分支,并进行了一些修正,所以应该使用 github.com/Connor1996/badger 而不是 github.com/dgraph-io/badger。
- 不要忘记为 badger.Txn 调用 Discard(),并在销毁前关闭所有迭代器。
实现服务处理程序
这个项目的最后一步是用实现的存储引擎
来构建原始的 key/value 服务处理程序
,包括 RawGet / RawScan / RawPut / RawDelete。处理程序已经定义好了,只需要在 kv/server/raw_api.go 中补充实现。一旦完成,记得运行 make project1 以通过测试用例。
文档的重点内容
列族(CF Column family)
列族(CF Column family):在文档的开始,所说的列族到底是什么意思?
Column Family,也叫 CF,这个概念从 HBase 中来,就是将多个列合并为一个CF进行管理。这样读取一行数据时,你可以按照 CF 加载列,不需要加载所有列(通常同一个CF的列会保存在同一个文件中,所以这样有很高的效率)。此外因为同一列的数据格式相同,你可以针对某种格式采用高效的压缩算法。
default_wxf
default_www
default_xxx
write_wxf
write_www
write_xxx
lock_wxf
lock_www
lock_xxx
从上面的样例可以看出,CF的本质,就是key的前缀,就是一个字符串,起命名空间的作用。
func KeyWithCF(cf string, key []byte) []byte {
return append([]byte(cf+"_"), key...)
}
独立存储引擎
-
Write:应该提供一种方式,将一系列的修改应用到内部状态,
内部状态是一个 badger 的实例。
-
Reader:应该
返回一个 StorageReader
,支持 key/value 的单点读取和快照扫描的操作。
看上面两句话的描述,其实核心就是,让我们使用基于badger实现write的操作,至于badger是怎么write的,我们不用管,我们只需要对badger进行一层封装即可。
对于Reader来说,核心就是让我们返回一个StorageReader
接口,我们应该去实现这个接口然后返回,而这个接口如何实现,提示里也说了,使用 badger.Txn
去实现该接口。
实现了这两个接口,那么文档中所说的独立存储引擎
就实现了。
原始的 key/value 服务处理程序
原始的 key/value 服务处理程序(Put/Delete/Get/Scan):其实就是对存储引擎的一层上层封装,有了这层封装,我们可以随意改变底层的存储引擎。使用则无需关注底层细节,只要遵循上层接口使用规范即可。
StandAloneStorage
通过文档中的提示2我们知道,engine_util对badger进行了封装,StandAloneStorage就是要在engine_util的基础上再封装一层。
// Engines keeps references to and data for the engines used by unistore.
// All engines are badger key/value databases.
// the Path fields are the filesystem path to where the data is stored.
type Engines struct {
// Data, including data which is committed (i.e., committed across other nodes) and un-committed (i.e., only present
// locally).
Kv *badger.DB
KvPath string
// Metadata used by Raft.
Raft *badger.DB
RaftPath string
}
那么我们就再封装一层即可,为什么这里要加一个conf?因为我们创建的时候需要用到conf里面的参数,因为不知道后续会不会用到这个config,所以先保存起来。
type StandAloneStorage struct {
// Your Data Here (1).
engine *engine_util.Engines
conf *config.Config
}
func NewStandAloneStorage(conf *config.Config) *StandAloneStorage {
// Your Code Here (1).
dbPath := conf.DBPath
kvPath := path.Join(dbPath, "kv")
raftPath := path.Join(dbPath, "raft")
kvDB := engine_util.CreateDB(kvPath, false)
//project1没用到raft
raftDB := engine_util.CreateDB(raftPath, true)
//func CreateDB(path string, raft bool) *badger.DB
//func NewEngines(kvEngine, raftEngine *badger.DB, kvPath, raftPath string) *Engines
return &StandAloneStorage{
engine: engine_util.NewEngines(kvDB, raftDB, kvPath, raftPath),
conf: conf,
}
}
StandAloneStorage是一个实例,它应该去实现接口,即Storage接口。那么我们下面重点关注Write和Reader。
type Storage interface {
Start() error
Stop() error
Write(ctx *kvrpcpb.Context, batch []Modify) error
Reader(ctx *kvrpcpb.Context) (StorageReader, error)
}
Write
去看看Modify类型是什么
Write(ctx *kvrpcpb.Context, batch []Modify) error
Modify 本质上代表着Put和Delete两种操作,通过下面的函数可以看到,它是通过断言来区分Put还是Delete的。一个Modify对应一个kv,所以可以看到上面Write接口中,Modify是一个切片,那么对于Write,我们需要遍历range这个切片进行操作。
// Modify is a single modification to TinyKV's underlying storage.
type Modify struct {
Data interface{}
}
type Put struct {
Key []byte
Value []byte
Cf string
}
type Delete struct {
Key []byte
Cf string
}
func (m *Modify) Key() []byte {
switch m.Data.(type) {
case Put:
return m.Data.(Put).Key
case Delete:
return m.Data.(Delete).Key
}
return nil
}
func (m *Modify) Value() []byte {
if putData, ok := m.Data.(Put); ok {
return putData.Value
}
return nil
}
func (m *Modify) Cf() string {
switch m.Data.(type) {
case Put:
return m.Data.(Put).Cf
case Delete:
return m.Data.(Delete).Cf
}
return ""
}
提示2中说了,engine_util包中提供了方法进行所有的读写操作,所以现在去看看提供了什么api供我们使用。
可以看到提供了两个函数给我们使用,看一下源码其实也能发现,在真实存储的时候,key被加了前缀:cf_key
.
func PutCF(engine *badger.DB, cf string, key []byte, val []byte) error {
return engine.Update(func(txn *badger.Txn) error {
return txn.Set(KeyWithCF(cf, key), val)
})
}
func DeleteCF(engine *badger.DB, cf string, key []byte) error {
return engine.Update(func(txn *badger.Txn) error {
return txn.Delete(KeyWithCF(cf, key))
})
}
在了解了engine_util提供的方法,以及Modify的含义后,Write怎么实现其实就迎刃而解了。
func (s *StandAloneStorage) Write(ctx *kvrpcpb.Context, batch []storage.Modify) error {
// Your Code Here (1).
//func PutCF(engine *badger.DB, cf string, key []byte, val []byte)
var err error
for _, m := range batch {
key, val, cf := m.Key(), m.Value(), m.Cf()
if _, ok := m.Data.(storage.Put); ok {
err = engine_util.PutCF(s.engine.Kv, cf, key, val)
} else {
err = engine_util.DeleteCF(s.engine.Kv, cf, key)
}
if err != nil {
return err
}
}
return nil
}
Reader
Reader函数需要我们返回一个StorageReader接口,那么我们就需要去实现一个StorageReader接口的结构体,来看看实现它需要哪些函数
Reader(ctx *kvrpcpb.Context) (StorageReader, error)
type StorageReader interface {
// When the key doesn't exist, return nil for the value
GetCF(cf string, key []byte) ([]byte, error)
IterCF(cf string) engine_util.DBIterator
Close()
}
看一下engine_util包给我们提供了什么函数,可以看到从txn中读取或者遍历的方法不用自己写,直接使用api即可,那么上面的GetCF和IterCF就是对下面两个函数的封装。
// get value
val, err := engine_util.GetCFFromTxn(txn, cf, key)
func GetCFFromTxn(txn *badger.Txn, cf string, key []byte) (val []byte, err error)
// get iterator
iter := engine_util.NewCFIterator(cf, txn)
func NewCFIterator(cf string, txn *badger.Txn) *BadgerIterator
可以看到engine_util提供的GetCFFromTxn和NewCFIterator两个函数都需要badger.Txn
,那么这个Txn是什么呢?其实是一个事务。获取badger.Txn的函数engine_util并未给出,需要直接调用badger.DB.NewTransaction函数。
txn *badger.Txn
//update为真表示Put/Delete两个写操作,为假表示Get/Scan两个读操作。
//NewTransaction creates a new transaction.
func (db *DB) NewTransaction(update bool) *Txn
所以不难发现,想要实现StorageReader接口,那么结构体中就要包含badger.Txn,继而去调用engine_util提供的api。在这里,可以把这个字段放在StandAloneStorage中;不过我还是把它拆到新的结构体里面了。
为什么呢?在上面的Write函数中,我们仅仅传入的是*badger.DB
,其内部也是有*badger.Txn
的,但是Write对事务进行了屏蔽
。在StorageReader接口的两个函数中,也没有事务的身影,但是我们因为是读,所以需要用到新的读事务,只能去创建一个。
综合来看,这些接口的封装,很明显就是不想让我们去过多的使用事务的,所以干脆把其放在一个新结构体里面return掉好了。
func (s *StandAloneStorage) Reader(ctx *kvrpcpb.Context) (storage.StorageReader, error) {
// Your Code Here (1).
//func (db *DB) NewTransaction(update bool) *Txn
//For read-only transactions, set update to false.
txn := s.engine.Kv.NewTransaction(false)
return NewStandAloneStorageReader(txn), nil
}
//实现 type StorageReader interface 接口
type StandAloneStorageReader struct {
KvTxn *badger.Txn
}
func NewStandAloneStorageReader(txn *badger.Txn) *StandAloneStorageReader {
return &StandAloneStorageReader{
KvTxn: txn,
}
}
func (s *StandAloneStorageReader) GetCF(cf string, key []byte) ([]byte, error) {
val, err := engine_util.GetCFFromTxn(s.KvTxn, cf, key)
//StandAloneStorage是底层的一个系统,这里认为not found不是err
//将not found屏蔽掉
//上层不认为not found是err
if err == badger.ErrKeyNotFound {
return nil, nil
}
return val, err
}
func (s *StandAloneStorageReader) IterCF(cf string) engine_util.DBIterator {
//func NewCFIterator(cf string, KvTxn *badger.Txn) *BadgerIterator
return engine_util.NewCFIterator(cf, s.KvTxn)
}
func (s *StandAloneStorageReader) Close() {
s.KvTxn.Discard()
}
Server
前面以及说过了,原始的 key/value 服务处理程序(Put/Delete/Get/Scan):其实就是对存储引擎的一层上层封装,有了这层封装,我们可以随意改变底层的存储引擎。使用则无需关注底层细节,只要遵循上层接口使用规范即可。
// Server is a TinyKV server, it 'faces outwards', sending and receiving messages from clients such as TinySQL.
type Server struct {
storage storage.Storage
//...
}
这样做我觉得最大的好处就是,底层的存储引擎无论怎么换,对上层都是无感的。这让我想到了mysql的innodb和myisam,都是无感的。在project1中,我们实现的是单机的存储引擎,所以在后续的project中,我们就可以直接换掉它。
func (server *Server) RawGet(_ context.Context, req *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error)
func (server *Server) RawPut(_ context.Context, req *kvrpcpb.RawPutRequest) (*kvrpcpb.RawPutResponse, error)
func (server *Server) RawDelete(_ context.Context, req *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error)
func (server *Server) RawScan(_ context.Context, req *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error)
这4个接口都是对上面的StandAloneStorage中写的接口的再次封装。
- RawGet: 如果获取不到,返回时要标记 reply.NotFound=true。
- RawPut:把单一的 put 请求用 storage.Modify 包装成一个数组,一次性写入。
- RawDelete:和 RawPut 同理。
- RawScan:通过 reader 获取 iter,从 StartKey 开始,同时注意 limit。
需要特别注意的一点是:RawGet: 如果获取不到,返回时要标记 reply.NotFound=true
我所理解的Scan,是线性扫描。那么转入请求中的Limit,就是限制多少个。首先根据Seek定位到第一个key,然后向后读Limit个,同时要注意下一个的合法性,具体见代码。
// The functions below are Server's Raw API. (implements TinyKvServer).
// Some helper methods can be found in sever.go in the current directory
// 上层允许err时返回nil结构体
// RawGet return the corresponding Get response based on RawGetRequest's CF and Key fields
func (server *Server) RawGet(_ context.Context, req *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error) {
// Your Code Here (1).
reader, err := server.storage.Reader(req.Context)
if err != nil {
return nil, err
}
val, err := reader.GetCF(req.Cf, req.Key)
if err != nil {
return nil, err
}
resp := &kvrpcpb.RawGetResponse{
Value: val,
NotFound: false,
}
if val == nil {
resp.NotFound = true
}
return resp, nil
}
// RawPut puts the target data into storage and returns the corresponding response
func (server *Server) RawPut(_ context.Context, req *kvrpcpb.RawPutRequest) (*kvrpcpb.RawPutResponse, error) {
// Your Code Here (1).
// Hint: Consider using Storage.Modify to store data to be modified
put := storage.Put{
Key: req.Key,
Value: req.Value,
Cf: req.Cf,
}
batch := storage.Modify{Data: put}
err := server.storage.Write(req.Context, []storage.Modify{batch})
if err != nil {
return nil, err
}
return &kvrpcpb.RawPutResponse{}, nil
}
// RawDelete delete the target data from storage and returns the corresponding response
func (server *Server) RawDelete(_ context.Context, req *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error) {
// Your Code Here (1).
// Hint: Consider using Storage.Modify to store data to be deleted
del := storage.Delete{
Key: req.Key,
Cf: req.Cf,
}
batch := storage.Modify{Data: del}
err := server.storage.Write(req.Context, []storage.Modify{batch})
if err != nil {
return nil, err
}
return &kvrpcpb.RawDeleteResponse{}, nil
}
// RawScan scan the data starting from the start key up to limit. and return the corresponding result
func (server *Server) RawScan(_ context.Context, req *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) {
// Your Code Here (1).
// Hint: Consider using reader.IterCF
reader, err := server.storage.Reader(req.Context)
if err != nil {
return nil, err
}
var Kvs []*kvrpcpb.KvPair
limit := req.Limit
iter := reader.IterCF(req.Cf)
defer iter.Close()
for iter.Seek(req.StartKey); iter.Valid(); iter.Next() {
item := iter.Item()
key := item.Key()
val, _ := item.Value()
Kvs = append(Kvs, &kvrpcpb.KvPair{
Key: key,
Value: val,
})
limit--
if limit == 0 {
break
}
}
resp := &kvrpcpb.RawScanResponse{
Kvs: Kvs,
}
return resp, nil
}
单元测试
pass咯~~~
root@wxf:/tinykv# make project1
GO111MODULE=on go test -v --count=1 --parallel=1 -p=1 ./kv/server -run 1
=== RUN TestRawGet1
--- PASS: TestRawGet1 (0.85s)
=== RUN TestRawGetNotFound1
--- PASS: TestRawGetNotFound1 (1.05s)
=== RUN TestRawPut1
--- PASS: TestRawPut1 (2.91s)
=== RUN TestRawGetAfterRawPut1
--- PASS: TestRawGetAfterRawPut1 (3.94s)
=== RUN TestRawGetAfterRawDelete1
--- PASS: TestRawGetAfterRawDelete1 (3.69s)
=== RUN TestRawDelete1
--- PASS: TestRawDelete1 (1.54s)
=== RUN TestRawScan1
--- PASS: TestRawScan1 (1.04s)
=== RUN TestRawScanAfterRawPut1
--- PASS: TestRawScanAfterRawPut1 (1.03s)
=== RUN TestRawScanAfterRawDelete1
--- PASS: TestRawScanAfterRawDelete1 (0.88s)
=== RUN TestIterWithRawDelete1
--- PASS: TestIterWithRawDelete1 (1.08s)
PASS
ok github.com/pingcap-incubator/tinykv/kv/server 18.027s
root@wxf:/tinykv#