根据建议进行编辑:为了简洁起见,我将删除较旧的代码和较长的部分,并重新编写问题.。
我正在尝试构建应用程序(Spring + Spring ),从命令行获取日期和配置信息。根据建议,我可以使用应用程序属性吗?其主要目的是利用同一作业(作业的任务)从不同的主机/时间下载不同的文件。因此,属性文件可以给出下载所需的信息,编译后的jar应该读取信息并执行其任务。
主要入口点。
@SpringBootApplication
public class CoreApplication implements ApplicationRunner {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job processJob;
@Value("${rundate}")
private String run_date;
private static final Logger logger = LoggerFactory.getLogger(CoreApplication.class);
public static void main(String[] args) {
SpringApplication.run(CoreApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("JobID", System.currentTimeMillis())
.addString("RunDate", run_date)
.toJobParameters();
try {
jobLauncher.run(processJob, jobParameters);
} catch (Exception e) {
logger.error("Exception while running a batch job {}", e.getMessage());
}
}
}
我重新整理了代码,从application.properties文件中使用服务器、用户等的值。如果注入这些属性是错误的,请告诉我。
application.properties文件:
spring.datasource.url=jdbc:postgresql://dbhost:1000/db
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.platform=postgresql
spring.batch.job.enabled=false
local.directory="/my/local/path/"
file.name="file_name_20200601.csv"
remote.directory="/remote/ftp/location"
remote.host="remotehost"
remote.port=22
remote.user="remoteuser"
private.key.location="/key/file/location"
我的批处理配置:
@Configuration
@EnableBatchProcessing
@EnableIntegration
@EnableAutoConfiguration
public class BatchConfiguration {
private Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public Job ftpJob() {
return jobBuilderFactory.get("FTP Job")
.incrementer(new RunIdIncrementer())
.start(getFilesFromFTPServer())
.build();
}
@Bean
public Step getFilesFromFTPServer() {
return stepBuilderFactory.get("Get file from server")
.tasklet(new RemoteFileInboundTasklet())
.build();
}
}
我的任务:
公共类RemoteFileInboundTasklet实现Tasklet {
private Logger logger = LoggerFactory.getLogger(RemoteFileInboundTasklet.class);
@Value("${file.name}")
private String fileNamePattern;
private String clientName;
private boolean deleteLocalFiles = true;
private boolean retryIfNotFound = false;
@Value("${local.directory}")
private String local_directory_value;
private File localDirectory;
private int downloadFileAttempts = 12;
private long retryIntervalMilliseconds = 300000;
@Value("${remote.directory}")
private String remoteDirectory;
@Value("${remote.host}")
private String remoteHost;
@Value("${remote.user}")
private String remoteUser;
@Value("${remote.port}")
private int remotePort;
@Value("${private.key.location}")
private String private_key_file;
public SessionFactory<ChannelSftp.LsEntry> clientSessionFactory() {
DefaultSftpSessionFactory ftpSessionFactory = new DefaultSftpSessionFactory();
ftpSessionFactory.setHost(remoteHost);
ftpSessionFactory.setPort(remotePort);
ftpSessionFactory.setUser(remoteUser);
ftpSessionFactory.setPrivateKey(new FileSystemResource(private_key_file));
ftpSessionFactory.setAllowUnknownKeys(true);
return ftpSessionFactory;
}
private SessionFactory sessionFactory = clientSessionFactory();
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer sftpInboundFileSynchronizer = new SftpInboundFileSynchronizer(sessionFactory);
sftpInboundFileSynchronizer.setDeleteRemoteFiles(false);
sftpInboundFileSynchronizer.setRemoteDirectory(remoteDirectory);
return sftpInboundFileSynchronizer;
}
private SftpInboundFileSynchronizer ftpInboundFileSynchronizer = sftpInboundFileSynchronizer();
private SftpInboundFileSynchronizingMessageSource sftpInboundFileSynchronizingMessageSource;
public boolean isDeleteLocalFiles() {
return deleteLocalFiles;
}
public void setDeleteLocalFiles(boolean deleteLocalFiles) {
this.deleteLocalFiles = deleteLocalFiles;
}
public SftpInboundFileSynchronizer getFtpInboundFileSynchronizer() {
return ftpInboundFileSynchronizer;
}
public void setFtpInboundFileSynchronizer(SftpInboundFileSynchronizer ftpInboundFileSynchronizer) {
this.ftpInboundFileSynchronizer = ftpInboundFileSynchronizer;
}
public SessionFactory getSessionFactory() {
return sessionFactory;
}
public void setSessionFactory(SessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
}
public SftpInboundFileSynchronizingMessageSource getSftpInboundFileSynchronizingMessageSource() {
return sftpInboundFileSynchronizingMessageSource;
}
public void setSftpInboundFileSynchronizingMessageSource(SftpInboundFileSynchronizingMessageSource sftpInboundFileSynchronizingMessageSource) {
this.sftpInboundFileSynchronizingMessageSource = sftpInboundFileSynchronizingMessageSource;
}
public String getRemoteDirectory() {
return remoteDirectory;
}
public void setRemoteDirectory(String remoteDirectory) {
this.remoteDirectory = remoteDirectory;
}
private SFTPGateway sftpGateway;
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler clientMessageHandler() {
SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(clientSessionFactory(), "mget", "payload");
sftpOutboundGateway.setAutoCreateLocalDirectory(true);
sftpOutboundGateway.setLocalDirectory(new File(local_directory_value));
sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED);
sftpOutboundGateway.setFilter(new AcceptOnceFileListFilter<>());
return sftpOutboundGateway;
}
private void deleteLocalFiles()
{
if (deleteLocalFiles)
{
localDirectory = new File(local_directory_value);
SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
List<File> matchingFiles = filter.filterFiles(localDirectory.listFiles());
if (CollectionUtils.isNotEmpty(matchingFiles))
{
for (File file : matchingFiles)
{
FileUtils.deleteQuietly(file);
}
}
}
}
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
deleteLocalFiles();
ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
if (retryIfNotFound) {
SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
int attemptCount = 1;
while (filter.filterFiles(localDirectory.listFiles()).size() == 0 && attemptCount <= downloadFileAttempts) {
logger.info("File(s) matching " + fileNamePattern + " not found on remote site. Attempt " + attemptCount + " out of " + downloadFileAttempts);
Thread.sleep(retryIntervalMilliseconds);
ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
attemptCount++;
}
if (attemptCount >= downloadFileAttempts && filter.filterFiles(localDirectory.listFiles()).size() == 0) {
throw new FileNotFoundException("Could not find remote file(s) matching " + fileNamePattern + " after " + downloadFileAttempts + " attempts.");
}
}
return RepeatStatus.FINISHED;
}
public String getFileNamePattern() {
return fileNamePattern;
}
public void setFileNamePattern(String fileNamePattern) {
this.fileNamePattern = fileNamePattern;
}
public String getClientName() {
return clientName;
}
public void setClientName(String clientName) {
this.clientName = clientName;
}
public boolean isRetryIfNotFound() {
return retryIfNotFound;
}
public void setRetryIfNotFound(boolean retryIfNotFound) {
this.retryIfNotFound = retryIfNotFound;
}
public File getLocalDirectory() {
return localDirectory;
}
public void setLocalDirectory(File localDirectory) {
this.localDirectory = localDirectory;
}
public int getDownloadFileAttempts() {
return downloadFileAttempts;
}
public void setDownloadFileAttempts(int downloadFileAttempts) {
this.downloadFileAttempts = downloadFileAttempts;
}
public long getRetryIntervalMilliseconds() {
return retryIntervalMilliseconds;
}
public void setRetryIntervalMilliseconds(long retryIntervalMilliseconds) {
this.retryIntervalMilliseconds = retryIntervalMilliseconds;
}
}
我的理解(请在这里纠正错误) application.properties文件属性可以注入到tasklet中(如上面所示)。然后我试着建造这个包裹。
mvn clean package
我得到以下错误:
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'getFilesFromFTPServer' threw exception; nested exception is java.lang.IllegalArgumentException: Path must not be null
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:651) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
... 122 common frames omitted
Caused by: java.lang.IllegalArgumentException: Path must not be null
at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.core.io.FileSystemResource.<init>(FileSystemResource.java:80) ~[spring-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at com.my.batch.core.tasklet.RemoteFileInboundTasklet.clientSessionFactory(RemoteFileInboundTasklet.java:78) ~[classes/:na]
at com.my.batch.core.tasklet.RemoteFileInboundTasklet.<init>(RemoteFileInboundTasklet.java:83) ~[classes/:na]
at com.my.batch.core.BatchConfiguration.getFilesFromFTPServer(BatchConfiguration.java:71) ~[classes/:na]
at com.my.batch.core.BatchConfiguration$$EnhancerBySpringCGLIB$$17d8a6d9.CGLIB$getFilesFromFTPServer$1(<generated>) ~[classes/:na]
代码中的行是:
ftpSessionFactory.setPrivateKey(new FileSystemResource(private_key_file));
通过BatchConfiguration.java -> getFilesFromFTPServer呼叫。
这意味着我来自applcation.properties的值没有被传递?我需要做什么改变?
而且,在编译或构建jar时,为什么要检查变量的值?
新编辑:
我试图在配置中将我的tasklet声明为bean,然后再次构建包。然而,它也产生了同样的错误。
更改后的application.properties文件:
spring.datasource.url=jdbc:postgresql://dbhost:1000/db
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.platform=postgresql
spring.batch.job.enabled=false
local.directory=/my/local/path/
file.name=file_name_20200601.csv
remote.directory=/remote/ftp/location
remote.host=remotehost
remote.port=22
remote.user=remoteuser
private.key.location=/key/file/location
任务没有改变。
更改配置:
@Configuration
@EnableBatchProcessing
@EnableIntegration
@EnableAutoConfiguration
public class BatchConfiguration {
private Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public RemoteFileInboundTasklet remoteFileInboundTasklet() {
return new RemoteFileInboundTasklet();
}
@Bean
public Job ftpJob() {
return jobBuilderFactory.get("FTP Job")
.incrementer(new RunIdIncrementer())
.start(getFilesFromFTPServer())
.build();
}
@Bean
public Step getFilesFromFTPServer() {
return stepBuilderFactory.get("Get file from server")
.tasklet(remoteFileInboundTasklet())
.build();
}
}
当我试图构建包(mvn干净包)时,仍然会遇到同样的错误。
路径不得为空。
它无法读取属性。知道是怎么回事吗?
基于不同方法的编辑:
我试图进一步了解如何使用配置,并发现了以下使用@ConfigurationProperties注释(如何访问Spring中application.properties文件中定义的值)的方法
我创建了一个新的ftp配置类:
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@ConfigurationProperties(prefix = "ftp")
@Configuration("coreFtpProperties")
public class CoreFtp {
private String host;
private String port;
private String user;
private String passwordKey;
private String localDirectory;
private String remoteDirectory;
private String fileName;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getPort() {
return port;
}
public void setPort(String port) {
this.port = port;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPasswordKey() {
return passwordKey;
}
public void setPasswordKey(String passwordKey) {
this.passwordKey = passwordKey;
}
public String getLocalDirectory() {
return localDirectory;
}
public void setLocalDirectory(String localDirectory) {
this.localDirectory = localDirectory;
}
public String getRemoteDirectory() {
return remoteDirectory;
}
public void setRemoteDirectory(String remoteDirectory) {
this.remoteDirectory = remoteDirectory;
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
}
对application.properties文件的小改动:
spring.datasource.url=jdbc:postgresql://dbhost:1000/db
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.platform=postgresql
spring.batch.job.enabled=false
ftp.local_directory=/my/local/path/
ftp.file_name=file_name_20200601.csv
ftp.remote_directory=/remote/ftp/location
ftp.host=remotehost
ftp.port=22
ftp.user=remoteuser
ftp.password_key=/key/file/location
在我的批处理配置中,我做了以下更改:
@Configuration
@EnableBatchProcessing
@EnableIntegration
public class BatchConfiguration {
private Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private CoreFtp coreFtpProperties;
@Bean
public RemoteFileInboundTasklet remoteFileInboundTasklet() {
RemoteFileInboundTasklet ftpTasklet = new RemoteFileInboundTasklet();
ftpTasklet.setRetryIfNotFound(true);
ftpTasklet.setDownloadFileAttempts(3);
ftpTasklet.setRetryIntervalMilliseconds(10000);
ftpTasklet.setFileNamePattern(coreFtpProperties.getFileName());
ftpTasklet.setRemoteDirectory(coreFtpProperties.getRemoteDirectory());
ftpTasklet.setLocalDirectory(new File(coreFtpProperties.getLocalDirectory()));
ftpTasklet.setSessionFactory(clientSessionFactory());
ftpTasklet.setFtpInboundFileSynchronizer(sftpInboundFileSynchronizer());
ftpTasklet.setSftpInboundFileSynchronizingMessageSource(new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer()));
return ftpTasklet;
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer sftpInboundFileSynchronizer = new SftpInboundFileSynchronizer(clientSessionFactory());
sftpInboundFileSynchronizer.setDeleteRemoteFiles(false);
sftpInboundFileSynchronizer.setRemoteDirectory(coreFtpProperties.getRemoteDirectory());
return sftpInboundFileSynchronizer;
}
@Bean(name = "clientSessionFactory")
public SessionFactory<LsEntry> clientSessionFactory() {
DefaultSftpSessionFactory ftpSessionFactory = new DefaultSftpSessionFactory();
ftpSessionFactory.setHost(coreFtpProperties.getHost());
ftpSessionFactory.setPort(Integer.parseInt(coreFtpProperties.getPort()));
ftpSessionFactory.setUser(coreFtpProperties.getUser());
ftpSessionFactory.setPrivateKey(new FileSystemResource(coreFtpProperties.getPasswordKey()));
ftpSessionFactory.setPassword("");
ftpSessionFactory.setAllowUnknownKeys(true);
return ftpSessionFactory;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler clientMessageHandler() {
SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(clientSessionFactory(), "mget", "payload");
sftpOutboundGateway.setAutoCreateLocalDirectory(true);
sftpOutboundGateway.setLocalDirectory(new File(coreFtpProperties.getLocalDirectory()));
sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED);
sftpOutboundGateway.setFilter(new AcceptOnceFileListFilter<>());
return sftpOutboundGateway;
}
@Bean
public Job ftpJob() {
return jobBuilderFactory.get("FTP Job")
.incrementer(new RunIdIncrementer())
.start(getFilesFromFTPServer())
.build();
}
@Bean
public Step getFilesFromFTPServer() {
return stepBuilderFactory.get("Get file from server")
.tasklet(remoteFileInboundTasklet())
.build();
}
}
因此,相应地,我的Tasklet更改为:
public class RemoteFileInboundTasklet implements Tasklet {
private Logger logger = LoggerFactory.getLogger(RemoteFileInboundTasklet.class);
private String fileNamePattern;
private String clientName;
private boolean deleteLocalFiles = true;
private boolean retryIfNotFound = false;
private File localDirectory;
private int downloadFileAttempts = 12;
private long retryIntervalMilliseconds = 300000;
private String remoteDirectory;
private SessionFactory sessionFactory;
private SftpInboundFileSynchronizer ftpInboundFileSynchronizer;
private SftpInboundFileSynchronizingMessageSource sftpInboundFileSynchronizingMessageSource;
public boolean isDeleteLocalFiles() {
return deleteLocalFiles;
}
public void setDeleteLocalFiles(boolean deleteLocalFiles) {
this.deleteLocalFiles = deleteLocalFiles;
}
public SftpInboundFileSynchronizer getFtpInboundFileSynchronizer() {
return ftpInboundFileSynchronizer;
}
public void setFtpInboundFileSynchronizer(SftpInboundFileSynchronizer ftpInboundFileSynchronizer) {
this.ftpInboundFileSynchronizer = ftpInboundFileSynchronizer;
}
public SessionFactory getSessionFactory() {
return sessionFactory;
}
public void setSessionFactory(SessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
}
public SftpInboundFileSynchronizingMessageSource getSftpInboundFileSynchronizingMessageSource() {
return sftpInboundFileSynchronizingMessageSource;
}
public void setSftpInboundFileSynchronizingMessageSource(SftpInboundFileSynchronizingMessageSource sftpInboundFileSynchronizingMessageSource) {
this.sftpInboundFileSynchronizingMessageSource = sftpInboundFileSynchronizingMessageSource;
}
public String getRemoteDirectory() {
return remoteDirectory;
}
public void setRemoteDirectory(String remoteDirectory) {
this.remoteDirectory = remoteDirectory;
}
private SFTPGateway sftpGateway;
private void deleteLocalFiles()
{
if (deleteLocalFiles)
{
SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
List<File> matchingFiles = filter.filterFiles(localDirectory.listFiles());
if (CollectionUtils.isNotEmpty(matchingFiles))
{
for (File file : matchingFiles)
{
FileUtils.deleteQuietly(file);
}
}
}
}
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
deleteLocalFiles();
ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
if (retryIfNotFound) {
SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
int attemptCount = 1;
while (filter.filterFiles(localDirectory.listFiles()).size() == 0 && attemptCount <= downloadFileAttempts) {
logger.info("File(s) matching " + fileNamePattern + " not found on remote site. Attempt " + attemptCount + " out of " + downloadFileAttempts);
Thread.sleep(retryIntervalMilliseconds);
ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
attemptCount++;
}
if (attemptCount >= downloadFileAttempts && filter.filterFiles(localDirectory.listFiles()).size() == 0) {
throw new FileNotFoundException("Could not find remote file(s) matching " + fileNamePattern + " after " + downloadFileAttempts + " attempts.");
}
}
return RepeatStatus.FINISHED;
}
}
基于上述更改,我能够编译代码并创建必要的Jar,并使用jar运行代码。
发布于 2020-06-02 07:23:30
您正在声明一个bean jobExecutionListener()
,在其中创建new FileSystemResource(config_file_path);
。config_file_path
是从作业参数@Value("#{jobParameters['ConfigFilePath']}")
注入的,这些参数在配置时不可用,但只在运行作业/步骤时才可用。这叫做后期绑定。
因此,在您的示例中,当Spring尝试创建bean jobExecutionListener()
时,它尝试注入config_file_path
,但此时它是空的(此时Spring只是创建bean来配置应用程序上下文),作业尚未运行,因此方法beforeJob
尚未执行。这就是你拥有NullPointerException
的原因。在@JobScope
bean上添加jobExecutionListener()
应该可以解决这个问题,但我不建议这样做。原因是您试图以错误的方式和错误的位置配置一些属性,所以我将修复该设计,而不是通过添加注释来解决这个问题。
作业参数用于业务参数,而不是技术细节。在您的例子中,runDate
是作业参数的好选择,而不是ConfigFilePath
。此外,既然使用Spring,为什么要注入文件路径然后执行properties = PropertiesLoaderUtils.loadProperties(resource);
和Integer.parseInt(properties.getProperty("remote.port"));
?如果让Spring在需要的地方注入属性,那么Spring将为您做到这一点。
我将删除这个config_file_path
作业参数以及作业侦听器,并将这些属性直接注入到remoteFileInboundTasklet
中,即尽可能接近需要这些属性的地方。
编辑:添加代码示例
您能帮助理解在哪里可以将tasklet声明为bean吗?
在您的步骤getFilesFromFTPServer
中,您正在手动创建任务线程,因此不执行依赖项注入。您需要将tasklet声明为Spring才能工作,如下所示:
@Bean
public Tasklet myTasklet() {
return new RemoteFileInboundTasklet()
}
@Bean
public Step getFilesFromFTPServer() {
return stepBuilderFactory.get("Get file from server")
.tasklet(myTasklet())
.build();
}
https://stackoverflow.com/questions/62140041
复制相似问题