前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊jpa的batch操作的实现

聊聊jpa的batch操作的实现

作者头像
code4it
发布2018-09-17 15:50:04
1.2K0
发布2018-09-17 15:50:04
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下jpa的batch操作的实现

save方法

SessionImpl.persist

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/internal/SessionImpl.java

代码语言:javascript
复制
    @Override
    public void persist(String entityName, Object object) throws HibernateException {
        firePersist( new PersistEvent( entityName, object, this ) );
    }

    private void firePersist(PersistEvent event) {
        errorIfClosed();
        checkTransactionSynchStatus();
        checkNoUnresolvedActionsBeforeOperation();
        for ( PersistEventListener listener : listeners( EventType.PERSIST ) ) {
            listener.onPersist( event );
        }
        checkNoUnresolvedActionsAfterOperation();
    }

触发了persist事件

flush方法

SessionImpl.flush

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/internal/SessionImpl.java

代码语言:javascript
复制
@Override
    public void flush() throws HibernateException {
        errorIfClosed();
        checkTransactionSynchStatus();
        if ( persistenceContext.getCascadeLevel() > 0 ) {
            throw new HibernateException( "Flush during cascade is dangerous" );
        }
        FlushEvent flushEvent = new FlushEvent( this );
        for ( FlushEventListener listener : listeners( EventType.FLUSH ) ) {
            listener.onFlush( flushEvent );
        }
        delayedAfterCompletion();
    }

DefaultFlushEventListener.onFlush

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/event/internal/DefaultFlushEventListener.java

代码语言:javascript
复制
/** Handle the given flush event.
     *
     * @param event The flush event to be handled.
     * @throws HibernateException
     */
    public void onFlush(FlushEvent event) throws HibernateException {
        final EventSource source = event.getSession();
        final PersistenceContext persistenceContext = source.getPersistenceContext();

        if ( persistenceContext.getNumberOfManagedEntities() > 0 ||
                persistenceContext.getCollectionEntries().size() > 0 ) {

            try {
                source.getEventListenerManager().flushStart();

                flushEverythingToExecutions( event );
                performExecutions( source );
                postFlush( source );
            }
            finally {
                source.getEventListenerManager().flushEnd(
                        event.getNumberOfEntitiesProcessed(),
                        event.getNumberOfCollectionsProcessed()
                );
            }

            postPostFlush( source );

            if ( source.getFactory().getStatistics().isStatisticsEnabled() ) {
                source.getFactory().getStatisticsImplementor().flush();
            }
        }
    }

这里调用了performExecutions

AbstractFlushingEventListener.performExecutions

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/event/internal/AbstractFlushingEventListener.java

代码语言:javascript
复制
    /**
     * Execute all SQL (and second-level cache updates) in a special order so that foreign-key constraints cannot
     * be violated: <ol>
     * <li> Inserts, in the order they were performed
     * <li> Updates
     * <li> Deletion of collection elements
     * <li> Insertion of collection elements
     * <li> Deletes, in the order they were performed
     * </ol>
     *
     * @param session The session being flushed
     */
    protected void performExecutions(EventSource session) {
        LOG.trace( "Executing flush" );

        // IMPL NOTE : here we alter the flushing flag of the persistence context to allow
        //        during-flush callbacks more leniency in regards to initializing proxies and
        //        lazy collections during their processing.
        // For more information, see HHH-2763
        try {
            session.getJdbcCoordinator().flushBeginning();
            session.getPersistenceContext().setFlushing( true );
            // we need to lock the collection caches before executing entity inserts/updates in order to
            // account for bi-directional associations
            session.getActionQueue().prepareActions();
            session.getActionQueue().executeActions();
        }
        finally {
            session.getPersistenceContext().setFlushing( false );
            session.getJdbcCoordinator().flushEnding();
        }
    }

这里调用了session.getActionQueue().executeActions();

ActionQueue.executeActions

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/engine/spi/ActionQueue.java

