跳至主要內容

Go 实现 TCP 服务端&客户端并维持心跳

Mayee...大约 5 分钟

前言

最近用 Go 开发了 TCP 的代理服务,经过一周的洗礼,感觉有些悟道,因此决定再巩固一遍,加点料,实现服务端与客户端心跳,服务端主动踢除不活跃客户端。

1. 服务端

服务端代码server.go

package main

import (
	"20220923/internal/enum"
	_ "20220923/internal/log"
	"20220923/internal/model"
	"20220923/internal/packet"
	"context"
	"errors"
	"go.uber.org/zap"
	"golang.org/x/sync/errgroup"
	"net"
	"os"
	"os/signal"
	"time"
)

func main() {
	server := newTCPServer()
	if err := server.Start(); err != nil {
		panic(err)
	}

	shutdown := make(chan os.Signal, 1)
	// 监听 os.Interrupt, os.Kill 这两种信号,一但收到信号,则将信号写入 shutdown 中
	signal.Notify(shutdown, os.Interrupt, os.Kill)
	<-shutdown

	_ = server.Stop()
}

func handler(conn *net.TCPConn) {
	// 包装
	s := packet.NewSession(conn)
	// 错误恢复
	defer func() {
		if exp := recover(); exp != nil {
			zap.S().Errorf("[recovered] panic: %v\n", exp)
		}
		_ = s.Close()
		zap.S().Warn("disconnect with client")
	}()
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 登录请求
	loginPkt, err := s.ReadPacket(ctx)
	if err != nil {
		panic(err)
	}
	loginMsg, err := loginPkt.ReadMessage()
	if err != nil {
		panic(err)
	}
	loginResp := model.NewLoginResponse()
	if loginMsg.Type() != enum.MsgTypeLogin {
		loginResp.SessionStatus = 5
		loginRespPkt := new(packet.Buffer)
		err = loginRespPkt.WriteMessage(loginResp)
		if err != nil {
			panic(err)
		}
		err = s.WritePacket(loginRespPkt)
		if err != nil {
			panic(err)
		}
		return
	}
	login := loginMsg.(*model.Login)
	uname := login.Username.String()
	passw := login.Password.String()
	if uname != serverUsername || passw != serverPassword {
		loginResp.SessionStatus = 5
		loginRespPkt := new(packet.Buffer)
		err = loginRespPkt.WriteMessage(loginResp)
		err = s.WritePacket(loginRespPkt)
		if err != nil {
			panic(err)
		}
		return
	} else {
		loginResp.SessionStatus = 0
		loginRespPkt := new(packet.Buffer)
		err = loginRespPkt.WriteMessage(loginResp)
		err = s.WritePacket(loginRespPkt)
		if err != nil {
			panic(err)
		}
	}
	// errgroup 是 waitgroup 的一个包装,同样实现了一个 goroutine 等待多个 goroutine,同时可以返回 error
	// errgroup 中自动创建了 context.WithCancel。当任一个 eg.Go 中 return 了 error,会执行 ctx 的 cancel,return nil 则不不会执行 cancel,但他们都会调用内部的 wg.Done()
	eg, ctx := errgroup.WithContext(ctx)
	ch := make(chan *packet.Buffer)
	heart := make(chan byte)

	// 读取客户端消息
	eg.Go(func() error {
		for {
			pkt, err := s.ReadPacket(ctx)
			if err != nil {
				return err
			}
			msg, err := pkt.ReadMessage()
			if err != nil {
				return err
			}
			switch msg.Type() {
			case enum.MsgTypeLogin:
				loginResp = model.NewLoginResponse()
				loginResp.SessionStatus = 100
				p := new(packet.Buffer)
				_ = p.WriteMessage(loginResp)
				ch <- p
			case enum.MsgTypeHeartBeat:
				// 写入一个任意值
				heart <- 0
			case enum.MsgTypeClientDemo:
				// 响应数据
				m := model.NewServerDemo()
				m.Addr.Set([]byte("127.0.0.1"))
				m.Port = serverPort
				m.Remark.Set([]byte("server response: ok"))
				p := new(packet.Buffer)
				err = p.WriteMessage(m)
				if err != nil {
					return err
				}
				ch <- p
			}
		}
	})

	// 维持心跳
	eg.Go(func() error {
		// 心跳间隔
		ticker := time.NewTicker(time.Second * 5)
		// 大于 3 倍心跳间隔
		timer := time.NewTimer(time.Second * 17)
		m := model.NewHeartBeat()
		for {
			select {
			case <-ticker.C:
				p := new(packet.Buffer)
				_ = p.WriteMessage(m)
				ch <- p
			case <-heart:
				zap.S().Debug("receive client heartBeat")
				// 重置心跳超时
				if !timer.Stop() {
					select {
					case <-timer.C: // 确保定时器中的 channel 被排空
					default:
					}
				}
				timer.Reset(time.Second * 17)
			case <-timer.C:
				return errors.New("client heart timeout")
			case <-ctx.Done():
				return nil
			}
		}
	})

	// 写回客户端消息
	eg.Go(func() error {
		for {
			select {
			case p := <-ch:
				err = s.WritePacket(p)
				if err != nil {
					return err
				}
			case <-ctx.Done():
				return nil
			}
		}
	})

	if err := eg.Wait(); err != nil {
		panic(err)
	}
}

