Spring Integration sftp 专栏详解

摘要:
Spring Integration提供了三种方法来支持SFTP服务器上的文件发送和接收:JSch支持连接配置上的多个通道的操作。propertyname=“host”value=“localhost”/>META-INF/keys/sftpTest“/>bean>以下是在springboot模式下定义的SessionFactory:

Spring Integration Sftp 文件传送

目前在国内项目开发中,使用Spring Integration技术的比较少,尤其是中文的参考文献和项目案例,更是罕有。鉴于此,本文详细介绍spring integration sftp模块在Sftp服务器和本地服务器之间文件的传送。
SFTP(Secure File Transfer Protocol) 安全文件传送协议,允许文件通过流的形式在网络之间进行传输。文件传输的过程涉及到两个重要的因素,安全渠道(secure channel,如SSH)以及SFTP联接身份的识别(SFTP session)。Spring Integration提供三种方式来支持文件在SFTP服务器的发送和接收:Inbound Channel Adapter,Outbound Channel Adapter,Outbound Gateway。

几个概念:

(1)SFTP Session Factory
在配置SFTP adapters之前,需要配置SFTP Session Factory;Spring Integration提供了如下xml和spring boot的定义方式。
每次使用 SFTP adapter,都需要Session Factory会话对象,一般情况,都会创建一个新的SFTP会话。同时还提供了Session的缓存功能。Spring integration中的Session Factory是依赖于JSch库来提供

JSch支持在一个连接配置上多个channel的操作。原生的JSch技术开发,在打开一个channel操作之前,需要建立Session的连接。同样的,默认情况,Spring Integration为每一个channel操作使用单独的物理连接。在3.0版本发布之后,Cache Session Factory 出现 (CachingSessionFactory),将Session Factory包装在缓存中,支持Session共享,可以在一个连接上支持多个JSch Channel的操作。如果缓存被重置,在最后一次channel关闭之后,才会断开连接。

Spring Integration sftp 专栏详解第1张

下面是XML方式定义的Session Factory的bean:

<beans:bean  
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <beans:property name="host" value="localhost"/>
    <beans:property name="privateKey" value="classpath:META-INF/keys/sftpTest"/>
    <beans:property name="privateKeyPassphrase" value="springIntegration"/>
    <beans:property name="port" value="22"/>
    <beans:property name="user" value="songhj"/>
</beans:bean>

 下面是使用springboot方式定义的Session Factory:

@Bean
public SessionFactory<LsEntry> sftpSessionFactory(){
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setUser("songhj");
    factory.setHost("127.0.0.1");
    factory.setPort("22");
    factory.setPassword("password");
    return factory;
}

(2)SFTP Session Caching

在3.0之前的版本中,会话默认情况下是自动缓存的。可以使用cache-sessions属性禁用自动缓存,但是该解决方案没有提供配置其他会话缓存属性的方法。例如,您不能限制缓存会话的数量。
3.0版本之后,提供了一个CachingSessionFactory。它提供了sessionCacheSize和sessionWaitTimeout属性。控制缓存中维护多少活动会话(默认情况下是无界的)。如果已经达到了sessionCacheSize阈值,那么任何获取另一个会话的尝试都将被阻塞,直到其中一个缓存会话可用,或者直到会话的等待时间过期(默认等待时间是Integer.MAX_VALUE)。sessionWaitTimeout属性启用该值的配置。缓存会话,然后将其包装在CachingSessionFactory的实例中。

<bean  
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <property name="host" value="localhost"/>
</bean>
<bean  
    class="org.springframework.integration.file.remote.session.CachingSessionFactory">
    <constructor-arg ref="sftpSessionFactory"/>
    <property name="sessionCacheSize" value="10"/>
    <property name="sessionWaitTimeout" value="1000"/>
</bean>

 从Spring Integration version 3.0开始,CachingConnectionFactory提供了resetCache()方法。当被调用时,所有空闲会话将立即关闭,而正在使用的会话将在返回到缓存时关闭。当使用isSharedSession=true时,通道是关闭的,而共享会话仅在最后一个通道关闭时才关闭。对会话的新请求将根据需要建立新的会话。

(3)RemoteFileTemplate

 从Spring Integration 3.0开始,通过SftpSession对象提供了一个新类Remote File Template。提供了Sftp文件发送、检索、删除和重命名文件的方法。此外,还提供了一个执行方法,允许调用者在会话上执行多个操作。在所有情况下,模板负责可靠地关闭会话。
