脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服务器之家 - 脚本之家 - Golang - 使用go实现一个超级mini的消息队列的示例代码

使用go实现一个超级mini的消息队列的示例代码

2022-01-24 00:45壮士断臂 Golang

本文主要介绍了使用go实现一个超级mini的消息队列的示例代码,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

前言

趁着有空余时间,就想着撸一个mini的生产-消费消息队列,说干就干了。自己是个javer,这次实现,特意换用了go。没错,是零基础上手go,顺便可以学学go。

前置知识:

  • go基本语法
  • 消息队列概念,也就三个:生产者、消费者、队列

目的

  • 没想着实现多复杂,因为时间有限,就mini就好,mini到什么程度呢
  • 使用双向链表数据结构作为队列
  • 有多个topic可供生产者生成消息和消费者消费消息
  • 支持生产者并发写
  • 支持消费者读,且ok后,从队列删除
  • 消息不丢失(持久化)
  • 高性能(先这样想)

 

设计

整体架构

使用go实现一个超级mini的消息队列的示例代码

协议

通讯协议底层使用tcp,mq是基于tcp自定义了一个协议,协议如下

使用go实现一个超级mini的消息队列的示例代码

type Msg struct {
 Id int64
 TopicLen int64
 Topic string
 // 1-consumer 2-producer 3-comsumer-ack 4-error
 MsgType int64 // 消息类型
 Len int64 // 消息长度
 Payload []byte // 消息
}

Payload使用字节数组,是因为不管数据是什么,只当做字节数组来处理即可。Msg承载着生产者生产的消息,消费者消费的消息,ACK、和错误消息,前两者会有负载,而后两者负载和长度都为空

协议的编解码处理,就是对字节的处理,接下来有从字节转为Msg,和从Msg转为字节两个函数

func BytesToMsg(reader io.Reader) Msg {

 m := Msg{}
 var buf [128]byte
 n, err := reader.Read(buf[:])
 if err != nil {
    fmt.Println("read failed, err:", err)
 }
 fmt.Println("read bytes:", n)
 // id
 buff := bytes.NewBuffer(buf[0:8])
 binary.Read(buff, binary.LittleEndian, &m.Id)
 // topiclen
 buff = bytes.NewBuffer(buf[8:16])
 binary.Read(buff, binary.LittleEndian, &m.TopicLen)
 // topic
 msgLastIndex := 16 + m.TopicLen
 m.Topic = string(buf[16: msgLastIndex])
 // msgtype
 buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 8])
 binary.Read(buff, binary.LittleEndian, &m.MsgType)

 buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 16])
 binary.Read(buff, binary.LittleEndian, &m.Len)

 if m.Len <= 0 {
    return m
 }

 m.Payload = buf[msgLastIndex + 16:]
 return m
}

func MsgToBytes(msg Msg) []byte {
 msg.TopicLen = int64(len([]byte(msg.Topic)))
 msg.Len = int64(len([]byte(msg.Payload)))

 var data []byte
 buf := bytes.NewBuffer([]byte{})
 binary.Write(buf, binary.LittleEndian, msg.Id)
 data = append(data, buf.Bytes()...)

 buf = bytes.NewBuffer([]byte{})
 binary.Write(buf, binary.LittleEndian, msg.TopicLen)
 data = append(data, buf.Bytes()...)
 
 data = append(data, []byte(msg.Topic)...)

 buf = bytes.NewBuffer([]byte{})
 binary.Write(buf, binary.LittleEndian, msg.MsgType)
 data = append(data, buf.Bytes()...)
 
 buf = bytes.NewBuffer([]byte{})
 binary.Write(buf, binary.LittleEndian, msg.Len)
 data = append(data, buf.Bytes()...)
 data = append(data, []byte(msg.Payload)...)

 return data
}

队列

使用container/list,实现先入先出,生产者在队尾写,消费者在队头读取

package broker

import (
 "container/list"
 "sync"
)

type Queue struct {
 len int
 data list.List
}

var lock sync.Mutex

func (queue *Queue) offer(msg Msg) {
 queue.data.PushBack(msg)
 queue.len = queue.data.Len()
}

func (queue *Queue) poll() Msg{
 if queue.len == 0 {
    return Msg{}
 }
 msg := queue.data.Front()
 return msg.Value.(Msg)
}