type TCPServer struct {
	lis *net.TCPListener

	userName string
	password string
}

const (
	serverUsername = "mayee"
	serverPassword = "mayee"
	serverPort     = 30001
)

func newTCPServer() *TCPServer {
	lis, _ := net.ListenTCP("tcp", &net.TCPAddr{Port: serverPort})
	return &TCPServer{
		lis:      lis,
		userName: serverUsername,
		password: serverPassword,
	}
}

func (s *TCPServer) Start() error {
	for {
		conn, err := s.lis.AcceptTCP()
		if err != nil {
			return err
		}
		go handler(conn)
	}
}

func (s *TCPServer) Stop() error {
	return s.lis.Close()
}

2. 客户端

客户端代码client.go

package main

import (
	"20220923/internal/enum"
	_ "20220923/internal/log"
	"20220923/internal/model"
	"20220923/internal/packet"
	"context"
	"fmt"
	"go.uber.org/zap"
	"golang.org/x/sync/errgroup"
	"net"
	"time"
)

func main() {
	conn, err := net.DialTimeout("tcp", ":30001", time.Second*2)
	if err != nil {
		panic(err)
	}
	s := packet.NewSession(conn)
	defer func() {
		if exp := recover(); exp != nil {
			zap.S().Errorf("[recovered] panic: %v\n", exp)
		}
		_ = s.Close()
		zap.S().Warn("disconnect with server")
	}()
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 登录
	login := model.NewLogin()
	login.Username.Set([]byte("mayee"))
	login.Password.Set([]byte("mayee"))
	loginPkt := new(packet.Buffer)
	err = loginPkt.WriteMessage(login)
	if err != nil {
		panic(err)
	}
	err = s.WritePacket(loginPkt)
	if err != nil {
		panic(err)
	}
	loginRespPkt, err := s.ReadPacket(ctx)
	if err != nil {
		panic(err)
	}
	message, err := loginRespPkt.ReadMessage()
	if err != nil {
		panic(err)
	}
	if message.Type() != enum.MsgTypeLoginResponse {
		panic("login response type error")
	}
	response := message.(*model.LoginResponse)
	if response.SessionStatus != 0 {
		panic(fmt.Errorf("lgoin response %d", response.SessionStatus))
	}

	eg, ctx := errgroup.WithContext(ctx)
	ch := make(chan *packet.Buffer)

	// 接收消息
	eg.Go(func() error {
		for {
			pkt, err := s.ReadPacket(ctx)
			if err != nil {
				return err
			}
			m, err := pkt.ReadMessage()
			if err != nil {
				return err
			}
			switch m.Type() {
			case enum.MsgTypeHeartBeat:
				zap.S().Debug("receive server heartBeat")
				// 响应心跳
				// 必须要重新写入一次 message, 因为 p.ReadMessage 后,p.num 为 0 了。若此时发送 p 后,服务端调用 p.ReadMessage, 发现 p.num 为 0 则手动抛出 io.EOF
				_ = pkt.WriteMessage(m)
				ch <- pkt
			case enum.MsgTypeServerDemo:
				demo := m.(*model.ServerDemo)
				zap.S().Info(demo.String())
			}
		}
	})

	// 发送消息
	eg.Go(func() error {
		demo := model.NewClientDemo()
		t := time.NewTicker(time.Second * 2)
		for {
			select {
			case <-t.C:
				p := new(packet.Buffer)
				_ = p.WriteMessage(demo)
				err = s.WritePacket(p)
				if err != nil {
					panic(err)
				}
			case p := <-ch:
				err = s.WritePacket(p)
				if err != nil {
					return err
				}
			case <-ctx.Done():
				return nil
			}
		}
	})

	if err = eg.Wait(); err != nil {
		panic(err)
	}
}

