我有一个rest服务,它向队列发送消息,这些消息被路由到文件中:
from("test-jms:queue:test.queue").to("file://test");另外,我在端点上有一个事件驱动的使用者。现在,只有在使用了消息时才会将其写入日志:
final Consumer consumer = endpoint.createConsumer(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
String message = exchange.getIn().getBody(String.class);
LOG.info("Message processed: " + message);
}
});一切都很顺利。在/test文件夹中,我将为收到的每一条消息获得一个新文件,此外,使用者还会创建一个附加了.camelLock的标记文件。使用readLock=none选项可以阻止使用者按预期的方式创建这些标记文件。
但是,无论是消息文件还是标记文件在消费后都不会被删除。在我的消费者实现中,我是不是遗漏了什么?
发布于 2014-08-11 07:22:18
正如克劳斯易卜生( Claus )所指出的,这里的关键是做好UnitOfWork (UoW)。现在我的事件驱动消费者看起来是这样的:
final Consumer consumer = endpoint.createConsumer(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
String message = exchange.getIn().getBody(String.class);
LOG.info("Message processed: " + message);
ConsumerTemplate consumerTemplate = camelContext.createConsumerTemplate();
consumerTemplate.doneUoW(exchange);
}
});此外,在创建端点时必须使用delete=true选项:
Endpoint endpoint = camelContext.getEndpoint("file://test?delete=true");发布于 2014-08-08 12:16:08
当您使用内联处理器手动创建这样的使用者时,您需要手动执行Exchange的UoW操作,以便触发删除/移动文件等的工作。
exchange.getUnitOfWork().done(exchange);您还可以尝试用UnitOfWorkProducer包装您的处理器,这将为您完成UnitOfWork的操作。
https://stackoverflow.com/questions/25201462
复制相似问题