Flag Counter
© 2017 Tao Peng. All rights reserved.

Go语言下使用 nats 消息机制


2016年05月05日

Nats是什么? 或者可能没听过, 但是kafka, rabbitmq大家肯定都知道, 都是大名鼎鼎的消息队列, Nats也是一种消息队列系统. Nats支持多种语言, 本文主要从Go语言版本来使用Nats. 它支持消息的publish/subscribe, request/replay 以及 message queueing.
官方网站在这里: nats.
github-server在这里: gnatsd server.
github-client在这里: nats client.

1. 简单介绍

Nats作为消息组件,负责者组件之间的通讯和交互, 主要有以下几个特点:

• Nats是基于Subject(主题)
• Publisher(发布者)以Subject发布消息
• Subscriber(订阅者)订阅特定Subject并收到相应的消息
• 这种策略下, Publisher和Subscriber不需要相互知道彼此存在, 只要按照订阅的Subject进行发布, Subscriber就能收到消息。


看一下消息的格式, Nats的消息格式很简单, 首先看一下从Publish中摘出来的代码:

    ...

	// 首先封装msg header
	msgh := nc.scratch[:len(_PUB_P_)]
	// 将主题subj封装
	msgh = append(msgh, subj...)
	// 空格分开
	msgh = append(msgh, ' ')
	// 如果reply存在,接在后面
	if reply != "" {
		msgh = append(msgh, reply...)
		msgh = append(msgh, ' ')
	}

	// We could be smarter here, but simple loop is ok,
	// just avoid strconv in fast path
	// FIXME(dlc) - Find a better way here.
	// msgh = strconv.AppendInt(msgh, int64(len(data)), 10)
	// 下面这个很有意思: 目的是将data长度放进b中
	// 例如长度是110, 那么b变成0 0 0 0 0 0 0 0 0 '1' '1' '0'
	// 最后三位代表的就是data长度
	var b [12]byte
	var i = len(b)
	if len(data) > 0 {
		for l := len(data); l > 0; l /= 10 {
			i -= 1
			b[i] = digits[l%10]
		}
	} else {
		i -= 1
		b[i] = digits[0]
	}
	// 将data长度写入 & CRLF
	msgh = append(msgh, b[i:]...)
	msgh = append(msgh, _CRLF_...)

	// FIXME, do deadlines here
    // 写header
	_, err := nc.bw.Write(msgh)
	// 写data
	if err == nil {
		_, err = nc.bw.Write(data)
	}

	...

所有基本流程是先发送Msg-header, 服务器端收到这条指令之后它会转到“等待数据”的状态, 然后等待接收客户端Publish的data数据.至此, Publish的流程就基本是这样.


顺便看一下subscribe过程, 基本的代码如下:

// subj: 订阅主题 queue: 队列名称,如果没有为空 cb: 收到订阅消息后的处理函数 ch: channel缓冲区
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg) (*Subscription, error) {
	if nc == nil {
		return nil, ErrInvalidConnection
	}
	nc.mu.Lock()
	// ok here, but defer is generally expensive
	defer nc.mu.Unlock()
	defer nc.kickFlusher()

	// Check for some error conditions.
	if nc.isClosed() {
		return nil, ErrConnectionClosed
	}

	if cb == nil && ch == nil {
		return nil, ErrBadSubscription
	}
    // 根据参数封装Subscription结构
	sub := &Subscription{Subject: subj, Queue: queue, mcb: cb, conn: nc}
	// Set pending limits. 设置pending msg大小限制
	sub.pMsgsLimit = DefaultSubPendingMsgsLimit
	sub.pBytesLimit = DefaultSubPendingBytesLimit

	// If we have an async callback, start up a sub specific
	// Go routine to deliver the messages.
	// 如果有回调函数, 说明是异步, 那么使用回调函数处理就OK, 否则同步处理
	if cb != nil {
		sub.typ = AsyncSubscription
		sub.pCond = sync.NewCond(&sub.mu)
		go nc.waitForMsgs(sub)
	} else {
	    // 否则是通过chan交互
		sub.typ = ChanSubscription
		// ch缓冲区
		sub.mch = ch
	}
    // ssid+1
	sub.sid = atomic.AddInt64(&nc.ssid, 1)
	nc.subs[sub.sid] = sub

	// We will send these for all subs when we reconnect
	// so that we can suppress here.
	// 将订阅请求发送给server
	if !nc.isReconnecting() {
		nc.bw.WriteString(fmt.Sprintf(subProto, subj, queue, sub.sid))
	}
	return sub, nil
}

