Apache Camel - 11 - File组件(2)

本文详细介绍Apache Camel File组件的应用,包括如何配置pom.xml引入依赖、设置日志配置、实现文件读取处理及日志输出,同时探讨了单线程与多线程环境下文件处理的区别。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

Apache Camel File组件

Apache Camel相关代码已经上传GitHub,需要的自取:GitHub - Apache Camel 完整Demo

如果觉得还行,麻烦点个Star

 

下面我们看代码实例:

公司因为是纯内网,没法使用Maven,这个pom.xml是我在家中使用的。

贴上,给需要的人。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.test.camel</groupId>
	<artifactId>ApacheCamelTest</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>ApacheCamelTest</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>

		<!-- junit -->
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>

		<!-- camel -->
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-jetty</artifactId>
			<version>2.20.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-test</artifactId>
			<version>2.20.0</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-cxf</artifactId>
			<version>2.20.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-cxf-transport</artifactId>
			<version>2.20.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.cxf</groupId>
			<artifactId>cxf-rt-bindings-soap</artifactId>
			<version>3.2.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.cxf</groupId>
			<artifactId>cxf-rt-frontend-jaxws</artifactId>
			<version>3.1.10</version>
		</dependency>

		<dependency>
			<groupId>org.apache.cxf</groupId>
			<artifactId>cxf-rt-transports-http</artifactId>
			<version>3.1.10</version>
		</dependency>

		<dependency>
			<groupId>org.apache.cxf</groupId>
			<artifactId>cxf-rt-transports-http-jetty</artifactId>
			<version>3.1.10</version>
		</dependency>

		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-core</artifactId>
			<version>2.20.0</version>
		</dependency>


		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-http</artifactId>
			<version>2.20.0</version>
		</dependency>


		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-http-common</artifactId>
			<version>2.20.0</version>
		</dependency>


		<!-- <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jsonpath</artifactId> 
			<version>2.14.0</version> </dependency> -->

		<!-- Logger -->
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.15</version>
			<exclusions>
				<exclusion>
					<groupId>com.sun.jmx</groupId>
					<artifactId>jmxri</artifactId>
				</exclusion>
				<exclusion>
					<groupId>com.sun.jdmk</groupId>
					<artifactId>jmxtools</artifactId>
				</exclusion>
				<exclusion>
					<groupId>javax.jms</groupId>
					<artifactId>jms</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>2.4</version>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>log4j-over-slf4j</artifactId>
			<version>1.6.1</version>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.6.5</version>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.6.6</version>
		</dependency>

		<!-- JSON -->
		<dependency>
			<groupId>org.json</groupId>
			<artifactId>json</artifactId>
			<version>20171018</version>
		</dependency>

		<dependency>
			<groupId>com.jayway.jsonpath</groupId>
			<artifactId>json-path</artifactId>
			<version>2.2.0</version>
		</dependency>

		<!-- httpclient -->
		<dependency>
			<groupId>org.apache.httpcomponents</groupId>
			<artifactId>httpclient</artifactId>
			<version>4.5.2</version>
		</dependency>


	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-dependency-plugin</artifactId>
				<executions>
					<execution>
						<id>copy</id>
						<phase>package</phase>
						<goals>
							<goal>copy-dependencies</goal>
						</goals>
						<configuration>
							<outputDirectory>${project.build.directory}/lib</outputDirectory>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>

</project>

log4j.properties

#####输出到控制台#####
log4j.rootLogger=DEBUG,CONSOLE,ARKSERVICES
log4j.addivity.org.apache=true
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%d] [%t] %p %l - %m%n

#####输出到文件#####
log4j.appender.ARKSERVICES=org.apache.log4j.RollingFileAppender
log4j.appender.ARKSERVICES.File=./log/calem-test.log
log4j.appender.ARKSERVICES.Threshold=INFO
log4j.appender.ARKSERVICES.MaxBackupIndex=5
log4j.appender.ARKSERVICES.MaxFileSize=100MB
log4j.appender.ARKSERVICES.layout=org.apache.log4j.PatternLayout
log4j.appender.ARKSERVICES.layout.ConversionPattern=[%d] [%t] %p %l - %m%n

log4j.logger.org.eclipse.jetty=OFF  

测试代码

package com.camel.file.file_1;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.ModelCamelContext;
import org.apache.log4j.PropertyConfigurator;

/**
* 普通Camel-File组件测试
* 
* @author CYX
* @time 2018年1月2日下午2:28:27
*/
public class CamelFileComponent_1 {