代码语言:javascript
复制
    /**
     * Perform all currently queued actions.
     * 
     * @throws HibernateException error executing queued actions.
     */
    public void executeActions() throws HibernateException {
        if ( hasUnresolvedEntityInsertActions() ) {
            throw new IllegalStateException( "About to execute actions, but there are unresolved entity insert actions." );
        }

        for ( ListProvider listProvider : EXECUTABLE_LISTS_MAP.values() ) {
            ExecutableList<?> l = listProvider.get( this );
            if ( l != null && !l.isEmpty() ) {
                executeActions( l );
            }
        }
    }

        /**
     * Perform {@link org.hibernate.action.spi.Executable#execute()} on each element of the list
     * 
     * @param list The list of Executable elements to be performed
     *
     * @throws HibernateException
     */
    private <E extends Executable & Comparable<?> & Serializable> void executeActions(ExecutableList<E> list) throws HibernateException {
        // todo : consider ways to improve the double iteration of Executables here:
        //        1) we explicitly iterate list here to perform Executable#execute()
        //        2) ExecutableList#getQuerySpaces also iterates the Executables to collect query spaces.
        try {
            for ( E e : list ) {
                try {
                    e.execute();
                }
                finally {
                    if( e.getBeforeTransactionCompletionProcess() != null ) {
                        if( beforeTransactionProcesses == null ) {
                            beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session );
                        }
                        beforeTransactionProcesses.register(e.getBeforeTransactionCompletionProcess());
                    }
                    if( e.getAfterTransactionCompletionProcess() != null ) {
                        if( afterTransactionProcesses == null ) {
                            afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session );
                        }
                        afterTransactionProcesses.register(e.getAfterTransactionCompletionProcess());
                    }
                }
            }
        }
        finally {
            if ( session.getFactory().getSessionFactoryOptions().isQueryCacheEnabled() ) {
                // Strictly speaking, only a subset of the list may have been processed if a RuntimeException occurs.
                // We still invalidate all spaces. I don't see this as a big deal - after all, RuntimeExceptions are
                // unexpected.
                Set<Serializable> propertySpaces = list.getQuerySpaces();
                invalidateSpaces( propertySpaces.toArray( new Serializable[propertySpaces.size()] ) );
            }
        }

        list.clear();
        session.getJdbcCoordinator().executeBatch();
    }

这里在for循环里头调用了e.execute();同时在循环之后,finally之后调用了session.getJdbcCoordinator().executeBatch(); 正符合了jdbc statement的executeBatch的调用模式,可以预见e.execute()执行了addBatch的操作,同时在达到一个batch的时候会先调用executeBatch()

EntityInsertAction.execute

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/action/internal/EntityInsertAction.java

代码语言:javascript
复制
    @Override
    public void execute() throws HibernateException {
        nullifyTransientReferencesIfNotAlready();

        final EntityPersister persister = getPersister();
        final SessionImplementor session = getSession();
        final Object instance = getInstance();
        final Serializable id = getId();

        final boolean veto = preInsert();

        // Don't need to lock the cache here, since if someone
        // else inserted the same pk first, the insert would fail

        if ( !veto ) {

            persister.insert( id, getState(), instance, session );
            PersistenceContext persistenceContext = session.getPersistenceContext();
            final EntityEntry entry = persistenceContext.getEntry( instance );
            if ( entry == null ) {
                throw new AssertionFailure( "possible non-threadsafe access to session" );
            }

            entry.postInsert( getState() );

            if ( persister.hasInsertGeneratedProperties() ) {
                persister.processInsertGeneratedProperties( id, instance, getState(), session );
                if ( persister.isVersionPropertyGenerated() ) {
                    version = Versioning.getVersion( getState(), persister );
                }
                entry.postUpdate( instance, getState(), version );
            }

            persistenceContext.registerInsertedKey( persister, getId() );
        }

        final SessionFactoryImplementor factory = session.getFactory();

        if ( isCachePutEnabled( persister, session ) ) {
            final CacheEntry ce = persister.buildCacheEntry(
                    instance,
                    getState(),
                    version,
                    session
            );
            cacheEntry = persister.getCacheEntryStructure().structure( ce );
            final EntityRegionAccessStrategy cache = persister.getCacheAccessStrategy();
            final Object ck = cache.generateCacheKey( id, persister, factory, session.getTenantIdentifier() );

            final boolean put = cacheInsert( persister, ck );

            if ( put && factory.getStatistics().isStatisticsEnabled() ) {
                factory.getStatisticsImplementor().secondLevelCachePut( cache.getRegion().getName() );
            }
        }

        handleNaturalIdPostSaveNotifications( id );

        postInsert();

        if ( factory.getStatistics().isStatisticsEnabled() && !veto ) {
            factory.getStatisticsImplementor().insertEntity( getPersister().getEntityName() );
        }

        markExecuted();
    }

调用了persister的insert方法

AbstractEntityPersister.insert

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/persister/entity/AbstractEntityPersister.java

