专栏首页码匠的流水账聊聊flink jdbc的ParameterValuesProvider
原创

聊聊flink jdbc的ParameterValuesProvider

本文主要研究一下flink jdbc的ParameterValuesProvider

ParameterValuesProvider

flink-jdbc_2.11-1.8.0-sources.jar!/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java

/**
 * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits).
 * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider}
 * implementation.
 */
public interface ParameterValuesProvider {
​
    /** Returns the necessary parameters array to use for query in parallel a table. */
    Serializable[][] getParameterValues();
}
  • ParameterValuesProvider接口定义了getParameterValues方法,用于返回并行表查询所需的参数,该参数主要是用于将一个大的sql查询分为几个分段查询用于并行处理;它有两个实现类分别是GenericParameterValuesProvider及NumericBetweenParametersProvider

GenericParameterValuesProvider

flink-jdbc_2.11-1.8.0-sources.jar!/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java

/**
 * This splits generator actually does nothing but wrapping the query parameters
 * computed by the user before creating the {@link JDBCInputFormat} instance.
 */
public class GenericParameterValuesProvider implements ParameterValuesProvider {
​
    private final Serializable[][] parameters;
​
    public GenericParameterValuesProvider(Serializable[][] parameters) {
        this.parameters = parameters;
    }
​
    @Override
    public Serializable[][] getParameterValues(){
        //do nothing...precomputed externally
        return parameters;
    }
​
}
  • GenericParameterValuesProvider实际上没有做其他事情,它实现的getParameterValues方法返回的值是构造器要求输入的

NumericBetweenParametersProvider

flink-jdbc_2.11-1.8.0-sources.jar!/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java

/**
 * This query parameters generator is an helper class to parameterize from/to queries on a numeric column.
 * The generated array of from/to values will be equally sized to fetchSize (apart from the last one),
 * ranging from minVal up to maxVal.
 *
 * <p>For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
 * <PRE>
 *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
 * </PRE>
 *
 * <p>You can take advantage of this class to automatically generate the parameters of the BETWEEN clause,
 * based on the passed constructor parameters.
 *
 */
public class NumericBetweenParametersProvider implements ParameterValuesProvider {
​
    private final long fetchSize;
    private final long minVal;
    private final long maxVal;
​
    /**
     * NumericBetweenParametersProvider constructor.
     *
     * @param fetchSize the max distance between the produced from/to pairs
     * @param minVal the lower bound of the produced "from" values
     * @param maxVal the upper bound of the produced "to" values
     */
    public NumericBetweenParametersProvider(long fetchSize, long minVal, long maxVal) {
        checkArgument(fetchSize > 0, "Fetch size must be greater than 0.");
        checkArgument(minVal <= maxVal, "Min value cannot be greater than max value.");
        this.fetchSize = fetchSize;
        this.minVal = minVal;
        this.maxVal = maxVal;
    }
​
    @Override
    public Serializable[][] getParameterValues() {
        double maxElemCount = (maxVal - minVal) + 1;
        int numBatches = new Double(Math.ceil(maxElemCount / fetchSize)).intValue();
        Serializable[][] parameters = new Serializable[numBatches][2];
        int batchIndex = 0;
        for (long start = minVal; start <= maxVal; start += fetchSize, batchIndex++) {
            long end = start + fetchSize - 1;
            if (end > maxVal) {
                end = maxVal;
            }
            parameters[batchIndex] = new Long[]{start, end};
        }
        return parameters;
    }
​
}
  • NumericBetweenParametersProvider为基于numeric主键的范围查询(WHERE id BETWEEN ? AND ?)自动生成了分段的参数,其构造器要求输入每次的fetchSize、最小值minVal、最大值maxVal;getParameterValues方法会根据这几个值计算numBatches,然后计算好分段的参数值

JDBCInputFormat

flink-jdbc_2.11-1.8.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java

