首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >camel读取目录,根据完成任务结果选择目标

camel读取目录,根据完成任务结果选择目标
EN

Stack Overflow用户
提问于 2017-08-24 22:44:22
回答 2查看 268关注 0票数 0

我需要读取包含csv文件(一个或多个文件)的目录。我使用Camel和springboot,并且我需要将任何完全处理(没有错误)的文件移动到OUT dir,但是如果最后的" to“路由失败(抛出异常),我需要将文件移动到REFUSED dir。

当我尝试我的代码时,camel进入无限循环,永远继续处理同一个文件。

代码语言:javascript
代码运行次数:0
运行
复制
24/08/2017 16:27:57.070 ERROR [Camel (camel-1) thread #0 - file://src/main/resources/data] - org.apache.camel.processor.DefaultErrorHandler: Failed delivery for (MessageId: ID-CAD1652-39380-1503584865077-0-33 on ExchangeId: ID-CAD1652-39380-1503584865077-0-34). Exhausted after delivery attempt: 1 caught: com.cadit.exceptions.FileNotEvaluableException: Error: file tipo sconosciuto

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route2            ] [route2            ] [file://src/main/resources/data?idempotent=false&move=OUT%2FVB%2F              ] [        10]
[route2            ] [unmarshal1        ] [unmarshal[org.apache.camel.model.dataformat.CsvDataFormat@28f6cf0f]           ] [         1]
[route2            ] [to1               ] [bean:myCsvHandler?method=doHandleCsvDataVB                                    ] [         8]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
com.cadit.exceptions.FileNotEvaluableException: Error: file tipo sconosciuto
    at com.cadit.handlers.MyCsvHandler.doHandleCsvDataVB(MyCsvHandler.java:172)
    at com.cadit.handlers.MyCsvHandler$$FastClassBySpringCGLIB$$f4b8f70b.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:721)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:282)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:656)
    at com.cadit.handlers.MyCsvHandler$$EnhancerBySpringCGLIB$$d81d9e7f.doHandleCsvDataVB(<generated>)
    at sun.reflect.GeneratedMethodAccessor97.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:458)
    at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:289)
    at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:262)
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:178)
    at org.apache.camel.component.bean.BeanProducer.process(BeanProducer.java:41)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
    at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:460)
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:227)
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:191)
    at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175)
    at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

下面是主要代码:

代码语言:javascript
代码运行次数:0
运行
复制
@Component
public class CamelContextConf extends RouteBuilder{

     static final Logger logger = Logger.getLogger(CamelContextConf.class);

    @Override
    public void configure() throws Exception {
        restConfiguration().component("servlet").dataFormatProperty("prettyPrint", "true") ;
                    CsvDataFormat csv = new CsvDataFormat();
        csv.setDelimiter(";");  
        csv.setSkipHeaderRecord(true);


        from("direct:csvprocessor")
        .streamCaching()
        .from("file:src/main/resources/data?move=OUT/VB/")
        .unmarshal(csv) 
        .to("bean:myCsvHandler?method=doHandleCsvDataVB")
        .onCompletion().onFailureOnly().to("file:src/main/reources/data/REFUSED").end()
        .setBody(constant("OK"))
        .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(200))
        .setHeader(Exchange.CONTENT_TYPE, constant("text/html"));

        logger.info("** Route config ok");

    }

然后,在myCsvHandler ->方法doHandleCsvDataVB中,我显式地抛出了一个异常来测试失败:

代码语言:javascript
代码运行次数:0
运行
复制
@Component
public class MyCsvHandler {


    @Inject
    AFVINCCrudRepository _entityManagerVINC;

    @Inject
    AFFileCrudRepository _entityManagerAfFile;

        @Transactional(propagation = Propagation.REQUIRED , transactionManager="DbTransactionManagerVB")
    public void doHandleCsvDataVB(List<List<String>> csvData) throws FileNotEvaluableException
    {
        //System.out.println("stampo..");
            if (csvData.size() > 0){
            AfFileEntity afbean = new AfFileEntity();
            afbean.setNomeFile("test");
            afbean.setDataImport(new java.sql.Timestamp(System.currentTimeMillis()));
            afbean.setTipoFile("M");
            afbean.setAfStatoFlusso("I");

            _entityManagerAfFile.save(afbean);

            long pkfile = afbean.getId();
            System.out.println("pkfile: " + pkfile);

            int i = 1; VincEntity vincBean = new VincEntity(); 
            System.out.println(csvData.size());
            for (List<String> rows : csvData){

                ..  
                    _entityManagerVINC.save(..);
                }   

                    throw new FileNotEvaluableException("Il file non è nè una ...");

            }

        }

    }

}

save方法循环并继续在db上保存数据。怎么了?

非常感谢。

EN

回答 2

Stack Overflow用户

发布于 2017-08-25 04:02:19

在您的代码中,不能有from In另一个from。

代码语言:javascript
代码运行次数:0
运行
复制
from("direct:csvprocessor")
.streamCaching()
.from("file:src/main/resources/data?move=OUT/VB/")

您的第二个from("file:...")应该使用enricher。http://camel.apache.org/content-enricher.html

不确定,但在您的doHandleCsvDataVB()方法中,我没有看到try() catch()块。您将直接从for循环到抛出异常。

票数 0
EN

Stack Overflow用户

发布于 2020-06-04 21:03:53

我按照下面的方法将文件从一个文件夹移动到另一个文件夹。

代码语言:javascript
代码运行次数:0
运行
复制
@Autowired
FileProcessor fileProcessor;

@Component
public class FileRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("file://{{dir.in}}?readLock=changed&include=.*.csv"&preMove={{dir.progress}}&move={{dir.out}}&moveFailed={{dir.error}}")
                    .process(fileProcessor).setId("Poller");
    }
}

polling

  • dir.progress文件的-Source -File目录将从正在进行中移动到正在进行中process

  • readLock=changed -在process

  • readLock=changed期间发生任何错误时,文件将移动到此处-这将在reading

  • include期间锁定文件-这将只读取源目录中的csv文件,即在文件夹中

根据您的需要,您可能有也可能没有处理器。如果你不想要处理器,那么使用下面的代码

代码语言:javascript
代码运行次数:0
运行
复制
from("file://{{dir.in}}?readLock=changed&include=.*.csv"&preMove={{dir.progress}}&move={{dir.out}}&moveFailed={{dir.error}}")
                        .setId("Poller");

请注意,如果您不使用preMove,则会在“in”文件夹中创建默认的.camel文件夹,并将文件的一个副本移动到.camel文件夹中。除非我们手动删除,否则文件将始终存在于目录中。

因此,我建议您在路由中使用preMove。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45864600

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档