TDMQ/pulsar golang 快速入门教程

架构: pulsar 对比 kafka

kafka

image-20220217152001309

kafka 由 zookeeper 和 broker 集群注册, broker 集群负责计算和储存消息, zookeeper 为注册中心(Kafka2.8就能不依赖zookeeper独立运行了, 部署还是比较方便的)

pulsar

image-20220217153119656

pulsar 比 kafka 的架构更为复杂, 部署也是更加复杂

  • pulsar 是计算储存分离架构, 计算使用 broker 集群(是无状态的) 储存使用 bookeeper 集群, broker 计算要使用 bookeeper 的数据都是要内置 bookeeper 客户端
  • pulsar 的分离架构更具有伸缩性
  • pulsar 至今最新版本还是需要依赖于 zookeeper

pulsar 的 4 种消费模型

  • 独占模式(Exclusive): 一个 topic 只能有一个消费者订阅, 多个消费者订阅就会报错
  • 灾备模式(Failover): 一个 topic 可以多个消费者订阅, 但是只有一个生效, 其他的作为容灾备份使用
  • 共享订阅(Shared): 最常用的订阅模式, 一个 topic 可以被多个消费者订阅, 每个消息轮询发给其中的一个消费者
  • key_shared: 共享订阅 + key 限制, 每个消息只会发送给绑定相同 key 的消费者

这里一般使用共享订阅 shared

TDMQ 使用教程

pulsar 的搭建十分繁琐, 如果搭建单机版本的话就根本发挥不了 pulsar 的高可用的特性

下面的示例是基于 腾讯云的 TDMQ (底层是 pulsar) 共享订阅模式的的消息传输

1. 创建 虚拟消息队列 TDMQ

打开腾讯云控制台 -> 搜索 tdmq -> 进入消息队列 TDMQ -> 集群管理 -> 新建集群

image-20220217161343812

新建的是虚拟集群,按量计费, 做测试开发一般够用的

如果你的开发服务器是再 vpc 内网里面就不建议接入公网, 更安全且拥有更高的带宽. 如果要接入公网你需提交申请

image-20220217161620471

2. 创建命名空间

命名空间 -> 选择当前集群为新建的集群 -> 新建命名空间

image-20220217162005018

  • 输入命名空间的名称

  • 选择消息的 TTL 默认 1 天, TTL 过期了消费者还是没有 ACK 该消息, 消息就会过期

  • 消息保留策略一般就是消费及删除

image-20220217162108458

3. 创建角色并授权

  1. 在 TDMQ Pulsar 版控制台的 角色管理 页面,选择地域和刚刚创建好的集群,单击新建进入新建角色页面。
  2. 填写角色名称和说明,单击提交完成角色创建。
  3. 进入 命名空间 页面,在刚刚创建的命名空间中,单击操作列的配置权限进入命名空间的权限列表。
  4. 在配置权限页面,单击添加角色,将刚刚创建的角色添加进来,分配生产和消费的权限。

image-20220217163929940

在命名空间这里配置权限

image-20220217164028540

添加角色

image-20220217164045798

我们选择刚刚添加的角色, 然后基于 生产消息和消费消息的权限

我们可以只是将 2 个角色分开, 也能防止循环依赖

image-20220217164219200

添加成功

image-20220217164254025

4. 创建 topic

点击 topic -> 新建

image-20220217164515738

  • 设置 topic 名称

  • 消息类型

    各种消息类型区别如下

    image-20220217164441596

  • 分区数: 分区能提升单个 topic 的吞吐量

image-20220217164714310

编写代码: 生产者 Producer

package main

import (
	"context"
	"fmt"
	"log"
	"strconv"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
)

var (
	//TODO:角色权限的token
	token = "{token}"
	//TODO: 消息队列的id/命名空间名称/topic名称
	topic = "{pulsar-name}/{namespace-name}/{topic-name}"
)

//就是说消费者要先链接才行,
//1.游标的问题
//2.share模式问题
func main() {
	//1. client
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:               "{url}", //TODO: 更换为接入点地址(控制台集群管理页完整复制)
		Authentication:    pulsar.NewAuthenticationToken(token),
		OperationTimeout:  30 * time.Second,
		ConnectionTimeout: 30 * time.Second,
	})

	if err != nil {
		log.Fatalf("Could not instantiate Pulsar client: %v", err)
	} else {
		fmt.Printf("ok=%#v\n", "ok")
	}
	defer client.Close()
	//------
	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic: topic,
	})

	if err != nil {
		log.Fatal(err)
	}
	for i := 0; i < 100; i++ {
		_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
			Payload: []byte("a" + strconv.Itoa(i)),
		})
	}

	defer producer.Close()

	if err != nil {
		fmt.Println("Failed to publish message", err)
	}
	fmt.Println("Published message")
}

编写代码: 消费者 Consumer

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
)

var (
	//TODO:角色权限的token
	token = "{token}"
	//TODO: 消息队列的id/命名空间名称/topic名称
	topic = "{pulsar-name}/{namespace-name}/{topic-name}"
)

//就是说消费者要先链接才行,
//1.游标的问题
//2.share模式问题
func main() {
	//1. client
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:               "{url}", //TODO: 更换为接入点地址(控制台集群管理页完整复制)
		Authentication:    pulsar.NewAuthenticationToken(token),
		OperationTimeout:  30 * time.Second,
		ConnectionTimeout: 30 * time.Second,
	})

	if err != nil {
		log.Fatalf("Could not instantiate Pulsar client: %v", err)
	} else {
		fmt.Printf("ok=%#v\n", "ok")
	}
	defer client.Close()
	//2. consumer -------
	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            topic,
		SubscriptionName: "{sub-name}", //TODO:消费者的名称,这里可以没有就新建
		//shared 类型
		Type: pulsar.Shared,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	for {
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			log.Fatal(err)
		}

		fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
			msg.ID(), string(msg.Payload()))

		consumer.Ack(msg)
	}
}

参数详解

上面的 todo 包含的位置都是需要自己填写删除的

  • {token}: 角色的授权

    image-20220217171636315

  • {pulsar-name}: 虚拟集群的id

    image-20220217172215675

  • {namespace-name}: 命名空间名称

    image-20220217172306758

  • {topic-name}: topic 名称

    image-20220217172404952

  • {url}: 访问地址

    image-20220217172636572

  • {sub-name}: 消费者的名称, 不存在将会新建

    image-20220217173322259

详情查看文档

https://cloud.tencent.com/document/product/1179/44814)

https://pulsar.staged.apache.org/docs/zh-CN

reference

Pulsar真的可以取代Kafka吗 - 知乎 (zhihu.com): https://zhuanlan.zhihu.com/p/370213273

pulsar 官网: https://pulsar.staged.apache.org/docs/zh-CN/standalone-docker

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