一、引入spring和ActiveMQ的依赖。
<properties>
<spring.version>5.0.2.RELEASE</spring.version>
<activemq.version>5.15.5</activemq.version>
</properties>
<dependencies>
<!--activemq需要的jar包 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
</dependencies>
二、整合queue消息模式
1.application-queue.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
<!--声明消息工厂-->
<bean id="jmsFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!--声明目的地 队列-->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-active-queue"/>
</bean>
<!-- Spring的JMS模版操作activeMQ工具 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="defaultDestination" ref="destinationQueue"/>
<!-- 消息转化器 -->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
2.QuenProducer.java消息生产者
public class QuenProducer {
public static void main(String[] args) {
// 创建IOC容器
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-queue.xml");
JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
//指定队列名称地址
//jmsTemplate.setDefaultDestination(new ActiveMQQueue("hello-quen"));
// 默认队列
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
//创建消息
TextMessage textMessage = session.createTextMessage("spring整合ActiveMQ-queue");
return textMessage;
}
});
System.out.println("消息发送成功!!!");
}
}
3.QuenConsumer.java消息消费者
public class QuenConsumer {
public static void main(String[] args) throws JMSException {
//创建IOC容器
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-queue.xml");
JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
//消费队列名称地址
//jmsTemplate.setDefaultDestination(new ActiveMQQueue("hello-quen"));
//如果不设置监听地址,默认目的地
TextMessage textMessage = (TextMessage) jmsTemplate.receive();
System.out.println(textMessage.getText());
//返回的是对象 消息转换器的作用 bean里面有配置
//Object object = jmsTemplate.receiveAndConvert();
//System.out.println("消息是:"+ object);
}
}
三、整合Topic消息模式
1.application-topic.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
<!--声明消息工厂-->
<bean id="jmsFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!--声明目的地 队列-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-active-topic"/>
</bean>
<!-- Spring的JMS模版操作activeMQ工具 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="defaultDestination" ref="destinationTopic"/>
<!-- 消息转化器 -->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
2.生产者和消费者,和queue模式一样,只需要修改配置文件即可。
四、监听者模式
1.application-queue-listener.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
<!--声明消息工厂-->
<bean id="jmsFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!--声明目的地 队列-->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-active-queue"/>
</bean>
<!-- Spring的JMS模版操作activeMQ工具 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="defaultDestination" ref="destinationQueue"/>
<!-- 消息转化器 -->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
<!--声明一个监听器对象-->
<bean id="myListener" class="com.wang.listener.MyListener"></bean>
<!--配置监听容器-->
<bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="destination" ref="destinationQueue"/>
<property name="messageListener" ref="myListener"/>
</bean>
</beans>
2.MyListener.java,监听相当于消费者。
public class MyListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("监听器收到消息,不需要配置消费者了!!!,内容是:"+ textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
3.QuenProducerListener.java 只需要配置生产者就可以了。上面监听相当于消费者。
public class QuenProducerListener {
public static void main(String[] args) {
// 创建IOC容器
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-queue-listener.xml");
JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
//指定队列名称地址
//jmsTemplate.setDefaultDestination(new ActiveMQQueue("hello-quen"));
// 默认队列
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
//创建消息
TextMessage textMessage = session.createTextMessage("spring整合ActiveMQ-queue");
return textMessage;
}
});
System.out.println("消息发送成功!!!");
}
}
评论