Spring Batch是一种用于批处理的框架基于Spring Framework开发,通过读取大量的数据、处理数据和写入大量数据来满足各种类型的企业级批处理需求。
Spring Batch可以很好地处理大量数据,并且提供了丰富的可扩展组件,业务逻辑与框架层的一系列处理步骤的集成也比较简单。
Spring Batch可以很好地支持程序员针对大量的数据,编写代码来执行规范的操作序列,提高开发效率,降低了对于数据库等系统资源访问的影响。
Spring Batch主要包含以下核心概念和组件:
ItemReader
读取出来的数据并进行相应的业务处理。ItemProcessor
处理好后的数据写入到目标存储位置。在进行批处理时需要避免扫描所有的数据,而是应该分批读取并处理数据,这样可以避免对系统资源产生过大压力。对于大数据量的处理任务,建议采取分页处理技术,将大数据量拆分成多个小任务处理,并对每个任务进行分页读取和处理。
@Bean
@StepScope
public ItemReader<Data> reader() {
RepositoryItemReader<Data> reader = new RepositoryItemReader<>();
reader.setRepository(repository);
reader.setMethodName(FIND_DATA_BY_NAME_AND_AGE);
reader.setPageSize(1000);
Map<String, Object> params = new HashMap<>();
params.put("name", "test");
params.put("age", 20);
reader.setParameterValues(params);
return reader;
}
以上例子展示了如何使用Spring Data JPA Repository对数据分页读取,在分页读取时,可以通过setPageSize()指定分页数量。
对于一些经常重复读写的数据可以使用读写缓存,减少读写操作的频率。使用读写缓存能够降低读写磁盘I/O的操作,大大提高批处理数据的处理效率。在Spring Batch中可以通过使用@EnableCaching
来开启缓存。
@Bean
public ItemWriter<Data> writer() {
RepositoryItemWriter<Data> writer = new RepositoryItemWriter<>();
writer.setRepository(repository);
writer.setMethodName(SAVE);
writer.afterPropertiesSet();
return writer;
}
@Bean
public CacheManager cacheManager() {
return new ConcurrentMapCacheManager("data");
}
以上例子展示了如何使用Spring Cache对数据进行缓存,需要在配置类上添加@EnableCaching
注解,并在CacheManager
中指定相应的Cache名称。
在写操作时应该尽量避免一次性提交大量的数据,可以采用行级别的写操作,即将数据分批次进行保存,批量提交,可有效避免内存溢出和减少I/O操作。
@Bean
public ItemWriter<Data> writer(EntityManagerFactory entityManagerFactory) {
JpaItemWriter<Data> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManagerFactory);
writer.setPersistenceUnitName(PERSISTENCE_UNIT_NAME);
writer.setTransactionManager(transactionManager);
writer.setFlushBlockSize(5000);
return writer;
}
以上例子展示了如何使用Spring Batch提供的JpaItemWriter对数据进行批量保存,可以通过调整setFlushBlockSize()
方法中指定每批次提交的数据量。
在对大量数据进行处理时可以采用多进程并发处理的方式来提高数据处理速度,主要思想是将大数据集拆分成多个任务,将这些任务分别交给不同的进程处理,利用多核计算机的特性,同时处理多个任务,提高数据处理效率。
@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("async-writer");
}
@Bean
public SimpleJobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(taskExecutor());
jobLauncher.setJobRepository(jobRepository);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
以上例子展示了如何使用Spring Batch提供的SimpleAsyncTaskExecutor
对数据进行批处理任务的并发处理,进程会被自动分配到可用的CPU核心上执行任务。
在对大量数据进行处理时可以采用多线程并发处理的方式来提高数据处理速度,主要思想是将大数据集拆分成多个任务,利用Java多线程的特性,同时处理多个任务,提高数据处理效率。
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(25);
taskExecutor.setThreadNamePrefix("batch-thread-");
taskExecutor.initialize();
return taskExecutor;
}
@Bean
public SimpleAsyncTaskExecutor jobExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("job-thread");
executor.setConcurrencyLimit(3);
return executor;
}
@Bean
public SimpleJobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(jobExecutor());
jobLauncher.setJobRepository(jobRepository);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
以上例子展示了如何使用Spring Batch提供的ThreadPoolTaskExecutor
对数据进行批处理任务的并发处理,可以通过调整setCorePoolSize()
、setMaxPoolSize()
和setQueueCapacity()
方法来设定线程池的大小和控制线程数在多大范围内,并使用SimpleAsyncTaskExecutor
来设限同时执行的线程数量。
在进行批处理任务时应该确保输入数据的正确性和读写操作的有效性,通过在批处理启动前进行校验,可以大大提高数据准确性。
@Configuration
public class JobValidateListener {
@Autowired
private Validator validator;
@Autowired
private Job job;
@PostConstruct
public void init() {
JobValidationListener validationListener = new JobValidationListener();
validationListener.setValidator(validator);
job.registerJobExecutionListener(validationListener);
}
}
public class JobValidationListener implements JobExecutionListener {
private Validator validator;
public void setValidator(Validator validator) {
this.validator = validator;
}
@Override
public void beforeJob(JobExecution jobExecution) {
JobParameters parameters = jobExecution.getJobParameters();
BatchJobParameterValidator validator = new BatchJobParameterValidator(parameters);
validator.validate();
}
@Override
public void afterJob(JobExecution jobExecution) {
}
}
以上例子展示了如何使用Bean Validation校验批处理任务的输入参数,在beforeJob()
方法中调用自定义的BatchJobParameterValidator
进行输入参数的校验。
在进行批处理任务时应该对每次读取和写入的数据进行校验,以避免不合法的数据写入目标数据存储。
@Bean
public ItemReader<Data> reader() {
JpaPagingItemReader<Data> reader = new JpaPagingItemReader<>();
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize(1000);
reader.setQueryString(FIND_DATA_BY_NAME_AND_AGE);
Map<String, Object> parameters = new HashMap<>();
parameters.put("name", "test");
parameters.put("age", 20);
reader.setParameterValues(parameters);
reader.setValidationQuery("select count(*) from data where name=#{name} and age=#{age}");
return reader;
}
以上例子展示了如何使用JpaPagingItemReader
来读取数据,并在Reader中进行数据校验,通过设置setValidationQuery()
方法指定校验SQL语句。
在进行批处理任务时应该及时了解任务的执行情况和运行状态,可以使用Spring Boot Actuator进行监控。Spring Boot Actuator提供了丰富的监控指标和API,可以帮助开发人员实时监控批处理任务的运行状况。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
以上例子展示了如何在pom.xml文件中添加spring-boot-starter-actuator
依赖来启用Actuator功能。
在进行批处理任务时可以使用管理控制台来监控任务的执行情况和运行状态,通过在控制台上显示监控指标和任务日志,可以及时发现和处理任务中的异常情况。
@Configuration
public class BatchLoggingConfiguration {
@Bean
public BatchConfigurer configurer(DataSource dataSource) {
return new DefaultBatchConfigurer(dataSource) {
@Override
public PlatformTransactionManager getTransactionManager() {
return new ResourcelessTransactionManager();
}
@Override
public JobLauncher getJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Override
public JobRepository getJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(getDataSource());
factory.setTransactionManager(getTransactionManager());
factory.setIsolationLevelForCreate("ISOLATION_DEFAULT");
factory.afterPropertiesSet();
return factory.getObject();
}
};
}
}
以上例子展示了如何使用BatchConfigurer
来记录批处理任务的日志和监控信息,并在管理控制台上显示。可以在程序启动时,使用@EnableBatchProcessing
注解启用批处理功能。同时,可以使用@EnableScheduling
注解来自动启动定时任务。
在我们的项目中需要对用户的购物行为进行数据分析,并将结果存入数据库。由于数据规模较大,并且需要及时更新,因此我们决定采用批处理技术来处理这个问题。
在使用批处理框架进行数据处理时遇到了以下问题:
数据读取效率较低,导致批处理速度较慢;
处理过程中,遇到异常时无法及时发现和处理。
首先修改了数据源的配置使用连接池提高数据读取效率。
<bean id="dataSource"
class="com.alibaba.druid.pool.DruidDataSource"
init-method="init"
destroy-method="close">
<property name="driverClassName" value="${jdbc.driverClassName}" />
<property name="url" value="${jdbc.url}" />
<property name="username" value="${jdbc.username}" />
<property name="password" value="${jdbc.password}" />
<property name="initialSize" value="${druid.initialSize}" />
<property name="minIdle" value="${druid.minIdle}" />
<property name="maxActive" value="${druid.maxActive}" />
<property name="maxWait" value="${druid.maxWait}" />
<property name="timeBetweenEvictionRunsMillis" value="${druid.timeBetweenEvictionRunsMillis}" />
<property name="minEvictableIdleTimeMillis" value="${druid.minEvictableIdleTimeMillis}" />
<property name="validationQuery" value="${druid.validationQuery}" />
<property name="testWhileIdle" value="${druid.testWhileIdle}" />
<property name="testOnBorrow" value="${druid.testOnBorrow}" />
<property name="testOnReturn" value="${druid.testOnReturn}" />
<property name="poolPreparedStatements" value="${druid.poolPreparedStatements}" />
<property name="maxPoolPreparedStatementPerConnectionSize" value="${druid.maxPoolPreparedStatementPerConnectionSize}" />
<property name="filters" value="${druid.filters}" />
</bean>
以上代码展示了我们如何使用阿里巴巴的Druid连接池来优化数据读取效率。
决定采用分片策略来处理大批量数据将批处理任务拆分成多个小任务并发执行,提高处理效率。
@Configuration
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(step1())
.next(step2())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<User, User>chunk(10000)
.reader(reader(null))
.processor(processor())
.writer(writer(null))
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.<User, User>chunk(10000)
.reader(reader2(null))
.processor(processor())
.writer(writer2(null))
.taskExecutor(taskExecutor())
.build();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Bean
@StepScope
public JdbcCursorItemReader<User> reader(@Value("#{stepExecutionContext['fromId']}")Long fromId) {
JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("SELECT * FROM user WHERE id > ? AND id <= ?");
reader.setPreparedStatementSetter(new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
ps.setLong(1, fromId);
ps.setLong(2, fromId + 10000);
}
});
reader.setRowMapper(new BeanPropertyRowMapper<>(User.class));
return reader;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
@StepScope
public JdbcCursorItemReader<User> reader2(@Value("#{stepExecutionContext['fromId']}")Long fromId) {
JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("SELECT * FROM user WHERE id > ?");
reader.setPreparedStatementSetter(new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
ps.setLong(1, fromId + 10000);
}
});
reader.setRowMapper(new BeanPropertyRowMapper<>(User.class));
return reader;
}
@Bean
public ItemProcessor<User, User> processor() {
return new UserItemProcessor();
}
@Bean
public ItemWriter<User> writer(DataSource dataSource) {
JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("INSERT INTO user(name, age) VALUES(?, ?)");
writer.setItemPreparedStatementSetter(new UserPreparedStatementSetter());
return writer;
}
@Bean
public ItemWriter<User> writer2(DataSource dataSource) {
JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("UPDATE user SET age = ? WHERE name = ?");
writer.setItemPreparedStatementSetter(new UserUpdatePreparedStatementSetter());
return writer;
}
@Bean(destroyMethod="shutdown")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(30);
executor.initialize();
return executor;
}
@Bean
public StepExecutionListener stepExecutionListener() {
return new StepExecutionListenerSupport() {
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if(stepExecution.getSkipCount() > 0) {
return new ExitStatus("COMPLETED_WITH_SKIPS");
} else {
return ExitStatus.COMPLETED;
}
}
};
}
}
以上代码展示了如何使用分片批处理来处理大批量数据。通过将批处理任务拆分成多个小任务并发执行,提高了批处理效率。
使用监控和异常处理策略来发现并处理批处理任务中出现的异常情况。
@Configuration
public class BatchConfiguration {
...
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<User, User>chunk(10000)
.reader(reader(null))
.processor(processor())
.writer(writer(null))
.taskExecutor(taskExecutor())
.faultTolerant()
.skipPolicy(new UserSkipPolicy())
.retryPolicy(new SimpleRetryPolicy())
.retryLimit(3)
.noRollback(NullPointerException.class)
.listener(stepExecutionListener())
.build();
}
@Bean
public StepExecutionListener stepExecutionListener() {
return new StepExecutionListenerSupport() {
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if(stepExecution.getSkipCount() > 0) {
return new ExitStatus("COMPLETED_WITH_SKIPS");
} else {
return ExitStatus.COMPLETED;
}
}
};
}
@Bean
public SkipPolicy userSkipPolicy() {
return (Throwable t, int skipCount) -> {
if(t instanceof NullPointerException) {
return false;
} else {
return true;
}
};
}
@Bean
public RetryPolicy simpleRetryPolicy() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
return retryPolicy;
}
@Bean
public ItemWriter<User> writer(DataSource dataSource) {
CompositeItemWriter<User> writer = new CompositeItemWriter<>();
List<ItemWriter<? super User>> writers = new ArrayList<>();
writers.add(new UserItemWriter());
writers.add(new LogUserItemWriter());
writer.setDelegates(writers);
writer.afterPropertiesSet();
return writer;
}
public class UserItemWriter implements ItemWriter<User> {
@Override
public void write(List<? extends User> items) throws Exception {
for(User item : items) {
...
}
}
}
public class LogUserItemWriter implements ItemWriter<User> {
@Override
public void write(List<? extends User> items) throws Exception {
for(User item : items) {
...
}
}
@Override
public void onWriteError(Exception exception, List<? extends User> items) {
...
}
}
@Bean
public BatchLoggingConfiguration batchLoggingConfiguration() {
return new BatchLoggingConfiguration();
}
}
以上代码展示了如何使用监控和异常处理策略来发现并处理批处理任务中出现的异常情况。可以使用faultTolerant()
方法来配置容错处理策略,使用skipPolicy()
方法来配置跳过错误记录的策略,使用retryPolicy()
方法来配置重试策略。
使用noRollback()
方法来避免回滚操作。使用CompositeItemWriter
来编写异常处理策略,同时也可以结合实际业务需求来进行异常处理。在进行批处理任务时也可以使用Spring Boot Actuator进行监控。
我们使用以上优化措施进行测试后获得了如下测试结果:
通过本文的分析和实践发现在处理大批量数据时,使用批处理框架是非常有效的。但是在实际应用中还需要考虑如何优化批处理的效率和稳定性可以采用连接池、分片批处理、容错处理、异常处理等方法来优化批处理效率和稳定性。希望本文的内容能够对大家有所帮助。
来源:blog.csdn.net/u010349629/article/details/130673379后端专属技术群
构建高质量的技术交流社群,欢迎从事编程开发、技术招聘HR进群,也欢迎大家分享自己公司的内推信息,相互帮助,一起进步!
文明发言,以
交流技术
、职位内推
、行业探讨
为主
广告人士勿入,切勿轻信私聊,防止被骗
加我好友,拉你进群