聊聊jpa的batch操作的实现

本文主要研究一下jpa的batch操作的实现

save方法

SessionImpl.persist

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/internal/SessionImpl.java

    @Override
    public void persist(String entityName, Object object) throws HibernateException {
        firePersist( new PersistEvent( entityName, object, this ) );
    }

    private void firePersist(PersistEvent event) {
        errorIfClosed();
        checkTransactionSynchStatus();
        checkNoUnresolvedActionsBeforeOperation();
        for ( PersistEventListener listener : listeners( EventType.PERSIST ) ) {
            listener.onPersist( event );
        }
        checkNoUnresolvedActionsAfterOperation();
    }

触发了persist事件

flush方法

SessionImpl.flush

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/internal/SessionImpl.java

@Override
    public void flush() throws HibernateException {
        errorIfClosed();
        checkTransactionSynchStatus();
        if ( persistenceContext.getCascadeLevel() > 0 ) {
            throw new HibernateException( "Flush during cascade is dangerous" );
        }
        FlushEvent flushEvent = new FlushEvent( this );
        for ( FlushEventListener listener : listeners( EventType.FLUSH ) ) {
            listener.onFlush( flushEvent );
        }
        delayedAfterCompletion();
    }

DefaultFlushEventListener.onFlush

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/event/internal/DefaultFlushEventListener.java

/** Handle the given flush event.
     *
     * @param event The flush event to be handled.
     * @throws HibernateException
     */
    public void onFlush(FlushEvent event) throws HibernateException {
        final EventSource source = event.getSession();
        final PersistenceContext persistenceContext = source.getPersistenceContext();

        if ( persistenceContext.getNumberOfManagedEntities() > 0 ||
                persistenceContext.getCollectionEntries().size() > 0 ) {

            try {
                source.getEventListenerManager().flushStart();

                flushEverythingToExecutions( event );
                performExecutions( source );
                postFlush( source );
            }
            finally {
                source.getEventListenerManager().flushEnd(
                        event.getNumberOfEntitiesProcessed(),
                        event.getNumberOfCollectionsProcessed()
                );
            }

            postPostFlush( source );

            if ( source.getFactory().getStatistics().isStatisticsEnabled() ) {
                source.getFactory().getStatisticsImplementor().flush();
            }
        }
    }

这里调用了performExecutions

AbstractFlushingEventListener.performExecutions

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/event/internal/AbstractFlushingEventListener.java

    /**
     * Execute all SQL (and second-level cache updates) in a special order so that foreign-key constraints cannot
     * be violated: <ol>
     * <li> Inserts, in the order they were performed
     * <li> Updates
     * <li> Deletion of collection elements
     * <li> Insertion of collection elements
     * <li> Deletes, in the order they were performed
     * </ol>
     *
     * @param session The session being flushed
     */
    protected void performExecutions(EventSource session) {
        LOG.trace( "Executing flush" );

        // IMPL NOTE : here we alter the flushing flag of the persistence context to allow
        //        during-flush callbacks more leniency in regards to initializing proxies and
        //        lazy collections during their processing.
        // For more information, see HHH-2763
        try {
            session.getJdbcCoordinator().flushBeginning();
            session.getPersistenceContext().setFlushing( true );
            // we need to lock the collection caches before executing entity inserts/updates in order to
            // account for bi-directional associations
            session.getActionQueue().prepareActions();
            session.getActionQueue().executeActions();
        }
        finally {
            session.getPersistenceContext().setFlushing( false );
            session.getJdbcCoordinator().flushEnding();
        }
    }

这里调用了session.getActionQueue().executeActions();