public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable<Row> {
​
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
​
    private String username;
    private String password;
    private String drivername;
    private String dbURL;
    private String queryTemplate;
    private int resultSetType;
    private int resultSetConcurrency;
    private RowTypeInfo rowTypeInfo;
​
    private transient Connection dbConn;
    private transient PreparedStatement statement;
    private transient ResultSet resultSet;
    private int fetchSize;
​
    private boolean hasNext;
    private Object[][] parameterValues;
​
    public JDBCInputFormat() {
    }
​
    @Override
    public RowTypeInfo getProducedType() {
        return rowTypeInfo;
    }
​
    @Override
    public void configure(Configuration parameters) {
        //do nothing here
    }
​
    @Override
    public void openInputFormat() {
        //called once per inputFormat (on open)
        try {
            Class.forName(drivername);
            if (username == null) {
                dbConn = DriverManager.getConnection(dbURL);
            } else {
                dbConn = DriverManager.getConnection(dbURL, username, password);
            }
            statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
            if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
                statement.setFetchSize(fetchSize);
            }
        } catch (SQLException se) {
            throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
        } catch (ClassNotFoundException cnfe) {
            throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
        }
    }
​
    @Override
    public void closeInputFormat() {
        //called once per inputFormat (on close)
        try {
            if (statement != null) {
                statement.close();
            }
        } catch (SQLException se) {
            LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage());
        } finally {
            statement = null;
        }
​
        try {
            if (dbConn != null) {
                dbConn.close();
            }
        } catch (SQLException se) {
            LOG.info("Inputformat couldn't be closed - " + se.getMessage());
        } finally {
            dbConn = null;
        }
​
        parameterValues = null;
    }
​
    /**
     * Connects to the source database and executes the query in a <b>parallel
     * fashion</b> if
     * this {@link InputFormat} is built using a parameterized query (i.e. using
     * a {@link PreparedStatement})
     * and a proper {@link ParameterValuesProvider}, in a <b>non-parallel
     * fashion</b> otherwise.
     *
     * @param inputSplit which is ignored if this InputFormat is executed as a
     *        non-parallel source,
     *        a "hook" to the query parameters otherwise (using its
     *        <i>splitNumber</i>)
     * @throws IOException if there's an error during the execution of the query
     */
    @Override
    public void open(InputSplit inputSplit) throws IOException {
        try {
            if (inputSplit != null && parameterValues != null) {
                for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
                    Object param = parameterValues[inputSplit.getSplitNumber()][i];
                    if (param instanceof String) {
                        statement.setString(i + 1, (String) param);
                    } else if (param instanceof Long) {
                        statement.setLong(i + 1, (Long) param);
                    } else if (param instanceof Integer) {
                        statement.setInt(i + 1, (Integer) param);
                    } else if (param instanceof Double) {
                        statement.setDouble(i + 1, (Double) param);
                    } else if (param instanceof Boolean) {
                        statement.setBoolean(i + 1, (Boolean) param);
                    } else if (param instanceof Float) {
                        statement.setFloat(i + 1, (Float) param);
                    } else if (param instanceof BigDecimal) {
                        statement.setBigDecimal(i + 1, (BigDecimal) param);
                    } else if (param instanceof Byte) {
                        statement.setByte(i + 1, (Byte) param);
                    } else if (param instanceof Short) {
                        statement.setShort(i + 1, (Short) param);
                    } else if (param instanceof Date) {
                        statement.setDate(i + 1, (Date) param);
                    } else if (param instanceof Time) {
                        statement.setTime(i + 1, (Time) param);
                    } else if (param instanceof Timestamp) {
                        statement.setTimestamp(i + 1, (Timestamp) param);
                    } else if (param instanceof Array) {
                        statement.setArray(i + 1, (Array) param);
                    } else {
                        //extends with other types if needed
                        throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet).");
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
                }
            }
            resultSet = statement.executeQuery();
            hasNext = resultSet.next();
        } catch (SQLException se) {
            throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
        }
    }
​
    /**
     * Closes all resources used.
     *
     * @throws IOException Indicates that a resource could not be closed.
     */
    @Override
    public void close() throws IOException {
        if (resultSet == null) {
            return;
        }
        try {
            resultSet.close();
        } catch (SQLException se) {
            LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage());
        }
    }
​
    /**
     * Checks whether all data has been read.
     *
     * @return boolean value indication whether all data has been read.
     * @throws IOException
     */
    @Override
    public boolean reachedEnd() throws IOException {
        return !hasNext;
    }
