继上一篇:【pulsar学习】pulsar集群部署及可视化监控部署之后,本文介绍如何把pulsar用起来。pulsar的结构主要有租户、namespace、topic,主要涉及到的有数据生产和数据消费。本文主要介绍上述内容的java API接口。

pulsar结构

在这里插入图片描述
pulsar结构如上图所示:多租户,名称空间,topic名称。topic是基本的单元,其定位为persistent://tenant/namespace/topic

  • persistentor non-persistent代表该topic是否是持久化的。持久化topic是消费发布与消费的逻辑端点;非持久化topic应用在仅消费实时发布消息与不需要持久化保证的应用程序,它通过删除持久消息的开销来减少消息发布延迟。
  • tenant为租户名
  • namespace为命名空间
  • topic为topic名

pulsar相关原理将会在下一篇博客介绍(画个饼先)。

下面的代码需要如下添加依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client-all</artifactId>
            <version>2.8.1</version>
        </dependency>
    </dependencies>

1 基于pulsar实现Topic的构建操作

1.1 管理租户

package www.whuhhh.cn;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.TenantInfo;

import java.util.HashSet;
import java.util.List;

/**
 * @author hhhSir
 * @create 2022-06-05 9:10
 */
// 使用java构建租户
public class CreateTenants {
    public static void main(String[] args) throws PulsarClientException, PulsarAdminException {
        // 创建pulsar的admin管理对象
        String serviceURL = "http://hadoop100:8080,hadoop102:8080,hadoop103:8080";
        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(serviceURL).build();
        // 创建租户

        HashSet<String> clusters = new HashSet<>();
        clusters.add("pulsar-cluster");

        HashSet<String> adminRoles = new HashSet<>();
        adminRoles.add("dev");
        TenantInfo config = TenantInfo.builder().allowedClusters(clusters).adminRoles(adminRoles).build();
        
        admin.tenants().createTenant("itcast_pulsar_t", config);
        
        // 3-获取有哪些租户
        System.out.println("获取有哪些租户");
        List<String> tenants = admin.tenants().getTenants();
        for (String tenant : tenants) {
            System.out.println(tenant);
        }

        // 4-删除租户
        admin.tenants().deleteTenant("itcast_pulsar_t");

        // 3-获取有哪些租户
        System.out.println("删除后,获取有那些租户");
        tenants = admin.tenants().getTenants();
        for (String tenant : tenants) {
            System.out.println(tenant);
        }
        admin.close();
    }
}

在这里插入图片描述
在这里插入图片描述

1.2 管理namespace

package www.whuhhh.cn;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;

import java.util.List;

/**
 * @author hhhSir
 * @create 2022-06-05 10:07
 */
public class CreateNamespace {
    public static void main(String[] args) throws PulsarClientException, PulsarAdminException {
        // 创建pulsar的admin管理对象
        String serviceURL = "http://hadoop100:8080,hadoop102:8080,hadoop103:8080";
        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(serviceURL).build();

        // 创建名称空间
        admin.namespaces().createNamespace("itcast_pulsar_t/itcast_pulsar_2");
        // 获取所有的名称空间
        System.out.println("获取到当前有哪些名称空间:");
        List<String> namespaces = admin.namespaces().getNamespaces("itcast_pulsar_t");
        for (String namespace : namespaces) {
            System.out.println(namespace);
        }
        // 删除名称空间
        admin.namespaces().deleteNamespace("itcast_pulsar_t/itcast_pulsar_n");
        // 关闭资源
        admin.close();
    }
}

在这里插入图片描述

1.3 管理topic

package www.whuhhh.cn;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;

import java.util.List;

/**
 * @author hhhSir
 * @create 2022-06-05 10:20
 */