ActionQueue.executeActions

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/engine/spi/ActionQueue.java

    /**
     * Perform all currently queued actions.
     * 
     * @throws HibernateException error executing queued actions.
     */
    public void executeActions() throws HibernateException {
        if ( hasUnresolvedEntityInsertActions() ) {
            throw new IllegalStateException( "About to execute actions, but there are unresolved entity insert actions." );
        }

        for ( ListProvider listProvider : EXECUTABLE_LISTS_MAP.values() ) {
            ExecutableList<?> l = listProvider.get( this );
            if ( l != null && !l.isEmpty() ) {
                executeActions( l );
            }
        }
    }

        /**
     * Perform {@link org.hibernate.action.spi.Executable#execute()} on each element of the list
     * 
     * @param list The list of Executable elements to be performed
     *
     * @throws HibernateException
     */
    private <E extends Executable & Comparable<?> & Serializable> void executeActions(ExecutableList<E> list) throws HibernateException {
        // todo : consider ways to improve the double iteration of Executables here:
        //        1) we explicitly iterate list here to perform Executable#execute()
        //        2) ExecutableList#getQuerySpaces also iterates the Executables to collect query spaces.
        try {
            for ( E e : list ) {
                try {
                    e.execute();
                }
                finally {
                    if( e.getBeforeTransactionCompletionProcess() != null ) {
                        if( beforeTransactionProcesses == null ) {
                            beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session );
                        }
                        beforeTransactionProcesses.register(e.getBeforeTransactionCompletionProcess());
                    }
                    if( e.getAfterTransactionCompletionProcess() != null ) {
                        if( afterTransactionProcesses == null ) {
                            afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session );
                        }
                        afterTransactionProcesses.register(e.getAfterTransactionCompletionProcess());
                    }
                }
            }
        }
        finally {
            if ( session.getFactory().getSessionFactoryOptions().isQueryCacheEnabled() ) {
                // Strictly speaking, only a subset of the list may have been processed if a RuntimeException occurs.
                // We still invalidate all spaces. I don't see this as a big deal - after all, RuntimeExceptions are
                // unexpected.
                Set<Serializable> propertySpaces = list.getQuerySpaces();
                invalidateSpaces( propertySpaces.toArray( new Serializable[propertySpaces.size()] ) );
            }
        }

        list.clear();
        session.getJdbcCoordinator().executeBatch();
    }

这里在for循环里头调用了e.execute();同时在循环之后,finally之后调用了session.getJdbcCoordinator().executeBatch(); 正符合了jdbc statement的executeBatch的调用模式,可以预见e.execute()执行了addBatch的操作,同时在达到一个batch的时候会先调用executeBatch()

EntityInsertAction.execute

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/action/internal/EntityInsertAction.java

    @Override
    public void execute() throws HibernateException {
        nullifyTransientReferencesIfNotAlready();

        final EntityPersister persister = getPersister();
        final SessionImplementor session = getSession();
        final Object instance = getInstance();
        final Serializable id = getId();

        final boolean veto = preInsert();

        // Don't need to lock the cache here, since if someone
        // else inserted the same pk first, the insert would fail

        if ( !veto ) {

            persister.insert( id, getState(), instance, session );
            PersistenceContext persistenceContext = session.getPersistenceContext();
            final EntityEntry entry = persistenceContext.getEntry( instance );
            if ( entry == null ) {
                throw new AssertionFailure( "possible non-threadsafe access to session" );
            }

            entry.postInsert( getState() );

            if ( persister.hasInsertGeneratedProperties() ) {
                persister.processInsertGeneratedProperties( id, instance, getState(), session );
                if ( persister.isVersionPropertyGenerated() ) {
                    version = Versioning.getVersion( getState(), persister );
                }
                entry.postUpdate( instance, getState(), version );
            }

            persistenceContext.registerInsertedKey( persister, getId() );
        }

        final SessionFactoryImplementor factory = session.getFactory();

        if ( isCachePutEnabled( persister, session ) ) {
            final CacheEntry ce = persister.buildCacheEntry(
                    instance,
                    getState(),
                    version,
                    session
            );
            cacheEntry = persister.getCacheEntryStructure().structure( ce );
            final EntityRegionAccessStrategy cache = persister.getCacheAccessStrategy();
            final Object ck = cache.generateCacheKey( id, persister, factory, session.getTenantIdentifier() );

            final boolean put = cacheInsert( persister, ck );

            if ( put && factory.getStatistics().isStatisticsEnabled() ) {
                factory.getStatisticsImplementor().secondLevelCachePut( cache.getRegion().getName() );
            }
        }

        handleNaturalIdPostSaveNotifications( id );

        postInsert();

        if ( factory.getStatistics().isStatisticsEnabled() && !veto ) {
            factory.getStatisticsImplementor().insertEntity( getPersister().getEntityName() );
        }

        markExecuted();
    }

调用了persister的insert方法

AbstractEntityPersister.insert

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/persister/entity/AbstractEntityPersister.java

    public void insert(Serializable id, Object[] fields, Object object, SessionImplementor session) {
        // apply any pre-insert in-memory value generation
        preInsertInMemoryValueGeneration( fields, object, session );

        final int span = getTableSpan();
        if ( entityMetamodel.isDynamicInsert() ) {
            // For the case of dynamic-insert="true", we need to generate the INSERT SQL
            boolean[] notNull = getPropertiesToInsert( fields );
            for ( int j = 0; j < span; j++ ) {
                insert( id, fields, notNull, j, generateInsertString( notNull, j ), object, session );
            }
        }
        else {
            // For the case of dynamic-insert="false", use the static SQL
            for ( int j = 0; j < span; j++ ) {
                insert( id, fields, getPropertyInsertability(), j, getSQLInsertStrings()[j], object, session );
            }
        }
    }