​
    /**
     * Stores the next resultSet row in a tuple.
     *
     * @param row row to be reused.
     * @return row containing next {@link Row}
     * @throws java.io.IOException
     */
    @Override
    public Row nextRecord(Row row) throws IOException {
        try {
            if (!hasNext) {
                return null;
            }
            for (int pos = 0; pos < row.getArity(); pos++) {
                row.setField(pos, resultSet.getObject(pos + 1));
            }
            //update hasNext after we've read the record
            hasNext = resultSet.next();
            return row;
        } catch (SQLException se) {
            throw new IOException("Couldn't read data - " + se.getMessage(), se);
        } catch (NullPointerException npe) {
            throw new IOException("Couldn't access resultSet", npe);
        }
    }
​
    @Override
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
        return cachedStatistics;
    }
​
    @Override
    public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
        if (parameterValues == null) {
            return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
        }
        GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];
        for (int i = 0; i < ret.length; i++) {
            ret[i] = new GenericInputSplit(i, ret.length);
        }
        return ret;
    }
​
    @Override
    public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
        return new DefaultInputSplitAssigner(inputSplits);
    }
​
    @VisibleForTesting
    PreparedStatement getStatement() {
        return statement;
    }
​
    //......
}
  • JDBCInputFormat继承了RichInputFormat,同时实现了ResultTypeQueryable接口
  • createInputSplits方法会根据parameterValues来创建GenericInputSplit数组,如果parameterValues为null则默认创建的totalNumberOfPartitions为1
  • getInputSplitAssigner方法根据InputSplit数组创建了DefaultInputSplitAssigner;getStatistics方法返回的是方法参数cachedStatistics
  • openInputFormat方法主要是获取数据库连接,准备好statement;closeInputFormat方法主要是关闭statement以及关闭数据库连接
  • open方法接收inputSplit,其主要是根据inputSplit从parameterValues提取查询参数,并设置到statement,之后执行statement.executeQuery()获取resultSet;nextRecord方法主要是遍历resultSet读取数据;close方法主要是关闭resultSet

InputSplit

/flink-core-1.8.0-sources.jar!/org/apache/flink/core/io/InputSplit.java

/**
 * This interface must be implemented by all kind of input splits that can be assigned to input formats.
 * 
 * <p>Input splits are transferred in serialized form via the messages, so they need to be serializable
 * as defined by {@link java.io.Serializable}.</p>
 */
@Public
public interface InputSplit extends Serializable {
    
    /**
     * Returns the number of this input split.
     * 
     * @return the number of this input split
     */
    int getSplitNumber();
}
  • InputSplit接口定义了getSplitNumber方法用于返回当前input的split number

GenericInputSplit

flink-core-1.8.0-sources.jar!/org/apache/flink/core/io/GenericInputSplit.java

/**
 * A generic input split that has only a partition number.
 */
@Public
public class GenericInputSplit implements InputSplit, java.io.Serializable {
​
    private static final long serialVersionUID = 1L;
​
    /** The number of this split. */
    private final int partitionNumber;
​
    /** The total number of partitions */
    private final int totalNumberOfPartitions;
    
    // --------------------------------------------------------------------------------------------
​
    /**
     * Creates a generic input split with the given split number.
     * 
     * @param partitionNumber The number of the split's partition.
     * @param totalNumberOfPartitions The total number of the splits (partitions).
     */
    public GenericInputSplit(int partitionNumber, int totalNumberOfPartitions) {
        this.partitionNumber = partitionNumber;
        this.totalNumberOfPartitions = totalNumberOfPartitions;
    }
​
    // --------------------------------------------------------------------------------------------
​
    @Override
    public int getSplitNumber() {
        return this.partitionNumber;
    }
    
    public int getTotalNumberOfSplits() {
        return this.totalNumberOfPartitions;
    }
    
    // --------------------------------------------------------------------------------------------
​
    @Override
    public int hashCode() {
        return this.partitionNumber ^ this.totalNumberOfPartitions;
    }
    
    @Override
    public boolean equals(Object obj) {
        if (obj instanceof GenericInputSplit) {
            GenericInputSplit other = (GenericInputSplit) obj;
            return this.partitionNumber == other.partitionNumber &&
                    this.totalNumberOfPartitions == other.totalNumberOfPartitions;
        } else {
            return false;
        }
    }
    
