Spring Batch in Spring MVC
Today I have implemented Spring Batch in my Spring MVC project for importing data from CSV file to database. There are plenty of tutorials on the internet but I met a lot of problems during the implementation, which were not mentioned in those tutorials. I will list here all the problems.
1. Exceptions
1-1. Problem 1
You can not use your custom autowired service inside ItemProcessor.
1-1-1. Solution
Try to use your SpringContextUtils.getBean
public class SkuDataProcessor implements ItemProcessor<CustomSku, ProductSku> { private static Logger logger = Logger.getLogger( SkuDataProcessor.class ); @Override public ProductSku process( final CustomSku inSku ) throws Exception { SkuImportService skuImportService = (SkuImportService) SpringContextUtils.getBean( "skuImportService" ); skuImportService.importSkuFromCSV( inSku ); |
@Service("skuImportService") public class SkuImportServiceImpl implements SkuImportService { private static Logger logger = Logger.getLogger( SkuImportServiceImpl.class ); @Autowired private BrandService brandService; ...... |
1-2. Problem 2
I have added all the data checking and importing in ItemProcessor, so I do not need to insert new record into the database(just do nothing here), so I tried to set a readable SQL in ItemWriter like this:
@Bean public ItemWriter<ProductSku> csvSkuWriter() { JdbcBatchItemWriter<ProductSku> writer = new JdbcBatchItemWriter<ProductSku>(); writer.setItemSqlParameterSourceProvider( new BeanPropertyItemSqlParameterSourceProvider<ProductSku>() ); writer.setSql( "select :id" ); writer.setDataSource( dataSource ); writer.afterPropertiesSet(); return writer; } @Bean public Step stepImport() { return stepBuilderFactory.get( "stepImport" ) .<CustomSku, ProductSku>chunk( 50 ) .reader( csvSkuReader() ) .processor( skuDataProcessor() ) .writer( csvSkuWriter() ) .build(); } |
org.springframework.dao.DataIntegrityViolationException: PreparedStatementCallback; SQL [select ?Can not issue executeUpdate() for SELECTs; nested exception is java.sql.BatchUpdateException: Can not issue executeUpdate() for SELECTs |
1-2-1. Solution:
Create a new class implement ItemWriter:
import java.util.List; import net.mobabel.nova.model.shop.ProductSku; import org.springframework.batch.item.ItemWriter; public class NoOpItemWriter implements ItemWriter<ProductSku> { public void write( List<? extends ProductSku> items ) throws Exception { //do nothing } } |
@Bean public NoOpItemWriter noOpWriter() { return new NoOpItemWriter(); } @Bean public Step stepImport() { return stepBuilderFactory.get( "stepImport" ) .<CustomSku, ProductSku>chunk( 50 ) .reader( csvSkuReader() ) .processor( skuDataProcessor() ) .writer( noOpWriter() ) .build(); } |
1-3. Problem 3
javax.validation.ValidationException: HV000183: Unable to initialize 'javax.el.ExpressionFactory'. Check that you have the EL dependencies on the classpath, or use ParameterMessageInterpolator instead |
I have imported spring-batch-core in maven, but this is not enough, you need the following two libraries.
<dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> </dependency> <dependency> <groupId>javax.el</groupId> <artifactId>javax.el-api</artifactId> </dependency> <dependency> <groupId>org.glassfish.web</groupId> <artifactId>javax.el</artifactId> </dependency> |
1-4. Problem 4
org.springframework.jdbc.BadSqlGrammarException: PreparedStatementCallback; bad SQL grammar [SELECT JOB_INSTANCE_ID, JOB_NAME from BATCH_JOB_INSTANCE where JOB_NAME = ? and JOB_KEY = ?]; nested exception is com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'nova_classic_local_core.batch_job_instance' doesn't exist |
1-4-1. Reason:
Spring Batch requires some tables in your database for job execution. But in Spring MVC those tables can not be generated automatically. In Spring Boot yes, if you enable spring.batch.initialize-schema=ALWAYS in your application.properties.
The meta table’s scripts are stored in the spring-batch-core.jar
, you need to create it manually.
I have tried this solution from https://www.mkyong.com/spring-batch/spring-batch-metadata-tables-are-not-created-automatically/ to create tables automatically but does not work.
This is my spring-batch.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" depends-on="dataSource"> <property name="dataSource" ref="dataSource" /> <property name="transactionManager" ref="transactionManager" /> <property name="databaseType" value="mysql" /> </bean> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <jdbc:initialize-database data-source="dataSource"> <jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql" /> <jdbc:script location="org/springframework/batch/core/schema-mysql.sql" /> </jdbc:initialize-database> </beans> |
Finally, I create the tables manually. Of course, you can implement this via flyway
1-5. Problem 5
When the job was executed successfully without any exceptions, and you run it again.
1-5-1. Reason:
You will get the “All steps already completed or no steps configured for this job.” message in table batch_job_execution and job quit with exit code NOOP, it might be that you are running the same job with exactly the same parameters.
Please add a timestamp in jobParameters to avoid this problem.
JobParameters jobParameters = new JobParametersBuilder() .addLong( "timestamp", System.currentTimeMillis() ) .toJobParameters(); JobExecution result = null; try { result = jobLauncher.run( skuToDatabaseJob, jobParameters ); } ...... |
ExitStatus es = result.getExitStatus(); if (es.getExitCode().equals( ExitStatus.COMPLETED.getExitCode() )){......} |
2. Code:
/* * Copyright (c) 2016-2017 Mobabel.net * * All rights reserved. */ package net.mobabel.nova.service.shop.custom.batch; import com.alibaba.druid.pool.DruidDataSource; import net.mobabel.nova.core.util.Logger; import net.mobabel.nova.model.shop.ProductSku; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.support.DatabaseType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.core.io.FileSystemResource; import org.springframework.stereotype.Component; import org.springframework.transaction.PlatformTransactionManager; @Component @EnableBatchProcessing public class SpringBatchProcessingImpl implements SpringBatchProcessing { private static Logger logger = Logger.getLogger( SpringBatchProcessingImpl.class ); @Value( "${ecommerce.config.csv_file}" ) private String fetchFile; @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired public DruidDataSource dataSource; @Autowired private JobLauncher jobLauncher; @Autowired private Job skuToDatabaseJob; @Bean public JobRepository jobRepository( DruidDataSource dataSource, PlatformTransactionManager transactionManager ) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource( dataSource ); jobRepositoryFactoryBean.setTransactionManager( transactionManager ); jobRepositoryFactoryBean.setDatabaseType( String.valueOf( DatabaseType.MYSQL ) ); jobRepositoryFactoryBean.setIsolationLevelForCreate( "ISOLATION_SERIALIZABLE" ); //for oracle //jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED"); jobRepositoryFactoryBean.afterPropertiesSet(); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher( DruidDataSource dataSource, PlatformTransactionManager transactionManager ) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository( jobRepository( dataSource, transactionManager ) ); return jobLauncher; } @Bean public FlatFileItemReader<CustomSku> csvSkuReader() { FlatFileItemReader<CustomSku> reader = new FlatFileItemReader<CustomSku>(); reader.setResource( new FileSystemResource( fetchFile ) ); reader.setLinesToSkip( 1 ); reader.setLineMapper( new DefaultLineMapper<CustomSku>() {{ setLineTokenizer( new DelimitedLineTokenizer() {{ setNames( new String[] { "refFromStore", "size", "color", "stock", "BrandName", "styleCode", "officePrice", "name", "skuFromStore", "weight" } ); setDelimiter( ";" ); }} ); setFieldSetMapper( new BeanWrapperFieldSetMapper<CustomSku>() {{ setTargetType( CustomSku.class ); }} ); }} ); return reader; } @Bean ItemProcessor<CustomSku, ProductSku> skuDataProcessor() { SkuDataProcessor skuDataProcessor = new SkuDataProcessor(); return skuDataProcessor; } @Bean public ItemWriter<ProductSku> csvSkuWriter() { JdbcBatchItemWriter<ProductSku> writer = new JdbcBatchItemWriter<ProductSku>(); writer.setItemSqlParameterSourceProvider( new BeanPropertyItemSqlParameterSourceProvider<ProductSku>() ); writer.setSql( "INSERT INTO sh_product_sku (id) VALUES (:id)" ); writer.setDataSource( dataSource ); writer.afterPropertiesSet(); return writer; } @Bean public NoOpItemWriter noOpWriter() { return new NoOpItemWriter(); } // end reader, writer, and processor // begin job info @Bean public Step stepImport() { return stepBuilderFactory.get( "stepImport" ) //Must be 1, otherwise every new item will repeat adding duplicated item, because in the check list, //the duplication checking will be executed in every chunksize frequency .<CustomSku, ProductSku>chunk( 1 ) .reader( csvSkuReader() ) .processor( skuDataProcessor() ) .writer( noOpWriter() ) .build(); } @Bean Job skuToDatabaseJob( SkuJobListener listener ) { return jobBuilderFactory.get( "skuToDatabaseJob" ) .incrementer( new RunIdIncrementer() ) .listener( listener ) .flow( stepImport() ) .end() .build(); } // end job info @Override public void run() throws Exception { JobParameters jobParameters = new JobParametersBuilder() .addLong( "timestamp", System.currentTimeMillis() ) .toJobParameters(); JobExecution result = null; try { result = jobLauncher.run( skuToDatabaseJob, jobParameters ); } catch (JobExecutionAlreadyRunningException e) { logger.error( "", e ); } catch (JobRestartException e) { logger.error( "", e ); } catch (JobInstanceAlreadyCompleteException e) { logger.error( "", e ); } catch (JobParametersInvalidException e) { logger.error( "", e ); } ExitStatus es = result.getExitStatus(); if (es.getExitCode().equals( ExitStatus.COMPLETED.getExitCode() )) { logger.info( "sku database refresh job finished" ); } else if (es.getExitCode().equals( ExitStatus.NOOP.getExitCode() )) { logger.warn( "sku database refresh job noop" ); } else { logger.error( "sku database refresh job failed" ); } } } |
/* * Copyright (c) 2016-2018 Mobabel.net * * All rights reserved. */ package net.mobabel.nova.service.shop.custom.batch; import net.mobabel.nova.core.util.Logger; import net.mobabel.nova.model.shop.ProductSku; import net.mobabel.nova.service.shop.custom.SkuImportService; import net.mobabel.nova.service.utils.SpringContextUtils; import org.springframework.batch.item.ItemProcessor; public class SkuDataProcessor implements ItemProcessor<CustomSku, ProductSku> { private static Logger logger = Logger.getLogger( SkuDataProcessor.class ); @Override public ProductSku process( final CustomSku inSku ) throws Exception { //This is your custom service to check and import records into database SkuImportService skuImportService = (SkuImportService) SpringContextUtils.getBean( "skuImportService" ); skuImportService.importSkuFromCSV( inSku ); //do nothing here final ProductSku outSku = new ProductSku(); outSku.setName( inSku.getName() ); logger.info( "Sku converting (" + inSku + ") into (" + outSku + ")" ); return outSku; } } |
