我需要读取包含csv文件(一个或多个文件)的目录。我使用Camel和springboot,并且我需要将任何完全处理(没有错误)的文件移动到OUT dir,但是如果最后的" to“路由失败(抛出异常),我需要将文件移动到REFUSED dir。
当我尝试我的代码时,camel进入无限循环,永远继续处理同一个文件。
24/08/2017 16:27:57.070 ERROR [Camel (camel-1) thread #0 - file://src/main/resources/data] - org.apache.camel.processor.DefaultErrorHandler: Failed delivery for (MessageId: ID-CAD1652-39380-1503584865077-0-33 on ExchangeId: ID-CAD1652-39380-1503584865077-0-34). Exhausted after delivery attempt: 1 caught: com.cadit.exceptions.FileNotEvaluableException: Error: file tipo sconosciuto
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route2 ] [route2 ] [file://src/main/resources/data?idempotent=false&move=OUT%2FVB%2F ] [ 10]
[route2 ] [unmarshal1 ] [unmarshal[org.apache.camel.model.dataformat.CsvDataFormat@28f6cf0f] ] [ 1]
[route2 ] [to1 ] [bean:myCsvHandler?method=doHandleCsvDataVB ] [ 8]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
com.cadit.exceptions.FileNotEvaluableException: Error: file tipo sconosciuto
at com.cadit.handlers.MyCsvHandler.doHandleCsvDataVB(MyCsvHandler.java:172)
at com.cadit.handlers.MyCsvHandler$$FastClassBySpringCGLIB$$f4b8f70b.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:721)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:282)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:656)
at com.cadit.handlers.MyCsvHandler$$EnhancerBySpringCGLIB$$d81d9e7f.doHandleCsvDataVB(<generated>)
at sun.reflect.GeneratedMethodAccessor97.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:458)
at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:289)
at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:262)
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:178)
at org.apache.camel.component.bean.BeanProducer.process(BeanProducer.java:41)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:460)
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:227)
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:191)
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175)
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
下面是主要代码:
@Component
public class CamelContextConf extends RouteBuilder{
static final Logger logger = Logger.getLogger(CamelContextConf.class);
@Override
public void configure() throws Exception {
restConfiguration().component("servlet").dataFormatProperty("prettyPrint", "true") ;
CsvDataFormat csv = new CsvDataFormat();
csv.setDelimiter(";");
csv.setSkipHeaderRecord(true);
from("direct:csvprocessor")
.streamCaching()
.from("file:src/main/resources/data?move=OUT/VB/")
.unmarshal(csv)
.to("bean:myCsvHandler?method=doHandleCsvDataVB")
.onCompletion().onFailureOnly().to("file:src/main/reources/data/REFUSED").end()
.setBody(constant("OK"))
.setHeader(Exchange.HTTP_RESPONSE_CODE, constant(200))
.setHeader(Exchange.CONTENT_TYPE, constant("text/html"));
logger.info("** Route config ok");
}
然后,在myCsvHandler ->方法doHandleCsvDataVB中,我显式地抛出了一个异常来测试失败:
@Component
public class MyCsvHandler {
@Inject
AFVINCCrudRepository _entityManagerVINC;
@Inject
AFFileCrudRepository _entityManagerAfFile;
@Transactional(propagation = Propagation.REQUIRED , transactionManager="DbTransactionManagerVB")
public void doHandleCsvDataVB(List<List<String>> csvData) throws FileNotEvaluableException
{
//System.out.println("stampo..");
if (csvData.size() > 0){
AfFileEntity afbean = new AfFileEntity();
afbean.setNomeFile("test");
afbean.setDataImport(new java.sql.Timestamp(System.currentTimeMillis()));
afbean.setTipoFile("M");
afbean.setAfStatoFlusso("I");
_entityManagerAfFile.save(afbean);
long pkfile = afbean.getId();
System.out.println("pkfile: " + pkfile);
int i = 1; VincEntity vincBean = new VincEntity();
System.out.println(csvData.size());
for (List<String> rows : csvData){
..
_entityManagerVINC.save(..);
}
throw new FileNotEvaluableException("Il file non è nè una ...");
}
}
}
}
save方法循环并继续在db上保存数据。怎么了?
非常感谢。
发布于 2017-08-24 20:02:19
在您的代码中,不能有from In另一个from。
from("direct:csvprocessor")
.streamCaching()
.from("file:src/main/resources/data?move=OUT/VB/")
您的第二个from("file:...")
应该使用enricher。http://camel.apache.org/content-enricher.html
不确定,但在您的doHandleCsvDataVB()
方法中,我没有看到try() catch()
块。您将直接从for循环到抛出异常。
发布于 2020-06-04 13:03:53
我按照下面的方法将文件从一个文件夹移动到另一个文件夹。
@Autowired
FileProcessor fileProcessor;
@Component
public class FileRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("file://{{dir.in}}?readLock=changed&include=.*.csv"&preMove={{dir.progress}}&move={{dir.out}}&moveFailed={{dir.error}}")
.process(fileProcessor).setId("Poller");
}
}
polling
根据您的需要,您可能有也可能没有处理器。如果你不想要处理器,那么使用下面的代码
from("file://{{dir.in}}?readLock=changed&include=.*.csv"&preMove={{dir.progress}}&move={{dir.out}}&moveFailed={{dir.error}}")
.setId("Poller");
请注意,如果您不使用preMove,则会在“in”文件夹中创建默认的.camel文件夹,并将文件的一个副本移动到.camel文件夹中。除非我们手动删除,否则文件将始终存在于目录中。
因此,我建议您在路由中使用preMove。
https://stackoverflow.com/questions/45864600
复制