    public String toString() {
        return "GenericSplit (" + this.partitionNumber + '/' + this.totalNumberOfPartitions + ')';
    }
}
  • GenericInputSplit实现了InputSplit接口,其getSplitNumber方法返回的是partitionNumber

InputSplitAssigner

flink-core-1.8.0-sources.jar!/org/apache/flink/core/io/InputSplitAssigner.java

/**
 * An input split assigner distributes the {@link InputSplit}s among the instances on which a
 * data source exists.
 */
@PublicEvolving
public interface InputSplitAssigner {
​
    /**
     * Returns the next input split that shall be consumed. The consumer's host is passed as a parameter
     * to allow localized assignments.
     * 
     * @param host The host address of split requesting task.
     * @param taskId The id of the split requesting task.
     * @return the next input split to be consumed, or <code>null</code> if no more splits remain.
     */
    InputSplit getNextInputSplit(String host, int taskId);
​
}
  • InputSplitAssigner接口定义了getNextInputSplit方法,其方法接收两个参数分别是host及taskId,该方法用于返回下一个inputSplit

DefaultInputSplitAssigner

flink-core-1.8.0-sources.jar!/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java

/**
 * This is the default implementation of the {@link InputSplitAssigner} interface. The default input split assigner
 * simply returns all input splits of an input vertex in the order they were originally computed.
 */
@Internal
public class DefaultInputSplitAssigner implements InputSplitAssigner {
​
    /** The logging object used to report information and errors. */
    private static final Logger LOG = LoggerFactory.getLogger(DefaultInputSplitAssigner.class);
​
    /** The list of all splits */
    private final List<InputSplit> splits = new ArrayList<InputSplit>();
​
​
    public DefaultInputSplitAssigner(InputSplit[] splits) {
        Collections.addAll(this.splits, splits);
    }
    
    public DefaultInputSplitAssigner(Collection<? extends InputSplit> splits) {
        this.splits.addAll(splits);
    }
    
    
    @Override
    public InputSplit getNextInputSplit(String host, int taskId) {
        InputSplit next = null;
        
        // keep the synchronized part short
        synchronized (this.splits) {
            if (this.splits.size() > 0) {
                next = this.splits.remove(this.splits.size() - 1);
            }
        }
        
        if (LOG.isDebugEnabled()) {
            if (next == null) {
                LOG.debug("No more input splits available");
            } else {
                LOG.debug("Assigning split " + next + " to " + host);
            }
        }
        return next;
    }
}
  • DefaultInputSplitAssigner是InputSplitAssigner的默认实现,其getNextInputSplit方法会使用synchronized修改splits值,移除最后一个元素

InputFormatSourceFunction

flink-streaming-java_2.11-1.8.0-sources.jar!/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java

@Internal
public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
    private static final long serialVersionUID = 1L;
​
    private TypeInformation<OUT> typeInfo;
    private transient TypeSerializer<OUT> serializer;
​
    private InputFormat<OUT, InputSplit> format;
​
    private transient InputSplitProvider provider;
    private transient Iterator<InputSplit> splitIterator;
​
    private volatile boolean isRunning = true;
​
    @SuppressWarnings("unchecked")
    public InputFormatSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
        this.format = (InputFormat<OUT, InputSplit>) format;
        this.typeInfo = typeInfo;
    }
​
    @Override
    @SuppressWarnings("unchecked")
    public void open(Configuration parameters) throws Exception {
        StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
​
        if (format instanceof RichInputFormat) {
            ((RichInputFormat) format).setRuntimeContext(context);
        }
        format.configure(parameters);
​
        provider = context.getInputSplitProvider();
        serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
        splitIterator = getInputSplits();
        isRunning = splitIterator.hasNext();
    }
