Codis源码解析——codis-server添加到集群

上一篇,我们成功在集群中添加了proxy。这一篇来讲讲codis-server添加到集群的过程中发生了什么。

第一步,先别急着添加server,而应该是创建分组。创建分组的过程很简单,主要就是校验group的id在不在1~9999这个范围内,如果在的话(以group1为例),就调用zkClient创建路径/codis3/codis-wujiang/group/group-0001,初识创建好的时候,这个group下面是空的。

{
    "id": 1,
    "servers": [],
    "promoting": {},
    "out_of_sync": false
}

接下来,向group中添加codis-server。这个过程中对redis的操作使用了/github/garyburd/redigo这个第三方redis服务。

首先,根据redis的地址新建一个redis客户端

func NewClient(addr string, auth string, timeout time.Duration) (*Client, error) {
    //调用了/github/garyburd/redigo/redis/conn.go,返回的redisClient中包含了connection,redis地址以及上次的使用时间等信息
    c, err := redigo.Dial("tcp", addr, []redigo.DialOption{
        redigo.DialConnectTimeout(math2.MinDuration(time.Second, timeout)),
        redigo.DialPassword(auth),
        redigo.DialReadTimeout(timeout), redigo.DialWriteTimeout(timeout),
    }...)
    if err != nil {
        return nil, errors.Trace(err)
    }
    return &Client{
        conn: c, Addr: addr, Auth: auth,
        LastUse: time.Now(), Timeout: timeout,
    }, nil
}
//RedisClient结构,对于每台redis服务器,都会有多个连接,过期的连接将会被清除
type Client struct {
    conn redigo.Conn
    Addr string
    Auth string

    Database int

    LastUse time.Time
    Timeout time.Duration
}

返回的redisClient结构如下所示

这里写图片描述

下一步,获取槽的信息,并且将server挂到当前group下

func (c *Client) SlotsInfo() (map[int]int, error) {
    //获取当前client的所有slot的信息,由于我们这一步还没有给group分配槽,所以这里得到的reply是一个len和cap都为0的interface切片
    if reply, err := c.Do("SLOTSINFO"); err != nil {
        return nil, errors.Trace(err)
    } else {
        //这里的infos也是len和cap都为0的interface切片
        infos, err := redigo.Values(reply, nil)
        if err != nil {
            return nil, errors.Trace(err)
        }
        slots := make(map[int]int)
        for i, info := range infos {
            p, err := redigo.Ints(info, nil)
            if err != nil || len(p) != 2 {
                return nil, errors.Errorf("invalid response[%d] = %v", i, info)
            }
            slots[p[0]] = p[1]
        }
        //slots也只相当于初始化了结构,但是没有填充数据
        return slots, nil
    }
}
//将server挂在group下,传入的gid就是1,addr是codis-server的地址,datacenter由于没有填写,所以dc是空字符串
func (s *Topom) GroupAddServer(gid int, dc, addr string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    //这个newContext方法我们之前说过很多遍了,就是重新填充上下文中的cache,如果cache为空,就从store中读出填入cache
    ctx, err := s.newContext()
    if err != nil {
        return err
    }

    if addr == "" {
        return errors.Errorf("invalid server address")
    }

    //同一group下重复添加server的校验
    for _, g := range ctx.group {
        for _, x := range g.Servers {
            if x.Addr == addr {
                return errors.Errorf("server-[%s] already exists", addr)
            }
        }
    }

    //从上下文中根据gid取出group的详细信息
    g, err := ctx.getGroup(gid)
    if err != nil {
        return err
    }
    if g.Promoting.State != models.ActionNothing {
        return errors.Errorf("group-[%d] is promoting", g.Id)
    }

    //此时集群中并没有sentinel
    if p := ctx.sentinel; len(p.Servers) != 0 {
        defer s.dirtySentinelCache()
        p.OutOfSync = true
        if err := s.storeUpdateSentinel(p); err != nil {
            return err
        }
    }
    defer s.dirtyGroupCache(g.Id)

    //将新增的codis-server地址追加到当前group的Servers属性后面
    g.Servers = append(g.Servers, &models.GroupServer{Addr: addr, DataCenter: dc})
    //更新zk路径,/codis3/codis-wujiang/group/group-0001下面的文件内容
    return s.storeUpdateGroup(g)
}
//Group结构
type Group struct {
    Id      int            `json:"id"`
    Servers []*GroupServer `json:"servers"`

    Promoting struct {
        Index int    `json:"index,omitempty"`
        State string `json:"state,omitempty"`
    } `json:"promoting"`

    OutOfSync bool `json:"out_of_sync"`
}

下图是ctx, err := s.newContext()这一步取出的ctx,可以看到,此时上下文中group的Servers属性还是空

这里写图片描述

下图是根据id从上下文取出的group添加了codis-server之后的情况,可以看到,此时在当前Group下已经多了刚添加的codis-server地址

这里写图片描述

下一步,我们要给刚刚添加的主codis-server添加一个从codis-server,流程和刚才是一样的,第一次添加的时候,两个都是主,要点击下面那个绿色的类似于设置的按钮,将第二个服务器变成第一个的从服务器

这里写图片描述

//这里传入的addr就是第二台codis-server服务器的地址
func (s *Topom) SyncCreateAction(addr string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    //此时的上下文的group属性中的Servers已经初始化了两个/pkg/models.GroupServer结构
    ctx, err := s.newContext()
    if err != nil {
        return err
    }

    //遍历上下文的group属性的Server,如果地址与传入的addr匹配,就可以找到对应的Group g
    g, index, err := ctx.getGroupByServer(addr)
    if err != nil {
        return err
    }
    if g.Promoting.State != models.ActionNothing {
        return errors.Errorf("group-[%d] is promoting", g.Id)
    }

    if g.Servers[index].Action.State == models.ActionPending {
        return errors.Errorf("server-[%s] action already exist", addr)
    }
    defer s.dirtyGroupCache(g.Id)

    //遍历g中的每个Server的Action.Index,取最大值加一赋值给新增的这个Server
    //因为此时所有Server的Action.Index都是0,所以这里将新增Server的Action.Index设置为1
    g.Servers[index].Action.Index = ctx.maxSyncActionIndex() + 1

    //因为我们这里是把新增的这个6380的机器作为从服务器,所以这里要把它的Action.State设置为pending
    g.Servers[index].Action.State = models.ActionPending
    //更新zk路径
    return s.storeUpdateGroup(g)
}

完成上述动作之后,Group的状态变为

这里写图片描述

并且zk下已经在Group1中注册了两台服务器的信息

这里写图片描述

再看界面,同属于Group1的两台codis-server的主从关系已经很明显

这里写图片描述

启动dashboard时有刷新redis状态的goroutine,在成功添加两台codis-server后,这个goroutine的作用也比较明显了,详见笔者之前的一篇博客Codis源码解析——dashboard的启动(2)

说明
如有转载,请注明出处
http://blog.csdn.net/antony9118/article/details/76589129

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值