`
zhangwei_david
  • 浏览: 468777 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Spring 之 JMS 监听JMS消息

    博客分类:
  • JMS
 
阅读更多

    在调用JMS消息消费者的receive()方法接收消息时,调用线程在消息可用之前一直阻塞。该线程出了等待还是等待,无所事事。这样的消息接收是同步消息接收,因为只用等到消息到达才能接收线程的工作。

     有同步的消息接收就有异步的消息接收,异步的消息接收就是注册一个消息监听器,该消息监听器必须实现javax.jms.MessageListener接口,当消息到达时将调用onMessage()方法,以消息作为方法的参数。

 

    原生的接口是javax.jms.MessageListener,除了这个原生的接口外,Spring 还提供了 SessionAwareMessageListener和MessageListenerAdapter.

 

/**
 *
 * @author zhangwei_david
 * @version $Id: MessageListener.java, v 0.1 2015年1月31日 下午9:06:02 zhangwei_david Exp $
 */
public class MailMessageListener implements MessageListener {

    private MessageConverter messageConverter;

    /**
     * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
     */
    public void onMessage(Message msg) {
        try {
            System.out.println("on message:" + messageConverter.fromMessage(msg));
        } catch (JMSException e) {
        }
    }

    /**
     * Setter method for property <tt>messageConverter</tt>.
     *
     * @param messageConverter value to be assigned to property messageConverter
     */
    public void setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

}

     Spring 中提供了多种消息监听器的容器,常用的容器有SimpleMessageListenerContainer 和DefaultMessageListenerContainer。SimpleMessageListenerContainer 是一个最简单的容器,不提供事务的支持,DefaultMessageListenerContainer是默认的容器实现,支持事务。

 

/**
 *
 * @author zhangwei_david
 * @version $Id: ToStringBase.java, v 0.1 2015年2月2日 下午7:41:52 zhangwei_david Exp $
 */
public class ToStringBase {

    /**
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
    }

}

 

定义一个基本的对象

/**
 *
 * @author zhangwei_david
 * @version $Id: Mail.java, v 0.1 2015年2月2日 下午7:25:24 zhangwei_david Exp $
 */
public class Mail extends ToStringBase {

    /**id**/
    private String mailId;

    private String from;

    private String to;

    private String content;

    /**
     * Getter method for property <tt>mailId</tt>.
     *
     * @return property value of mailId
     */
    public String getMailId() {
        return mailId;
    }

    /**
     * Setter method for property <tt>mailId</tt>.
     *
     * @param mailId value to be assigned to property mailId
     */
    public void setMailId(String mailId) {
        this.mailId = mailId;
    }

    /**
     * Getter method for property <tt>from</tt>.
     *
     * @return property value of from
     */
    public String getFrom() {
        return from;
    }

    /**
     * Setter method for property <tt>from</tt>.
     *
     * @param from value to be assigned to property from
     */
    public void setFrom(String from) {
        this.from = from;
    }

    /**
     * Getter method for property <tt>to</tt>.
     *
     * @return property value of to
     */
    public String getTo() {
        return to;
    }

    /**
     * Setter method for property <tt>to</tt>.
     *
     * @param to value to be assigned to property to
     */
    public void setTo(String to) {
        this.to = to;
    }

    /**
     * Getter method for property <tt>content</tt>.
     *
     * @return property value of content
     */
    public String getContent() {
        return content;
    }

    /**
     * Setter method for property <tt>content</tt>.
     *
     * @param content value to be assigned to property content
     */
    public void setContent(String content) {
        this.content = content;
    }

}

 

/**
 *
 * @author zhangwei_david
 * @version $Id: ProducerImpl.java, v 0.1 2015年1月31日 下午8:25:36 zhangwei_david Exp $
 */
@Component
public class ProducerImpl implements Producer {
    @Autowired
    private JmsTemplate jmsTemplate;

    /**
     */
    @Transactional
    public void send(Mail mail) {
        System.out.println("sende->" + mail);
        jmsTemplate.convertAndSend(mail);

    }
}

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:aop="http://www.springframework.org/schema/aop" 
	xsi:schemaLocation="
		http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
		http://www.springframework.org/shcema/jms
		http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
		http://www.springframework.org/schema/context
		http://www.springframework.org/schema/context/spring-context-3.0.xsd
		http://www.springframework.org/schema/aop 
		http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
		">
	<aop:aspectj-autoproxy />

	<context:annotation-config />
	<context:component-scan base-package="com.cathy.demo.jms.*" />
	<!-- connectionFactory -->
	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616"/>
	</bean>
	<!-- mailMessage converter -->
	<bean id="mailMessageConverter" class="com.cathy.demo.jms.convert.MailMessageConverter"/>
	<!-- jmsTemplate -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory"/>
		<property name="defaultDestination" ref="topic"/>
		<property name="receiveTimeout" value="60000"/>
		<property name="pubSubDomain" value="true"/>
		<property name="sessionTransacted" value="true"/>   
		<property name="messageConverter" ref="mailMessageConverter"/>
	</bean>
	<!--
	<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg value="message.queue"/>
	</bean>
	-->
	<!-- 主题 -->
	<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="notifyTopic"/>
	</bean>
	<bean id="defaultMessageListener" class="com.cathy.demo.jms.listener.MailMessageListener">
		<property name="messageConverter" ref="mailMessageConverter"/>
	</bean>
	<!-- 消息接收监听器用于异步接收消息-->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory"/>  
        <property name="destination" ref="topic"/>
        <property name="sessionTransacted" value="true"/>
        <property name="messageListener" ref="defaultMessageListener"/>  
   </bean>
   
</beans>

 

/**
 *
 * @author zhangwei_david
 * @version $Id: Sender.java, v 0.1 2015年1月31日 下午8:47:18 zhangwei_david Exp $
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath*:META-INF/spring/jms-beans.xml")
public class SenderAndReciver {
    @Autowired
    private Producer producer;

    @Test
    public void testSend() {
        Mail mail = new Mail();
        mail.setMailId("testId");
        mail.setTo("david");
        mail.setFrom("cathy");
        mail.setContent("Hello");
        producer.send(mail);
    }

}

 测试的结果是:

log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
sende->Mail[mailId=testId,from=cathy,to=david,content=Hello]
on message:Mail[mailId=testId,from=cathy,to=david,content=Hello]

 

MessageListenerAdapter

 

消息监听器适配器代理消息的处理目标 通过反射监听方法,具有灵活的消息类型转换。允许监听器的方法来对邮件内容类型进行操作,完全独立于JMS API

 

定义一个消息处理目标方法

 

/**
 *
 * @author zhangwei_david
 * @version $Id: ListenerDelegate.java, v 0.1 2015年2月3日 下午2:32:11 zhangwei_david Exp $
 */
public class ListenerDelegate {

    public void handleMessage(@SuppressWarnings("rawtypes") Map map) {
        System.out.println("receive  MapMessage->" + map);
    }
}
 
<bean id="defaultListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
         <constructor-arg ref="listenerDelegate"/>
     </bean>
	
	<!-- 消息接收监听器用于异步接收消息-->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory"/>  
        <property name="destination" ref="topic"/>
        <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/>
        <property name="messageListener" ref="defaultListener"/> 
        <property name="sessionTransacted" value="true"/> 
   </bean>
 执行测试方法的结果是:

 

sende->Mail[mailId=testId,from=cathy,to=david,content=Hello]
on message:Mail[mailId=testId,from=cathy,to=david,content=Hello]
receive  MapMessage->{mailId=testId, from=cathy, to=david, content=Hello}
 那么在MessageListenerAdapter中又是如何处理消息的呢?
public void onMessage(Message message, Session session) throws JMSException {
		// 获取代理方法
		Object delegate = getDelegate();
		// 如果代理方法不是当前类
		if (delegate != this) {
			// 如果代理方法是实现SessionAwareMessageListener接口的,直接将消息和session交给代理方法处理
			if (delegate instanceof SessionAwareMessageListener) {
				if (session != null) {
					((SessionAwareMessageListener) delegate).onMessage(message, session);
					return;
				}
				// session为null,且该代理对象未实现MessageListener接口则抛出一个异常
				else if (!(delegate instanceof MessageListener)) {
					throw new javax.jms.IllegalStateException("MessageListenerAdapter cannot handle a " +
							"SessionAwareMessageListener delegate if it hasn't been invoked with a Session itself");
				}
			}
			// 如果代理对象实现了MessageListener接口则将消息交给代理对象处理
			if (delegate instanceof MessageListener) {
				((MessageListener) delegate).onMessage(message);
				return;
			}
		}

		//将消息体转换为一个对象
		Object convertedMessage = extractMessage(message);
		// 获取这个消息的处理方法
		String methodName = getListenerMethodName(message, convertedMessage);
		if (methodName == null) {
			throw new javax.jms.IllegalStateException("No default listener method specified: " +
					"Either specify a non-null value for the 'defaultListenerMethod' property or " +
					"override the 'getListenerMethodName' method.");
		}

		// 反射调用处理方法处理消息
		Object[] listenerArguments = buildListenerArguments(convertedMessage);
		Object result = invokeListenerMethod(methodName, listenerArguments);
		if (result != null) {
			handleResult(result, message, session);
		}
		else {
			logger.trace("No result object given - no result to handle");
		}
	}
 
0
0
分享到:
评论

相关推荐

    weblogic中使用JMS发送和接受消息

    weblogic中使用JMS发送和接受消息(Queue and TOPIC)

    spring-jms-oracle-aq.rar_oracle aq_spring oracle aq_spring oracl

    spring监听oracle AQ队列 Oracle AQ with Spring JMS ( Without Spring JDBC Extension )

    在spring boot中使用jms集成IBM-MQ和TLQ,包含普通队列和主题订阅两种模式,并实现按需加载

    1) 本工程主要演示在SPRING BOOT工程中怎样使用JMS集成IBM-MQ及TLQ两种消息中间件产品 2) 使用SPRING BOOT Conditional机制实现了两种产品按需加载,工程会根据配置文件开关动态加载 3) 实现了普通队列消息发送与...

    JMS之Spring +activeMQ实现消息队列

    NULL 博文链接:https://ihenu.iteye.com/blog/2270078

    JMS与SPRING的整合实例(基于Apache ActiveMQ)

    JMS与SPRING的整合实例(基于Apache ActiveMQ) 定义JMS连接工厂 定义JMS Template 定义消息目的地 定义接收监听器 定义一个JMS话题 定义消费者(接收端) 定义发布者 JAVA核心代码一般由三个部分组成: 监听器...

    JMS之ActiveMQ与Spring整合源码

    cJMS之ActiveMQ与Spring整合源码

    Java笔试题目类型-spring-jms-examples:SpringJMS示例

    [消息监听适配器]( 这三种类型的消息侦听器都使用 async 目录中的示例进行了演示。 运行示例 这些示例旨在使用 Maven 从命令行运行。 消费者应该在一个终端中运行,而生产者应该在一个单独的终端中运行。 下面是运行...

    Spring In Action 使用Spring发送和接收JMS消息

    NULL 博文链接:https://wujiu.iteye.com/blog/2010643

    JMS入门Demo

    在Spring整合JMS的应用中,如果我们要进行本地的事务管理的话非常简单,只需要在定义对应的消息监听容器时指定其sessionTransacted属性为true,如: &lt;bean id="jmsContainer" class="org.springframework.jms....

    JMS完全实例(八个实例)

    含有八个独立运行的程序,分别设计JMS多个方面。 工程使用ActiveMQ,运行工程之前,请阅读doc.txt文档,并运行ActiveMQ

    spring+activeMQ集成

    spring集成activeMQ框架 配置方式(内含三种常见的消息接受监听方式的配置)JMS 配置测试等等

    Spring 2.0 开发参考手册

    19.2. 使用Spring JMS 19.2.1. JmsTemplate 19.2.2. 连接工厂 19.2.3. (消息)目的地管理 19.2.4. 消息侦听容器 19.2.5. 事务管理 19.3. 发送一条消息 19.3.1. 使用消息转换器 19.3.2. SessionCallback 和...

    Spring中文帮助文档

    2.4.4. 异步的JMS 2.4.5. JDBC 2.5. Web层 2.5.1. Spring MVC合理的默认值 2.5.2. Portlet 框架 2.5.3. 基于Annotation的控制器 2.5.4. Spring MVC的表单标签库 2.5.5. 对Tiles 2 支持 2.5.6. 对JSF 1.2支持...

    Spring in Action(第2版)中文版

    10.3.1创建消息监听器 10.3.2编写纯pojomdp 10.4使用基于消息的rpc 10.4.1引入lingo 10.4.2输出服务 10.4.3代理jms 10.5小结 第11章spring和ejb 11.1在spring中置入ejb 11.1.1代理会话bean(ejb2.x) 11.1.2...

    Spring in Action(第二版 中文高清版).part2

    10.3.1 创建消息监听器 10.3.2 编写纯POJO MDP 10.4 使用基于消息的RPC 10.4.1 引入Lingo 10.4.2 输出服务 10.4.3 代理JMS 10.5 小结 第11章 Spring和EJB 11.1 在Spring中置入EJB 11.1.1 代理会话Bean...

    Spring in Action(第二版 中文高清版).part1

    10.3.1 创建消息监听器 10.3.2 编写纯POJO MDP 10.4 使用基于消息的RPC 10.4.1 引入Lingo 10.4.2 输出服务 10.4.3 代理JMS 10.5 小结 第11章 Spring和EJB 11.1 在Spring中置入EJB 11.1.1 代理会话Bean...

    Spring API

    2.4.4. 异步的JMS 2.4.5. JDBC 2.5. Web层 2.5.1. Spring MVC合理的默认值 2.5.2. Portlet 框架 2.5.3. 基于Annotation的控制器 2.5.4. Spring MVC的表单标签库 2.5.5. 对Tiles 2 支持 2.5.6. 对JSF 1.2支持...

    Spring-Reference_zh_CN(Spring中文参考手册)

    2.4.3. 异步的JMS 2.4.4. JDBC 2.5. Web层 2.5.1. Spring MVC的表单标签库 2.5.2. Spring MVC合理的默认值 2.5.3. Portlet 框架 2.6. 其他特性 2.6.1. 动态语言支持 2.6.2. JMX 2.6 .3. 任务规划 2.6.4. 对Java 5...

    Spring攻略(第二版 中文高清版).part1

    10.6 使用BlazeDS和Spring消费面向消息的服务 421 10.6.1 问题 421 10.6.2 解决方案 422 10.6.3 工作原理 422 10.7 将依赖注入带给你的ActionScript客户 434 10.7.1 问题 434 10.7.2 解决方案 434 ...

    spring in action英文版

     第1章 开始Spring之旅  1.1 为什么使用Spring  1.1.1 J2EE开发者的一天  1.1.2 Spring的承诺  1.2 Spring是什么  1.3 开始Spring之旅  1.4 理解反向控制  1.4.1 依赖注入  1.4.2 IoC应用...

Global site tag (gtag.js) - Google Analytics