代码语言:javascript
复制
    public void insert(Serializable id, Object[] fields, Object object, SessionImplementor session) {
        // apply any pre-insert in-memory value generation
        preInsertInMemoryValueGeneration( fields, object, session );

        final int span = getTableSpan();
        if ( entityMetamodel.isDynamicInsert() ) {
            // For the case of dynamic-insert="true", we need to generate the INSERT SQL
            boolean[] notNull = getPropertiesToInsert( fields );
            for ( int j = 0; j < span; j++ ) {
                insert( id, fields, notNull, j, generateInsertString( notNull, j ), object, session );
            }
        }
        else {
            // For the case of dynamic-insert="false", use the static SQL
            for ( int j = 0; j < span; j++ ) {
                insert( id, fields, getPropertyInsertability(), j, getSQLInsertStrings()[j], object, session );
            }
        }
    }

insert

代码语言:javascript
复制
    /**
     * Perform an SQL INSERT.
     * <p/>
     * This for is used for all non-root tables as well as the root table
     * in cases where the identifier value is known before the insert occurs.
     */
    protected void insert(
            final Serializable id,
            final Object[] fields,
            final boolean[] notNull,
            final int j,
            final String sql,
            final Object object,
            final SessionImplementor session) throws HibernateException {

        if ( isInverseTable( j ) ) {
            return;
        }

        //note: it is conceptually possible that a UserType could map null to
        //      a non-null value, so the following is arguable:
        if ( isNullableTable( j ) && isAllNull( fields, j ) ) {
            return;
        }

        if ( LOG.isTraceEnabled() ) {
            LOG.tracev( "Inserting entity: {0}", MessageHelper.infoString( this, id, getFactory() ) );
            if ( j == 0 && isVersioned() ) {
                LOG.tracev( "Version: {0}", Versioning.getVersion( fields, this ) );
            }
        }

        // TODO : shouldn't inserts be Expectations.NONE?
        final Expectation expectation = Expectations.appropriateExpectation( insertResultCheckStyles[j] );
        // we can't batch joined inserts, *especially* not if it is an identity insert;
        // nor can we batch statements where the expectation is based on an output param
        final boolean useBatch = j == 0 && expectation.canBeBatched();
        if ( useBatch && inserBatchKey == null ) {
            inserBatchKey = new BasicBatchKey(
                    getEntityName() + "#INSERT",
                    expectation
            );
        }
        final boolean callable = isInsertCallable( j );

        try {
            // Render the SQL query
            final PreparedStatement insert;
            if ( useBatch ) {
                insert = session
                        .getJdbcCoordinator()
                        .getBatch( inserBatchKey )
                        .getBatchStatement( sql, callable );
            }
            else {
                insert = session
                        .getJdbcCoordinator()
                        .getStatementPreparer()
                        .prepareStatement( sql, callable );
            }

            try {
                int index = 1;
                index += expectation.prepare( insert );

                // Write the values of fields onto the prepared statement - we MUST use the state at the time the
                // insert was issued (cos of foreign key constraints). Not necessarily the object's current state

                dehydrate( id, fields, null, notNull, propertyColumnInsertable, j, insert, session, index, false );

                if ( useBatch ) {
                    session.getJdbcCoordinator().getBatch( inserBatchKey ).addToBatch();
                }
                else {
                    expectation.verifyOutcome(
                            session.getJdbcCoordinator()
                                    .getResultSetReturn()
                                    .executeUpdate( insert ), insert, -1
                    );
                }

            }
            catch (SQLException e) {
                if ( useBatch ) {
                    session.getJdbcCoordinator().abortBatch();
                }
                throw e;
            }
            finally {
                if ( !useBatch ) {
                    session.getJdbcCoordinator().getResourceRegistry().release( insert );
                    session.getJdbcCoordinator().afterStatementExecution();
                }
            }
        }
        catch (SQLException e) {
            throw getFactory().getSQLExceptionHelper().convert(
                    e,
                    "could not insert: " + MessageHelper.infoString( this ),
                    sql
            );
        }

    }

useBatch为true,调用session.getJdbcCoordinator().getBatch( inserBatchKey ).addToBatch() 这里的insertBatchKey为com.example.domain.DemoUser#INSERT

JdbcCoordinatorImpl.getBatch

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/engine/jdbc/internal/JdbcCoordinatorImpl.java

