博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring整合JMS-基于activeMQ实现(二)
阅读量:7050 次
发布时间:2019-06-28

本文共 9458 字,大约阅读时间需要 31 分钟。

Spring整合JMS-基于activeMQ实现(二)
1、消息监听器
     在Spring整合JMS的应用中我们在定义消息监听器的时候一共能够定义三种类型的消息监听器,各自是
MessageListener
SessionAwareMessageListener
MessageListenerAdapter
1.1 MessageListener
     MessageListener是
最原始的消息监听器(javax.jms.MessageListener),它是JMS规范中定义的一个接口。定义了一个omMessage方法。仅仅接收一个message參数
     
public 
class 
ConsumerMessageListener 
implements 
MessageListener{
    
public 
void 
onMessage(Message message) {
        
//这里我们知道生产者发送的就是一个纯文本消息。所以这里能够直接进行强制转换,或者直接把onMessage方法的參数改成Message的子类TextMessage
        TextMessage textMessage = (TextMessage)message;
        System. 
out
.println( 
"接收到一个纯文本消息" 
);
        
try 
{
            System. 
out
.println( 
"消息内容是:" 
+ textMessage.getText());
        } 
catch 
(JMSException e) {
            e.printStackTrace();
        }
    }
}
1.2 SessionAwareMessageListener
     sessionAwareMessageListener
是Spring为我们提供的,它不是标准的JMS消息监听器。MessageListener处理接收到的消息时候假设须要返回消息给对方。此时就须要又一次获取connection和session,SessionAwareMessageListener的设计就是为了
方便我们在接收到消息后发送一个回复的消息,onMessage方法中接收两个參数,一个Message。一个发送消息的Session。(看红色新增部分)
     
<
beans 
xmlns
= 
"http://www.springframework.org/schema/beans" 
xmlns:aop
= 
"http://www.springframework.org/schema/aop"
       
xmlns:tx
= 
"http://www.springframework.org/schema/tx" 
xmlns:xsi
= 
"http://www.w3.org/2001/XMLSchema-instance"
       
xmlns:context
= 
"http://www.springframework.org/schema/context" 
xmlns:jms
= 
"http://www.springframework.org/schema/jms"
       
xsi:schemaLocation
= 
"
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
    http://www.springframework.org/schema/tx  http://www.springframework.org/schema/tx/spring-tx-3.1.xsd "
>
      
<
bean 
id 
=
"connectionFactory" 
class
= 
"org.springframework.jms.connection.CachingConnectionFactory" 
>
             
<
property 
name 
=
"targetConnectionFactory"
>
                   
<
bean 
class
= 
"org.apache.activemq.ActiveMQConnectionFactory" 
>
                         
<
property 
name 
=
"brokerURL"
>
                               
<
value 
>
tcp://localhost:61616 
</
value 
>
                         
</
property 
>
                   
</
bean 
>
             
</
property 
>
             
<
property 
name 
=
"sessionCacheSize" 
value
= 
"1" 
/>
      
</
bean 
>
      
<!-- Spring jmsTemplate queue -->
      
<
bean 
id 
=
"jmsTemplate" 
class
= 
"org.springframework.jms.core.JmsTemplate" 
>
             
<
property 
name 
=
"connectionFactory" 
ref
= 
"connectionFactory"
></
property 
>
             
<
property 
name 
=
"defaultDestinationName" 
value
= 
"subject"
></
property 
>
             
<
property 
name 
=
"deliveryPersistent" 
value
= 
"true"
></
property 
>
             
<
property 
name 
=
"pubSubDomain" 
value
=
"false"
></ 
property
> 
<!-- false p2p,true topic -->
             
<
property 
name 
=
"sessionAcknowledgeMode" 
value
= 
"1"
></
property 
>
             
<
property 
name 
=
"explicitQosEnabled" 
value
= 
"true"
></
property 
>
             