func (queue *Queue) delete(id int64) {
 lock.Lock()
 for msg := queue.data.Front(); msg != nil; msg = msg.Next() {
    if msg.Value.(Msg).Id == id {
       queue.data.Remove(msg)
       queue.len = queue.data.Len()
       break
    }
 }
 lock.Unlock()
}

方法offer往队列里插入数据,poll从队列头读取数据素,delete根据消息ID从队列删除数据。这里使用Queue结构体对List进行封装,其实是有必要的,List作为底层的数据结构,我们希望隐藏更多的底层操作,只给客户提供基本的操作
delete操作是在消费者消费成功且发送ACK后,对消息从队列里移除的,因为消费者可以多个同时消费,所以这里进入临界区时加锁(em,加锁是否就一定会影响对性能有较大的影响呢)

broker

broker作为服务器角色,负责接收连接,接收和响应请求

package broker

import (
 "bufio"
 "net"
 "os"
 "sync"
 "time"
)

var topics = sync.Map{}

func handleErr(conn net.Conn)  {
 defer func() {
    if err := recover(); err != nil {
       println(err.(string))
       conn.Write(MsgToBytes(Msg{MsgType: 4}))
    }
 }()
}

func Process(conn net.Conn) {
 handleErr(conn)
 reader := bufio.NewReader(conn)
 msg := BytesToMsg(reader)
 queue, ok := topics.Load(msg.Topic)
 var res Msg
 if msg.MsgType == 1 {
    // comsumer
    if queue == nil || queue.(*Queue).len == 0{
       return
    }
    msg = queue.(*Queue).poll()
    msg.MsgType = 1
    res = msg
 } else if msg.MsgType == 2 {
    // producer
    if ! ok {
       queue = &Queue{}
       queue.(*Queue).data.Init()
       topics.Store(msg.Topic, queue)
    }
    queue.(*Queue).offer(msg)
    res = Msg{Id: msg.Id, MsgType: 2}
 } else if msg.MsgType == 3 {
    // consumer ack
    if queue == nil {
       return
    }
    queue.(*Queue).delete(msg.Id)

 }
 conn.Write(MsgToBytes(res))

}

MsgType等于1时,直接消费消息;MsgType等于2时是生产者生产消息,如果队列为空,那么还需创建一个新的队列,放在对应的topic下;MsgType等于3时,代表消费者成功消费,可以

删除消息

我们说消息不丢失,这里实现不完全,我就实现了持久化(持久化也没全部实现)。思路就是该topic对应的队列里的消息,按协议格式进行序列化,当broker启动时,从文件恢复
持久化需要考虑的是增量还是全量,需要保存多久,这些都会影响实现的难度和性能(想想Kafka和Redis的持久化),这里表示简单实现就好:定时器定时保存

func Save()  {
 ticker := time.NewTicker(60)
 for {
    select {
    case <-ticker.C:
       topics.Range(func(key, value interface{}) bool {
          if value == nil {
             return false
          }
          file, _ := os.Open(key.(string))
          if file == nil {
             file, _ = os.Create(key.(string))
          }
          for msg := value.(*Queue).data.Front(); msg != nil; msg = msg.Next() {
             file.Write(MsgToBytes(msg.Value.(Msg)))
          }
          _ := file.Close()
          return false
       })
    default:
       time.Sleep(1)
    }
 }
}

有一个问题是,当上面的delete操作时,这里的file文件需不需要跟着delete掉对应的消息?答案是需要删除的,如果不删除,只能等下一次的全量持久化来覆盖了,中间就有脏数据问题
下面是启动逻辑

package main

import (
 "awesomeProject/broker"
 "fmt"
 "net"
)

func main()  {
 listen, err := net.Listen("tcp", "127.0.0.1:12345")
 if err != nil {
    fmt.Print("listen failed, err:", err)
    return
 }
 go broker.Save()
 for {
    conn, err := listen.Accept()
    if err != nil {
       fmt.Print("accept failed, err:", err)
       continue
    }
    go broker.Process(conn)

 }
}

生产者

package main

import (
 "awesomeProject/broker"
 "fmt"
 "net"
)

func produce() {
 conn, err := net.Dial("tcp", "127.0.0.1:12345")
 if err != nil {
    fmt.Print("connect failed, err:", err)
 }
 defer conn.Close()

 msg := broker.Msg{Id: 1102, Topic: "topic-test",  MsgType: 2,  Payload: []byte("我")}
 n, err := conn.Write(broker.MsgToBytes(msg))
 if err != nil {
    fmt.Print("write failed, err:", err)
 }

 fmt.Print(n)
}

