1、20.1 Spring Integration主要概念介绍Spring Integration所有组件都是所有组件都是 Spring 管理的对象,它提供了各类消息端点实现对消管理的对象,它提供了各类消息端点实现对消息的处理,图息的处理,图20-1给出了一个典型消息端点给出了一个典型消息端点(Message Endpoint)的逻辑处理过程。)的逻辑处理过程。20.1.1消息的构建构建消息有两种方式,一种使用消息接口的实现类构建消息有两种方式,一种使用消息接口的实现类GenericMessage构建,构建,其构造方法有两个,一其构造方法有两个,一个是仅含消息负载参数,另一个则包括消息负载和消个是
2、仅含消息负载参数,另一个则包括消息负载和消息头两个参数。消息头为一个息头两个参数。消息头为一个Map对象,存放关于消对象,存放关于消息的各种元数据。消息负载是任何可持久化的息的各种元数据。消息负载是任何可持久化的Java对对象。含消息头参数的构造方法具体格式如下:象。含消息头参数的构造方法具体格式如下:GenericMessage(T payload,Map headers);另一种使用另一种使用MessageBuilder工具类。例如,以下创建工具类。例如,以下创建一个字符串消息:一个字符串消息:Message m=MessageBuilder.withPayload(hello).buil
3、d();20.1.2 消息通道消息通道(消息通道(MessageChannel)是传送消息的部件,)是传送消息的部件,消息生产者发送消息到通道,消息消费者从通道接收消息生产者发送消息到通道,消息消费者从通道接收消息。消息。MessageChannel接口定义如下:接口定义如下:public interface MessageChannel String getName();boolean send(Message message);boolean send(Message message,long timeout);MessageChannel有有2个子接口,分别是个子接口,分别是Pollab
4、leChannel和和SubscribableChannel。接口PollableChannel 接口接口PollableChannel用于点对点通道形式中,能够缓存消用于点对点通道形式中,能够缓存消息,且消费者可以在任意时间处理消息,缓冲的优势在于它能够息,且消费者可以在任意时间处理消息,缓冲的优势在于它能够调节接入消息流量,从而防止系统负荷过载。调节接入消息流量,从而防止系统负荷过载。接口接口PollableChannel的定义如下:的定义如下:public interface PollableChannel extends MessageChannel Message receive()
5、;Message receive(long timeout);ListMessage clear();ListMessage purge(Messageselector selector);SubscribableChannel接口另一个是另一个是SubscribableChannel接口,用于发布接口,用于发布/订阅形式的消息通信,无需缓存,其中含支持消息的订阅形式的消息通信,无需缓存,其中含支持消息的订阅订阅/取消的取消的subscribe和和unsubscribe方法,它们均方法,它们均需要提供需要提供MessageHandler类型的消息处理器作为参类型的消息处理器作为参数。数。接口接
6、口SubscribableChannel的定义如下:的定义如下:public interface SubscribableChannel extends MessageChannel boolean subscribe(MessageHandler handler);boolean unsubscribe(MessageHandler handler);点对点和发布订阅通道(1)点对点形式通道)点对点形式通道 点对点通道只有一个接收者能收到消息,常用点对点通道包括直接点对点通道只有一个接收者能收到消息,常用点对点通道包括直接通道(通道(DirectChannel)、队列通道()、队列通道(Qu
7、eueChannel)、优先级通道)、优先级通道(PriorityChannel)。队列通道支持消息缓冲队列,按)。队列通道支持消息缓冲队列,按FIFO规则处理规则处理消息,而优先级通道允许指定消息的优先顺序。直接通道定义形式为消息,而优先级通道允许指定消息的优先顺序。直接通道定义形式为(2)发布)发布/订阅形式通道订阅形式通道 发布订阅形式通道(发布订阅形式通道(PublishSubscribeChannel)用于发布)用于发布/订阅订阅模式的通信,发布者通过该通道广播消息,所有订阅者将接收到消息。模式的通信,发布者通过该通道广播消息,所有订阅者将接收到消息。发布订阅形式通道的定义形式为发布
8、订阅形式通道的定义形式为 任何订阅者必须是一个任何订阅者必须是一个MessageHandler类型的对象,其中含有类型的对象,其中含有handleMessage方法实现消息处理逻辑。方法实现消息处理逻辑。20.1.3 消息端点Message EndpointMessage Endpoint类型类型使用说明使用说明消息转换器(消息转换器(TransformerTransformer)进行消息内容或结构的变换。如:对象到串、对象到进行消息内容或结构的变换。如:对象到串、对象到jsonjson、对象到、对象到mapmap等转换器。等转换器。过滤器过滤器(Filter)(Filter)一般用于发布订阅
9、模式,决定是否消息可传递到通道。一般用于发布订阅模式,决定是否消息可传递到通道。路由器(路由器(RouterRouter)负责决定输入消息将由哪些通道接收。这种决定一般基负责决定输入消息将由哪些通道接收。这种决定一般基于消息的内容或消息头部中的可用元数据。于消息的内容或消息头部中的可用元数据。消息分离器(消息分离器(SplitterSplitter)将消息分解为多个消息发送到相应通道。每条消息都包将消息分解为多个消息发送到相应通道。每条消息都包含与一个具体项相关的数据。含与一个具体项相关的数据。消息聚合器(消息聚合器(AggregatorAggregator)将多个消息组合为一个消息。将多个消
10、息组合为一个消息。服务激活器(服务激活器(Service Service activatoractivator)是最典型的消息端点,是消息通道和服务实例的接口,是最典型的消息端点,是消息通道和服务实例的接口,服务实例从输入通道获取消息,对消息进行分析加工处服务实例从输入通道获取消息,对消息进行分析加工处理,并将响应消息发送给输出通道。理,并将响应消息发送给输出通道。通道适配器(通道适配器(Channel Channel adapteradapter)实现消息源或消息目标与消息系统的连接。根据适配器实现消息源或消息目标与消息系统的连接。根据适配器是提供消息还是接受消息可分为是提供消息还是接受消息
11、可分为inboundinbound和和outboundoutbound两两大类。大类。Spring IntegrationSpring Integration提供有提供有FileFile、HTTPHTTP、JDBC JDBC、JMS JMS、FTPFTP、MAILMAIL等众多主流适配器。等众多主流适配器。20.2 应用消息处理流程配置 2.服务处理程序package chapter20;import java.io.File;public class DirList public File list(String sdir)/列某目录下文件列某目录下文件File d=new File(sdi
12、r);return d.listFiles();public void output(String jsonfiles)/输出输出Json串串System.out.println(jsonfiles);3.测试程序public class test public static void main(String a)ApplicationContext context=new ClassPathXmlApplicationContext(config.xml);MessageChannel input=(MessageChannel)context.getBean(channel1);input
13、.send(new GenericMessage(F:mp3);20.3 使用注解定义消息端点Componentpublic class DirList ServiceActivator(id=service1,inputChannel=channel1,outputChannel=channel2)/用注解定义服务激活器用注解定义服务激活器public File list(String sdir)File d=new File(sdir);return d.listFiles();ServiceActivator(id=service2,inputChannel=pubsub-channel
14、)public void output(String jsonfiles)System.out.println(jsonfiles);20.4 网络教学中用户星级计算处理样例1星级计算的服务处理程序星级计算的服务处理程序public class ComputerLevel Resource(name=jdbcTemplate)private JdbcTemplate jdbcTemplate;/用于访问数据库用于访问数据库public Map computerlevel(String user)Map userlevel=new HashMap();userlevel.put(user,use
15、r);userlevel.put(level,(int)(Math.random()*6);/计算星级简化为计算星级简化为随机产生随机产生return userlevel;2消息处理流程的配置文件 3.应用程序中给流程的输入通道发送消息触发流程处理ApplicationContext ct=RequestContextUtils.getWebApplicationContext(request);MessageChannel inputchannel=(MessageChannel)ct.getBean(userinfo);String user=request.getRemoteUser();inputchannel.send(new GenericMessage(user);/给给userinfo通道发消息通道发消息