SftpRemoteFileTemplate继承Remote File Template,可以很容易的实现对SFTP文件的发送(包括文件追加,替换等),检索,删除和重命名。

Spring Integration sftp 专栏详解第2张

 Spring Integration sftp 专栏详解第3张

 Spring Integration sftp 专栏详解第4张

 (4)SFTP Inbound Channel Adapter

SFTP Inbound Channel Adapter 是一个特殊的监听器,这个认识很重要,它将连接到服务器,并监听远程文件操作事件(例如,创建新文件),并初始化文件传输实例。它的作用实质就是将sftp远程服务器文件发送到本地文件夹下。
下面是XML方式的 sftp inbound channel adapter配置实例:

<int-sftp:inbound-channel-adapter  
     session-factory="sftpSessionFactory"
   channel="requestChannel"
   filename-pattern="*.txt"
   remote-directory="/foo/bar"
   preserve-timestamp="true"
   local-directory="file:target/foo"
   auto-create-local-directory="true"
   local-filename-generator-expression="#this.toUpperCase() + '.a'"
   local-filter="myFilter"
   delete-remote-files="false">
  <int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>

 从上面的配置可以理解,由inbound-channel-adapter元素创建,同时提供各种属性的值,local-directory:文件被转移的目标文件夹;remote-directory:文件来源远程文件夹目录;session-factory:session 会话工厂;channel:需要的渠道;文件过滤器以及是否删除转移后的文件等属性。

下面是springboot方式配置的Sftp Inbound channel adapter:

@Bean
@InboundChannelAdapter(value = "inboundFileChannel",
        poller = @Poller(cron = "1/10 * * * * *", maxMessagesPerPoll = "1"))
public MessageSource<File> fileMessageSource() {
 
    //创建sftpInboundFileSynchronizer,并绑定到message source.
    SftpInboundFileSynchronizingMessageSource source =
            new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());

    //自动创建本地文件夹
    source.setAutoCreateLocalDirectory(true);
    source.setLocalDirectory(new File(sftpProperty.getLocalTempDir()));
    //设置文件过滤器
    source.setLocalFilter(new AcceptOnceFileListFilter<File>());
    return source;
}

/**
 * 为Inbound-channel-adapter提供bean
 */
@Bean
public DirectChannel inboundFileChannel() {
    return new DirectChannel();
}

/**
 * SftpInboundFileSynchronizer,
 *
 *  同步sftp文件至本地服务器.
 *      <1> 可以放在service中获取bean使用.toLocal方法;
 *      <2> 也可以使用inbound-channel-adapter中,做监控文件服务器的动态。
 *
 * @return SftpInboundFileSynchronizer
 */
@Bean(name = "synFileChannel")
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer (){
    SftpInboundFileSynchronizer fileSynchronize =
            new SftpInboundFileSynchronizer(sftpSessionFactory());
    fileSynchronize.setDeleteRemoteFiles(true);
    fileSynchronize.setPreserveTimestamp(true);
    //!important
    fileSynchronize.setRemoteDirectory(sftpProperty.getSftpSendPath());
    fileSynchronize.setFilter(new SftpSimplePatternFileListFilter("*.*"));
    //fileSynchronize.setLocalFilenameGeneratorExpression( );
    fileSynchronize.setPreserveTimestamp(true);
    return fileSynchronize;
}

文件名设置:默认情况下,传输的文件将与原始文件具有相同的名称。Inbound channel adapter提供local-filename-generator- Expression属性,允许提供SpEL表达式来生成本地文件的名称。

文件修改时间戳:从Spring Integration 3.0开始,reserve-timestamp属性(默认为false);设置为true时,本地文件修改后的时间戳将设置为从服务器接收到的时间;否则将设置为当前时间。

文件过滤器:filename-pattern属性指定了简单的文件过滤模式。同时还提供了filename-regex属性来指定使用正则表达式(例如filename-regex=".*.test$")来过滤文件,以确定检索哪些远程文件。除了上面的过滤器之外,spring integration还提供了其他筛选器,如AcceptOnceFileListFilter,CompositeFileListFilter,SftpPersistentAcceptOnceFileListFilter,前者的作用是避免同步以前获取的文件。

(5)SFTP Outbound Channel Adapter

Spring Integration的 sftp outbound channel adapter是一个特殊的Message Handler,它将连接到远程sftp服务器目录。并为接收到的Message的payload,初始化一个文件传输渠道。 sftp outbound adapter支持以下几种传输对象:
1)Java.io.File:实际文件对象;
2)byte[]:字节数组
3)Java.lang.String:字符串

