本文主要研究一下jpa的batch操作的实现
hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/internal/SessionImpl.java
@Override
public void persist(String entityName, Object object) throws HibernateException {
firePersist( new PersistEvent( entityName, object, this ) );
}
private void firePersist(PersistEvent event) {
errorIfClosed();
checkTransactionSynchStatus();
checkNoUnresolvedActionsBeforeOperation();
for ( PersistEventListener listener : listeners( EventType.PERSIST ) ) {
listener.onPersist( event );
}
checkNoUnresolvedActionsAfterOperation();
}
触发了persist事件
hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/internal/SessionImpl.java
@Override
public void flush() throws HibernateException {
errorIfClosed();
checkTransactionSynchStatus();
if ( persistenceContext.getCascadeLevel() > 0 ) {
throw new HibernateException( "Flush during cascade is dangerous" );
}
FlushEvent flushEvent = new FlushEvent( this );
for ( FlushEventListener listener : listeners( EventType.FLUSH ) ) {
listener.onFlush( flushEvent );
}
delayedAfterCompletion();
}
hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/event/internal/DefaultFlushEventListener.java
/** Handle the given flush event.
*
* @param event The flush event to be handled.
* @throws HibernateException
*/
public void onFlush(FlushEvent event) throws HibernateException {
final EventSource source = event.getSession();
final PersistenceContext persistenceContext = source.getPersistenceContext();
if ( persistenceContext.getNumberOfManagedEntities() > 0 ||
persistenceContext.getCollectionEntries().size() > 0 ) {
try {
source.getEventListenerManager().flushStart();
flushEverythingToExecutions( event );
performExecutions( source );
postFlush( source );
}
finally {
source.getEventListenerManager().flushEnd(
event.getNumberOfEntitiesProcessed(),
event.getNumberOfCollectionsProcessed()
);
}
postPostFlush( source );
if ( source.getFactory().getStatistics().isStatisticsEnabled() ) {
source.getFactory().getStatisticsImplementor().flush();
}
}
}
这里调用了performExecutions
hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/event/internal/AbstractFlushingEventListener.java
/**
* Execute all SQL (and second-level cache updates) in a special order so that foreign-key constraints cannot
* be violated: <ol>
* <li> Inserts, in the order they were performed
* <li> Updates
* <li> Deletion of collection elements
* <li> Insertion of collection elements
* <li> Deletes, in the order they were performed
* </ol>
*
* @param session The session being flushed
*/
protected void performExecutions(EventSource session) {
LOG.trace( "Executing flush" );
// IMPL NOTE : here we alter the flushing flag of the persistence context to allow
// during-flush callbacks more leniency in regards to initializing proxies and
// lazy collections during their processing.
// For more information, see HHH-2763
try {
session.getJdbcCoordinator().flushBeginning();
session.getPersistenceContext().setFlushing( true );
// we need to lock the collection caches before executing entity inserts/updates in order to
// account for bi-directional associations
session.getActionQueue().prepareActions();
session.getActionQueue().executeActions();
}
finally {
session.getPersistenceContext().setFlushing( false );
session.getJdbcCoordinator().flushEnding();
}
}
这里调用了session.getActionQueue().executeActions();
hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/engine/spi/ActionQueue.java
/**
* Perform all currently queued actions.
*
* @throws HibernateException error executing queued actions.
*/
public void executeActions() throws HibernateException {
if ( hasUnresolvedEntityInsertActions() ) {
throw new IllegalStateException( "About to execute actions, but there are unresolved entity insert actions." );
}
for ( ListProvider listProvider : EXECUTABLE_LISTS_MAP.values() ) {
ExecutableList<?> l = listProvider.get( this );
if ( l != null && !l.isEmpty() ) {
executeActions( l );
}
}
}
/**
* Perform {@link org.hibernate.action.spi.Executable#execute()} on each element of the list
*
* @param list The list of Executable elements to be performed
*
* @throws HibernateException
*/
private <E extends Executable & Comparable<?> & Serializable> void executeActions(ExecutableList<E> list) throws HibernateException {
// todo : consider ways to improve the double iteration of Executables here:
// 1) we explicitly iterate list here to perform Executable#execute()
// 2) ExecutableList#getQuerySpaces also iterates the Executables to collect query spaces.
try {
for ( E e : list ) {
try {
e.execute();
}
finally {
if( e.getBeforeTransactionCompletionProcess() != null ) {
if( beforeTransactionProcesses == null ) {
beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session );
}
beforeTransactionProcesses.register(e.getBeforeTransactionCompletionProcess());
}
if( e.getAfterTransactionCompletionProcess() != null ) {
if( afterTransactionProcesses == null ) {
afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session );
}
afterTransactionProcesses.register(e.getAfterTransactionCompletionProcess());
}
}
}
}
finally {
if ( session.getFactory().getSessionFactoryOptions().isQueryCacheEnabled() ) {
// Strictly speaking, only a subset of the list may have been processed if a RuntimeException occurs.
// We still invalidate all spaces. I don't see this as a big deal - after all, RuntimeExceptions are
// unexpected.
Set<Serializable> propertySpaces = list.getQuerySpaces();
invalidateSpaces( propertySpaces.toArray( new Serializable[propertySpaces.size()] ) );
}
}
list.clear();
session.getJdbcCoordinator().executeBatch();
}
这里在for循环里头调用了e.execute();同时在循环之后,finally之后调用了session.getJdbcCoordinator().executeBatch(); 正符合了jdbc statement的executeBatch的调用模式,可以预见e.execute()执行了addBatch的操作,同时在达到一个batch的时候会先调用executeBatch()
hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/action/internal/EntityInsertAction.java
@Override
public void execute() throws HibernateException {
nullifyTransientReferencesIfNotAlready();
final EntityPersister persister = getPersister();
final SessionImplementor session = getSession();
final Object instance = getInstance();
final Serializable id = getId();
final boolean veto = preInsert();
// Don't need to lock the cache here, since if someone
// else inserted the same pk first, the insert would fail
if ( !veto ) {
persister.insert( id, getState(), instance, session );
PersistenceContext persistenceContext = session.getPersistenceContext();
final EntityEntry entry = persistenceContext.getEntry( instance );
if ( entry == null ) {
throw new AssertionFailure( "possible non-threadsafe access to session" );
}
entry.postInsert( getState() );
if ( persister.hasInsertGeneratedProperties() ) {
persister.processInsertGeneratedProperties( id, instance, getState(), session );
if ( persister.isVersionPropertyGenerated() ) {
version = Versioning.getVersion( getState(), persister );
}
entry.postUpdate( instance, getState(), version );
}
persistenceContext.registerInsertedKey( persister, getId() );
}
final SessionFactoryImplementor factory = session.getFactory();
if ( isCachePutEnabled( persister, session ) ) {
final CacheEntry ce = persister.buildCacheEntry(
instance,
getState(),
version,
session
);
cacheEntry = persister.getCacheEntryStructure().structure( ce );
final EntityRegionAccessStrategy cache = persister.getCacheAccessStrategy();
final Object ck = cache.generateCacheKey( id, persister, factory, session.getTenantIdentifier() );
final boolean put = cacheInsert( persister, ck );
if ( put && factory.getStatistics().isStatisticsEnabled() ) {
factory.getStatisticsImplementor().secondLevelCachePut( cache.getRegion().getName() );
}
}
handleNaturalIdPostSaveNotifications( id );
postInsert();
if ( factory.getStatistics().isStatisticsEnabled() && !veto ) {
factory.getStatisticsImplementor().insertEntity( getPersister().getEntityName() );
}
markExecuted();
}
调用了persister的insert方法
hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/persister/entity/AbstractEntityPersister.java
public void insert(Serializable id, Object[] fields, Object object, SessionImplementor session) {
// apply any pre-insert in-memory value generation
preInsertInMemoryValueGeneration( fields, object, session );
final int span = getTableSpan();
if ( entityMetamodel.isDynamicInsert() ) {
// For the case of dynamic-insert="true", we need to generate the INSERT SQL
boolean[] notNull = getPropertiesToInsert( fields );
for ( int j = 0; j < span; j++ ) {
insert( id, fields, notNull, j, generateInsertString( notNull, j ), object, session );
}
}
else {
// For the case of dynamic-insert="false", use the static SQL
for ( int j = 0; j < span; j++ ) {
insert( id, fields, getPropertyInsertability(), j, getSQLInsertStrings()[j], object, session );
}
}
}
/**
* Perform an SQL INSERT.
* <p/>
* This for is used for all non-root tables as well as the root table
* in cases where the identifier value is known before the insert occurs.
*/
protected void insert(
final Serializable id,
final Object[] fields,
final boolean[] notNull,
final int j,
final String sql,
final Object object,
final SessionImplementor session) throws HibernateException {
if ( isInverseTable( j ) ) {
return;
}
//note: it is conceptually possible that a UserType could map null to
// a non-null value, so the following is arguable:
if ( isNullableTable( j ) && isAllNull( fields, j ) ) {
return;
}
if ( LOG.isTraceEnabled() ) {
LOG.tracev( "Inserting entity: {0}", MessageHelper.infoString( this, id, getFactory() ) );
if ( j == 0 && isVersioned() ) {
LOG.tracev( "Version: {0}", Versioning.getVersion( fields, this ) );
}
}
// TODO : shouldn't inserts be Expectations.NONE?
final Expectation expectation = Expectations.appropriateExpectation( insertResultCheckStyles[j] );
// we can't batch joined inserts, *especially* not if it is an identity insert;
// nor can we batch statements where the expectation is based on an output param
final boolean useBatch = j == 0 && expectation.canBeBatched();
if ( useBatch && inserBatchKey == null ) {
inserBatchKey = new BasicBatchKey(
getEntityName() + "#INSERT",
expectation
);
}
final boolean callable = isInsertCallable( j );
try {
// Render the SQL query
final PreparedStatement insert;
if ( useBatch ) {
insert = session
.getJdbcCoordinator()
.getBatch( inserBatchKey )
.getBatchStatement( sql, callable );
}
else {
insert = session
.getJdbcCoordinator()
.getStatementPreparer()
.prepareStatement( sql, callable );
}
try {
int index = 1;
index += expectation.prepare( insert );
// Write the values of fields onto the prepared statement - we MUST use the state at the time the
// insert was issued (cos of foreign key constraints). Not necessarily the object's current state
dehydrate( id, fields, null, notNull, propertyColumnInsertable, j, insert, session, index, false );
if ( useBatch ) {
session.getJdbcCoordinator().getBatch( inserBatchKey ).addToBatch();
}
else {
expectation.verifyOutcome(
session.getJdbcCoordinator()
.getResultSetReturn()
.executeUpdate( insert ), insert, -1
);
}
}
catch (SQLException e) {
if ( useBatch ) {
session.getJdbcCoordinator().abortBatch();
}
throw e;
}
finally {
if ( !useBatch ) {
session.getJdbcCoordinator().getResourceRegistry().release( insert );
session.getJdbcCoordinator().afterStatementExecution();
}
}
}
catch (SQLException e) {
throw getFactory().getSQLExceptionHelper().convert(
e,
"could not insert: " + MessageHelper.infoString( this ),
sql
);
}
}
useBatch为true,调用session.getJdbcCoordinator().getBatch( inserBatchKey ).addToBatch() 这里的insertBatchKey为com.example.domain.DemoUser#INSERT
hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/engine/jdbc/internal/JdbcCoordinatorImpl.java
@Override
public Batch getBatch(BatchKey key) {
if ( currentBatch != null ) {
if ( currentBatch.getKey().equals( key ) ) {
return currentBatch;
}
else {
currentBatch.execute();
currentBatch.release();
}
}
currentBatch = batchBuilder().buildBatch( key, this );
return currentBatch;
}
hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/engine/jdbc/batch/internal/BatchingBatch.java
@Override
public void addToBatch() {
try {
currentStatement.addBatch();
}
catch ( SQLException e ) {
LOG.debugf( "SQLException escaped proxy", e );
throw sqlExceptionHelper().convert( e, "could not perform addBatch", currentStatementSql );
}
statementPosition++;
if ( statementPosition >= getKey().getBatchedStatementCount() ) {
batchPosition++;
if ( batchPosition == batchSize ) {
notifyObserversImplicitExecution();
performExecution();
batchPosition = 0;
batchExecuted = true;
}
statementPosition = 0;
}
}
这里在批量够的话,会执行performExecution
private void performExecution() {
LOG.debugf( "Executing batch size: %s", batchPosition );
try {
for ( Map.Entry<String,PreparedStatement> entry : getStatements().entrySet() ) {
try {
final PreparedStatement statement = entry.getValue();
final int[] rowCounts;
try {
getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchStart();
rowCounts = statement.executeBatch();
}
finally {
getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchEnd();
}
checkRowCounts( rowCounts, statement );
}
catch ( SQLException e ) {
abortBatch();
throw sqlExceptionHelper().convert( e, "could not execute batch", entry.getKey() );
}
}
}
catch ( RuntimeException re ) {
LOG.unableToExecuteBatch( re.getMessage() );
throw re;
}
finally {
batchPosition = 0;
}
}
可以看到这里调用了statement.executeBatch()
具体模式如下
public void jdbcBatchOperationTemplate(List<Employee> data){
String sql = "insert into employee (name, city, phone) values (?, ?, ?)";
Connection conn = null;
PreparedStatement pstmt = null;
final int batchSize = 1000;
int count = 0;
try{
conn = dataSource.getConnection();
pstmt = conn.prepareStatement(sql);
for (Employee item: data) {
pstmt.setString(1,item.getName());
pstmt.setString(2,item.getCity());
pstmt.setString(3,item.getPhone());
//添加到batch
pstmt.addBatch();
//小批量提交,避免OOM
if(++count % batchSize == 0) {
pstmt.executeBatch();
}
}
pstmt.executeBatch(); //提交剩余的数据
}catch (SQLException e){
e.printStackTrace();
}finally {
DbUtils.closeQuietly(pstmt);
DbUtils.closeQuietly(conn);
}
}
唯一的区别是jpa是在save的时候是将所有数据都提交到action queue,最后再flush的时候触发类似上面的addBatch和executeBatch操作。 对于使用@GeneratedValue(strategy = GenerationType.AUTO),在每次save添加到action queue之前都会调用数据库获取id。也就是假设要批量insert1000条数据,则save放到action queue之前会调用1000次获取他们的id,然后最后flush的时候,再将action queue的1000条数据,分批batch执行,相当于上面模板的方法data参数是1000个有id的Employee对象。
select
nextval ('hibernate_sequence')