<
property 
name 
=
"timeToLive" 
value
=
"604800000"
></ 
property
>
      
</
bean 
>
      
<!-- 配置Queue,当中value为Queue名称->start -->
      
<
bean 
id 
= 
"testQueue" 
class 
= 
"org.apache.activemq.command.ActiveMQQueue" 
>
             
<
constructor-arg 
index 
= 
"0" 
value 
=
"${pur.test.add}" 
/>
      
</
bean 
>
     
      
<bean id = "sessionAwareQueue" class = "org.apache.activemq.command.ActiveMQQueue" >
             
<constructor-arg index = "0" value= "queue.liupeng.sessionaware" />
      </bean >
 
<!-- 配置Queue,当中value为Queue名称->end -->
     
     
       
<!-- 注入AMQ的实现类属性(JmsTemplate和Destination) -->
       
<
bean 
id 
= 
"amqQueueSender" 
class 
= 
"com.tuniu.scc.purchase.plan.manage.core.amq.AMQQueueSender" 
>
            
<
property 
name 
= 
"jmsTemplate" 
ref
=
"jmsTemplate" 
></
property 
>
            
<
property 
name 
= 
"testQueue" 
ref
=
"testQueue" 
></
property 
>
            
<property name = "sessionAwareQueue" ref= "sessionAwareQueue"></property >
       
</
bean 
>
      
       
<!-- 消息发送必用的发送类 -->
       
<
bean 
id 
= 
"multiThreadAMQSender" 
class 
=
"com.tuniu.scc.purchase.plan.manage.core.amq.MultiThreadAMQSender"
             
init-method
= 
"init"
>
             
<
property 
name 
= 
"jmsTemplate" 
ref
=
"jmsTemplate" 
></
property 
>
             
<
property 
name 
= 
"multiThreadAMQExecutor" 
ref
= 
"multiThreadAMQExecutor" 
></
property 
>
      
</
bean 
>
     
     
<!-- 消息监听器->start --> 
 
          
<
bean 
id 
= 
"consumerMessageListener" 
class
= 
"com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerMessageListener" 
/>
          
<!-- 消息监听容器 -->
          
<
bean 
id 
= 
"jmsContainer" 
class
= 
"org.springframework.jms.listener.DefaultMessageListenerContainer" 
>
              
<
property 
name 
= 
"connectionFactory" 
ref
= 
"connectionFactory" 
/> 
 
              
<
property 
name 
= 
"destination" 
ref
= 
"testQueue" 
/> 
<!-- 消费者队列名称,改动 -->
              
<
property 
name 
= 
"messageListener" 
ref
= 
"consumerMessageListener" 
/>
          
</
bean 
>
         
         
<
bean 
id 
= 
"consumerSessionAwareMessageListener" 
class =
"com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerSessionAwareMessageListener" 
>
            
<property name ="testQueue" ref="testQueue"/> 
<!-- 接收消息后返回给testQueue队列 --> 
          
</
bean 
>
          
 
< 
bean 
id
= 
"sessionAwareListenerContainer" 
class
= 
"org.springframework.jms.listener.DefaultMessageListenerContainer" 
>
 
              
<
property 
name 
=
"connectionFactory" 
ref
= 
"connectionFactory" 
/>
 
              
<
property 
name 
=
"destination" 
ref
=
"sessionAwareQueue" 
/>
 
              
<
property 
name 
=
"messageListener" 
ref
= 
"consumerSessionAwareMessageListener" 
/> 
 
           
</
bean 
>
 
 
<!-- 消息监听器->end --> 
    
</
beans
>
     
发送消息:
                  
@Resource
    
private 
AMQQueueSender 
amqQueueSender
;
    
private 
static 
final 
Logger 
LOG 
= LoggerFactory.getLogger(AMQController. 
class
);
    
@UvConfig
(method = 
"testQueue"
, description = 
"測试AMQ"
)
    
