本文共 34302 字,大约阅读时间需要 114 分钟。
Spring-batch学习总结(四)
一.ItemWriter简介1.对于read读取数据时是一个item为单位的循环读取,而对于writer写入数据则是以chunk为单位,一块一块的进行写入2.例(我们举一个小例子来认识其writer原理):代码:OutOverViewApplicationpackage com.dhcc.batch.batchDemo.output.outview;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class OutOverViewApplication { public static void main(String[] args) { SpringApplication.run(OutOverViewApplication.class, args); }}
OutputViewItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.outview;import java.util.ArrayList;import java.util.List;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepScope;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class OutputViewItemWriterConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("OutputViewItemWriter") private ItemWriter outputViewItemWriter; @Bean public Job OutputViewItemWriterJob3() { return jobBuilderFactory.get("OutputViewItemWriterJob3") .start(OutputViewItemWriterStep3()) .build(); } @Bean public Step OutputViewItemWriterStep3() { return stepBuilderFactory.get("OutputViewItemWriterStep3") .chunk(10) .reader(listViewItemRead()) .writer(outputViewItemWriter) .build(); } @Bean @StepScope public ListItemViewReader listViewItemRead() { List dataList=new ArrayList<>(); for(int i=0;i<100;i++) { dataList.add("my name is zhongqiujie"+i); } return new ListItemViewReader (dataList); }}
ListItemViewReader
package com.dhcc.batch.batchDemo.output.outview;import java.util.Iterator;import java.util.List;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.NonTransientResourceException;import org.springframework.batch.item.ParseException;import org.springframework.batch.item.UnexpectedInputException;@SuppressWarnings("hiding")public class ListItemViewReaderimplements ItemReader { private final Iterator iterator; public ListItemViewReader(List data) { this.iterator = data.iterator(); } @Override public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { if (iterator.hasNext()) { return this.iterator.next(); } else { return null; } }}
OutputViewItemWriter implements
package com.dhcc.batch.batchDemo.output.outview;import java.util.List;import org.springframework.batch.item.ItemWriter;import org.springframework.stereotype.Component;@Component("OutputViewItemWriter")public class OutputViewItemWriter implements ItemWriter{ @Override public void write(List items) throws Exception { System.out.println("writer chunk size is :" + items.size()); for (String item : items) { System.out.println("writer data is:" + item); } }}
运行结果:
二.将数据写入到数据库
1.在spring batch中为我们提供了许多将数据写入到数据库中的writer(1)Neo4jItemWriter;(2)MongoItemWriter;..........2.此处我们只学习JdbcBatchItemWriter例:我们先在数据库中建立数据表alipaytrando,结构如下:接下来我们将项目中的springbatchtest2文件读出并写入到数据库表alipaytrando中Springbatchtest2文件结构如下:开始写代码: AlipayTranDopackage com.dhcc.batch.batchDemo.output.db.entity;public class AlipayTranDo { private String tranId; private String channel; private String tranType; private String counterparty; private String goods; private String amount; private String isDebitCredit; private String state; public AlipayTranDo(String tranId, String channel, String tranType, String counterparty, String goods, String amount, String isDebitCredit, String state) { super(); this.tranId = tranId; this.channel = channel; this.tranType = tranType; this.counterparty = counterparty; this.goods = goods; this.amount = amount; this.isDebitCredit = isDebitCredit; this.state = state; } public String getTranId() { return tranId; } public void setTranId(String tranId) { this.tranId = tranId; } public String getChannel() { return channel; } public void setChannel(String channel) { this.channel = channel; } public String getTranType() { return tranType; } public void setTranType(String tranType) { this.tranType = tranType; } public String getCounterparty() { return counterparty; } public void setCounterparty(String counterparty) { this.counterparty = counterparty; } public String getGoods() { return goods; } public void setGoods(String goods) { this.goods = goods; } public String getAmount() { return amount; } public void setAmount(String amount) { this.amount = amount; } public String getIsDebitCredit() { return isDebitCredit; } public void setIsDebitCredit(String isDebitCredit) { this.isDebitCredit = isDebitCredit; } public String getState() { return state; } public void setState(String state) { this.state = state; } @Override public String toString() { return "AlipayTranDO{" + "tranId='" + tranId + '\'' + ", channel='" + channel + '\'' + ", tranType='" + tranType + '\'' + ", counterparty='" + counterparty + '\'' + ", goods='" + goods + '\'' + ", amount='" + amount + '\'' + ", isDebitCredit='" + isDebitCredit + '\'' + ", state='" + state + '\'' + '}'; } }
AlipayTranDoFileMapper
package com.dhcc.batch.batchDemo.output.db.util;import org.springframework.batch.item.file.mapping.FieldSetMapper;import org.springframework.batch.item.file.transform.FieldSet;import org.springframework.validation.BindException;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;public class AlipayTranDoFileMapper implements FieldSetMapper{ @Override public AlipayTranDo mapFieldSet(FieldSet fieldSet) throws BindException { return new AlipayTranDo(fieldSet.readString("tranId") , fieldSet.readString("channel") ,fieldSet.readString("tranType") , fieldSet.readString("counterparty") , fieldSet.readString("goods") ,fieldSet.readString("amount") , fieldSet.readString("isDebitCredit") , fieldSet.readString("state") ); }}
OutputItemWriterDBApplication
package com.dhcc.batch.batchDemo.output.db.jdbcout;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class OutputItemWriterDBApplication { public static void main(String[] args) { SpringApplication.run(OutputItemWriterDBApplication.class, args); }}
*OutputItemWriterDBConfiguration
package com.dhcc.batch.batchDemo.output.db.jdbcout;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;@Configurationpublic class OutputItemWriterDBConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("outputDBItemReader") private ItemReader outputDBItemReader; @Autowired @Qualifier("outputDBItemWriter") private ItemWriter outputDBItemWriter; @Autowired private MyProcess myProcess; @Bean public Job OutputItemWriterDBJob2() { return jobBuilderFactory.get("OutputItemWriterDBJob2").start(OutputItemWriterDBStep2()).build(); } @Bean public Step OutputItemWriterDBStep2() { return stepBuilderFactory.get("OutputItemWriterDBStep2").chunk(50) .reader(outputDBItemReader) .processor(myProcess) .writer(outputDBItemWriter) .build(); }}
OutputItemWriterDBItemReaderConfiguration
package com.dhcc.batch.batchDemo.output.db.jdbcout;import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.batch.item.file.mapping.DefaultLineMapper;import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.ClassPathResource;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;import com.dhcc.batch.batchDemo.output.db.util.AlipayTranDoFileMapper;@Configurationpublic class OutputItemWriterDBItemReaderConfiguration { @Bean public FlatFileItemReaderoutputDBItemReader(){ FlatFileItemReader reader=new FlatFileItemReader (); reader.setEncoding("UTF-8"); reader.setResource(new ClassPathResource("/data/init/springbatchtest2.csv")); reader.setLinesToSkip(5); DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer(); tokenizer.setNames(new String[] {"tranId","channel","tranType","counterparty","goods","amount","isDebitCredit","state"} ); DefaultLineMapper lineMapper=new DefaultLineMapper (); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new AlipayTranDoFileMapper()); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; }}
MyProcess
package com.dhcc.batch.batchDemo.output.db.jdbcout;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;@Componentpublic class MyProcess implements ItemProcessor{ @Override public AlipayTranDo process(AlipayTranDo item) throws Exception { System.out.println(item); return item; }}
OutputItemWriterDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.db.jdbcout;import javax.sql.DataSource;import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;import org.springframework.batch.item.database.JdbcBatchItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;@Configurationpublic class OutputItemWriterDBItemWriterConfiguration { @Autowired private DataSource dataSource; @Bean public JdbcBatchItemWriteroutputDBItemWriter() { System.out.println(); JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>(); writer.setDataSource(dataSource); writer.setSql( "insert into alipaytrando" + "(tranId,channel,tranType,counterparty,goods,amount,isDebitCredit,state) values" + "(:tranId,:channel,:tranType,:counterparty,:goods,:amount,:isDebitCredit,:state) "); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider ()); return writer; }}
运行结果:
观察控制台可得我们的项目运行成功,接下来我们再到数据中观察数据是否成功插入发现表中数据已经插入成功三.将数据写入到普通文件中1.FlatFileItemWriter可以将任何一个类型为T的对象数据写入到普通文件中2.例:我们将数据库中的alipaytrando中的数据读出并且写入到普通文件中接下里我们开始编写代码:实体类AlipayTranDo与上一个例子一样,我们不在重复展示AlipayTranDoFileMapperpackage com.dhcc.batch.batchDemo.output.flatfile;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;public class AlipayTranDoFileMapper implements RowMapper{ @Override public AlipayTranDo mapRow(ResultSet rs, int rowNum) throws SQLException { return new AlipayTranDo(rs.getString("tranId"), rs.getString("channel"), rs.getString("tranType"), rs.getString("counterparty"), rs.getString("goods"), rs.getString("amount"), rs.getString("isDebitCredit"), rs.getString("state")); }}
AlipayTranDoLineAggregator
package com.dhcc.batch.batchDemo.output.flatfile;import org.springframework.batch.item.file.transform.LineAggregator;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;public class AlipayTranDoLineAggregator implements LineAggregator{ //JSON private ObjectMapper mapper=new ObjectMapper(); @Override public String aggregate(AlipayTranDo alipayTranDo) { try { return mapper.writeValueAsString(alipayTranDo); } catch (JsonProcessingException e) { throw new RuntimeException("unable to writer...",e); } }}
FlatFileOutputFromDBConfiguration
package com.dhcc.batch.batchDemo.output.flatfile;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FlatFileOutputFromDBConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("flatFileOutputFromDBItemReader") private ItemReader flatFileOutputFromDBItemReader; @Autowired @Qualifier("flatFileOutputFromDBItemWriter") private ItemWriter flatFileOutputFromDBItemWriter; @Bean public Job FlatFileOutputFromDBJob() { return jobBuilderFactory.get("FlatFileOutputFromDBJob").start(FlatFileOutputFromDBStep()).build(); } @Bean public Step FlatFileOutputFromDBStep() { return stepBuilderFactory.get("FlatFileOutputFromDBStep").chunk(100) .reader(flatFileOutputFromDBItemReader).writer(flatFileOutputFromDBItemWriter).build(); }}
FlatFileOutputFromDBItemReaderConfiguration
package com.dhcc.batch.batchDemo.output.flatfile;import java.util.HashMap;import java.util.Map;import javax.sql.DataSource;import org.springframework.batch.item.database.JdbcPagingItemReader;import org.springframework.batch.item.database.Order;import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FlatFileOutputFromDBItemReaderConfiguration { @Autowired private DataSource dataSource; @Bean public JdbcPagingItemReaderflatFileOutputFromDBItemReader() { JdbcPagingItemReader reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); // 设置数据源 reader.setFetchSize(100); // 设置一次最大读取条数 reader.setRowMapper(new AlipayTranDoFileMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中 MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("tranId,channel,tranType,counterparty,goods,amount,isDebitCredit,state"); // 设置查询的列 queryProvider.setFromClause("from alipaytrando"); // 设置要查询的表 Map sortKeys = new HashMap ();// 定义一个集合用于存放排序列 sortKeys.put("tranId", Order.ASCENDING);// 按照升序排序 queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider);// 设置排序列 return reader; }}
FlatFileOutputFromDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.flatfile;import java.io.File;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;@Configurationpublic class FlatFileOutputFromDBItemWriterConfiguration { @Bean public FlatFileItemWriterflatFileOutputFromDBItemWriter(){ FlatFileItemWriter writer=new FlatFileItemWriter (); try { File path=new File("D:"+File.separator+"alipayTranDo.data").getAbsoluteFile();// String path=File.createTempFile("alipayTranDo", ".data").getAbsolutePath(); System.out.println("file is create in :"+path); writer.setResource(new FileSystemResource(path)); writer.setLineAggregator(new AlipayTranDoLineAggregator()); writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; }}
OutputItemWriterFlatFileApplication
package com.dhcc.batch.batchDemo.output.flatfile;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class OutputItemWriterFlatFileApplication { public static void main(String[] args) { SpringApplication.run(OutputItemWriterFlatFileApplication.class, args); }}
运行结果:
控制台显示文件读取写入成功,我们根据文件地址,观察写入后的普通文件四.将数据写入到xml文件中1.将数据写入到xml文件中,我们必须用到StaxEventItemWriter;2.我们也会用到XStreamMarshall来序列文件例:我们将数据库表alipaytrando中的数据写入到本地磁盘中代码(此处我们只展示writer,用来写入的类,其他的均与上一个例子相同):XMLFileOutputFromDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.xmlfile;import java.io.File;import java.util.HashMap;import java.util.Map;import org.springframework.batch.item.xml.StaxEventItemWriter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;import org.springframework.oxm.xstream.XStreamMarshaller;@Configurationpublic class XMLFileOutputFromDBItemWriterConfiguration { @Bean public StaxEventItemWriterxmlFileOutputFromDBItemWriter() throws Exception { XStreamMarshaller marshaller = new XStreamMarshaller(); @SuppressWarnings("rawtypes") Map aliases = new HashMap<>(); aliases.put("alipayTranDo", AlipayTranDo.class); marshaller.setAliases(aliases); StaxEventItemWriter writer = new StaxEventItemWriter<>(); writer.setRootTagName("alipaytrandos"); writer.setMarshaller(marshaller); File path = new File("D:" + File.separator + "alipayTranDo.xml").getAbsoluteFile(); System.out.println("file is create in :" + path); writer.setResource(new FileSystemResource(path)); writer.afterPropertiesSet(); return writer; }}
运行结果:
根据地址观察写入后的xml文件五.将数据写入到多文件1.将数据写入多个文件,我们使用CompositItemWriter<T>或者使用ClassifierCompositItemWriter<T>2.例(1):我们将数据表alipaytrandao中的数据分别写入到xml文件和json文件中此处我们只展示writer(其余代码与上例相同):mutipleFileOutputFromDBItemWriterConfigurationpackage com.dhcc.batch.batchDemo.output.mutiple.composit;import java.io.File;import java.util.Arrays;import java.util.HashMap;import java.util.Map;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.batch.item.support.CompositeItemWriter;import org.springframework.batch.item.xml.StaxEventItemWriter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;import org.springframework.oxm.xstream.XStreamMarshaller;@Configurationpublic class mutipleFileOutputFromDBItemWriterConfiguration { @Bean public FlatFileItemWriterjsonFileItemWriter(){ FlatFileItemWriter writer=new FlatFileItemWriter (); try { File path=new File("D:"+File.separator+"alipayTranDo1.json").getAbsoluteFile();// String path=File.createTempFile("alipayTranDo", ".json").getAbsolutePath(); System.out.println("file is create in :"+path); writer.setResource(new FileSystemResource(path)); writer.setLineAggregator(new AlipayTranDoLineAggregator()); writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; } @Bean public StaxEventItemWriter xmlFileItemWriter() throws Exception{ XStreamMarshaller marshaller=new XStreamMarshaller(); @SuppressWarnings("rawtypes") Map aliases=new HashMap<>(); aliases.put("alipayTranDo", AlipayTranDo.class); marshaller.setAliases(aliases); StaxEventItemWriter writer=new StaxEventItemWriter<>(); writer.setRootTagName("alipaytrandos"); writer.setMarshaller(marshaller); File path=new File("D:"+File.separator+"alipayTranDo1.xml").getAbsoluteFile(); System.out.println("file is create in :"+path); writer.setResource(new FileSystemResource(path)); writer.afterPropertiesSet(); return writer; } @Bean public CompositeItemWriter alipayTranDoFileOutputFromDBItemWriter() throws Exception{ CompositeItemWriter itemWriter=new CompositeItemWriter<>(); itemWriter.setDelegates(Arrays.asList(xmlFileItemWriter(),jsonFileItemWriter())); itemWriter.afterPropertiesSet(); return itemWriter; }}
运行结果:
观察文件:Json:Xml:3.例(2):我们将同一个文件进行分类写入:首先我们观察数据库表person_buf的数据结构(数据总数是10001):我们的目标是将数据从数据库读出按照id的奇偶分别写入不同类型的文件中
接下来上代码: Personpackage com.dhcc.batch.batchDemo.output.mutiple.classifier;import java.util.Date;public class Person { private Integer id; private String name; private String perDesc; private Date createTime; private Date updateTime; private String sex; private Float score; private Double price; public Person() { super(); } public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score, Double price) { super(); this.id = id; this.name = name; this.perDesc = perDesc; this.createTime = createTime; this.updateTime = updateTime; this.sex = sex; this.score = score; this.price = price; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getCreateTime() { return createTime; } public String getPerDesc() { return perDesc; } public void setPerDesc(String perDesc) { this.perDesc = perDesc; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public Float getScore() { return score; } public void setScore(Float score) { this.score = score; } public Double getPrice() { return price; } public void setPrice(Double price) { this.price = price; } @Override public String toString() { return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime=" + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]"; }}
PersonLineAggregator
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import org.springframework.batch.item.file.transform.LineAggregator;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;public class PersonLineAggregator implements LineAggregator{ //JSON private ObjectMapper mapper=new ObjectMapper(); @Override public String aggregate(Person person) { try { return mapper.writeValueAsString(person); } catch (JsonProcessingException e) { throw new RuntimeException("unable to writer...",e); } }}
PersonRowMapper
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;/** * 实现将数据库中的每条数据映射到Person对象中 * @author Administrator * */public class PersonRowMapper implements RowMapper{ /** * rs一条结果集,rowNum代表当前行 */ @Override public Person mapRow(ResultSet rs, int rowNum) throws SQLException { return new Person(rs.getInt("id") ,rs.getString("name") ,rs.getString("per_desc") ,rs.getDate("create_time") ,rs.getDate("update_time") ,rs.getString("sex") ,rs.getFloat("score") ,rs.getDouble("price")); }}
OutputItemWriterMutipleClassFileApplication
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class OutputItemWriterMutipleClassFileApplication { public static void main(String[] args) { SpringApplication.run(OutputItemWriterMutipleClassFileApplication.class, args); }}
ClassifierMutipleFileOutputFromDBConfiguration
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemStream;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class ClassifierMutipleFileOutputFromDBConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("mutipleFileOutputFromDBItemReader") private ItemReader mutipleFileOutputFromDBItemReader; @Autowired @Qualifier("alipayTranDoFileOutputFromDBItemWriter") private ItemWriter alipayTranDoFileOutputFromDBItemWriter; @Autowired @Qualifier("jsonFileItemWriter") private ItemStream jsonFileItemWriter; @Autowired @Qualifier("xmlFileItemWriter") private ItemStream xmlFileItemWriter; @Bean public Job mutipleFileOutputFromDBJob1() { return jobBuilderFactory.get("mutipleFileOutputFromDBJob1") .start(mutipleFileOutputFromDBStep1()) .build(); } @Bean public Step mutipleFileOutputFromDBStep1() { return stepBuilderFactory.get("mutipleFileOutputFromDBStep1").chunk(100) .reader(mutipleFileOutputFromDBItemReader) .writer(alipayTranDoFileOutputFromDBItemWriter) .stream(jsonFileItemWriter) .stream(xmlFileItemWriter) .build(); }}
mutipleFileOutputFromDBItemReaderConfiguration
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import java.util.HashMap;import java.util.Map;import javax.sql.DataSource;import org.springframework.batch.item.database.JdbcPagingItemReader;import org.springframework.batch.item.database.Order;import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class mutipleFileOutputFromDBItemReaderConfiguration { @Autowired private DataSource dataSource; @Bean public JdbcPagingItemReadermutipleFileOutputFromDBItemReader() { JdbcPagingItemReader reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); // 设置数据源 reader.setFetchSize(100); // 设置一次最大读取条数 reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中 MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列 queryProvider.setFromClause("from person_buf"); // 设置要查询的表 Map sortKeys = new HashMap ();// 定义一个集合用于存放排序列 sortKeys.put("id", Order.ASCENDING);// 按照升序排序 queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider);// 设置排序列 return reader; }}
mutipleFileOutputFromDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import java.io.File;import java.util.HashMap;import java.util.Map;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.batch.item.support.ClassifierCompositeItemWriter;import org.springframework.batch.item.xml.StaxEventItemWriter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;import org.springframework.oxm.xstream.XStreamMarshaller;@Configurationpublic class mutipleFileOutputFromDBItemWriterConfiguration { @Bean public FlatFileItemWriterjsonFileItemWriter(){ FlatFileItemWriter writer=new FlatFileItemWriter (); try { File path=new File("D:"+File.separator+"person.json").getAbsoluteFile(); System.out.println("file is create in :"+path); writer.setResource(new FileSystemResource(path)); writer.setLineAggregator(new PersonLineAggregator()); writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; } @Bean public StaxEventItemWriter xmlFileItemWriter() throws Exception{ XStreamMarshaller marshaller=new XStreamMarshaller(); @SuppressWarnings("rawtypes") Map aliases=new HashMap<>(); aliases.put("person", Person.class); marshaller.setAliases(aliases); StaxEventItemWriter writer=new StaxEventItemWriter<>(); writer.setRootTagName("persons"); writer.setMarshaller(marshaller); File path=new File("D:"+File.separator+"person.xml").getAbsoluteFile(); System.out.println("file is create in :"+path); writer.setResource(new FileSystemResource(path)); writer.afterPropertiesSet(); return writer; } @Bean public ClassifierCompositeItemWriter alipayTranDoFileOutputFromDBItemWriter() throws Exception{ ClassifierCompositeItemWriter itemWriter=new ClassifierCompositeItemWriter (); itemWriter.setClassifier(new MyWriterClassifier(jsonFileItemWriter(),xmlFileItemWriter())); return itemWriter; }}
MyWriterClassifier
package com.dhcc.batch.batchDemo.output.mutiple.classifier;import org.springframework.batch.item.ItemWriter;import org.springframework.classify.Classifier;public class MyWriterClassifier implements Classifier> { private ItemWriter jsonWriter; private ItemWriter xmlWriter; /** * */ private static final long serialVersionUID = -2911015707834323846L; public MyWriterClassifier(ItemWriter jsonWriter, ItemWriter xmlWriter) { this.jsonWriter = jsonWriter; this.xmlWriter = xmlWriter; } @Override public ItemWriter classify(Person classifiable) { if (classifiable.getId()%2==0) { return jsonWriter; }else { return xmlWriter; } }}
运行结果:
观察文件:Person.json:(我们可以看出id为偶数的都写在了json文件中)Person.xml:(我们可以看出id为奇数的都写在了xml文件中)转载于:https://blog.51cto.com/13501268/2298822