​
    @Override
    public void run(SourceContext<OUT> ctx) throws Exception {
        try {
​
            Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
            if (isRunning && format instanceof RichInputFormat) {
                ((RichInputFormat) format).openInputFormat();
            }
​
            OUT nextElement = serializer.createInstance();
            while (isRunning) {
                format.open(splitIterator.next());
​
                // for each element we also check if cancel
                // was called by checking the isRunning flag
​
                while (isRunning && !format.reachedEnd()) {
                    nextElement = format.nextRecord(nextElement);
                    if (nextElement != null) {
                        ctx.collect(nextElement);
                    } else {
                        break;
                    }
                }
                format.close();
                completedSplitsCounter.inc();
​
                if (isRunning) {
                    isRunning = splitIterator.hasNext();
                }
            }
        } finally {
            format.close();
            if (format instanceof RichInputFormat) {
                ((RichInputFormat) format).closeInputFormat();
            }
            isRunning = false;
        }
    }
​
    @Override
    public void cancel() {
        isRunning = false;
    }
​
    @Override
    public void close() throws Exception {
        format.close();
        if (format instanceof RichInputFormat) {
            ((RichInputFormat) format).closeInputFormat();
        }
    }
​
    /**
     * Returns the {@code InputFormat}. This is only needed because we need to set the input
     * split assigner on the {@code StreamGraph}.
     */
    public InputFormat<OUT, InputSplit> getFormat() {
        return format;
    }
​
    private Iterator<InputSplit> getInputSplits() {
​
        return new Iterator<InputSplit>() {
​
            private InputSplit nextSplit;
​
            private boolean exhausted;
​
            @Override
            public boolean hasNext() {
                if (exhausted) {
                    return false;
                }
​
                if (nextSplit != null) {
                    return true;
                }
​
                final InputSplit split;
                try {
                    split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader());
                } catch (InputSplitProviderException e) {
                    throw new RuntimeException("Could not retrieve next input split.", e);
                }
​
                if (split != null) {
                    this.nextSplit = split;
                    return true;
                } else {
                    exhausted = true;
                    return false;
                }
            }
​
            @Override
            public InputSplit next() {
                if (this.nextSplit == null && !hasNext()) {
                    throw new NoSuchElementException();
                }
​
                final InputSplit tmp = this.nextSplit;
                this.nextSplit = null;
                return tmp;
            }
​
            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}
  • InputFormatSourceFunction的splitIterator的hasNext()方法会使用provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader())来获取nextInputSplit,其provider为RpcInputSplitProvider

InputSplitProvider

flink-runtime_2.11-1.8.0-sources.jar!/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java

/**
 * An input split provider can be successively queried to provide a series of {@link InputSplit} objects a
 * task is supposed to consume in the course of its execution.
 */
@Public
public interface InputSplitProvider {
​
    /**
     * Requests the next input split to be consumed by the calling task.
     *
     * @param userCodeClassLoader used to deserialize input splits
     * @return the next input split to be consumed by the calling task or <code>null</code> if the
     *         task shall not consume any further input splits.
     * @throws InputSplitProviderException if fetching the next input split fails
     */
    InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException;
}
  • InputSplitProvider接口定义了getNextInputSplit方法,用于给每个task调用获取它要处理的inputSplit

RpcInputSplitProvider

flink-runtime_2.11-1.8.0-sources.jar!/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java

public class RpcInputSplitProvider implements InputSplitProvider {
    private final JobMasterGateway jobMasterGateway;
    private final JobVertexID jobVertexID;
    private final ExecutionAttemptID executionAttemptID;
    private final Time timeout;
​
    public RpcInputSplitProvider(
            JobMasterGateway jobMasterGateway,
            JobVertexID jobVertexID,
            ExecutionAttemptID executionAttemptID,
            Time timeout) {
        this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
        this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
        this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID);
        this.timeout = Preconditions.checkNotNull(timeout);
    }
​
​
    @Override
    public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
        Preconditions.checkNotNull(userCodeClassLoader);
​
        CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
            jobVertexID,
            executionAttemptID);
​
        try {
            SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());
​
            if (serializedInputSplit.isEmpty()) {
                return null;
            } else {
                return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader);
            }
        } catch (Exception e) {
            throw new InputSplitProviderException("Requesting the next input split failed.", e);
        }
    }
}
  • RpcInputSplitProvider的getNextInputSplit方法主要是通过jobMasterGateway.requestNextInputSplit,像jobMaster请求nextInputSplit

JobMaster.requestNextInputSplit

