Apache Camel File组件
Apache Camel相关代码已经上传GitHub,需要的自取:GitHub - Apache Camel 完整Demo
如果觉得还行,麻烦点个Star
下面我们看代码实例:
公司因为是纯内网,没法使用Maven,这个pom.xml是我在家中使用的。
贴上,给需要的人。
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test.camel</groupId>
<artifactId>ApacheCamelTest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>ApacheCamelTest</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- camel -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jetty</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
<version>2.20.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-cxf</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-cxf-transport</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-bindings-soap</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-frontend-jaxws</artifactId>
<version>3.1.10</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
<version>3.1.10</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http-jetty</artifactId>
<version>3.1.10</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-http</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-http-common</artifactId>
<version>2.20.0</version>
</dependency>
<!-- <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jsonpath</artifactId>
<version>2.14.0</version> </dependency> -->
<!-- Logger -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.15</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.6</version>
</dependency>
<!-- JSON -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20171018</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.2.0</version>
</dependency>
<!-- httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
log4j.properties
#####输出到控制台#####
log4j.rootLogger=DEBUG,CONSOLE,ARKSERVICES
log4j.addivity.org.apache=true
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%d] [%t] %p %l - %m%n
#####输出到文件#####
log4j.appender.ARKSERVICES=org.apache.log4j.RollingFileAppender
log4j.appender.ARKSERVICES.File=./log/calem-test.log
log4j.appender.ARKSERVICES.Threshold=INFO
log4j.appender.ARKSERVICES.MaxBackupIndex=5
log4j.appender.ARKSERVICES.MaxFileSize=100MB
log4j.appender.ARKSERVICES.layout=org.apache.log4j.PatternLayout
log4j.appender.ARKSERVICES.layout.ConversionPattern=[%d] [%t] %p %l - %m%n
log4j.logger.org.eclipse.jetty=OFF
测试代码
package com.camel.file.file_1;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.ModelCamelContext;
import org.apache.log4j.PropertyConfigurator;
/**
* 普通Camel-File组件测试
*
* @author CYX
* @time 2018年1月2日下午2:28:27
*/
public class CamelFileComponent_1 {
public static void main(String[] args) throws Exception {
// 日志
PropertyConfigurator.configure("./conf/log4j.properties");
PropertyConfigurator.configureAndWatch("./conf/log4j.properties", 1000);
// 这是camel上下文对象,整个路由的驱动全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 启动route
camelContext.start();
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("file:/temp?delay=3000&delete=true&charset=UTF-8").process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// 读取文件名
Message fileMessage = exchange.getIn();
System.out.println("文件名 : " + fileMessage);
System.out.println("exchange.toString() : " + exchange.toString());
// 读取文件中的内容
String fileInfomation = fileMessage.getBody(String.class);
System.out.println("文件内容 :" + fileInfomation);
}
}).to("log:CamelFileComponent_1?showExchangeId=true");
}
});
// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (CamelFileComponent_1.class) {
CamelFileComponent_1.class.wait();
}
}
}
程序运行之后,向目录中放入一个文件,日志输出
文件名 : 新建文本文档.txt
exchange.toString() : Exchange[ID-CYX-1514877290970-0-1]
文件内容 :爱上就打算打iOS惊呆近三年地哦啊包速度IP
阿喀琉斯弄丢了卡萨诺的
阿喀琉斯弄懂ijasd
卡拉胶三路口等你哦啊接收到
[2018-01-02 15:14:58,489] [Camel (camel-1) thread #1 -file:///temp] INFO org.apache.camel.util.CamelLogger.log(CamelLogger.java:159) - Exchange[Id: ID-CYX-1514877290970-0-1, ExchangePattern: InOnly, BodyType: org.apache.camel.component.file.GenericFile, Body: [Body is file based: GenericFile[E:\temp\新建文本文档.txt]]]
上面的代码很简单。
我们看下关键的路由信息。
/temp:Camel File组件,会定时轮询这个目录中的文件。
delay:延迟时间、下一次轮询的间隔时间
delete:文件成功处理之后,将被删除
charset:字符集
定时从temp目录中轮询,当有文件之后,读取文件,然后数据流入process,进行业务操作,我这里只是打印文件名和文件内容,然后再到log日志输出。
很简单吧...
文章上面,还有一些有趣的配置参数,可以试着用用看....
Camel File组件 线程池
下面我们来看一个有意思的。
在《Apache Camel - 9 - Camel路由条件》中,我们在Camel中使用了线程池...
咱们试试看,在读取文件的时候,增加一个线程池。
package com.camel.file.file_2;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileMessage;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.ModelCamelContext;
import org.apache.camel.model.MulticastDefinition;
import org.apache.commons.logging.Log;
import org.apache.log4j.PropertyConfigurator;
import com.camel.file.file_1.CamelFileComponent_1;
/**
* Camel-file组件测试-线程池
*
* @author CYX
* @time 2018年1月2日下午2:28:54
*/
public class CamelFileComponent_2 {
public static void main(String[] args) throws Exception {
// 日志
PropertyConfigurator.configure("./conf/log4j.properties");
PropertyConfigurator.configureAndWatch("./conf/log4j.properties", 1000);
ExecutorService executor = new ThreadPoolExecutor(10, 15, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
// 这是camel上下文对象,整个路由的驱动全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 启动route
camelContext.start();
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
MulticastDefinition multicastDefinition = from("file:/temp?delay=3000&delete=true&charset=UTF-8").multicast();
multicastDefinition.setParallelProcessing(true);
multicastDefinition.setExecutorService(executor);
multicastDefinition.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Message fileMessage = exchange.getIn();
// log.info("文件名 : " + fileMessage.getBody());
// 读取文件名
System.out.println("文件名 : " + fileMessage.getBody());
// System.out.println("文件内容 : " + fileMessage.getBody(String.class));
}
}).to("log:CamelFileComponent_2?showExchangeId=true");
}
});
// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (CamelFileComponent_2.class) {
CamelFileComponent_2.class.wait();
}
}
}
代码逻辑和之前的那个基本是一样的,只是增加了一个线程池。
好奇的有两点,第一,增加线程池之后,.to("log:CamelFileComponent_2?showExchangeId=true")是否是多线程打印的?
咱们来试试看...
咱们向指定的目录中多早一点文件,然后启动程序:
根据日志的线程ID,可以看出,日志输出是多线程输出的。
那process操作的时候是否是多线程操作的呢??
为了看的清除一点,我们修改一下代码:
package com.camel.file.file_2;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.ModelCamelContext;
import org.apache.camel.model.MulticastDefinition;
import org.apache.log4j.PropertyConfigurator;
/**
* Camel-file组件测试-线程池
*
* @author CYX
* @time 2018年1月2日下午2:28:54
*/
public class CamelFileComponent_2 {
public static void main(String[] args) throws Exception {
// 日志
PropertyConfigurator.configure("./conf/log4j.properties");
PropertyConfigurator.configureAndWatch("./conf/log4j.properties", 1000);
ExecutorService executor = new ThreadPoolExecutor(10, 15, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
// 这是camel上下文对象,整个路由的驱动全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 启动route
camelContext.start();
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
MulticastDefinition multicastDefinition = from("file:/temp?delay=3000&delete=true&charset=UTF-8").multicast();
multicastDefinition.setParallelProcessing(true);
multicastDefinition.setExecutorService(executor);
multicastDefinition.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Message fileMessage = exchange.getIn();
log.info("文件名 : " + fileMessage.getBody());
// 读取文件名
// System.out.println("文件名 : " + fileMessage.getBody());
// System.out.println("文件内容 : " + fileMessage.getBody(String.class));
}
});//.to("log:CamelFileComponent_2?showExchangeId=true");
}
});
// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (CamelFileComponent_2.class) {
CamelFileComponent_2.class.wait();
}
}
}
注意看,process方法中的log是父类提供的!!!
和之前一样,咱们测试一下:
通过日志输出的线程号,可以确定没问题....
另外一个,需要注意的是,使用单线程和多线程读取文件时候要注意的!!!!
在获取文件名和文件内容的时候,单线程和多线程是不一样的。
为了让大家看的更清楚一些,我直接贴图。
大家可以试试看,我这里不再贴日志的图片了。
------------------------------------------------------------------------
package com.camel.server.file.file_1;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.log4j.PropertyConfigurator;
/**
* Apache Camel File组件<br>
* copy文件,从一个目录,移动到另一个目录.
*
* @author CYX
*
*/
public class FileCopierWithCamel {
public static void main(String[] args) throws Exception {
// 日志
PropertyConfigurator.configure("./conf/log4j.properties");
PropertyConfigurator.configureAndWatch("./conf/log4j.properties", 1000);
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
// 不加延时时间参数,默认为500毫秒.
/**
* 不加任何属性参数,inbox目录中的文件,会被直接移动到outbox目录中.
*/
from("file:./inbox").to("file:outbox");
/**
* noop参数:如果为true,则不会以任何方式移动或删除文件.<br>
* 如果noop = true,Camel也会设置idempotent = true,以避免重复使用同一个文件.<br>
* 简单的说:移动inbox目录中的文件到outbox目录中,inbox中的文件会保留下来并不会被删除.
*/
// from("file:./inbox?noop=true").to("file:outbox");
}
});
context.start();
// 仅为了保证主线程不退出
synchronized (FileCopierWithCamel.class) {
FileCopierWithCamel.class.wait();
}
}
}