本文通过小的实际示例介绍Spring Integration(SI)的核心概念。Spring Integration提供了许多功能强大的组件,这些组件可以极大地增强企业架构内系统和流程的互连互通。

它实现了一些优秀且常用的设计模式,帮助开发人员避免从头设计自己的模式。我们将探讨SI如何在企业级应用程序中实现的特定需求,以及为什么它比一些替代方案更合适。我们还将介绍一些有力工具,进一步简化SI应用程序开发。

增加相关依赖

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>4.3.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>4.3.5.RELEASE</version>
</dependency>

如果使用Spring boot,则增加spring-boot-starter-integration:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>

消息模式

这个库中的基本模式之一是消息传递。该模式以消息为中心——其有效负载为实际传递数据,通过预定义的通道从原始系统或进程转移到一个或多个系统或进程。

该模式是最灵活的集成多个不同系统的方式,其方式如下:

  • 几乎完全解耦涉及到集成的系统;
  • 允许参与集成的系统完全不知道彼此的底层协议、格式或其他实现细节;
  • 支持开发和重用集成中涉及的组件;

消息集成实例

下面通过一个简单示例,实现从源文件夹拷贝文本文件至目标文件夹,配置如下:


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.io.File;

@Configuration
@EnableIntegration
public class SiBaseConfig {
    public final String INPUT_DIR = "D:\\readfile\\in";
    public final String OUTPUT_DIR = "D:\\readfile\\out";
    public final String FILE_PATTERN = "*.txt";

    @Bean(name = "fileChannel")
    public MessageChannel fileChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
        FileReadingMessageSource sourceReader= new FileReadingMessageSource();
        sourceReader.setDirectory(new File(INPUT_DIR));
        sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
        return sourceReader;
    }

    @Bean
    @ServiceActivator(inputChannel= "fileChannel")
    public MessageHandler fileWritingMessageHandler() {
        FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
        handler.setFileExistsMode(FileExistsMode.REPLACE);
        handler.setExpectReply(false);
        return handler;
    }
}

上面代码配置了一个service activator,一个集成通道,以及一个入站通道适配器。稍后将详细讲解这些组件。@EnableIntegration注解标识该类为Spring集成配置类。

Spring boot启动类,不用增加内容:


@SpringBootApplication
public class SiLab1Application {
    public static void main(String[] args) {
        SpringApplication.run(SiLab1Application.class, args);
    }
}

运行持续,在源文件夹增加文件,会自动复制到目标文件夹。

下面我们再看一个示例,从控制台输入数据,直接在控制台输出内容:

@Bean(name = "stdChannel")
public MessageChannel inputChannel() {
    return new DirectChannel();
}

@Bean
@InboundChannelAdapter(value = "stdChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<?> stdOutMessageSource() {

    return new ByteStreamReadingMessageSource(System.in, 1024);
}

@Bean
@ServiceActivator(inputChannel= "stdChannel")
public MessageHandler stdInMessageSource() {

    return new ByteStreamWritingMessageHandler(System.out, 1024);
}

首先定义直接通道,然后定义入站适配器,在定义ServiceActivator,分别指定标准输入、输出流。通过上面两个示例,大概了解Spring Integration的用法和能力,下面我们进一步介绍框架主要组件。

Spring集成组件

Message

org.springframework.messaging.Message接口定义了Spring消息,Spring集成上下文中传输数据单元:


public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}

接口定义了两个关键元素的访问:

  • 消息头,即键值对容器,用于传输元数据,在org.springframework.messaging.MessageHeaders类中定义。
  • 消息体,实际传输数据的内容,上面示例中为文件。

Channel

Channel是Spring Integration(实际上,EAI)体系结构中的基本管道,消息通过Channel从一个系统中转到另一个系统。

你可以将它视为管道,通过它集成的系统或进程可以向其他系统推送消息(或从其他系统接收消息)。Spring Integration中的通道有多种形式,它们提供了灵活的配置属性,且几乎开箱即用,不需要任何编码。当然如果有自定义需求,也可以使用健壮的框架快速实现。

  • Point-to-Point (P2P)

Point-to-Point (P2P) channels用于在系统或组件间建立 1-to-1 消息通道. 一个组件发布消息到通道,另一个从通道接收。在通道两边仅有一个组件。
我们看到配置通道非常简单,仅需返回DirectChannel实例:

@Bean
public MessageChannel fileChannel1() {
    return new DirectChannel();
}

@Bean
public MessageChannel fileChannel2() {
    return new DirectChannel();
}

@Bean
public MessageChannel fileChannel3() {
    return new DirectChannel();
}

这里定义了三个通道,并通过各自方法名作为其标识。

  • Publish-Subscribe (Pub-Sub)

Publish-Subscribe (Pub-Sub) 通道用于在系统或组件间建立一对多消息通道,因此我们可以同时给上面三个通道发送消息。

下面实例创建代替P2P的 pub-sub 通道:

@Bean
public MessageChannel pubSubFileChannel() {
    return new PublishSubscribeChannel();
}

@Bean
@InboundChannelAdapter(value = "pubSubFileChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource sourceReader = new FileReadingMessageSource();
    sourceReader.setDirectory(new File(INPUT_DIR));
    sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
    return sourceReader;
}

现在,我们已经将入站通道适配器转换为发布到Pub-Sub通道。这将允许我们将从源文件夹读取的文件发送到多个目的地。

桥接(Bridge)

桥接在Spring 集成框架中用于连接两个不能直接相连的通道或适配器。下面实例使用桥接连接Pub-Sub通道到三个不同的P2P的通道,因为Pub-Sub通道和P2P通道不能直接连接:

@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel1() {
    return new DirectChannel();
}

@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel2() {
    return new DirectChannel();
}

@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel3() {
    return new DirectChannel();
}

上面代码配置pubSubFileChannel 桥接到 三个 P2P 通道,@BridgeFrom注解用于定义桥接。上述代码可描述为:创建桥接,从pubSubFileChannel通道到ileChannel1, fileChannel2, fileChannel3三个通道,使得从pubSubFileChannel通道的消息可同时传输到三个通道。

Service Activator

Service Activator是任何给定方法标注@ServiceActivator注解的POJO。使得当从入站通道接收到消息时执行POJO的方法,并支持些消息值出战通道。上面示例中从input通道接收文件写入配置的文件夹。

Adapter

Adapter是企业级集成模式的组件,支持给系统或数据源增加插槽适配器,就像在墙上安装插座或其他电子设备。它支持对数据库、FTP服务器和JMS、AMQP等消息系统以及Twitter等社交网络等提供开箱即用的可重用连接。连接到这些系统的需求很常见,因此适配器能够移植、重用。(事实上,适配器种类不多,Spring Integration已经内置大部分常用适配器)。

适配器大致可分类两类:入站和出战。下面通过上面示例分析两类适配器。

入站适配器配置包括:

  • 通过@InboundChannelAdapter注解将bean配置为适配器———参数包括入站通道(在本例中是txt文件)和轮询器,轮询器确定按指定的时间间隔轮询已配置的文件夹。
  • 返回FileReadingMessageSource的标准Spring Bean,这是处理文件系统轮询的具体类实现。

出站适配器用于向外发送消息。Spring Integration提供各种常用的开箱即用的出战适配器。

总结

本文简单演示了基于Java 配置Spring Integration示例,集成应用可以作为单独的应用运行,也可以作为企业级应用的一个部分。虽然它不直接与其他以EAI为中心的产品和模式(如企业服务总线)竞争,但它是一种可行的、轻量级的替代方案,可以解决许多需要构建ESB才能解决的问题。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