基本过程就是封装好”subj: 订阅主题 queue: 队列名称,如果没有为空 cb: 收到订阅消息后的处理函数 ch: channel缓冲区”这些参数, 然后发送给server.


2. Nats使用API

####1). 最基本的publish和subscribe 首先看一下基本的代码:

// 订阅者
package main

import (
	"flag"
	"log"
	"runtime"
	"strings"

	"github.com/nats-io/nats"
)

func usage() {
	log.Fatalf("Usage: nats-sub [-s server] [--tls] [-t] <subject> \n")
}

func printMsg(m *nats.Msg, i int) {
	log.Printf("[#%d] Received on [%s]: '%s'\n", i, m.Subject, string(m.Data))
}

func main() {
	var urls = flag.String("s", "nats://0.0.0.0:4222", "The nats server URLs (separated by comma)")
	var tls = flag.Bool("tls", false, "Use Secure Connection")

	log.SetFlags(0)
	flag.Usage = usage
	flag.Parse()

	args := flag.Args()
	if len(args) < 1 {
		usage()
	}

	// 解析可选项
	opts := nats.DefaultOptions
	opts.Servers = strings.Split(*urls, ",")
	for i, s := range opts.Servers {
		opts.Servers[i] = strings.Trim(s, " ")
	}
	opts.Secure = *tls

	// 连接到gnatsd
	nc, err := opts.Connect()
	if err != nil {
		log.Fatalf("Can't connect: %v\n", err)
	}

	// 订阅的subject
	subj, i := args[0], 0

	// 订阅主题, 当收到subject时候执行后面的func函数
	// 返回值subsubscription的实例
	//
	sub, _ := nc.Subscribe(subj, func(msg *nats.Msg) {
		i += 1
		printMsg(msg, i)
	})

	// 下面设置自动"反注册", 当达到一定的数量后就自动执行"发注册"
	const MAX_WANTED = 3
	sub.AutoUnsubscribe(MAX_WANTED)

	log.Printf("Listening on [%s]\n", subj)

	runtime.Goexit()
}

