nsq go-client测试

package main

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"sync"
	//"time"
)

// nsq发布消息
func Producer() {
	p, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig()) //新建生产者
	if err != nil {
		panic(err)
	}
	if err := p.Publish("test", []byte("hello NSQ!!!")); err != nil { // 发布消息
		panic(err)
	}
}

// nsq订阅消息
type ConsumerT struct{}

func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
	fmt.Println(string(msg.Body))
	return nil
}

func Consumer(wg *sync.WaitGroup) {
	c, err := nsq.NewConsumer("test", "test-channel", nsq.NewConfig()) // 新建一个消费者
	//c, err := nsq.NewConsumer("test", "nsq_to_file", nsq.NewConfig()) // 新建一个消费者
	if err != nil {
		wg.Done()
		panic(err)
	}
	c.AddHandler(&ConsumerT{})                                // 添加消息处理
	if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil { // 建立连接
		wg.Done()
		panic(err)
	}
}

// 主函数
func main() {
	wg := &sync.WaitGroup{}
	wg.Add(1)
	//Producer()
	Consumer(wg)
	//time.Sleep(time.Second * 30)
	wg.Wait()
}

关于 liuzhantao

北京互联网求职群:168047123
此条目发表在 nsq 分类目录。将固定链接加入收藏夹。