flink-runtime_2.11-1.8.0-sources.jar!/org/apache/flink/runtime/jobmaster/JobMaster.java

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
    //......
​
    @Override
    public CompletableFuture<SerializedInputSplit> requestNextInputSplit(
            final JobVertexID vertexID,
            final ExecutionAttemptID executionAttempt) {
​
        final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
        if (execution == null) {
            // can happen when JobManager had already unregistered this execution upon on task failure,
            // but TaskManager get some delay to aware of that situation
            if (log.isDebugEnabled()) {
                log.debug("Can not find Execution for attempt {}.", executionAttempt);
            }
            // but we should TaskManager be aware of this
            return FutureUtils.completedExceptionally(new Exception("Can not find Execution for attempt " + executionAttempt));
        }
​
        final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
        if (vertex == null) {
            log.error("Cannot find execution vertex for vertex ID {}.", vertexID);
            return FutureUtils.completedExceptionally(new Exception("Cannot find execution vertex for vertex ID " + vertexID));
        }
​
        final InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
        if (splitAssigner == null) {
            log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
            return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex ID " + vertexID));
        }
​
        final LogicalSlot slot = execution.getAssignedResource();
        final int taskId = execution.getVertex().getParallelSubtaskIndex();
        final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
        final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
​
        if (log.isDebugEnabled()) {
            log.debug("Send next input split {}.", nextInputSplit);
        }
​
        try {
            final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
            return CompletableFuture.completedFuture(new SerializedInputSplit(serializedInputSplit));
        } catch (Exception ex) {
            log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
            IOException reason = new IOException("Could not serialize the next input split of class " +
                    nextInputSplit.getClass() + ".", ex);
            vertex.fail(reason);
            return FutureUtils.completedExceptionally(reason);
        }
    }
​
    //......
}
  • JobMaster的requestNextInputSplit方法会通过splitAssigner.getNextInputSplit(host, taskId)来获取nextInputSplit,然后返回给请求的RpcInputSplitProvider

小结

  • ParameterValuesProvider接口定义了getParameterValues方法,用于返回并行表查询所需的参数,该参数主要是用于将一个大的sql查询分为几个分段查询用于并行处理;它有两个实现类分别是GenericParameterValuesProvider及NumericBetweenParametersProvider
  • GenericParameterValuesProvider实际上没有做其他事情,它实现的getParameterValues方法返回的值是构造器要求输入的;NumericBetweenParametersProvider为基于numeric主键的范围查询(WHERE id BETWEEN ? AND ?)自动生成了分段的参数,其构造器要求输入每次的fetchSize、最小值minVal、最大值maxVal;getParameterValues方法会根据这几个值计算numBatches,然后计算好分段的参数值
  • JDBCInputFormat继承了RichInputFormat,同时实现了ResultTypeQueryable接口;createInputSplits方法会根据parameterValues来创建GenericInputSplit数组,如果parameterValues为null则默认创建的totalNumberOfPartitions为1;getInputSplitAssigner方法根据InputSplit数组创建了DefaultInputSplitAssigner;getStatistics方法返回的是方法参数cachedStatistics;openInputFormat方法主要是获取数据库连接,准备好statement;closeInputFormat方法主要是关闭statement以及关闭数据库连接;open方法接收inputSplit,其主要是根据inputSplit从parameterValues提取查询参数,并设置到statement,之后执行statement.executeQuery()获取resultSet;nextRecord方法主要是遍历resultSet读取数据;close方法主要是关闭resultSet
  • InputSplit接口定义了getSplitNumber方法用于返回当前input的split number;GenericInputSplit实现了InputSplit接口,其getSplitNumber方法返回的是partitionNumber;InputSplitAssigner接口定义了getNextInputSplit方法,其方法接收两个参数分别是host及taskId,该方法用于返回下一个inputSplit;DefaultInputSplitAssigner是InputSplitAssigner的默认实现,其getNextInputSplit方法会使用synchronized修改splits值,移除最后一个元素
  • InputFormatSourceFunction的splitIterator的hasNext()方法会使用provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader())来获取nextInputSplit,其provider为RpcInputSplitProvider;InputSplitProvider接口定义了getNextInputSplit方法,用于给每个task调用获取它要处理的inputSplit;RpcInputSplitProvider的getNextInputSplit方法主要是通过jobMasterGateway.requestNextInputSplit,像jobMaster请求nextInputSplit;JobMaster的requestNextInputSplit方法会通过splitAssigner.getNextInputSplit(host, taskId)来获取nextInputSplit,然后返回给请求的RpcInputSplitProvider

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