main函数中首先配置了一些参数例如urls, tls,分别代表server的地址以及是否使用安全连接. 然后是解析参数, 需要注意, 代码在运行的时候, 格式必须是go run XXX.go [-s server] [--tls] [-t] <subject>, 其中subject是必须的, 代表当前订阅的主题. 然后是填充nats的 options, 这里主要是填充了server的url信息和tls信息. nc, err := opts.Connect()用于连接到nats-server. 接着调用sub, _ := nc.Subscribe(subj, func(msg *nats.Msg)订阅这个主题, 当下次接收到此主题的msg的时候, 执行func函数. 最后sub.AutoUnsubscribe(MAX_WANTED)表示当接收到MAX_WANTED个数msg的时候自动Unsubscribe这个主题. 订阅者程序运行后, 由于调用了runtime.Goexit(), 程序会继续, 等待接收到主题消息.


发布者的代码如下:

 // 发布者
 package main

 import (
 	"flag"
 	"log"
 	"strings"

 	"github.com/nats-io/nats"
 )

 func usage() {
 	log.Fatalf("Usage: nats-pub [-s server (%s)] [--tls] <subject> <msg> \n", "nats://0.0.0.0:4222")
 }

 func main() {
 	// 下面定义连接到serverURL
 	var urls = flag.String("s", "nats://0.0.0.0:4222", "The nats server URLs (separated by comma)")
 	// 是否使用TLS安全传输协议
 	var tls = flag.Bool("tls", false, "Use TLS Secure Connection")

 	// 下面是判断参数
 	log.SetFlags(0)
 	flag.Usage = usage
 	flag.Parse()

 	args := flag.Args()
 	if len(args) < 1 {
 		usage()
 	}

 	// 下面填充nats的一些选项
 	opts := nats.DefaultOptions
 	opts.Servers = strings.Split(*urls, ",")
 	for i, s := range opts.Servers {
 		opts.Servers[i] = strings.Trim(s, " ")
 	}

 	opts.Secure = *tls

 	// 连接到gnatsd
 	nc, err := opts.Connect()
 	if err != nil {
 		log.Fatalf("Can't connect: %v\n", err)
 	}
 	defer nc.Close()

 	// 下面定义subjectmsg
 	subj, msg := args[0], []byte(args[1])

 	// 发布消息
 	nc.Publish(subj, msg)
 	// 刷新缓冲区
 	nc.Flush()

 	log.Printf("Published [%s] : '%s'\n", subj, msg)
 }

执行本程序格式是go run XXX.go subj, 上面的一些参数设置以及解析参数和”订阅者”是一样的, 核心代码是nc.Publish(subj, msg), 用于发布指定主题的消息.


####2). EncodedConn

上面实例的代码存在一个问题, Publisher和Subscriber发送和接收消息都是msg *nats.Msg, 这个是封装好的结构体, 具体如下:

// Msg is a structure used by Subscribers and PublishMsg().
// nats客户端默认的消息传输结构, 字段分别是:
// @1: 主题名
// @2: 回复的主题名(如果回复对方,使用此主题)
// @3: 数据
// @4: 订阅信息
// @5: 获取下一条Msg
type Msg struct {
	Subject string
	Reply   string
	Data    []byte
	Sub     *Subscription
	next    *Msg
}

如果现在就只想要发送一个字符串, 怎么办呢? 那就需要我们使用NewEncodedConn. 先看具体的代码:

// 订阅者
package main

import (
	"flag"
	"log"
	"runtime"
	"strings"

	"github.com/nats-io/nats"
)

func usage() {
	log.Fatalf("Usage: nats-sub [-s server] [--tls] [-t]\n")
}

func printMsg(m string, subj string, i int) {
	log.Printf("[#%d] Received on [%s]: '%s'\n", i, subj, m)
}

func main() {
	// pub一样
	var urls = flag.String("s", "nats://0.0.0.0:4222", "The nats server URLs (separated by comma)")
	var tls = flag.Bool("tls", false, "Use Secure Connection")

	// 解析可选项
	opts := nats.DefaultOptions
	opts.Servers = strings.Split(*urls, ",")
	for i, s := range opts.Servers {
		opts.Servers[i] = strings.Trim(s, " ")
	}
	opts.Secure = *tls

	// 连接到gnatsd
	nc, err := opts.Connect()
	if err != nil {
		log.Fatalf("Can't connect: %v\n", err)
	}

	// 下面定义一个encodeConn
	// 注意!!!!!
	enc, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
	//defer enc.Close()
	if err != nil {
		log.Fatal("New encodeConn error: %v\n", err)
	}

	// 订阅主题, 当收到subject时候执行后面的func函数
	enc.Subscribe("person", func(m string) {
		i += 1
		printMsg(m, subj, i)
	})

	// 测试取消订阅
	// ss, _ := enc.Subscribe("person", func(m string)
	// ss.Unsubscribe()

	log.Printf("Listening on [%s]\n", subj)

	runtime.Goexit()
}

上面注意多了enc, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER), EncodedConn可以Publish任何原生类型的go类型数据. 这里的func中接受的是一个string数据, 其实是JSON数据, 具体的看下面的”发布者”代码.


下面看发布者代码:

package main

import (
	"flag"
	"log"
	"strings"
	"time"

	"github.com/nats-io/nats"
)

func usage() {
	log.Fatalf("Usage: nats-pub [-s server (%s)] [--tls]\n", "nats://0.0.0.0:4222")
}

func main() {
	// 下面定义连接到serverURL
	var urls = flag.String("s", "nats://0.0.0.0:4222", "The nats server URLs (separated by comma)")
	// 是否使用TLS安全传输协议
	var tls = flag.Bool("tls", false, "Use TLS Secure Connection")

	// 下面填充nats的一些选项
	opts := nats.DefaultOptions
	opts.Servers = strings.Split(*urls, ",")
	for i, s := range opts.Servers {
		opts.Servers[i] = strings.Trim(s, " ")
	}

	opts.Secure = *tls

	// 连接到gnatsd
	nc, err := opts.Connect()
	if err != nil {
		log.Fatalf("Can't connect: %v\n", err)
	}
	defer nc.Close()

	// 下面定义一个encodeconn
	//
	// 使用NewEncodedConn的好处在于我们可以定义自己的Msg结构体, 不需要使用nats.Msg这个结构体
	// 例如上面我们输入的参数是JSON类型也是OK!
	//
	enc, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
	defer enc.Close()
	if err != nil {
		log.Fatal("New encodeConn error: %v\n", err)
	}

	///////////////////
	// 下面测试JSON
	///////////////////
	type Person struct {
		Name string
		Addr string
		Age  int
	}

	perInfo := Person{
		Name:"TAO",
		Addr:"BJ",
		Age:99,
	}

    // 发布JSON消息
	enc.Publish("person", perInfo)
	log.Printf("Published [%s] : '%s'\n", "person", perInfo)
}

