现在的服务器,只需要舍得堆配置,单台服务器接入100W连接问题也不大,但是总有避免单点故障或其他场景的需要,我们要把网关设计成允许多节点的服务。由此也会衍生出一系列如客户端状态管理、消息的路由处理等问题需要处理,下面是一个简单的分布式TCP接入网关实现和说明。
组件介绍
TCP接入服务
- 提供TCP接入接口
- 心跳检测和连接管理
会话管理服务
- 维护设备连接状态
- 记录设备接入节点状态
消息路由服务
- 消息路由和装发
- 设备离线后的消息保存
设备接入流程
设备A -> 网关A -> 会话管理服务
消息流程
设备A -> 网关A -> 消息解码 -> 会话管理服务 -> 消息路由服务 -> 网关B -> 设备B
离线消息
设备A -> 网关A -> 会话管理服务 -> 消息路由服务 -> 离线消息-> 设备A
代码实现
TCP接入服务
// Conn tcp连接
type Conn struct {
conn net.Conn
msgs chan []byte
ctx context.Context
cancel context.CancelFunc
once sync.Once
onMsg func(c *Conn, msg []byte)
onCloseConn func(c *Conn)
Uid string
}
// TcpServer tcp服务
type TcpServer struct {
Port int // 端口号
OnConnClose func(c *Conn) // 连接关闭调用函数
OnMsg func(c *Conn, msg []byte) // 接收到消息调用函数
}
// Start 启动服务
func (t *TcpServer) Start(c context.Context) error {
listen, err := net.Listen("tcp", fmt.Sprintf(":%d", t.Port))
if err != nil {
log.Println(err)
return err
}
for {
accept, err := listen.Accept()
if err != nil {
log.Println("[tcp] connect error: ", err)
continue
}
conn := Conn{
conn: accept,
msgs: make(chan []byte, 10),
onMsg: t.OnMsg,
onCloseConn: t.OnConnClose,
}
conn.ctx, conn.cancel = context.WithCancel(c)
log.Println("[tcp] connect success", conn.conn.RemoteAddr())
// 启动读写 goroutine
go conn.write()
go conn.read()
}
}
func (c *Conn) Write(msg []byte) {
c.msgs <- msg
}
// close 关闭连接 释放资源,并且回调关闭连接的回调函数
func (c *Conn) close() {
c.once.Do(func() {
c.onCloseConn(c)
c.cancel()
close(c.msgs)
_ = c.conn.Close()
})
}
func (c *Conn) write() {
for {
msg := <-c.msgs:
_, err := c.conn.Write(msg)
}
}
func (c *Conn) read() {
for {
scanner := bufio.NewReader(c.conn)
text, err := scanner.ReadString('\n')
if err != nil {
log.Println("[tcp] read message error: ", err)
c.close()
return
}
if text == "" {
continue
}
c.onMsg(c, []byte(text))
}
}
会话管理服务
这里只是列了接口,实际可以按需实现,如使用redis的Hset保存客户端状态
// SessionMgt 会话管理
type SessionMgt interface {
// FindSession 查找用户对应的节点
FindSession(clientId string) (string, error)
// SetSession 设置用户对应的节点
SetSession(clientId string, node string) error
// DeleteSession 离线或者断开连接时删除
DeleteSession(user string)
}
消息路由服务
这里只是列了接口,实际可以按需实现
- 使用redis的List作为消息队列和离校消息存储
- 或者可以使用RPC调用其他节点暴露的接口
// Message 消息
type Message struct {
Type MessageType
From string
To string
Msg string
}
// MsgRepo 消息存储
type MsgRepo interface {
PutOfflineMsg(msg Message) error // 存储离线消息
GetOfflineMsg(userId string) []Message // 获取离线消息
SendMsg(node string, msg Message) error // 发送消息
ConsumeMsg(node string, f func(msg Message)) // 消费消息
}
网关节点
// GatewayNode 网关节点
type GatewayNode struct {
Node string // 节点名称
localSessionTable sync.Map
TcpServer *TcpServer
SessionMgt SessionMgt
MsgRepo MsgRepo
}
func (d *GatewayNode) Start(c context.Context) error {
go d.consumeMsg()
return d.TcpServer.Start(c)
}
// 消费消息
func (d *GatewayNode) consumeMsg() {
d.MsgRepo.ConsumeMsg(d.Node, func(msg Message) {
switch msg.Type {
case MessageTypeSend:
// 发送消息 ,如果在线,那么就直接发送
if user, ok := d.localSessionTable.Load(msg.To); ok {
user.(*Conn).Write(msg.ToByte())
} else {
// 如果不在线,那么就重新路由消息
d.routeMsg(msg)
}
}
})
}
// routeMsg 路由消息
// 通过会话管理器,查找用户连接的节点
// 往节点推送消息
// 如果不在线,那么就存储到离线队列中
func (d *GatewayNode) routeMsg(msg Message) {
if val, ok := d.localSessionTable.Load(msg.To); ok {
conn := val.(*Conn)
conn.Write(msg.ToByte())
}
node, err := d.SessionMgt.FindSession(msg.To)
if err != nil {
return
}
if node == "" {
err = d.MsgRepo.PutOfflineMsg(msg)
} else {
err = d.MsgRepo.SendMsg(node, msg)
}
log.Println("路由消息", err)
}
// OnMsg 消息处理
func (d *GatewayNode) OnMsg(c *Conn, bs []byte) {
log.Println("收到消息", string(bs))
var msg Message
err := json.Unmarshal(bs, &msg)
if err != nil {
return
}
if msg.Type == MessageTypeRegister {
c.Uid = msg.From
d.localSessionTable.Store(msg.From, c)
err := d.SessionMgt.SetSession(msg.From, d.Node)
if err != nil {
log.Println("设置用户节点失败", err)
return
}
// 注册成功后,获取离线消息
offlineMsg := d.MsgRepo.GetOfflineMsg(msg.From)
for _, message := range offlineMsg {
d.routeMsg(message)
}
} else if msg.Type == MessageTypeSend {
d.routeMsg(msg)
}
}
// OnConnClose 连接关闭处理
func (d *GatewayNode) OnConnClose(c *Conn) {
d.localSessionTable.Delete(c.Uid)
d.SessionMgt.DeleteSession(c.Uid)
}
下面是运行效果,为了演示加了简单的窗口效果,可以看到两个客户端连接到不同的服务端,都能接收到对方的消息。