	public static void main(String[] args) throws Exception {

	// 日志
	PropertyConfigurator.configure("./conf/log4j.properties");
	PropertyConfigurator.configureAndWatch("./conf/log4j.properties", 1000);

	// 这是camel上下文对象,整个路由的驱动全靠它了。
	ModelCamelContext camelContext = new DefaultCamelContext();

	// 启动route
	camelContext.start();

	camelContext.addRoutes(new RouteBuilder() {

	@Override
	public void configure() throws Exception {
			from("file:/temp?delay=3000&delete=true&charset=UTF-8").process(new Processor() {

	@Override
	public void process(Exchange exchange) throws Exception {

	// 读取文件名
	Message fileMessage = exchange.getIn();
	System.out.println("文件名 : " + fileMessage);
	System.out.println("exchange.toString() : " + exchange.toString());

	// 读取文件中的内容
	String fileInfomation = fileMessage.getBody(String.class);
	System.out.println("文件内容 :" + fileInfomation);

	}
	}).to("log:CamelFileComponent_1?showExchangeId=true");
	}
	});

	// 通用没有具体业务意义的代码,只是为了保证主线程不退出
	synchronized (CamelFileComponent_1.class) {
			CamelFileComponent_1.class.wait();
	}

	}

}

程序运行之后,向目录中放入一个文件,日志输出

文件名 : 新建文本文档.txt

exchange.toString() : Exchange[ID-CYX-1514877290970-0-1]

文件内容 :爱上就打算打iOS惊呆近三年地哦啊包速度IP

阿喀琉斯弄丢了卡萨诺的

阿喀琉斯弄懂ijasd

卡拉胶三路口等你哦啊接收到