看上面的enc.Publish("person", perInfo)发布是自定义的JSON消息, 注意订阅者也能获得相应的JSON数据.


####3). net chan 上面讲的publish/subscribe是属于异步方法, netchan是属于同步方法, 一个Subscriber使用产方法订阅一个主题, 必须要等到消息来到才会 执行下面代码, 否则进行阻塞.


下面看具体的代码:

// 订阅者

// 使用chan来接收数据
recv := make(chan *person)

// 绑定chan和subj
// 相当于是订阅主题subj, 并且数据缓存在recv中
enc.BindRecvChan(subj, recv)

// 等待接收数据, 如果没有数据, 会被阻塞在此处
msg := <-recv

将之前代码的Subscribe换成上面的代码就OK.


再看看发布者代码:

// 发布者
type person struct {
	Name string
	Addr string
	Age  int
}

// 测试使用chan进行通信
snd := make(chan *person)

// 将subj和chan关联
// 相当于是发送subj消息, 发送缓冲区是snd
enc.BindSendChan(subj, snd)

// 构造数据并将数据发送到chan中
me := &person{Name: "TAO", Age: 22, Addr: "BJ"}

// 将数据写入缓冲区
snd <- me

同样将之前代码中的Publish换成上面的代码就OK.
运行上面代码, 就能发现netchan是同步机制.


####4). request/reply

之前已经说过, Nats支持request/reply, 那么这个其实也是建立在”订阅/发布”机制上的, 同样, 此处根据是否是EncodedConn分两部分讲解.


如果 不是 EncodedConn, 那么request/reply基本使用方法如下:

// 代码1:
// Requests
msg, err := nc.Request("test", []byte("test data"), 10*time.Millisecond)

// 代码2:
// Replies
nc.Subscribe("test", func(m *Msg) {
    nc.Publish(m.Reply, []byte("response test data"))
})

代码nc.Request("test", []byte("test data"), 10*time.Millisecond)表明发出一个request请求, 主题是”test”, 数据是”test data”, 并且设置了超时时间 10ms. 这里封装Msg数据的结构体依然使用的默认的nats.Msg.
再看reply代码, 其实本质就是订阅这样一个主题, 但是和之前不一样的是, 此处需要Publish一个回复消息! 回复消息的主题是m.Reply, 这个是server中生成的一个主题, 叫做Inbox, 看源码就清楚了. 这样的话, Request就能接受到数据. 如果在规定时间内没有收到回复, 那就就超时err!


如果 是 EncodedConn, 那么request/reply基本使用方法如下:

// 代码1:
// Requests
var resp string
err := c.Request("test", "test data", &resp, 10*time.Millisecond)

// 代码2:
// Replies
c.Subscribe("test", func(subj, reply string, msg string) {
    c.Publish(reply, "response test data")
})

之前已经说过, 使用EncodedConn可以自己定义Msg结构, 所以此处的Subscribe第二个函数参数可以自己定义参数, 包括reply需要自己定义. 其他的和之前是一样的.


####5). subject通配符

nats-server在管理subject的时候是通过’.’进行分割的, 例如subject可以是”a.b.c”, 或者是”a.b.d.g”之类, server底层是使用tree module分层管理subject. 此处有两个通配符”*“和”>”.


首先看”*”:

nc.Subscribe("aa.*.ccc", func(m *Msg) {
})

是可以匹配”aa.bb.ccc”, 可以匹配”aa..ccc”, 可以匹配”aa.jcakscba.ccc”… BUT, 不能匹配”aa.XXX.YYY.ccc”, 也就是之前说的, ‘*‘虽然可以匹配一切, 但是由于server是通过’.’来管理的, 所以这种匹配是不合理的.


再看”>”:

nc.Subscribe("aa.>", func(m *Msg) {
})

// 不合法
nc.Subscribe("aa.>.gg", func(m *Msg) {
})

’>’匹配后面所有的长度, 并且’>’需要放在通配符最后. 所以上面的第一种匹配的是所有的以”aa.”开头的subject. 第二种不合法, 无法进行匹配.


####6). nats-server集群

集群其实就开启多个server, 然后选择(或者随机选择)一个server进行服务.

// 一堆server地址
var servers = "nats://localhost:1222, nats://localhost:1223, nats://localhost:1224"
// 连接
nc, err := nats.Connect(servers)

代码比较简单, 但是connect的时候可以配上一些options. 具体的options暂时就不多说了.


今晚就先写这么多了, 有时间看看server代码, 看看怎么对subject进行管理的, 到时候再来分享. good night~