public class CreateTopic {
    public static void main(String[] args) throws PulsarClientException, PulsarAdminException {
        // 1 创建pulsar的admin管理对象
        String serviceURL = "http://hadoop100:8080,hadoop102:8080,hadoop103:8080";
        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(serviceURL).build();

        // 2 创建Topic
        // 2.1 创建一个持久化的带分区的Topic
        admin.topics().createPartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1",3);
        // 2.2 创建一个非持久化带分区的Topic
        admin.topics().createPartitionedTopic("non-persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic2",3);
        // 2.3 创建一个持久化的不带分区的Topic
        admin.topics().createNonPartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic3");
        // 2.4 创建一个非持久化不分区的Topic
        admin.topics().createNonPartitionedTopic("non-persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic4");

        // 3 列出某个名称空间下,所有的topic
        // 3.1 无分区的Topic
        System.out.println("无分区的Topic:");
        List<String> topics = admin.topics().getList("itcast_pulsar_t/itcast_pulsar_n");
        for (String topic : topics) {
            System.out.println(topic);
        }
        // 3.2 有分区的Topic
        System.out.println("有分区的Topic:");
        topics = admin.topics().getPartitionedTopicList("itcast_pulsar_t/itcast_pulsar_n");
        for (String topic : topics) {
            System.out.println(topic);
        }

        // 4 更新Topic:增加分区数
        admin.topics().updatePartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1",5);
        int partitions = admin.topics().getPartitionedTopicMetadata("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").partitions;
        System.out.println("topic的分区数为:" + partitions);

        // 5 删除Topic
        // 5.1 删除没有分区的
        admin.topics().delete("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic3");
        admin.topics().delete("non-persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic4");
        // 5.2 删除有分区的
        admin.topics().deletePartitionedTopic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1");
        admin.topics().deletePartitionedTopic("non-persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic2");
        
        admin.close();
    }
}

在这里插入图片描述
带分区的不会被删除,没带分区的会被删除?

注意:
不管是有分区还是没有分区, 创建topic后,如果没有任何操作, 60s后pulsar会认为此topic是不活动的,会自动进行删除, 以避免生成垃圾数据。
相关配置:

  • Brokerdeleteinactivetopicsenabenabled : 默认值为true 表示是否启动自动删除
  • BrokerDeleteInactiveTopicsFrequencySeconds: 默认为60s 表示检测未活动的时间

但是我自己在玩的时候发现有分区的topic一直没有被删除,也没有搜到相关的资料。

2 基于Pulsar实现数据生产

2.1 同步发送数据

package www.whuhhh.cn;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import java.nio.charset.StandardCharsets;

/**
 * @author hhhSir
 * @create 2022-06-05 11:14
 */
public class PulsarProducerSyncTest {
    public static void main(String[] args) throws PulsarClientException, InterruptedException {
        // 获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
        PulsarClient client = clientBuilder.build();
        // 通过客户端创建生产者的对象
        Producer<byte[]> producer = client.newProducer().topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").create();
        // 发送消息
        int i = 0;
        while (i < 200) {
            producer.send(( "你好 Pulsar..." + i ).getBytes(StandardCharsets.UTF_8));
            Thread.sleep(10);
            i++;
        }
        // 释放资源
        producer.close();
        client.close();
    }
}

在这里插入图片描述

2.2 异步发送数据

package www.whuhhh.cn;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

import java.nio.charset.StandardCharsets;

/**
 * @author hhhSir
 * @create 2022-06-05 14:46
 */
public class PulsarProducerAsyncTest {
    public static void main(String[] args) throws PulsarClientException, InterruptedException {
        // 获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
        PulsarClient client = clientBuilder.build();
        // 通过客户端创建生产者对象
        Producer<byte[]> producer = client.newProducer().topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").create();
        // 发送消息
        int i = 0;
        while (i < 200) {
            producer.sendAsync(( "异步发送 Pulsar..." + i ).getBytes(StandardCharsets.UTF_8));
            //Thread.sleep(10);
            i++;
        }
        // 如果采用异步发送数据, 由于需要先放置在缓存区中, 如果立即关闭, 会导致无法发送
        Thread.sleep(1000);
        producer.close();
        client.close();
    }
}

异步的代码与同步的就发送那里不一样,另外也要设置thread.sleep的时间

2.3 基于pulsar实现数据生产

package www.whuhhh.cn;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;

/**
 * @author hhhSir
 * @create 2022-06-05 14:55
 */
public class PulsarProducerSchemaTest {
    public static void main(String[] args) throws PulsarClientException, InterruptedException {
        // 获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
        PulsarClient client = clientBuilder.build();
        // 通过客户端创建生产者对象
        AvroSchema<User> schema = AvroSchema.of(SchemaDefinition.<User>builder().withPojo(User.class).build());
        Producer<User> producer = client.newProducer(schema).topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").create();
        // 发送消息
        int i = 0;
        while (i < 200) {
            User user = new User();
            user.setName("张三");
            user.setAge(i);
            producer.send(user);
            i++;
        }
        Thread.sleep(10000);
        producer.close();
        client.close();
    }
}