[2018-01-02 15:14:58,489] [Camel (camel-1) thread #1 -file:///temp] INFO org.apache.camel.util.CamelLogger.log(CamelLogger.java:159) - Exchange[Id: ID-CYX-1514877290970-0-1, ExchangePattern: InOnly, BodyType: org.apache.camel.component.file.GenericFile, Body: [Body is file based: GenericFile[E:\temp\新建文本文档.txt]]]

上面的代码很简单。

我们看下关键的路由信息。

/temp:Camel File组件,会定时轮询这个目录中的文件。

delay:延迟时间、下一次轮询的间隔时间

delete:文件成功处理之后,将被删除

charset:字符集

定时从temp目录中轮询,当有文件之后,读取文件,然后数据流入process,进行业务操作,我这里只是打印文件名和文件内容,然后再到log日志输出。

很简单吧...

文章上面,还有一些有趣的配置参数,可以试着用用看....

 

Camel File组件 线程池

下面我们来看一个有意思的。

在《Apache Camel - 9 - Camel路由条件》中,我们在Camel中使用了线程池...

咱们试试看,在读取文件的时候,增加一个线程池。

package com.camel.file.file_2;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileMessage;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.ModelCamelContext;
import org.apache.camel.model.MulticastDefinition;
import org.apache.commons.logging.Log;
import org.apache.log4j.PropertyConfigurator;

import com.camel.file.file_1.CamelFileComponent_1;

/**
* Camel-file组件测试-线程池
* 
* @author CYX
* @time 2018年1月2日下午2:28:54
*/
public class CamelFileComponent_2 {

	public static void main(String[] args) throws Exception {

	// 日志
	PropertyConfigurator.configure("./conf/log4j.properties");
	PropertyConfigurator.configureAndWatch("./conf/log4j.properties", 1000);

	ExecutorService executor = new ThreadPoolExecutor(10, 15, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

	// 这是camel上下文对象,整个路由的驱动全靠它了。
	ModelCamelContext camelContext = new DefaultCamelContext();

	// 启动route
	camelContext.start();

	camelContext.addRoutes(new RouteBuilder() {

	@Override
	public void configure() throws Exception {

	MulticastDefinition multicastDefinition = from("file:/temp?delay=3000&delete=true&charset=UTF-8").multicast();

	multicastDefinition.setParallelProcessing(true);

	multicastDefinition.setExecutorService(executor);

	multicastDefinition.process(new Processor() {

	@Override
	public void process(Exchange exchange) throws Exception {

	Message fileMessage = exchange.getIn();
//		log.info("文件名 : " + fileMessage.getBody());
	
	// 读取文件名
	System.out.println("文件名 : " + fileMessage.getBody());
//		System.out.println("文件内容 : " + fileMessage.getBody(String.class));

	}
	}).to("log:CamelFileComponent_2?showExchangeId=true");
	}
	});

	// 通用没有具体业务意义的代码,只是为了保证主线程不退出
	synchronized (CamelFileComponent_2.class) {
		CamelFileComponent_2.class.wait();
	}

	}

}

代码逻辑和之前的那个基本是一样的,只是增加了一个线程池。

好奇的有两点,第一,增加线程池之后,.to("log:CamelFileComponent_2?showExchangeId=true")是否是多线程打印的?

咱们来试试看...

咱们向指定的目录中多早一点文件,然后启动程序:

根据日志的线程ID,可以看出,日志输出是多线程输出的。

 

 

那process操作的时候是否是多线程操作的呢??

为了看的清除一点,我们修改一下代码:

package com.camel.file.file_2;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.ModelCamelContext;
import org.apache.camel.model.MulticastDefinition;
import org.apache.log4j.PropertyConfigurator;

/**
* Camel-file组件测试-线程池
* 
* @author CYX
* @time 2018年1月2日下午2:28:54
*/
public class CamelFileComponent_2 {

	public static void main(String[] args) throws Exception {

	// 日志
	PropertyConfigurator.configure("./conf/log4j.properties");
	PropertyConfigurator.configureAndWatch("./conf/log4j.properties", 1000);

	ExecutorService executor = new ThreadPoolExecutor(10, 15, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

	// 这是camel上下文对象,整个路由的驱动全靠它了。
	ModelCamelContext camelContext = new DefaultCamelContext();

	// 启动route
	camelContext.start();

	camelContext.addRoutes(new RouteBuilder() {

	@Override
	public void configure() throws Exception {

	MulticastDefinition multicastDefinition = from("file:/temp?delay=3000&delete=true&charset=UTF-8").multicast();

	multicastDefinition.setParallelProcessing(true);

	multicastDefinition.setExecutorService(executor);

	multicastDefinition.process(new Processor() {

	@Override
	public void process(Exchange exchange) throws Exception {

	Message fileMessage = exchange.getIn();
	log.info("文件名 : " + fileMessage.getBody());
	
	// 读取文件名
//		System.out.println("文件名 : " + fileMessage.getBody());
//		System.out.println("文件内容 : " + fileMessage.getBody(String.class));

	}
	});//.to("log:CamelFileComponent_2?showExchangeId=true");
	}
	});

	// 通用没有具体业务意义的代码,只是为了保证主线程不退出
	synchronized (CamelFileComponent_2.class) {
		CamelFileComponent_2.class.wait();
	}

	}

}

注意看,process方法中的log是父类提供的!!!

和之前一样,咱们测试一下:

通过日志输出的线程号,可以确定没问题....

另外一个,需要注意的是,使用单线程和多线程读取文件时候要注意的!!!!

在获取文件名和文件内容的时候,单线程和多线程是不一样的。

为了让大家看的更清楚一些,我直接贴图。

大家可以试试看,我这里不再贴日志的图片了。

------------------------------------------------------------------------

package com.camel.server.file.file_1;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.log4j.PropertyConfigurator;

/**
 * Apache Camel File组件<br>
 * copy文件,从一个目录,移动到另一个目录.
 * 
 * @author CYX
 *
 */
public class FileCopierWithCamel {

	public static void main(String[] args) throws Exception {

		// 日志
		PropertyConfigurator.configure("./conf/log4j.properties");
		PropertyConfigurator.configureAndWatch("./conf/log4j.properties", 1000);

		CamelContext context = new DefaultCamelContext();

		context.addRoutes(new RouteBuilder() {

			@Override
			public void configure() throws Exception {

				// 不加延时时间参数,默认为500毫秒.

				/**
				 * 不加任何属性参数,inbox目录中的文件,会被直接移动到outbox目录中.
				 */
				from("file:./inbox").to("file:outbox");

				/**
				 * noop参数:如果为true,则不会以任何方式移动或删除文件.<br>
				 * 如果noop = true,Camel也会设置idempotent = true,以避免重复使用同一个文件.<br>
				 * 简单的说:移动inbox目录中的文件到outbox目录中,inbox中的文件会保留下来并不会被删除.
				 */
				// from("file:./inbox?noop=true").to("file:outbox");
			}
		});

		context.start();

		// 仅为了保证主线程不退出
		synchronized (FileCopierWithCamel.class) {
			FileCopierWithCamel.class.wait();
		}

	}

}

 

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值