insert

    /**
     * Perform an SQL INSERT.
     * <p/>
     * This for is used for all non-root tables as well as the root table
     * in cases where the identifier value is known before the insert occurs.
     */
    protected void insert(
            final Serializable id,
            final Object[] fields,
            final boolean[] notNull,
            final int j,
            final String sql,
            final Object object,
            final SessionImplementor session) throws HibernateException {

        if ( isInverseTable( j ) ) {
            return;
        }

        //note: it is conceptually possible that a UserType could map null to
        //      a non-null value, so the following is arguable:
        if ( isNullableTable( j ) && isAllNull( fields, j ) ) {
            return;
        }

        if ( LOG.isTraceEnabled() ) {
            LOG.tracev( "Inserting entity: {0}", MessageHelper.infoString( this, id, getFactory() ) );
            if ( j == 0 && isVersioned() ) {
                LOG.tracev( "Version: {0}", Versioning.getVersion( fields, this ) );
            }
        }

        // TODO : shouldn't inserts be Expectations.NONE?
        final Expectation expectation = Expectations.appropriateExpectation( insertResultCheckStyles[j] );
        // we can't batch joined inserts, *especially* not if it is an identity insert;
        // nor can we batch statements where the expectation is based on an output param
        final boolean useBatch = j == 0 && expectation.canBeBatched();
        if ( useBatch && inserBatchKey == null ) {
            inserBatchKey = new BasicBatchKey(
                    getEntityName() + "#INSERT",
                    expectation
            );
        }
        final boolean callable = isInsertCallable( j );

        try {
            // Render the SQL query
            final PreparedStatement insert;
            if ( useBatch ) {
                insert = session
                        .getJdbcCoordinator()
                        .getBatch( inserBatchKey )
                        .getBatchStatement( sql, callable );
            }
            else {
                insert = session
                        .getJdbcCoordinator()
                        .getStatementPreparer()
                        .prepareStatement( sql, callable );
            }

            try {
                int index = 1;
                index += expectation.prepare( insert );

                // Write the values of fields onto the prepared statement - we MUST use the state at the time the
                // insert was issued (cos of foreign key constraints). Not necessarily the object's current state

                dehydrate( id, fields, null, notNull, propertyColumnInsertable, j, insert, session, index, false );

                if ( useBatch ) {
                    session.getJdbcCoordinator().getBatch( inserBatchKey ).addToBatch();
                }
                else {
                    expectation.verifyOutcome(
                            session.getJdbcCoordinator()
                                    .getResultSetReturn()
                                    .executeUpdate( insert ), insert, -1
                    );
                }

            }
            catch (SQLException e) {
                if ( useBatch ) {
                    session.getJdbcCoordinator().abortBatch();
                }
                throw e;
            }
            finally {
                if ( !useBatch ) {
                    session.getJdbcCoordinator().getResourceRegistry().release( insert );
                    session.getJdbcCoordinator().afterStatementExecution();
                }
            }
        }
        catch (SQLException e) {
            throw getFactory().getSQLExceptionHelper().convert(
                    e,
                    "could not insert: " + MessageHelper.infoString( this ),
                    sql
            );
        }

    }

useBatch为true,调用session.getJdbcCoordinator().getBatch( inserBatchKey ).addToBatch() 这里的insertBatchKey为com.example.domain.DemoUser#INSERT

JdbcCoordinatorImpl.getBatch

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/engine/jdbc/internal/JdbcCoordinatorImpl.java

    @Override
    public Batch getBatch(BatchKey key) {
        if ( currentBatch != null ) {
            if ( currentBatch.getKey().equals( key ) ) {
                return currentBatch;
            }
            else {
                currentBatch.execute();
                currentBatch.release();
            }
        }
        currentBatch = batchBuilder().buildBatch( key, this );
        return currentBatch;
    }

BatchingBatch.addToBatch

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/engine/jdbc/batch/internal/BatchingBatch.java

    @Override
    public void addToBatch() {
        try {
            currentStatement.addBatch();
        }
        catch ( SQLException e ) {
            LOG.debugf( "SQLException escaped proxy", e );
            throw sqlExceptionHelper().convert( e, "could not perform addBatch", currentStatementSql );
        }
        statementPosition++;
        if ( statementPosition >= getKey().getBatchedStatementCount() ) {
            batchPosition++;
            if ( batchPosition == batchSize ) {
                notifyObserversImplicitExecution();
                performExecution();
                batchPosition = 0;
                batchExecuted = true;
            }
            statementPosition = 0;
        }
    }

这里在批量够的话,会执行performExecution

performExecution