推荐阅读

  • 日访问百万级微信小程序优化技巧总结

    之前负责的锡慧在线小程序是一款公益性质在线教育类小程序,因疫情影响导致流量暴增,日访问过百万

    薛定喵君
    小程序微信缓存RedisCanvas
  • Spiral: 一个性能卓越的PHP/Golang混合开发框架

    春节期间,了解到一个“全新”的 WEB 开发框架:Spiral, 最开始引起我的兴趣是从同事那里听说了 RoadRunner. 然后去了解 RoadRunner 的时候看到了 Spiral. 之所以把“全新”用双引号引起来,是因为这个框架其实从 2013 年起就在它的开发团队以及一些企业客户中应用了,经历了各种实际应用场景的考验,Spiral 的功能及其丰富,性能与当前主流的 PHP 框架相比也相当出众。但这个框架源自俄国,在国内不算知名,他们团队开始重视和梳理开源,也应该是才开始的事情。

    小李刀刀
    PHPGoSymfony
  • kubernetes系列教程(二十)prometheus提供完备监控系统

    上一个章节中kubernetes系列教程(十九)使用metric-server让HPA弹性伸缩愉快运行介绍了在kubernetes中的监控架构,通过安装和使用metric-server提供kubernetes中的核心监控指标:提供node节点和pod容器CPU和内存的监控能力,核心监控指标提供的监控维度和指标相对有限,需要更好的扩展监控能力,需要使用自定义监控来实现,本文介绍prometheus提供更更加丰富的自定义监控能力。

    HappyLau谈云计算
    Kubernetes容器微服务云监控
  • 如何将设计思维应用到精益初创公司的软件开发

    我们所说的设计思维,是指由 IDEO 公司的 Tim Brown 提出,并且正在改变全世界组织的设计思维,简称 DT。(译者注:IDDO,当代最具影响力的设计公司之一)

    Aceyclee
    Serverless无服务器云函数
  • InnoDB 事务加锁分析

    一般大家对数据库事务的了解可能停留在事务的ACID特性以及事务4种不同的隔离级别层面上,而对于事务 4 种不同隔离级别如何实现了解相对较少。

    2020labs小助手
    MySQLSQL数据库MVCMVCC
  • FutureTask 核心源码解析

    研究源码,一般我们都从整体以及实例先入手,再研究细节,不至于一开始就“深陷其中而"当局者迷".

    JavaEdge
    HTTPJava
  • 200行代码落地人脸识别开锁应用

    2019年国庆,帮朋友实现了一个人脸识别进行开锁的功能,用在他的真人实景游戏业务中。几个月来运行稳定,体验良好,借着这个春节宅家的时间,整理一下这个应用的实现过程。

    高树磊
    人脸识别图像处理
  • 滑动验证码攻防对抗

        在业务安全领域,滑动验证码已经是国内继,传统字符型验证码之后的标配。众所周知,打码平台和机器学习这两种绕过验证码的方式,已经是攻击者很主流的思路,不再阐述。冷渗透介绍的是一个冷门的绕过思路和防御方案。这些积累,均来自于实战之中,希望有用。

    周俊辉
    HTTP网络安全安全网站
  • 程序员进阶必读,万字总结Mysql优化精华篇

    price decimal(8,2)有2位小数的定点数,定点数支持很大的数(甚至是超过int,bigint存储范围的数)

    程序员内点事
    全文检索缓存SQL数据库Python
  • 运维转型 | 运维人不再只是“救火英雄”

    各行各业都开启了数字化转型的进程,运维团队在这种时代的浪潮中又该何去何从?我在帮助一些企业落地了运维技术平台之后,开始反思这个问题,并将所思所想整理成本篇文章。

    嘉为科技
    企业运维自动化云计算

扫码关注云+社区

领取腾讯云代金券