代码语言:javascript
复制
    @Override
    public Batch getBatch(BatchKey key) {
        if ( currentBatch != null ) {
            if ( currentBatch.getKey().equals( key ) ) {
                return currentBatch;
            }
            else {
                currentBatch.execute();
                currentBatch.release();
            }
        }
        currentBatch = batchBuilder().buildBatch( key, this );
        return currentBatch;
    }

BatchingBatch.addToBatch

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/engine/jdbc/batch/internal/BatchingBatch.java

代码语言:javascript
复制
    @Override
    public void addToBatch() {
        try {
            currentStatement.addBatch();
        }
        catch ( SQLException e ) {
            LOG.debugf( "SQLException escaped proxy", e );
            throw sqlExceptionHelper().convert( e, "could not perform addBatch", currentStatementSql );
        }
        statementPosition++;
        if ( statementPosition >= getKey().getBatchedStatementCount() ) {
            batchPosition++;
            if ( batchPosition == batchSize ) {
                notifyObserversImplicitExecution();
                performExecution();
                batchPosition = 0;
                batchExecuted = true;
            }
            statementPosition = 0;
        }
    }

这里在批量够的话,会执行performExecution

performExecution

代码语言:javascript
复制
private void performExecution() {
        LOG.debugf( "Executing batch size: %s", batchPosition );
        try {
            for ( Map.Entry<String,PreparedStatement> entry : getStatements().entrySet() ) {
                try {
                    final PreparedStatement statement = entry.getValue();
                    final int[] rowCounts;
                    try {
                        getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchStart();
                        rowCounts = statement.executeBatch();
                    }
                    finally {
                        getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchEnd();
                    }
                    checkRowCounts( rowCounts, statement );
                }
                catch ( SQLException e ) {
                    abortBatch();
                    throw sqlExceptionHelper().convert( e, "could not execute batch", entry.getKey() );
                }
            }
        }
        catch ( RuntimeException re ) {
            LOG.unableToExecuteBatch( re.getMessage() );
            throw re;
        }
        finally {
            batchPosition = 0;
        }
    }

可以看到这里调用了statement.executeBatch()

小结

  • jpa的save方法首先将数据添加造action queue里头
  • 在flush的时候,再通过insert action构造statement的batch操作,然后到达一个批量的时候才perform
  • jpa的batch操作也是在jdbc的statment的addBatch和executeBatch上的封装,具体可以详见ActionQueue.executeActions

具体模式如下

代码语言:javascript
复制
    public void jdbcBatchOperationTemplate(List<Employee> data){
        String sql = "insert into employee (name, city, phone) values (?, ?, ?)";

        Connection conn = null;
        PreparedStatement pstmt = null;

        final int batchSize = 1000;
        int count = 0;

        try{
            conn = dataSource.getConnection();
            pstmt = conn.prepareStatement(sql);

            for (Employee item: data) {
                pstmt.setString(1,item.getName());
                pstmt.setString(2,item.getCity());
                pstmt.setString(3,item.getPhone());

                //添加到batch
                pstmt.addBatch();

                //小批量提交,避免OOM
                if(++count % batchSize == 0) {
                    pstmt.executeBatch();
                }
            }

            pstmt.executeBatch(); //提交剩余的数据

        }catch (SQLException e){
            e.printStackTrace();
        }finally {
            DbUtils.closeQuietly(pstmt);
            DbUtils.closeQuietly(conn);
        }
    }

唯一的区别是jpa是在save的时候是将所有数据都提交到action queue,最后再flush的时候触发类似上面的addBatch和executeBatch操作。 对于使用@GeneratedValue(strategy = GenerationType.AUTO),在每次save添加到action queue之前都会调用数据库获取id。也就是假设要批量insert1000条数据,则save放到action queue之前会调用1000次获取他们的id,然后最后flush的时候,再将action queue的1000条数据,分批batch执行,相当于上面模板的方法data参数是1000个有id的Employee对象。

代码语言:javascript
复制
    select
        nextval ('hibernate_sequence')

doc

  • Spring Data JPA: Batch insert for nested entities
  • Spring JPA Hibernate - JpaRepository Insert (Batch)
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-01-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • save方法
    • SessionImpl.persist
    • flush方法
      • SessionImpl.flush
        • DefaultFlushEventListener.onFlush
          • AbstractFlushingEventListener.performExecutions
            • ActionQueue.executeActions
              • EntityInsertAction.execute
                • AbstractEntityPersister.insert
                  • insert
                    • JdbcCoordinatorImpl.getBatch
                      • BatchingBatch.addToBatch
                        • performExecution
                        • 小结
                        • doc
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档