现在的服务器,只需要舍得堆配置,单台服务器接入100W连接问题也不大,但是总有避免单点故障或其他场景的需要,我们要把网关设计成允许多节点的服务。由此也会衍生出一系列如客户端状态管理、消息的路由处理等问题需要处理,下面是一个简单的分布式TCP接入网关实现和说明。

组件介绍

TCP接入服务

  • 提供TCP接入接口
  • 心跳检测和连接管理

会话管理服务

  • 维护设备连接状态
  • 记录设备接入节点状态

消息路由服务

  • 消息路由和装发
  • 设备离线后的消息保存

设备接入流程

设备A -> 网关A -> 会话管理服务 

消息流程

设备A -> 网关A -> 消息解码 -> 会话管理服务 -> 消息路由服务 -> 网关B -> 设备B

离线消息

设备A -> 网关A  -> 会话管理服务 -> 消息路由服务 -> 离线消息-> 设备A

代码实现

TCP接入服务

// Conn tcp连接
type Conn struct {
	conn        net.Conn
	msgs        chan []byte
	ctx         context.Context
	cancel      context.CancelFunc
	once        sync.Once
	onMsg       func(c *Conn, msg []byte)
	onCloseConn func(c *Conn)
	Uid         string
}

// TcpServer tcp服务
type TcpServer struct {
	Port        int                       // 端口号
	OnConnClose func(c *Conn)             // 连接关闭调用函数
	OnMsg       func(c *Conn, msg []byte) // 接收到消息调用函数
}

// Start 启动服务
func (t *TcpServer) Start(c context.Context) error {
	listen, err := net.Listen("tcp", fmt.Sprintf(":%d", t.Port))
	if err != nil {
		log.Println(err)
		return err
	}
	for {
		accept, err := listen.Accept()
		if err != nil {
			log.Println("[tcp] connect error: ", err)
			continue
		}

		conn := Conn{
			conn:        accept,
			msgs:        make(chan []byte, 10),
			onMsg:       t.OnMsg,
			onCloseConn: t.OnConnClose,
		}
		conn.ctx, conn.cancel = context.WithCancel(c)
		log.Println("[tcp] connect success", conn.conn.RemoteAddr())
		// 启动读写 goroutine
		go conn.write()
		go conn.read()
	}
}
func (c *Conn) Write(msg []byte) {
	c.msgs <- msg
}

// close 关闭连接 释放资源,并且回调关闭连接的回调函数
func (c *Conn) close() {
	c.once.Do(func() {
		c.onCloseConn(c)
		c.cancel()
		close(c.msgs)
		_ = c.conn.Close()
	})
}
func (c *Conn) write() {
	for {
		msg := <-c.msgs:
		_, err := c.conn.Write(msg)
	}
}
func (c *Conn) read() {
	for {
		scanner := bufio.NewReader(c.conn)
		text, err := scanner.ReadString('\n')
		if err != nil {
			log.Println("[tcp] read message error: ", err)
			c.close()
			return
		}
		if text == "" {
			continue
		}
		c.onMsg(c, []byte(text))
	}
}

会话管理服务

这里只是列了接口,实际可以按需实现,如使用redis的Hset保存客户端状态

// SessionMgt 会话管理
type SessionMgt interface {
	// FindSession 查找用户对应的节点
	FindSession(clientId string) (string, error)
	// SetSession  设置用户对应的节点
	SetSession(clientId string, node string) error
	// DeleteSession 离线或者断开连接时删除
	DeleteSession(user string)
}

消息路由服务

这里只是列了接口,实际可以按需实现

  • 使用redis的List作为消息队列和离校消息存储
  • 或者可以使用RPC调用其他节点暴露的接口
// Message 消息
type Message struct {
	Type MessageType
	From string
	To   string
	Msg  string
}

// MsgRepo 消息存储
type MsgRepo interface {
	PutOfflineMsg(msg Message) error             // 存储离线消息
	GetOfflineMsg(userId string) []Message       // 获取离线消息
	SendMsg(node string, msg Message) error      // 发送消息
	ConsumeMsg(node string, f func(msg Message)) // 消费消息
}

网关节点

// GatewayNode 网关节点
type GatewayNode struct {
	Node              string // 节点名称
	localSessionTable sync.Map
	TcpServer         *TcpServer
	SessionMgt        SessionMgt
	MsgRepo           MsgRepo
}

func (d *GatewayNode) Start(c context.Context) error {
	go d.consumeMsg()
	return d.TcpServer.Start(c)
}

// 消费消息
func (d *GatewayNode) consumeMsg() {
	d.MsgRepo.ConsumeMsg(d.Node, func(msg Message) {
		switch msg.Type {
		case MessageTypeSend:
			// 发送消息 ,如果在线,那么就直接发送
			if user, ok := d.localSessionTable.Load(msg.To); ok {
				user.(*Conn).Write(msg.ToByte())
			} else {
				// 如果不在线,那么就重新路由消息
				d.routeMsg(msg)
			}
		}
	})
}

// routeMsg 路由消息
// 通过会话管理器,查找用户连接的节点
// 往节点推送消息
// 如果不在线,那么就存储到离线队列中
func (d *GatewayNode) routeMsg(msg Message) {
	if val, ok := d.localSessionTable.Load(msg.To); ok {
		conn := val.(*Conn)
		conn.Write(msg.ToByte())
	}
	node, err := d.SessionMgt.FindSession(msg.To)
	if err != nil {
		return
	}
	if node == "" {
		err = d.MsgRepo.PutOfflineMsg(msg)
	} else {
		err = d.MsgRepo.SendMsg(node, msg)
	}
	log.Println("路由消息", err)
}

// OnMsg 消息处理
func (d *GatewayNode) OnMsg(c *Conn, bs []byte) {
	log.Println("收到消息", string(bs))
	var msg Message
	err := json.Unmarshal(bs, &msg)
	if err != nil {
		return
	}
	if msg.Type == MessageTypeRegister {
		c.Uid = msg.From
		d.localSessionTable.Store(msg.From, c)
		err := d.SessionMgt.SetSession(msg.From, d.Node)
		if err != nil {
			log.Println("设置用户节点失败", err)
			return
		}
       		// 注册成功后,获取离线消息
		offlineMsg := d.MsgRepo.GetOfflineMsg(msg.From)
		for _, message := range offlineMsg {
			d.routeMsg(message)
		}
	} else if msg.Type == MessageTypeSend {
		d.routeMsg(msg)
	}
}

// OnConnClose 连接关闭处理
func (d *GatewayNode) OnConnClose(c *Conn) {
	d.localSessionTable.Delete(c.Uid)
	d.SessionMgt.DeleteSession(c.Uid)
}

下面是运行效果,为了演示加了简单的窗口效果,可以看到两个客户端连接到不同的服务端,都能接收到对方的消息。
tcp_gateway_node