3.实现效果

3.1. 客户端正常

client 端输出:

[2022-09-26 22:59:34.355]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 22:59:36.354]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 22:59:37.356]       [DEBUG] [client/client.go:76]   receive server heartBeat
[2022-09-26 22:59:38.355]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 22:59:40.351]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 22:59:42.356]       [DEBUG] [client/client.go:76]   receive server heartBeat
[2022-09-26 22:59:42.356]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 22:59:44.352]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 22:59:46.350]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 22:59:47.357]       [DEBUG] [client/client.go:76]   receive server heartBeat
[2022-09-26 22:59:48.348]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 22:59:50.348]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 22:59:52.350]       [DEBUG] [client/client.go:76]   receive server heartBeat
[2022-09-26 22:59:52.350]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 22:59:54.354]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}

server 端输出:

[2022-09-26 22:59:37.356]       [DEBUG] [server/server.go:146]  receive client heartBeat
[2022-09-26 22:59:42.356]       [DEBUG] [server/server.go:146]  receive client heartBeat
[2022-09-26 22:59:47.357]       [DEBUG] [server/server.go:146]  receive client heartBeat
[2022-09-26 22:59:52.350]       [DEBUG] [server/server.go:146]  receive client heartBeat

客户端与服务端保持正常通信。

3.2. 客户端失活

client 端输出:

[2022-09-26 23:00:56.907]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 23:00:58.909]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 23:00:59.910]       [DEBUG] [client/client.go:76]   receive server heartBeat
[2022-09-26 23:01:00.897]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 23:01:02.906]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 23:01:04.901]       [DEBUG] [client/client.go:76]   receive server heartBeat
[2022-09-26 23:01:04.901]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 23:01:06.899]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 23:01:08.909]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 23:01:09.898]       [DEBUG] [client/client.go:76]   receive server heartBeat
[2022-09-26 23:01:10.900]       [DEBUG] [client/client.go:84]   {"MsgSize":41,"MsgType":100,"Addr":127.0.0.1,"Port":30001,"Remark":server response: ok}
[2022-09-26 23:01:11.915]       [ERROR] [client/client.go:23]   [recovered] panic: EOF

[2022-09-26 23:01:11.915]       [WARN]  [client/client.go:26]   disconnect with server

server 端输出:

[2022-09-26 23:01:11.900]       [ERROR] [server/server.go:38]   [recovered] panic: client heart timeout

[2022-09-26 23:01:11.915]       [WARN]  [server/server.go:41]   disconnect with client

服务端连续发起三次心跳客户端没有回应后,服务端断开与客户端的连接。


Tip:本文完整示例代码已上传至 Giteeopen in new window