Как обрабатывать логически связанные строки после ItemReader в SpringBatch?

Сценарий

Чтобы упростить задачу, предположим, что у меня есть ItemReader, который возвращает мне 25 строк.

  1. Первые 10 строк принадлежат студенту A

  2. Следующие 5 принадлежат студенту B

  3. и 10 оставшихся принадлежат студенту C

Я хочу объединить их вместе логически, скажем, по studentId и выровнять их чтобы в итоге получить по одной строке на студента.

Проблема

Если я правильно понимаю, установка интервала фиксации на 5 сделает следующее:

  1. Отправит 5 строк в процессор (который объединит их или выполнит любую бизнес-логику, которую я ему скажу).
  2. После Обработчик запишет 5 строк.
  3. Затем он сделает это снова для следующих 5 строк и так далее.

Если это так, то для следующих пяти я должен буду проверить уже написанные, вывести их, объединить с теми, которые я сейчас обрабатываю, и написать их снова.

Лично мне это не нравится.

  1. Как лучше всего справиться с подобной ситуацией в Spring Batch?

Альтернатива

Иногда я чувствую, что гораздо проще написать обычную основную программу Spring JDBC, и тогда у меня будет полный контроль над тем, что я хочу сделать. Однако я хотел воспользоваться преимуществами мониторинга состояния задания в репозитории заданий, возможностью перезапуска, пропуска, задания и шага слушателей.....

Мой код Spring Batch

Мой module-context.xml

   <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

    <description>Example job to get you started. It provides a skeleton for a typical batch application.</description>

    <batch:job id="job1">
        <batch:step id="step1"  >           
            <batch:tasklet transaction-manager="transactionManager" start-limit="100" >             
                 <batch:chunk reader="attendanceItemReader"
                              processor="attendanceProcessor" 
                              writer="attendanceItemWriter" 
                              commit-interval="10" 
                 />

            </batch:tasklet>
        </batch:step>
    </batch:job> 

    <bean id="attendanceItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> 
        <property name="dataSource">
            <ref bean="sourceDataSource"/>
        </property> 
        <property name="sql"                                                    
                  value="select s.student_name ,s.student_id ,fas.attendance_days ,fas.attendance_value from K12INTEL_DW.ftbl_attendance_stumonabssum fas inner join k12intel_dw.dtbl_students s on fas.student_key = s.student_key inner join K12INTEL_DW.dtbl_schools ds on fas.school_key = ds.school_key inner join k12intel_dw.dtbl_school_dates dsd on fas.school_dates_key = dsd.school_dates_key where dsd.rolling_local_school_yr_number = 0 and ds.school_code = ? and s.student_activity_indicator = 'Active' and fas.LOCAL_GRADING_PERIOD = 'G1' and s.student_current_grade_level = 'Gr 9' order by s.student_id"/>
        <property name="preparedStatementSetter" ref="attendanceStatementSetter"/>           
        <property name="rowMapper" ref="attendanceRowMapper"/> 
    </bean> 

    <bean id="attendanceStatementSetter" class="edu.kdc.visioncards.preparedstatements.AttendanceStatementSetter"/>

    <bean id="attendanceRowMapper" class="edu.kdc.visioncards.rowmapper.AttendanceRowMapper"/>

    <bean id="attendanceProcessor" class="edu.kdc.visioncards.AttendanceProcessor" />  

    <bean id="attendanceItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"> 
        <property name="resource" value="file:target/outputs/passthrough.txt"/> 
        <property name="lineAggregator"> 
            <bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" /> 
        </property> 
    </bean> 

</beans>

Мои вспомогательные классы для Reader.

PreparedStatementSetter

package edu.kdc.visioncards.preparedstatements;

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.springframework.jdbc.core.PreparedStatementSetter;

public class AttendanceStatementSetter implements PreparedStatementSetter {

    public void setValues(PreparedStatement ps) throws SQLException {

        ps.setInt(1, 7);

    }

}

и RowMapper

package edu.kdc.visioncards.rowmapper;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

import edu.kdc.visioncards.dto.AttendanceDTO;

public class AttendanceRowMapper<T> implements RowMapper<AttendanceDTO> {

    public static final String STUDENT_NAME = "STUDENT_NAME";
    public static final String STUDENT_ID = "STUDENT_ID";
    public static final String ATTENDANCE_DAYS = "ATTENDANCE_DAYS";
    public static final String ATTENDANCE_VALUE = "ATTENDANCE_VALUE";

    public AttendanceDTO mapRow(ResultSet rs, int rowNum) throws SQLException {

        AttendanceDTO dto = new AttendanceDTO();
        dto.setStudentId(rs.getString(STUDENT_ID));
        dto.setStudentName(rs.getString(STUDENT_NAME));
        dto.setAttDays(rs.getInt(ATTENDANCE_DAYS));
        dto.setAttValue(rs.getInt(ATTENDANCE_VALUE));

        return dto;
    }
}

Мой процессор

package edu.kdc.visioncards;

import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.item.ItemProcessor;

import edu.kdc.visioncards.dto.AttendanceDTO;

public class AttendanceProcessor implements ItemProcessor<AttendanceDTO, Map<Integer, AttendanceDTO>> {

    private Map<Integer, AttendanceDTO> map = new HashMap<Integer, AttendanceDTO>();

    public Map<Integer, AttendanceDTO> process(AttendanceDTO dto) throws Exception {

        if(map.containsKey(new Integer(dto.getStudentId()))){

            AttendanceDTO attDto = (AttendanceDTO)map.get(new Integer(dto.getStudentId()));
            attDto.setAttDays(attDto.getAttDays() + dto.getAttDays());
            attDto.setAttValue(attDto.getAttValue() + dto.getAttValue());

        }else{
            map.put(new Integer(dto.getStudentId()), dto);
        }
        return map;
    }

}

Мои проблемы из кода выше

В процессоре я создаю HashMap и по мере обработки строк я проверяю, есть ли у меня уже этот студент в карте, если его там нет, я добавляю его. Если он там уже есть, я беру значения, которые меня интересуют, и добавляю их к строке, которую я сейчас обрабатываю.

После этого Spring Batch Framework пишет в файл согласно моей конфигурации

Мой вопрос заключается в следующем:

  1. Я не хочу, чтобы он отправлялся на запись. Я хочу обработать все оставшиеся строки. Как мне сохранить эту карту, которую я создал, в памяти для следующего набора строк, которые должны пройти через этот же процессор? Каждый раз, когда строка обрабатывается через AttendanceProcessor, инициализируется карта. Должен ли я поместить инициализацию Map в статический блок?
18
задан Viriato 17 January 2012 в 13:03
поделиться