首页 > 文章列表 > Java API 开发中使用 HornetQ 进行消息处理

Java API 开发中使用 HornetQ 进行消息处理

JavaAPI HornetQ 消息处理
214 2023-06-17

Java API 开发中使用 HornetQ 进行消息处理

随着互联网的快速发展,大量的信息交互涌现出来,消息队列成为解决高并发、高可用、异步处理等问题的重要手段。HornetQ 是由 JBoss 开发的基于 JMS 协议的高性能、高可用的开源消息中间件。本文将介绍如何在 Java API 开发中使用 HornetQ 进行消息处理。

一、快速入门

  1. 下载 HornetQ

HornetQ的官网(http://hornetq.apache.org/downloads.html)提供了多种格式的下载包,这里选择 HornetQ-2.4.0.Final-bin.tar.gz。

  1. 安装 HornetQ

下载完成后,将 HornetQ-2.4.0.Final-bin.tar.gz 解压缩到本地文件夹中。

  1. 启动 HornetQ

进入 HornetQ 的 bin 目录,执行以下命令:

  ./run.sh

出现以下日志信息则表示成功启动 HornetQ 服务:

  11:14:21,867 INFO [ServerImpl] Starting HornetQ Server
  11:14:21,986 INFO [JournalStorageManager] Using NIO Journal
  11:14:22,626 INFO [NettyAcceptor] Started Netty Acceptor version #{version}
  11:14:22,697 INFO [HornetQServerImpl] HornetQ Server version #{version} [${name}] started

  1. 部署 HornetQ 控制台

将 HornetQ 的 hornetq-console.war 放入 Tomcat 的 webapps 目录下,启动 Tomcat,通过 http://localhost:8080/hornetq-console 访问 HornetQ 控制台。

二、HornetQ 的使用

  1. 发布和接收消息

HornetQ 的发布订阅模式是基于 Topic 的,发布端向某个 Topic 发布消息,而多个接收端可以同时订阅这个 Topic,接收端就可以接收到多个发布端发布的消息。

(1)消息发布端

先创建一个发布端(Publisher)发送消息,代码如下:

public class Publisher {

    public static void main(String[] args) throws Exception {

        // 初始化连接工厂等配置信息
        ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
        JMSContext jmsContext = connectionFactory.createContext();

        // 发送消息
        JMSProducer producer = jmsContext.createProducer();
        Destination destination = HornetQJMSClient.createTopic("exampleTopic");
        producer.send(destination, "Hello, HornetQ!");

        // 关闭连接
        jmsContext.close();
    }
}

(2)消息接收端

再创建一个接收端(Subscriber)去接收消息,并将消息打印出来,代码如下:

public class Subscriber {

    public static void main(String[] args) throws Exception {

        // 初始化连接工厂等配置信息
        ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
        JMSContext jmsContext = connectionFactory.createContext();

        // 创建消费者
        Destination destination = HornetQJMSClient.createTopic("exampleTopic");
        JMSConsumer consumer = jmsContext.createConsumer(destination);

        // 接收消息并打印
        String message = null;
        do {
            message = consumer.receiveBody(String.class, 1000);
            System.out.println("Received message: " + message);
        } while (message != null);

        // 关闭连接
        jmsContext.close();
    }
}

在运行发布端和接收端之后,可以在 HornetQ 控制台上查看发布端发送的消息,如下图所示:

  1. 消息持久化

HornetQ 支持将消息进行持久化存储,这意味着即使 HornetQ 宕机了,也能保证消息不丢失。

(1)发送者

我们需要将消息的持久性设置为 DeliveryMode.PERSISTENT,如下所示:

public class Publisher {

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
        JMSContext jmsContext = connectionFactory.createContext();

        // 设定持久性
        JMSProducer producer = jmsContext.createProducer();
        destination = HornetQJMSClient.createTopic("exampleTopic");
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        // 发送消息
        producer.send(destination, "Hello, HornetQ!");

        jmsContext.close();
    }
}

(2)接收者

HornetQ 默认已将消息持久化存储,因此不需要在接收者端进行特定的配置,继续使用上一节中的 Subscriber 类即可。

  1. 集群模式

