跳到主要内容

避坑-5-TCP粘包

一 TCP粘包优雅处理

1.1 粘包产生的原因

在使用TCP协议分高发送数据时,为了提高发送效率,需要对封包进行拼包处理,将小包凑成大包,在TCP层可以节约包头的大小损耗,I/O层的调用损耗也可以有所降低。但此时也容易形成粘包,也就是没有按照预期的大小得到数据,数据包不完整。

![] (tcpread-01.png)

在接收TCP封包时,接收缓冲区的大小与发送过来的TCP传输单元大小不等,这时候会造成两种情况:

  • 接收的数据大于等于接收缓冲区大小时,此时需要将数据复制到用户缓冲,接着读取后面的封包。
  • 接收的数据小于接收缓冲区大小时,此时需要继续等待后续的 TCP 封包。

在go语言的io包中有个函数ReadAtLeast()用来处理封装

func ReadAtLeast(r Reader, buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, ErrShortBuffer
}
for n < min && err == nil {
var nn int
nn, err = r.Read(buf[n:])
n += nn
}
if n >= min {
err = nil
} else if n > 0 && err == EOF {
err = ErrUnexpectedEOF
}
return
}

在else判断中,读取目标已经完结,但是己经读取一些数据,也就是说,没法完成读取任务,发生了不可期望的终结错误。

ReadAtLeast还有个更好的封装:

func ReadFull(r Reader, buf []byte, min int) (n int, err error) {
return ReadAtLeast(r, buf, len(buf))
}

这个函数只需要提供buf接收缓冲区切片,就可以将这个己经分配的buf填充满 。简单地说就是: 给多大空间,填充多少字节,直到填满空间。
使用ReadFull可以优雅地完成对TCP粘包的处理。

1.2 封包发送

封包格式:

  • Size:大小,代表Size后面包体的大小
  • Body:消息的二进制数据

在发送封包时,需要将封包的Size字段从无符号十六位整型转换为字节数组(利用binary包的Write函数):

// 二进制封包格式
type Packet struct {
Size uint16 // 包体大小
Body []byte // 包体数据
}

// 将数据写入dataWriter
func writePacket(dataWriter io.Writer, data []byte) error {

// 准备一个字节数组缓冲
var buf bytes.Buffer

// 将Size写入缓冲
err := binary.Write(&buf, binary.LittleEndian, uint16(len(data)))
if err != nil {
return err
}

// 写入包体数据
_, err = buf.Write(data)
if err != nil {
return err
}

// 获取写入的完整数据
out := buf.Bytes()

// 写入完整数据
_, err = dataWriter.Write(out)
if err != nil {
return err
}

return nil
}

1.3 连接器

连接器可以通过给定的地址和发送次数,不断地通过Socket给地址对应的连接发送封包。本例中,将通过循环构造的转为字符串的数字形成封包。由于速度快,包量大,因此能有效地造成粘包:

func Connector(addr string, sendTimes int) {

// 创建连接
conn, err := net.Dial("tpc", addr)
if err != nil {
fmt.Println("连接错误:", err)
return
}

// 循环请求
for i := 0; i < sendTimes; i++ {
str := strconv.Itoa(i) // 将循环序号转化为字符串
err := writePacket(conn, []byte(str))
if err != nil {
fmt.Println("发送错误:", err)
break
}
}
}

1.4 接受器

连接器( Connector)只能发起到接收器( Acceptor)的连接, 一个接受器能接受多个来源连接。

接受器中侦昕的过程被放到一个独立的goroutine中,保证侦昕的过程不会阻塞其他逻辑的执行。在侦昕器停止时,需要使用 sync.WaitGroup进行同步,确认侦昕过程正常结 束。接受器不停的接受连接,当获取到连接时,为连接创建一个并发的会话处理的goroutine ( handleSession 函数)。

接受器开始侦昕后,使用 Wait()方法等待侦昕结束。当处理粘包的任务完成时俗,AcceptorStop()方法会被调用,结束侦昕,同时退出整个程序。接受器的实现过程如下:

// 接收器
type Acceptor struct {
ls net.Listener // 保存侦听器
wg sync.WaitGroup // 侦听器同步
OnSeesionData func(net.Conn, []byte) bool
}

// 异步开始侦听
func (a *Acceptor) Start(addr string) {
go func(addr string) {
a.wg.Add(1) // 侦听开始时,添加一个任务
defer a.wg.Done()

var err error
a.ls, err = net.Listen("tcp", addr)
if err != nil {
fmt.Println("listen err = ", err)
return
}
// 侦听循环
for {
conn, err := a.ls.Accept() // 新连接没到来时,Accept阻塞
if err != nil {
break
}
go handleSession(conn, a.OnSeesionData)
}
}(addr)
}

// 停止侦听
func (a *Acceptor) Stop() {
a.ls.Close()
}

// 等待侦听完全停止
func (a *Acceptor) Wait() {
a.wg.Wait()
}

func NewAcceptor() *Acceptor {
return &Acceptor{}
}

1.5 封包读取

本例中的 readPacket()函数可以从 dataReader 中读取封包数据,并且将封包的 Size 和 Body 部分分离并返回 Packet 结构:

func readPacket(dataReader io.Reader) (pkt Packet, err error) {
var sizeBuffer = make([]byte, 2) // Size为uint16类型,占据2个字节

// 持续读取Size,直到读到为止
_, err = io.ReadFull(dataReader, sizeBuffer)
if err != nil {
return
}

// 读取数据
sizeReader := bytes.NewReader(sizeBuffer)
err = binary.Read(sizeReader, binary.LittleEndian, &pkt.Size)
if err != nil {
return
}

// 分配包体大小
pkt.Body = make([]byte, pkt.Size)

// 读取包体数据
_, err = io.ReadFull(dataReader, pkt.Body)
return
}

1.6 会话处理

务器在接受一个连接后就会进入会话( Session )处理。会话处理是一个循环,不停地从 Socket 中接收数据并通过 readPacket()函数将数据转换为封包。会话处理会传入一个处理数据的回调,处理好的数据会通过这个回调传送到用户的逻辑回调函数( callback) :

// 连接的会话逻辑
func handleSession(conn net.Conn, callback func(net.Conn, []byte) bool) {
dataReader := bufio.NewReader(conn) // 创建socket读取器
for { // 循环接收数据
pkt, err := readPacket(dataReader)
if err != nil || !callback(conn, pkt.Body) {
// 回调要求退出
conn.Close()
break
}
}
}

1.5 测试粘包处理

func main() {

const TESTCOUNT = 10000 // 测试次数
const addr = "127.0.0.1:3000"

var recCounter int // 接收器

a := NewAcceptor()
a.Start(addr)

a.OnSeesionData = func(conn net.Conn, data []byte) bool {
str := string(data)
n, err := strconv.Atoi(str)
if err != nil || recCounter != n {
panic("failed")
}
recCounter++

if recCounter >= TESTCOUNT {
a.Stop()
return false
}

return true
}

// 连接器不断发送数据
Connector(addr, TESTCOUNT)

// 等待侦听器结束
a.Wait()
}