下面是基于xml格式配置:

<int-sftp:outbound-channel-adapter  
				session-factory="sftpSessionFactory"
				channel="inputChannel"
				charset="UTF-8"
				remote-directory="foo/bar"
				remote-filename-generator-expression="payload.getName() + '-foo'"/>

由上面的配置,可以看到outbound channel adapter的属性;可以动态计算文件名或现有目录路径的表达式。目前Spring Integration并没有提供单独的注解来配置此adapter,可以使用@ServiceActivator 定义bean,来实现该配置。

下面是基于Spring boot框架配置,配置了Inbound Channel Adapter。

@SpringBootApplication
public class SftpJavaApplication {
    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        factory.setTestSession(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }
    //配置Sftp Inbound Channel Adapter.
    //Inbound Channel Adapter实质就是一个服务监控器,监控sftp上服务器上文件的创建。
    @Bean
    @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }
    //配置Sftp Outbound Channel Adapter.
    //Outbound Channel Adapter实质上就是一个MessageHandler,接收发送的消息。
    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }
        };
    }
}

 Channel Adapter*
管道适配器,其就是一个端点Endpoint,顾名思义,就是用来适配连接Message channel与其他系统的。Channel Adapter分为inbound和outbound。刚一开始接触spring-integration时对这两个概念的非常不理解,在这里可以简单化,以当前的项目spring-integration为基准,所谓inbound就是从其他系统接收Message,获取资源,如sftp,file,Http等。而outbound就是将资源通过消息管道,发送资源到target目标服务器。

Spring Integration sftp 专栏详解第5张

 Spring Integration sftp 专栏详解第6张

 (6)Service Activator

Service activator的作用时将存在的service注册为一个端点,将该服务实例同spring-integration消息服务连接起来。该端点必须配置input channel,若调用的方法有返回值,还可以配置out channel。

Service activator调用某个服务的操作来处理消息,如在下面的发送文件到sftp服务,就是将SftpMessageHandler来处理channel中的消息。

Spring Integration sftp 专栏详解第7张

Spring-Integration-Sftp实战测试:

一、导入依赖

<dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-sftp</artifactId>
</dependency>

二、Sftp配置类

@Configuration
@EnableIntegration
public class SpringSftpConfig {
 
    @Autowired
    private SftpProperties sftpProperties;
 
    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(sftpProperties.getHost());
        factory.setPort(sftpProperties.getPort());
        factory.setUser(sftpProperties.getUsername());
        if (StringUtils.isNotEmpty(sftpProperties.getPrivateKey())) {
            factory.setPrivateKey(new ClassPathResource(sftpProperties.getPrivateKey()));
            factory.setPrivateKeyPassphrase(sftpProperties.getPassphrase());
        } else {
            factory.setPassword(sftpProperties.getPassword());
        }
        factory.setAllowUnknownKeys(true);
        CachingSessionFactory<ChannelSftp.LsEntry> cachingSessionFactory = new CachingSessionFactory<>(factory);
        cachingSessionFactory.setPoolSize(10);
        cachingSessionFactory.setSessionWaitTimeout(10000);
        return cachingSessionFactory;
    }
 
    @Bean
    public SftpRemoteFileTemplate sftpRemoteFileTemplate() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }
 
    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(true);
        fileSynchronizer.setRemoteDirectory(sftpProperties.getRoot());
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.txt"));
        return fileSynchronizer;
    }
 
    @Bean
    @ServiceActivator(inputChannel = "lsChannel")
    public MessageHandler lsHandler() {
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), "ls", "payload");
        sftpOutboundGateway.setOptions("-dirs"); //配置项
        return sftpOutboundGateway;
    }
 
    @Bean
    @ServiceActivator(inputChannel = "downloadChannel")
    public MessageHandler downloadHandler() {
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), "mget", "payload");
        sftpOutboundGateway.setOptions("-R");
        sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED);
        sftpOutboundGateway.setLocalDirectory(new File(sftpProperties.getLocalPath()));
        sftpOutboundGateway.setAutoCreateLocalDirectory(true);
        return sftpOutboundGateway;
    }
 
    @Bean
    @ServiceActivator(inputChannel = "uploadChannel")
    public MessageHandler uploadHandler() {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
        handler.setRemoteDirectoryExpression(new LiteralExpression(sftpProperties.getRoot()));
        handler.setFileNameGenerator(message -> {
            if (message.getPayload() instanceof File) {
                return ((File) message.getPayload()).getName();
            } else {
                throw new IllegalArgumentException("File expected as payload.");
            }
        });
        return handler;
    }
 
    @MessagingGateway
    public interface SftpGateway {
        @Gateway(requestChannel = "lsChannel")
        List<FileInfo> listFile(String dir);
 
        @Gateway(requestChannel = "downloadChannel")
        List<File> downloadFiles(String dir);
 
        @Gateway(requestChannel = "uploadChannel")
        void uploadFile(File file);
    }
 
}