HornetQ 具有高可用性的特点,可以以集群模式运行,以确保消息的可靠性和高并发性。以下是实现 HornetQ 集群模式的步骤:

(1)拷贝 HornetQ 目录并新建文件夹

将 HornetQ 目录拷贝一份并重命名为 HornetQ2,然后新建一个名为 cluster 的文件夹,并将 HornetQ2 目录下的 data 目录、log 目录、tmp 目录等文件夹全部复制到 cluster 文件夹下。

(2)修改配置文件

在 HornetQ 目录下的 examples/configs/clustered 配置文件夹中,将 hq-configuration.xml 文件及 server0 和 server1 文件夹复制到 HornetQ2 目录中,并按照下面的方式修改 server0 文件夹中的 hornetq-configuration.xml 文件:

  (a)将节点名修改为 server0

  (b)将 cluster-connections 中的 server-username 和 server-password 修改为"guest"

  (c)修改 connector 地址为本机 IP 地址,如 192.168.1.1

  (d)将 jms-configuration 下的 use-ha 设为 true

  如下所示:

<configuration xmlns="urn:hornetq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
    <cluster-password>guest</cluster-password>
    <paging-directory>${data.dir:../data}/paging</paging-directory>
    <bindings-directory>${data.dir:../data}/bindings</bindings-directory>
    <journal-directory>${data.dir:../data}/journal</journal-directory>
    <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
    <journal-type>NIO</journal-type>
    <journal-datasync>true</journal-datasync>
    <journal-min-files>2</journal-min-files>
    <journal-pool-files>10</journal-pool-files>
    <journal-file-size>10240</journal-file-size>
    <journal-buffer-timeout>28000</journal-buffer-timeout>
    <journal-max-io>1</journal-max-io>
    <disk-scan-period>5000</disk-scan-period>
    <max-disk-usage>90</max-disk-usage>
    <critical-analyzer>true</critical-analyzer>
    <critical-analyzer-timeout>120000</critical-analyzer-timeout>
    <critical-analyzer-check-period>60000</critical-analyzer-check-period>
    <critical-analyzer-policy>HALT</critical-analyzer-policy>
    <page-sync-timeout>1628000</page-sync-timeout>
    <global-max-size>100Mb</global-max-size>
    <connectors>
        <connector name="netty">
            <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
            <param key="host" value="192.168.1.1"/>
            <param key="port" value="5445"/>
        </connector>
    </connectors>
    <acceptors>
        <acceptor name="netty">
            <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
            <param key="host" value="192.168.1.1"/>
            <param key="port" value="5545"/>
        </acceptor>
    </acceptors>
    <cluster-connections>
        <cluster-connection name="my-cluster">
            <address>jms</address>
            <connector-ref>netty</connector-ref>
            <retry-interval>500</retry-interval>
            <use-duplicate-detection>true</use-duplicate-detection>
            <forward-when-no-consumers>true</forward-when-no-consumers>
            <max-hops>1</max-hops>
            <discovery-group-ref discovery-group-name="my-discovery-group"/>
            <static-connectors>
                <connector-ref>netty</connector-ref>
            </static-connectors>
        </cluster-connection>
    </cluster-connections>
    <ha-policy>
        <replication>
            <slave>
                <allow-failback>true</allow-failback>
                <failback-delay>5000</failback-delay>
                <max-saved-replicated-journals-size>1000000</max-saved-replicated-journals-size>
                <restart-backup>true</restart-backup>
            </slave>
        </replication>
    </ha-policy>
</configuration>

然后再按照同样的方式修改 server1 文件夹中的 hornetq-configuration.xml 文件,其中将 server0 改为 server1。

(3)启动 HornetQ

依次在 HornetQ 和 HornetQ2 的 bin 目录下执行 run.sh 命令启动 HornetQ 进程,此时两个 HornetQ 节点即形成了一个集群,可以通过 HornetQ 控制台查看。

三、总结

通过本文的介绍,我们了解了 HornetQ 的基本使用和集群模式的配置方法。使用 HornetQ 可以方便地解决消息交互的问题,提高系统的健壮性和并发能力。同时,HornetQ 还支持多种消息传递模式、丰富的消息持久化机制和扩展插件等特性,可以根据实际需求进行选择和配置。