private void performExecution() {
        LOG.debugf( "Executing batch size: %s", batchPosition );
        try {
            for ( Map.Entry<String,PreparedStatement> entry : getStatements().entrySet() ) {
                try {
                    final PreparedStatement statement = entry.getValue();
                    final int[] rowCounts;
                    try {
                        getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchStart();
                        rowCounts = statement.executeBatch();
                    }
                    finally {
                        getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchEnd();
                    }
                    checkRowCounts( rowCounts, statement );
                }
                catch ( SQLException e ) {
                    abortBatch();
                    throw sqlExceptionHelper().convert( e, "could not execute batch", entry.getKey() );
                }
            }
        }
        catch ( RuntimeException re ) {
            LOG.unableToExecuteBatch( re.getMessage() );
            throw re;
        }
        finally {
            batchPosition = 0;
        }
    }

可以看到这里调用了statement.executeBatch()

小结

  • jpa的save方法首先将数据添加造action queue里头
  • 在flush的时候,再通过insert action构造statement的batch操作,然后到达一个批量的时候才perform
  • jpa的batch操作也是在jdbc的statment的addBatch和executeBatch上的封装,具体可以详见ActionQueue.executeActions

具体模式如下

    public void jdbcBatchOperationTemplate(List<Employee> data){
        String sql = "insert into employee (name, city, phone) values (?, ?, ?)";

        Connection conn = null;
        PreparedStatement pstmt = null;

        final int batchSize = 1000;
        int count = 0;

        try{
            conn = dataSource.getConnection();
            pstmt = conn.prepareStatement(sql);

            for (Employee item: data) {
                pstmt.setString(1,item.getName());
                pstmt.setString(2,item.getCity());
                pstmt.setString(3,item.getPhone());

                //添加到batch
                pstmt.addBatch();

                //小批量提交,避免OOM
                if(++count % batchSize == 0) {
                    pstmt.executeBatch();
                }
            }

            pstmt.executeBatch(); //提交剩余的数据

        }catch (SQLException e){
            e.printStackTrace();
        }finally {
            DbUtils.closeQuietly(pstmt);
            DbUtils.closeQuietly(conn);
        }
    }

唯一的区别是jpa是在save的时候是将所有数据都提交到action queue,最后再flush的时候触发类似上面的addBatch和executeBatch操作。 对于使用@GeneratedValue(strategy = GenerationType.AUTO),在每次save添加到action queue之前都会调用数据库获取id。也就是假设要批量insert1000条数据,则save放到action queue之前会调用1000次获取他们的id,然后最后flush的时候,再将action queue的1000条数据,分批batch执行,相当于上面模板的方法data参数是1000个有id的Employee对象。

    select
        nextval ('hibernate_sequence')

doc

  • Spring Data JPA: Batch insert for nested entities
  • Spring JPA Hibernate - JpaRepository Insert (Batch)

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-01-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏腾讯研究院的专栏

Doctor Si, Xiao:Thoughts on Chinese Safe Harbor Rules

Thoughts on Chinese Safe Harbor Rules ——Shared by Doctor Si, Xiao at Stanford...

46850
来自专栏技术随笔

医学影像中Dicom的常用Tag分类与说明

50770
来自专栏函数式编程语言及工具

SDP(2):ScalikeJDBC-Connection Pool Configuration

  scalikeJDBC可以通过配置文件来设置连接池及全局系统参数。对配置文件的解析是通过TypesafeConfig工具库实现的。默认加载classpath...

36240
来自专栏GIS讲堂

巧用Arcgis Server的REST接口实现OL2中WMS添加过滤

在实际的应用中,很多时候我们需要对展示的图层进行属性或者空间的过滤,在Geoserver发布的WMS中,可以通过CQL_FILTER来设置过滤条件,但是Arcg...

14030
来自专栏函数式编程语言及工具

Cats(3)- freeK-Free编程更轻松,Free programming with freeK

   在上一节我们讨论了通过Coproduct来实现DSL组合:用一些功能简单的基础DSL组合成符合大型多复杂功能应用的DSL。但是我们发现:cats在处理多层...

21470
来自专栏cmazxiaoma的架构师之路

你真的会用HttpMessageConverter吗?

1.7K60
来自专栏JAVA后端开发

解决SpringMVC使用fastJson后Long类型丢失精度的问题

78030
来自专栏码匠的流水账

聊聊sentinel的FlowSlot

com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.java

17210
来自专栏jeremy的技术点滴

python开发小技巧

32740
来自专栏码匠的流水账

聊聊openmessaging的MessagingAccessPoint

本文主要研究一下openmessaging的MessagingAccessPoint

8510

扫码关注云+社区

领取腾讯云代金券