其中,SftpProperties是第二篇中的配置类。SessionFactory来创建连接工厂,进行连接操作。SftpRemoteFileTemplate是Spring-integration-sftp进行sftp操作的模板类,相当于jsch的ChannelSftp。但是SftpRemoteFileTemplate的上传下载方法不大好用,是通过InputStream和OutputStream进行操作的。因此这里通过Spring integration的MessageHandler来进行上传下载操作。

@Autowired
private SpringSftpConfig.SftpGateway sftpGateway;
sftpGateway.listFile("/Users/kungfupanda").stream().forEach(System.out::println);
sftpGateway.downloadFiles("/Users/kungfupanda/sftpserver/RELREVMOD_ddmmyyyy.txt").stream().forEach(System.out::println);
sftpGateway.uploadFile(new File("RELREVMOD_ddmmyyyy.txt"));

三、启动集成扫描

在启动类上加上@IntegrationComponentScan注解

@IntegrationComponentScan
@SpringBootApplication
@EnableScheduling
public class CDDAlertApplication{
 
    public static void main(String... args){
        SpringApplication.run(CDDAlertApplication.class, args);
    }
}

sftp.properties配置信息文件

sftp.client.protocol=sftp
sftp.client.host=localhost
sftp.client.port=22
sftp.client.username=kungfupanda
sftp.client.password=xxxxx
sftp.client.root=/Users/kungfupanda
sftp.client.privateKey=
sftp.client.passphrase=
sftp.client.sessionStrictHostKeyChecking=no
sftp.client.sessionConnectTimeout=15000
sftp.client.channelConnectedTimeout=15000
sftp.client.localPath=./_files

免责声明:文章转载自《Spring Integration sftp 专栏详解》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Ice框架简介及Vs2013安装Ice 3.7.0步骤及实例Java 2进制和16进制的转换下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

基于ASP.NET的lucene.net全文搜索(一)

在做项目的时候,需求添加全文搜索,选择了lucene.net方向,调研了一下,基本实现了需求,现在将它分享给大家。理解不深请多多包涵。 在完成需求的时候,查看的大量的资料,本文不介绍详细的lucene.net工程建立,只介绍如何对文档进行全文搜索。对于如何建立lucene.net的工程请大家访问 lucene.net开发。 使用lucene.net搜索分为...

Lattice Diamond与modelsim联合仿真环境设置

http://blog.csdn.net/pianbing/article/details/71565034  也可参考这里:http://www.cnblogs.com/fhyfhy/p/5557855.html lattice 与 modelsim 仿真 笔记Lattice FPGA开发环境在仿真时可以使用modelsim,相比于Diamond自带的A...

批处理命令大全

  1.Echo 命令打开回显或关闭请求回显功能,或显示消息。如果没有任何参数,echo 命令将显示当前回显设置。语法echo [{on|off}] [message]Sample:echo off / echo hello world在实际应用中我们会把这条命令和重定向符号(也称为管道符号,一般用> >> ^)结合来实现输入一些命令到特...

WebService 之 身份验证

  在项目开发,我们经常会使用WebService,但在使用WebService时我们经常会考虑到了WebService是安全问题,很容易想到通过一组用户名与密码来防止非法用户的调用 。 一、NetworkCredential方式  在 System.Net 命名空间中提供了一个NetworkCredential,通过它我们可以在网络中提供一个凭证,只有获...

小程序下拉刷新

  1、通过scroll-view实现   开始用scroll-view组件,通过scroll-view自带的触发下拉刷新、上拉加载事件。   在iOS下,可以正常触发,但在安卓机型下,必须先上滑一段距离再下滑,才能够触发下拉刷新事件,体验不太好。向小程序官方反馈,给我的回答是安卓机型不支持反弹效果。   上图是官网中关于scroll-view的内容,顺...

redhat7 升级openssh openssl

部署telnet,防止ssh启动失败  1、关闭防火墙或者开放23端口  2、安装启动服务,并开启root访问 yum install -y telnet-server.x86_64 yum install -y telnet.x86_64 yum install -y xinetd.x86_64 systemctl enable xinetd.ser...