【pulsar学习】pulsar中java API的使用
继上一篇:【pulsar学习】pulsar集群部署及可视化监控部署之后,本文介绍如何把pulsar用起来。pulsar的结构主要有租户、namespace、topic,主要涉及到的有数据生产和数据消费。本文主要介绍上述内容的java API接口。...
文章目录
继上一篇:【pulsar学习】pulsar集群部署及可视化监控部署之后,本文介绍如何把pulsar用起来。pulsar的结构主要有租户、namespace、topic,主要涉及到的有数据生产和数据消费。本文主要介绍上述内容的java API接口。
pulsar结构
pulsar结构如上图所示:多租户,名称空间,topic名称。topic是基本的单元,其定位为persistent://tenant/namespace/topic
persistent
ornon-persistent
代表该topic是否是持久化的。持久化topic是消费发布与消费的逻辑端点;非持久化topic应用在仅消费实时发布消息与不需要持久化保证的应用程序,它通过删除持久消息的开销来减少消息发布延迟。tenant
为租户名namespace
为命名空间topic
为topic名
pulsar相关原理将会在下一篇博客介绍(画个饼先)。
下面的代码需要如下添加依赖:
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
1 基于pulsar实现Topic的构建操作
1.1 管理租户
package www.whuhhh.cn;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.HashSet;
import java.util.List;
/**
* @author hhhSir
* @create 2022-06-05 9:10
*/
// 使用java构建租户
public class CreateTenants {
public static void main(String[] args) throws PulsarClientException, PulsarAdminException {
// 创建pulsar的admin管理对象
String serviceURL = "http://hadoop100:8080,hadoop102:8080,hadoop103:8080";
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(serviceURL).build();
// 创建租户
HashSet<String> clusters = new HashSet<>();
clusters.add("pulsar-cluster");
HashSet<String> adminRoles = new HashSet<>();
adminRoles.add("dev");
TenantInfo config = TenantInfo.builder().allowedClusters(clusters).adminRoles(adminRoles).build();
admin.tenants().createTenant("itcast_pulsar_t", config);
// 3-获取有哪些租户
System.out.println("获取有哪些租户");
List<String> tenants = admin.tenants().getTenants();
for (String tenant : tenants) {
System.out.println(tenant);
}
// 4-删除租户
admin.tenants().deleteTenant("itcast_pulsar_t");
// 3-获取有哪些租户
System.out.println("删除后,获取有那些租户");
tenants = admin.tenants().getTenants();
for (String tenant : tenants) {
System.out.println(tenant);
}
admin.close();
}
}
1.2 管理namespace
package www.whuhhh.cn;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.List;
/**
* @author hhhSir
* @create 2022-06-05 10:07
*/
public class CreateNamespace {
public static void main(String[] args) throws PulsarClientException, PulsarAdminException {
// 创建pulsar的admin管理对象
String serviceURL = "http://hadoop100:8080,hadoop102:8080,hadoop103:8080";
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(serviceURL).build();
// 创建名称空间
admin.namespaces().createNamespace("itcast_pulsar_t/itcast_pulsar_2");
// 获取所有的名称空间
System.out.println("获取到当前有哪些名称空间:");
List<String> namespaces = admin.namespaces().getNamespaces("itcast_pulsar_t");
for (String namespace : namespaces) {
System.out.println(namespace);
}
// 删除名称空间
admin.namespaces().deleteNamespace("itcast_pulsar_t/itcast_pulsar_n");
// 关闭资源
admin.close();
}
}
1.3 管理topic
package www.whuhhh.cn;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.List;
/**
* @author hhhSir
* @create 2022-06-05 10:20
*/
public class CreateTopic {
public static void main(String[] args) throws PulsarClientException, PulsarAdminException {
// 1 创建pulsar的admin管理对象
String serviceURL = "http://hadoop100:8080,hadoop102:8080,hadoop103:8080";
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(serviceURL).build();
// 2 创建Topic
// 2.1 创建一个持久化的带分区的Topic
admin.topics().createPartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1",3);
// 2.2 创建一个非持久化带分区的Topic
admin.topics().createPartitionedTopic("non-persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic2",3);
// 2.3 创建一个持久化的不带分区的Topic
admin.topics().createNonPartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic3");
// 2.4 创建一个非持久化不分区的Topic
admin.topics().createNonPartitionedTopic("non-persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic4");
// 3 列出某个名称空间下,所有的topic
// 3.1 无分区的Topic
System.out.println("无分区的Topic:");
List<String> topics = admin.topics().getList("itcast_pulsar_t/itcast_pulsar_n");
for (String topic : topics) {
System.out.println(topic);
}
// 3.2 有分区的Topic
System.out.println("有分区的Topic:");
topics = admin.topics().getPartitionedTopicList("itcast_pulsar_t/itcast_pulsar_n");
for (String topic : topics) {
System.out.println(topic);
}
// 4 更新Topic:增加分区数
admin.topics().updatePartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1",5);
int partitions = admin.topics().getPartitionedTopicMetadata("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").partitions;
System.out.println("topic的分区数为:" + partitions);
// 5 删除Topic
// 5.1 删除没有分区的
admin.topics().delete("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic3");
admin.topics().delete("non-persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic4");
// 5.2 删除有分区的
admin.topics().deletePartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1");
admin.topics().deletePartitionedTopic("non-persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic2");
admin.close();
}
}
带分区的不会被删除,没带分区的会被删除?
注意:
不管是有分区还是没有分区, 创建topic后,如果没有任何操作, 60s后pulsar会认为此topic是不活动的,会自动进行删除, 以避免生成垃圾数据。
相关配置:
- Brokerdeleteinactivetopicsenabenabled : 默认值为true 表示是否启动自动删除
- BrokerDeleteInactiveTopicsFrequencySeconds: 默认为60s 表示检测未活动的时间
但是我自己在玩的时候发现有分区的topic一直没有被删除,也没有搜到相关的资料。
2 基于Pulsar实现数据生产
2.1 同步发送数据
package www.whuhhh.cn;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import java.nio.charset.StandardCharsets;
/**
* @author hhhSir
* @create 2022-06-05 11:14
*/
public class PulsarProducerSyncTest {
public static void main(String[] args) throws PulsarClientException, InterruptedException {
// 获取pulsar的客户端对象
ClientBuilder clientBuilder = PulsarClient.builder();
clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
PulsarClient client = clientBuilder.build();
// 通过客户端创建生产者的对象
Producer<byte[]> producer = client.newProducer().topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").create();
// 发送消息
int i = 0;
while (i < 200) {
producer.send(( "你好 Pulsar..." + i ).getBytes(StandardCharsets.UTF_8));
Thread.sleep(10);
i++;
}
// 释放资源
producer.close();
client.close();
}
}
2.2 异步发送数据
package www.whuhhh.cn;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import java.nio.charset.StandardCharsets;
/**
* @author hhhSir
* @create 2022-06-05 14:46
*/
public class PulsarProducerAsyncTest {
public static void main(String[] args) throws PulsarClientException, InterruptedException {
// 获取pulsar的客户端对象
ClientBuilder clientBuilder = PulsarClient.builder();
clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
PulsarClient client = clientBuilder.build();
// 通过客户端创建生产者对象
Producer<byte[]> producer = client.newProducer().topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").create();
// 发送消息
int i = 0;
while (i < 200) {
producer.sendAsync(( "异步发送 Pulsar..." + i ).getBytes(StandardCharsets.UTF_8));
//Thread.sleep(10);
i++;
}
// 如果采用异步发送数据, 由于需要先放置在缓存区中, 如果立即关闭, 会导致无法发送
Thread.sleep(1000);
producer.close();
client.close();
}
}
异步的代码与同步的就发送那里不一样,另外也要设置thread.sleep的时间
2.3 基于pulsar实现数据生产
package www.whuhhh.cn;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
/**
* @author hhhSir
* @create 2022-06-05 14:55
*/
public class PulsarProducerSchemaTest {
public static void main(String[] args) throws PulsarClientException, InterruptedException {
// 获取pulsar的客户端对象
ClientBuilder clientBuilder = PulsarClient.builder();
clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
PulsarClient client = clientBuilder.build();
// 通过客户端创建生产者对象
AvroSchema<User> schema = AvroSchema.of(SchemaDefinition.<User>builder().withPojo(User.class).build());
Producer<User> producer = client.newProducer(schema).topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").create();
// 发送消息
int i = 0;
while (i < 200) {
User user = new User();
user.setName("张三");
user.setAge(i);
producer.send(user);
i++;
}
Thread.sleep(10000);
producer.close();
client.close();
}
}
其中,user类为
package www.whuhhh.cn;
import java.io.Serializable;
/**
* @author hhhSir
* @create 2022-06-05 14:58
*/
public class User implements Serializable {
private String name;
private Integer age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
再用本文3.2中代码进行消费(先运行消费代码,再运行生产者代码,可以看到打印的数据)
可以看到:message.getValue()
获取到的就是user对象,然后如果直接输出就是user类中toString的结果。
3 基于Pulsar实现数据消费
3.1 同步方式消费数据
package www.whuhhh.cn;
import org.apache.pulsar.client.api.*;
/**
* @author hhhSir
* @create 2022-06-05 11:26
*/
public class PulsarConsumerTest {
public static void main(String[] args) throws PulsarClientException {
ClientBuilder clientBuilder = PulsarClient.builder();
clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
PulsarClient client = clientBuilder.build();
Consumer<byte[]> consumer = client.newConsumer().topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").subscriptionName("my_subscription").subscribe();
// 循环获取读取
while (true) {
Message<byte[]> message = consumer.receive();
try {
System.out.println("消息为: " + new String(message.getData()));
consumer.acknowledge(message);
} catch (Exception e) {
consumer.negativeAcknowledge(message);
}
}
}
}
3.2 schema方式消费数据
package www.whuhhh.cn;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
/**
* @author hhhSir
* @create 2022-06-05 15:08
*/
public class PulsarConsumerSchemaTest {
public static void main(String[] args) throws PulsarClientException {
// 1、获取pulsar的客户端对象
ClientBuilder clientBuilder = PulsarClient.builder();
clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
PulsarClient client = clientBuilder.build();
// 2、通过客户端创建消费者的对象
Consumer<User> consumer = client.newConsumer(AvroSchema.of(SchemaDefinition.<User>builder().withPojo(User.class).build()))
.topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1")
.subscriptionName("my-subscription")
.subscribe();
// 3、循环获取读取
while (true) {
Message<User> message = consumer.receive();
try {
System.out.println("消息为: " + message.getValue() + ", age: " + message.getValue().getAge());
consumer.acknowledge(message);
} catch (Exception e) {
consumer.negativeAcknowledge(message);
}
}
}
}
这里获取user数据的方法如2.3所示
3.3 批量处理消费数据
package www.whuhhh.cn;
import org.apache.pulsar.client.api.*;
import java.util.concurrent.TimeUnit;
/**
* @author hhhSir
* @create 2022-06-05 15:38
*/
public class PulsarConsumerBatchTest {
public static void main(String[] args) throws PulsarClientException {
// 1、获取pulsar的客户端对象
ClientBuilder clientBuilder = PulsarClient.builder();
clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
PulsarClient client = clientBuilder.build();
// 2、通过客户端创建消费者的对象
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1")
.subscriptionName("my-subscription")
.batchReceivePolicy(BatchReceivePolicy.builder()
// 设置一次性最大获取多少条消息,默认值为-1
.maxNumMessages(100)
// 设置每条数据允许的最大的字节大小,默认为10 * 1024 * 1024
.maxNumBytes(1024 * 1024)
// 设置等待超时时间,默认为100
.timeout(200, TimeUnit.MILLISECONDS)
.build())
.subscribe();
// 3、循环读取获取
while (true) {
Messages<byte[]> messages = consumer.batchReceive(); // 批量读取数据
for (Message<byte[]> message : messages) {
try{
System.out.println("批量消息为: " + new String(message.getData()));
consumer.acknowledge(message);
} catch (Exception e) {
consumer.negativeAcknowledge(message);
}
}
}
}
}
批量的方法注意参数的使用。
总结
走得慢是为了走的更好,加油!
更多推荐
所有评论(0)