1 介绍
用Spring Batch实现一个简单的需求,将csv文件转换成xml文件。
csv文件如下:record.csv
username, user_id, transaction_date, transaction_amountdevendra, 1234, 31/10/2015, 10000john, 2134, 3/12/2015, 12321robin, 2134, 2/02/2015, 23411
转换成的xml文件如下:record.xml
10000.0 2015-10-31T00:00:00+08:00 1234 devendra 12321.0 2015-12-03T00:00:00+08:00 2134 john 23411.0 2015-02-02T00:00:00+08:00 2134 robin
2 实现
下面用Spring Batch实现,思路很简单,定义一个Reader读record.csv,定义一个Writer写record.xml
2.1 用Java来配置
SpringConfig.java,部分代码
// 数据源@Beanpublic DataSource dataSource() { DriverManagerDataSource dataSource = new DriverManagerDataSource(); dataSource.setDriverClassName("org.sqlite.JDBC"); dataSource.setUrl("jdbc:sqlite:repository.sqlite"); return dataSource;}// 事务管理器private PlatformTransactionManager getTransactionManager() { return new ResourcelessTransactionManager();}// 任务仓库private JobRepository getJobRepository() throws Exception { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); factory.setDataSource(dataSource()); factory.setTransactionManager(getTransactionManager()); // JobRepositoryFactoryBean's methods Throws Generic Exception, // it would have been better to have a specific one factory.afterPropertiesSet(); return (JobRepository) factory.getObject();}// 任务加载器public JobLauncher getJobLauncher() throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); // SimpleJobLauncher's methods Throws Generic Exception, // it would have been better to have a specific one jobLauncher.setJobRepository(getJobRepository()); jobLauncher.afterPropertiesSet(); return jobLauncher;}
2.2 创建DTO对象
创建一个Transaction.java用来传输record.csv中的每行数据
package com.yysue.batch.demo01.model;import java.util.Date;import javax.xml.bind.annotation.XmlRootElement;@SuppressWarnings("restriction")@XmlRootElement(name = "transactionRecord")public class Transaction { private String username; private int userId; private Date transactionDate; private double amount; // get/set...}
2.3 定义ItemReader
@Beanpublic ItemReaderitemReader() throws UnexpectedInputException, ParseException { // 数据来源 Resource inputCsv = new ClassPathResource("demo01/input/record.csv"); // Spring Batch的内置reader FlatFileItemReader reader = new FlatFileItemReader<>(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); String[] tokens = {"username", "userid", "transactiondate", "amount"}; tokenizer.setNames(tokens); reader.setResource(inputCsv); DefaultLineMapper lineMapper = new DefaultLineMapper (); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); reader.setLinesToSkip(1); // 文件开头需要忽略的行数 reader.setLineMapper(lineMapper); return reader;}
2.4 定义ItemWriter
@Beanpublic ItemWriteritemWriter(Marshaller marshaller) throws MalformedURLException { String userHome = System.getProperty("user.home"); Resource outputXml = new FileSystemResource(userHome + "/output/demo01/record.xml"); StaxEventItemWriter itemWriter = new StaxEventItemWriter (); itemWriter.setMarshaller(marshaller); itemWriter.setRootTagName("transactionRecord"); itemWriter.setResource(outputXml); return itemWriter;}
2.5 定义Processor
@Beanpublic ItemProcessoritemProcessor() { return new CustomItemProcessor();}
2.6 定义Job和Step
@Beanprotected Step step1(ItemReaderreader, ItemProcessor processor, ItemWriter writer) { return steps.get("step1") . chunk(10) .reader(reader) .processor(processor) .writer(writer) .build();}@Bean(name = "firstBatchJob")public Job job(@Qualifier("step1") Step step1) { return jobs.get("firstBatchJob").start(step1).build();}
2.7 定义主程序
App.java
public static void main(final String[] args) { // Spring Java config final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.register(SpringConfig.class); context.register(SpringBatchConfig.class); context.refresh(); // Spring xml config // ApplicationContext context = new ClassPathXmlApplicationContext("demo01/spring-batch.xml"); final JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); final Job job = (Job) context.getBean("firstBatchJob"); System.out.println("Starting the batch job"); try { final JobExecution execution = jobLauncher.run(job, new JobParameters()); System.out.println("Job Status : " + execution.getStatus()); System.out.println("Job succeeded"); } catch (final Exception e) { e.printStackTrace(); System.out.println("Job failed"); } System.exit(0);}
运行主程序,会读取项目的resources/demo01/input/record.csv文件,在用户目录下生成文件 xml文件为:output/demo01/record.xml
3 总结
本例完整实现