首先,我们需要创建一个用来存储数据的表,这里我们创建一个名为“person”的表,包含id、name和age三个字段:
CREATE TABLE person (
id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
age INT NOT NULL,
PRIMARY KEY (id)
);
接下来,我们需要创建一个用来读取CSV文件数据的ItemReader,这里我们使用FlatFileItemReader来读取CSV文件。在创建FlatFileItemReader时,我们需要指定CSV文件的路径、字段分隔符、字段映射关系等属性。
@Bean
public FlatFileItemReader<Person> reader() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("persons.csv"));
reader.setLinesToSkip(1); // 跳过CSV文件的标题行
reader.setLineMapper(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames("name", "age");
setDelimiter(",");
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
return reader;
}
在这个示例中,我们创建了一个名为“reader”的FlatFileItemReader,它从一个名为“persons.csv”的CSV文件中读取数据创建ItemWriter
接下来,我们需要创建一个用来写入数据到数据库的ItemWriter,这里我们使用JdbcBatchItemWriter来写入数据。在创建JdbcBatchItemWriter时,我们需要指定数据源、SQL语句、参数映射关系等属性。
@Bean
public JdbcBatchItemWriter<Person> writer() {
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("INSERT INTO person (name, age) VALUES (:name, :age)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
return writer;
}
在这个示例中,我们创建了一个名为“writer”的JdbcBatchItemWriter,它使用了dataSource数据源,并指定了一个SQL语句,用于向“person”表中插入数据。在SQL语句中,我们使用了“:name”和“:age”两个占位符,用于映射Item中的数据。
接下来,我们需要创建两个Step,分别用于读取CSV文件和写入数据库。在创建Step时,我们需要指定ItemReader、ItemProcessor、ItemWriter等属性。
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) -> {
// do something
return RepeatStatus.FINISHED;
})
.build();
}
在这个示例中,我们创建了两个Step,第一个Step是“step1”,用于读取CSV文件并写入数据库;第二个Step是“step2”,用于执行一些简单的任务。在“step1”中,我们使用了chunk()方法来指定每次读取和处理的数据条数,同时指定了ItemReader、ItemProcessor和ItemWriter。在“step2”中,我们使用了tasklet()方法来指定一个简单的任务,该任务返回RepeatStatus.FINISHED表示执行完成。
最后,我们需要将两个Step组合起来,并创建一个Job来执行整个批处理任务。在创建Job时,我们需要指定Job的名称、版本号和Step的依赖关系。
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(step1())
.next(step2())
.build();
}
在这个示例中,我们创建了一个名为“job”的Job,使用了RunIdIncrementer来增加运行ID,表示不同的执行实例。Job包含了两个Step,第一个Step是“step1”,第二个Step是“step2”。Job的执行顺序为先执行“step1”,然后再执行“step2”。
最后,我们需要使用JobLauncher来启动批处理任务的执行。
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
public void runJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("inputFile", "classpath:input.csv")
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
在这个示例中,我们使用了JobLauncher来启动Job的执行,同时为Job指定了两个参数,分别是“inputFile”和“time”。这些参数可以在Job中使用,以便动态地改变Job的行为。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。