其中,user类为

package www.whuhhh.cn;

import java.io.Serializable;

/**
 * @author hhhSir
 * @create 2022-06-05 14:58
 */
public class User implements Serializable {

    private String name;
    private Integer age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

再用本文3.2中代码进行消费(先运行消费代码,再运行生产者代码,可以看到打印的数据)
在这里插入图片描述
可以看到:message.getValue() 获取到的就是user对象,然后如果直接输出就是user类中toString的结果。

3 基于Pulsar实现数据消费

3.1 同步方式消费数据

package www.whuhhh.cn;

import org.apache.pulsar.client.api.*;

/**
 * @author hhhSir
 * @create 2022-06-05 11:26
 */
public class PulsarConsumerTest {
    public static void main(String[] args) throws PulsarClientException {
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
        PulsarClient client = clientBuilder.build();

        Consumer<byte[]> consumer = client.newConsumer().topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1").subscriptionName("my_subscription").subscribe();

        // 循环获取读取
        while (true) {
            Message<byte[]> message = consumer.receive();
            try {
                System.out.println("消息为: " + new String(message.getData()));
                consumer.acknowledge(message);
            } catch (Exception e) {
                consumer.negativeAcknowledge(message);
            }
        }
    }
}

3.2 schema方式消费数据

package www.whuhhh.cn;

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;

/**
 * @author hhhSir
 * @create 2022-06-05 15:08
 */
public class PulsarConsumerSchemaTest {
    public static void main(String[] args) throws PulsarClientException {
        // 1、获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
        PulsarClient client = clientBuilder.build();
        // 2、通过客户端创建消费者的对象
        Consumer<User> consumer = client.newConsumer(AvroSchema.of(SchemaDefinition.<User>builder().withPojo(User.class).build()))
                .topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1")
                .subscriptionName("my-subscription")
                .subscribe();
        // 3、循环获取读取
        while (true) {
            Message<User> message = consumer.receive();
            try {
                System.out.println("消息为: " + message.getValue() + ", age: " + message.getValue().getAge());
                consumer.acknowledge(message);
            } catch (Exception e) {
                consumer.negativeAcknowledge(message);
            }
        }
    }
}

这里获取user数据的方法如2.3所示

3.3 批量处理消费数据

package www.whuhhh.cn;

import org.apache.pulsar.client.api.*;

import java.util.concurrent.TimeUnit;

/**
 * @author hhhSir
 * @create 2022-06-05 15:38
 */
public class PulsarConsumerBatchTest {
    public static void main(String[] args) throws PulsarClientException {
        // 1、获取pulsar的客户端对象
        ClientBuilder clientBuilder = PulsarClient.builder();
        clientBuilder.serviceUrl("pulsar://hadoop100:6650,hadoop102:6650,hadoop103:6650");
        PulsarClient client = clientBuilder.build();
        // 2、通过客户端创建消费者的对象
        Consumer<byte[]> consumer = client.newConsumer()
                .topic("persistent://itcast_pulsar_t/itcast_pulsar_n/my-topic1")
                .subscriptionName("my-subscription")
                .batchReceivePolicy(BatchReceivePolicy.builder()
                        // 设置一次性最大获取多少条消息,默认值为-1
                        .maxNumMessages(100)
                        // 设置每条数据允许的最大的字节大小,默认为10 * 1024 * 1024
                        .maxNumBytes(1024 * 1024)
                        // 设置等待超时时间,默认为100
                        .timeout(200, TimeUnit.MILLISECONDS)
                        .build())
                .subscribe();
        // 3、循环读取获取
        while (true) {
            Messages<byte[]> messages = consumer.batchReceive(); // 批量读取数据
            for (Message<byte[]> message : messages) {
                try{
                    System.out.println("批量消息为: " + new String(message.getData()));
                    consumer.acknowledge(message);
                } catch (Exception e) {
                    consumer.negativeAcknowledge(message);
                }
            }

        }
    }
}

批量的方法注意参数的使用。

总结

走得慢是为了走的更好,加油!

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