@RequestMapping
(value = 
"/testQueue"
, method = RequestMethod. 
POST
)
    
@TSPServiceInfo
(name = 
"PUR.NM.
AMQController
.testQueue" 
, description = 
"測试AMQ"
)
    
public 
void 
testQueue(HttpServletRequest request, HttpServletResponse response) {
        
try 
{
            
long 
beginTime = System. currentTimeMillis();
            
LOG
.info( 
"发送開始"
);
            
//amqQueueSender.sendMessage("test", StaticProperty.TEST_QUEUE);
            
amqQueueSender
.sendMessage( 
"test"
, StaticProperty.
TEST_SESSIONAWARE_QUEUE 
);
            
LOG
.info( 
"发送结束,耗时:" 
+(System.currentTimeMillis()-beginTime)+ 
"ms"
);
        } 
catch 
(InterruptedException e) {
            
LOG
.error( 
"測试失败"
, e);
        }
    }
     
接收消息:
          
public 
class 
ConsumerSessionAwareMessageListener 
implements 
SessionAwareMessageListener<TextMessage>{
    
private 
Destination 
testQueue
; 
//返回消息目的队列
    
@Override
    
public 
void 
onMessage(TextMessage message, Session session) 
throws 
JMSException {
        System. 
out
.println( 
"收到一条消息" 
);
        System. 
out
.println( 
"消息内容是:" 
+message.getText());
       
        MessageProducer producer = session.createProducer( 
testQueue
);
        Message txtMessage = session.createTextMessage(
"consumerSessionAwareMessageListener..." 
);
        producer.send(txtMessage);
    }
    
public 
Destination getTestQueue() {
        
return 
testQueue
;
    }
    
public 
void 
setTestQueue(Destination sessionAwareQueue) {
        
this
.
testQueue 
= sessionAwareQueue;
    }
}
    
 打印结果:
收到一条消息
消息内容是:test
接收到一个纯文本消息
消息内容是:consumerSessionAwareMessageListener...
1.3 MessageListenerAdapter
     MessageListenerAdapter
实现了MessageListener接口和SessionAwareMessageListener接口,它的作用主要是将接收到的消息进行类型转换。然后通过反射的形式把它交给一个普通的Java类进行处理。
     MessageListenerAdapter会把接收到的消息做例如以下转换:
          TextMessage转换为String对象
          BytesMessage转换为byte数组
          MapMessage转换为Map对象
          ObjectMessage转换为相应的Serializable对象
     既然前面说到MessageListenerAdapter会把接收到的消息做类型转换再通过反射交给Java类处理,假设真正目标处理器是一个MessageListener或者是一个SessionAwareMessageListener,那么Spring将直接使用接收到的Message对象作为參数调用它们的onMessage方法。而不会利用反射去调用。以下定义的时候为它指定一个目标类
              
<!-- 消息监听适配器 --> 
 
      
<
bean 
id 
=
"messageListenerAdapter" 
class
= 
"org.springframework.jms.listener.adapter.MessageListenerAdapter" 
>
 
               
<
property 
name 
=
"delegate"
>
 
                  
<
bean 
class
= 
"com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerListener" 
/>
 
              
</
property 
>
 
              
<
property 
name 
=
"defaultListenerMethod" 
value
= 
"receiveMessage"
/>
          
</
bean 
>
          
<
bean 
id 
=
"messageListenerAdapterContainer" 
class
= 
"org.springframework.jms.listener.DefaultMessageListenerContainer" 
>
 
            
<
property 
name 
=
"connectionFactory" 
ref
= 
"connectionFactory" 
/>
 
            
<
property 
name 
=
"destination" 
ref
=
"adapterQueue" 
/>
 
            
<
property 
name 
=
"messageListener" 
ref
= 
"messageListenerAdapter" 
/> 
 
           
</
bean 
>
  
     上面说到,目标处理器是一个普通Java类的时候,Spring将进行类型转换之后的对象通过反射去调用真正的方法,那么Spring是怎样知道该调用哪个方法的呢?这是通过MessageListenerAdapter的defaultListenerMethod属性来决定的。