消费者

package main

import (
 "awesomeProject/broker"
 "bytes"
 "fmt"
 "net"
)

func comsume() {
 conn, err := net.Dial("tcp", "127.0.0.1:12345")
 if err != nil {
    fmt.Print("connect failed, err:", err)
 }
 defer conn.Close()

 msg := broker.Msg{Topic: "topic-test",  MsgType: 1}

 n, err := conn.Write(broker.MsgToBytes(msg))
 if err != nil {
    fmt.Println("write failed, err:", err)
 }
 fmt.Println("n", n)

 var res [128]byte
 conn.Read(res[:])
 buf := bytes.NewBuffer(res[:])
 receMsg := broker.BytesToMsg(buf)
 fmt.Print(receMsg)

 // ack
 conn, _ = net.Dial("tcp", "127.0.0.1:12345")
 l, e := conn.Write(broker.MsgToBytes(broker.Msg{Id: receMsg.Id, Topic: receMsg.Topic, MsgType: 3}))
 if e != nil {
    fmt.Println("write failed, err:", err)
 }
 fmt.Println("l:", l)
}

消费者这里ack时重新创建了连接,如果不创建连接的话,那服务端那里就需要一直从conn读取数据,直到结束。思考一下,像RabbitMQ的ack就有自动和手工的ack,如果是手工的ack,必然需要一个新的连接,因为不知道客户端什么时候发送ack,自动的话,当然可以使用同一个连接,but这里就简单创建一条新连接吧

启动

先启动broker,再启动producer,然后启动comsumer,OK,能跑,能实现发送消息到队列,从队列消费消息

 

总结

整体虽然简单,但毕竟是使用go实现的,就是看似一顿操作猛如虎,实质慌如狗。第一时间就被go的gopath和go mod困扰住,后面语法的使用,比如指针,传值传引用等,最头疼的就是类型转换,作为一个javer,使用go进行类型转换,着实被狠狠得虐了一番。

到此这篇关于使用go实现一个超级mini的消息队列的示例代码的文章就介绍到这了,更多相关go mini消息队列内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://juejin.cn/post/7041085344481017887

延伸 · 阅读

精彩推荐
  • Golanggolang如何使用struct的tag属性的详细介绍

    golang如何使用struct的tag属性的详细介绍

    这篇文章主要介绍了golang如何使用struct的tag属性的详细介绍,从例子说起,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看...

    Go语言中文网11352020-05-21
  • Golanggolang 通过ssh代理连接mysql的操作

    golang 通过ssh代理连接mysql的操作

    这篇文章主要介绍了golang 通过ssh代理连接mysql的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...

    a165861639710342021-03-08
  • Golanggo语言制作端口扫描器

    go语言制作端口扫描器

    本文给大家分享的是使用go语言编写的TCP端口扫描器,可以选择IP范围,扫描的端口,以及多线程,有需要的小伙伴可以参考下。 ...

    脚本之家3642020-04-25
  • GolangGolang通脉之数据类型详情

    Golang通脉之数据类型详情

    这篇文章主要介绍了Golang通脉之数据类型,在编程语言中标识符就是定义的具有某种意义的词,比如变量名、常量名、函数名等等,Go语言中标识符允许由...

    4272021-11-24
  • GolangGolang中Bit数组的实现方式

    Golang中Bit数组的实现方式

    这篇文章主要介绍了Golang中Bit数组的实现方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...

    天易独尊11682021-06-09
  • Golanggolang的httpserver优雅重启方法详解

    golang的httpserver优雅重启方法详解

    这篇文章主要给大家介绍了关于golang的httpserver优雅重启的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,...

    helight2992020-05-14
  • Golanggolang json.Marshal 特殊html字符被转义的解决方法

    golang json.Marshal 特殊html字符被转义的解决方法

    今天小编就为大家分享一篇golang json.Marshal 特殊html字符被转义的解决方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧 ...

    李浩的life12792020-05-27
  • Golanggo日志系统logrus显示文件和行号的操作

    go日志系统logrus显示文件和行号的操作

    这篇文章主要介绍了go日志系统logrus显示文件和行号的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...

    SmallQinYan12302021-02-02