当没有指定该属性的时候。会默认调用目标处理器的handleMessage方法
     
编写新队列、发送消息同上
    
 接收消息:
          
public 
class 
ConsumerListener{
    
public 
void 
handleMessage(String message) {  
        System. 
out
.println( 
"ConsumerListener通过handleMessage接收到一个纯文本消息。消息内容是:" 
+ message);  
    }  
      
    
public 
void 
receiveMessage(String message) {  
        System.
out
.println(
"ConsumerListener通过receiveMessage接收到一个纯文本消息。消息内容是:" 
+ message);  
    }  
}
     
MessageListenerAdapter的另外一个主要功能就是能够自己主动的发送回复的消息
  1. 方法一:
  2. 、public void sendMessage(Destination destination, final String message) {   
  3.         System.out.println("---------------生产者发送消息-----------------");   
  4.         System.out.println("---------------生产者发了一个消息:" + message);   
  5.         jmsTemplate.send(destination, new MessageCreator() {   
  6.             public Message createMessage(Session session) throws JMSException {   
  7.                 TextMessage textMessage = session.createTextMessage(message);   
  8.                 textMessage.setJMSReplyTo(responseDestination);  //(省略编写其相应的监听器代码) 
  9.                 return textMessage;   
  10.             }   
  11.         });   
  12.     }   
     方法二:
          
<!-- 消息监听适配器 --> 
 
      
<
bean 
id 
=
"messageListenerAdapter" 
class
= 
"org.springframework.jms.listener.adapter.MessageListenerAdapter" 
>
 
               
<
property 
name 
=
"delegate"
>
 
                  
<
bean 
class
= 
"com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerListener" 
/>
 
              
</
property 
>
 
              
<
property 
name 
=
"defaultListenerMethod" 
value
= 
"receiveMessage"
/>
          
</
bean 
>
          
<
bean 
id 
=
"messageListenerAdapterContainer" 
class
= 
"org.springframework.jms.listener.DefaultMessageListenerContainer" 
>
 
            
<
property 
name 
=
"connectionFactory" 
ref
= 
"connectionFactory" 
/>
 
            
<
property 
name 
=
"destination" 
ref
=
"adapterQueue" 
/>
 
            
<
property 
name 
=
"messageListener" 
ref
= 
"messageListenerAdapter" 
/>
                      
  <property name="defaultResponseDestination" ref="defaultResponseQueue"/>   
           
</
bean 
>

转载地址:http://hzpol.baihongyu.com/

你可能感兴趣的文章
js实现图片加载特效(从左到右,百叶窗,从中间到两边)
查看>>
基于matlab的图像处理——高斯噪声&均值滤波,椒盐噪声&中值滤波
查看>>
小程序学习---hello world
查看>>
到数博会看“2017十大黑科技”
查看>>
麦肯锡报告:如果再不转型人工智能,这些行业将被越甩越远
查看>>
Git学习笔记(一)
查看>>
MS CRM 2011 如何从外部连接CRM
查看>>
MySQL读取配置文件的顺序、启动方式、启动原理
查看>>
速度制胜亚特兰大 施耐德电气助Internap与客户共成长
查看>>
重复 HTML 元素
查看>>
非系统数据文件损坏,rman备份恢复
查看>>
BIOS自检
查看>>
利用RIS在网络中实现批量操作系统的安装
查看>>
进程之进程间的8种通信方式
查看>>
史上最全的CSS hack方式一览
查看>>
第三天 入口文件index.php 01
查看>>
构建富互联网应用程序监控工作流和流程(6)
查看>>
动态编译php的gd库
查看>>
开源 .net license tool, EasyLicense !
查看>>
Openssl及加密解密(一)数据加密解密及CA原理
查看>>