聊聊flink的CsvReader

本文主要研究一下flink的CsvReader

实例

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
​
        DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
                .pojoType(RecordDto.class, "playerName", "country", "year", "game", "gold", "silver", "bronze", "total");
​
        csvInput.map(new MapFunction<RecordDto, RecordDto>() {
            @Override
            public RecordDto map(RecordDto value) throws Exception {
                LOGGER.info("execute map:{}",value);
                TimeUnit.SECONDS.sleep(5);
                return value;
            }
        }).print();

ExecutionEnvironment.readCsvFile

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java

    /**
     * Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to
     * define parameters and field types and will eventually produce the DataSet that corresponds to
     * the read and parsed CSV input.
     *
     * @param filePath The path of the CSV file.
     * @return A CsvReader that can be used to configure the CSV input.
     */
    public CsvReader readCsvFile(String filePath) {
        return new CsvReader(filePath, this);
    }
  • 这里根据filePath创建了CsvReader

CsvReader

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvReader.java

    public CsvReader(String filePath, ExecutionEnvironment executionContext) {
        this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext);
    }
​
    public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
        Preconditions.checkNotNull(filePath, "The file path may not be null.");
        Preconditions.checkNotNull(executionContext, "The execution context may not be null.");
​
        this.path = filePath;
        this.executionContext = executionContext;
    }
​
    /**
     * Configures the reader to read the CSV data and parse it to the given type. The all fields of the type
     * must be public or able to set value. The type information for the fields is obtained from the type class.
     *
     * @param pojoType The class of the target POJO.
     * @param pojoFields The fields of the POJO which are mapped to CSV fields.
     * @return The DataSet representing the parsed CSV data.
     */
    public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields) {
        Preconditions.checkNotNull(pojoType, "The POJO type class must not be null.");
        Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO.");
​
        final TypeInformation<T> ti = TypeExtractor.createTypeInfo(pojoType);
        if (!(ti instanceof PojoTypeInfo)) {
            throw new IllegalArgumentException(
                "The specified class is not a POJO. The type class must meet the POJO requirements. Found: " + ti);
        }
        final PojoTypeInfo<T> pti = (PojoTypeInfo<T>) ti;
​
        CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, pti, pojoFields, this.includedMask);
​
        configureInputFormat(inputFormat);
​
        return new DataSource<T>(executionContext, inputFormat, pti, Utils.getCallLocationName());
    }
  • CsvReader提供了pojoType方法,用于将csv的数据映射为java类型,同时转换为flink的DataSource;创建DataSource的时候,这里提供了PojoCsvInputFormat以及PojoTypeInfo

Task

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......
​
    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
​
            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------
​
            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;
​
            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }
​
            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
​
            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);
​
            // run the invokable
            invokable.invoke();
​
            //......
    }
}
  • Task的run方法会调用invokable.invoke(),这里的invokable为DataSourceTask

DataSourceTask.invoke

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/DataSourceTask.java

    @Override
    public void invoke() throws Exception {
        // --------------------------------------------------------------------
        // Initialize
        // --------------------------------------------------------------------
        initInputFormat();
​
        LOG.debug(getLogString("Start registering input and output"));
​
        try {
            initOutputs(getUserCodeClassLoader());
        } catch (Exception ex) {
            throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " +
                    ex.getMessage(), ex);
        }
​
        LOG.debug(getLogString("Finished registering input and output"));
​
        // --------------------------------------------------------------------
        // Invoke
        // --------------------------------------------------------------------
        LOG.debug(getLogString("Starting data source operator"));
​
        RuntimeContext ctx = createRuntimeContext();
​
        final Counter numRecordsOut;
        {
            Counter tmpNumRecordsOut;
            try {
                OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
                ioMetricGroup.reuseInputMetricsForTask();
                if (this.config.getNumberOfChainedStubs() == 0) {
                    ioMetricGroup.reuseOutputMetricsForTask();
                }
                tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                tmpNumRecordsOut = new SimpleCounter();
            }
            numRecordsOut = tmpNumRecordsOut;
        }
        
        Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
​
        if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
            ((RichInputFormat) this.format).setRuntimeContext(ctx);
            LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
            ((RichInputFormat) this.format).openInputFormat();
            LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
        }
​
        ExecutionConfig executionConfig = getExecutionConfig();
​
        boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
​
        LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
        
        final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
        
        try {
            // start all chained tasks
            BatchTask.openChainedTasks(this.chainedTasks, this);
            
            // get input splits to read
            final Iterator<InputSplit> splitIterator = getInputSplits();
            
            // for each assigned input split
            while (!this.taskCanceled && splitIterator.hasNext())
            {
                // get start and end
                final InputSplit split = splitIterator.next();
​
                LOG.debug(getLogString("Opening input split " + split.toString()));
                
                final InputFormat<OT, InputSplit> format = this.format;
            
                // open input format
                format.open(split);
    
                LOG.debug(getLogString("Starting to read input from split " + split.toString()));
                
                try {
                    final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut);
​
                    if (objectReuseEnabled) {
                        OT reuse = serializer.createInstance();
​
                        // as long as there is data to read
                        while (!this.taskCanceled && !format.reachedEnd()) {
​
                            OT returned;
                            if ((returned = format.nextRecord(reuse)) != null) {
                                output.collect(returned);
                            }
                        }
                    } else {
                        // as long as there is data to read
                        while (!this.taskCanceled && !format.reachedEnd()) {
                            OT returned;
                            if ((returned = format.nextRecord(serializer.createInstance())) != null) {
                                output.collect(returned);
                            }
                        }
                    }
​
                    if (LOG.isDebugEnabled() && !this.taskCanceled) {
                        LOG.debug(getLogString("Closing input split " + split.toString()));
                    }
                } finally {
                    // close. We close here such that a regular close throwing an exception marks a task as failed.
                    format.close();
                }
                completedSplitsCounter.inc();
            } // end for all input splits
​
            // close the collector. if it is a chaining task collector, it will close its chained tasks
            this.output.close();
​
            // close all chained tasks letting them report failure
            BatchTask.closeChainedTasks(this.chainedTasks, this);
​
        }
        catch (Exception ex) {
            // close the input, but do not report any exceptions, since we already have another root cause
            try {
                this.format.close();
            } catch (Throwable ignored) {}
​
            BatchTask.cancelChainedTasks(this.chainedTasks);
​
            ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
​
            if (ex instanceof CancelTaskException) {
                // forward canceling exception
                throw ex;
            }
            else if (!this.taskCanceled) {
                // drop exception, if the task was canceled
                BatchTask.logAndThrowException(ex, this);
            }
        } finally {
            BatchTask.clearWriters(eventualOutputs);
            // --------------------------------------------------------------------
            // Closing
            // --------------------------------------------------------------------
            if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
                ((RichInputFormat) this.format).closeInputFormat();
                LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
            }
        }
​
        if (!this.taskCanceled) {
            LOG.debug(getLogString("Finished data source operator"));
        }
        else {
            LOG.debug(getLogString("Data source operator cancelled"));
        }
    }
  • DataSourceTask的invoke方法这里只要不是taskCanceled及format.reachedEnd(),都会调用format.nextRecord(serializer.createInstance())来拉取数据,然后执行output.collect(returned)
  • 这里的format为CsvInputFormat(PojoCsvInputFormat),不过nextRecord以及reachedEnd方法它是调用的父类DelimitedInputFormat
  • PojoCsvInputFormat继承了抽象类CsvInputFormat,而CsvInputFormat继承了抽象类GenericCsvInputFormat,GenericCsvInputFormat则继承了抽象类DelimitedInputFormat

DelimitedInputFormat

flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/DelimitedInputFormat.java

    /**
     * The default read buffer size = 1MB.
     */
    private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;
​
    private transient byte[] readBuffer;
​
    private int bufferSize = -1;
​
    private void initBuffers() {
        this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;
​
        if (this.bufferSize <= this.delimiter.length) {
            throw new IllegalArgumentException("Buffer size must be greater than length of delimiter.");
        }
​
        if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) {
            this.readBuffer = new byte[this.bufferSize];
        }
        if (this.wrapBuffer == null || this.wrapBuffer.length < 256) {
            this.wrapBuffer = new byte[256];
        }
​
        this.readPos = 0;
        this.limit = 0;
        this.overLimit = false;
        this.end = false;
    }
​
    /**
     * Checks whether the current split is at its end.
     * 
     * @return True, if the split is at its end, false otherwise.
     */
    @Override
    public boolean reachedEnd() {
        return this.end;
    }
    
    @Override
    public OT nextRecord(OT record) throws IOException {
        if (readLine()) {
            return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
        } else {
            this.end = true;
            return null;
        }
    }
​
    /**
     * Fills the read buffer with bytes read from the file starting from an offset.
     */
    private boolean fillBuffer(int offset) throws IOException {
        int maxReadLength = this.readBuffer.length - offset;
        // special case for reading the whole split.
        if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
            int read = this.stream.read(this.readBuffer, offset, maxReadLength);
            if (read == -1) {
                this.stream.close();
                this.stream = null;
                return false;
            } else {
                this.readPos = offset;
                this.limit = read;
                return true;
            }
        }
        
        // else ..
        int toRead;
        if (this.splitLength > 0) {
            // if we have more data, read that
            toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength;
        }
        else {
            // if we have exhausted our split, we need to complete the current record, or read one
            // more across the next split.
            // the reason is that the next split will skip over the beginning until it finds the first
            // delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
            // previous split.
            toRead = maxReadLength;
            this.overLimit = true;
        }
​
        int read = this.stream.read(this.readBuffer, offset, toRead);
​
        if (read == -1) {
            this.stream.close();
            this.stream = null;
            return false;
        } else {
            this.splitLength -= read;
            this.readPos = offset; // position from where to start reading
            this.limit = read + offset; // number of valid bytes in the read buffer
            return true;
        }
    }
  • DelimitedInputFormat首先调用readLine()读取数据到currBuffer,如果有数据,则调用子类CsvInputFormat实现的readRecord方法,这里传递了currBuffer、currOffset、currLen
  • DelimitedInputFormat的readLine()方法里头会调用fillBuffer方法,fillBuffer方法会根据splitLength(DelimitedInputFormat.getStatistics方法里头FileInputSplit的length)及maxReadLength来确定toRead,之后从offset开始到toRead从文件读取数据到readBuffer中,然后设置currBuffer、currOffset、currLen
  • readBuffer在init的时候会设置bufferSize,bufferSize初始化的时候为-1,在getStatistics方法里头被设置为4 * 1024,而DEFAULT_READ_BUFFER_SIZE是1024*1024

CsvInputFormat.readRecord

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvInputFormat.java

    @Override
    public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
        /*
         * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
         */
        // Found window's end line, so find carriage return before the newline
        if (this.lineDelimiterIsLinebreak && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') {
            //reduce the number of bytes so that the Carriage return is not taken as data
            numBytes--;
        }
​
        if (commentPrefix != null && commentPrefix.length <= numBytes) {
            //check record for comments
            boolean isComment = true;
            for (int i = 0; i < commentPrefix.length; i++) {
                if (commentPrefix[i] != bytes[offset + i]) {
                    isComment = false;
                    break;
                }
            }
            if (isComment) {
                this.commentCount++;
                return null;
            }
        }
​
        if (parseRecord(parsedValues, bytes, offset, numBytes)) {
            return fillRecord(reuse, parsedValues);
        } else {
            this.invalidLineCount++;
            return null;
        }
    }
  • CsvInputFormat的readRecord方法负责读取原始数据,之后通过parseRecord方法解析原始数据填充到parsedValues(Object[]),之后调用子类的fillRecord方法(这里是PojoCsvInputFormat)将parsedValues填充到reuse对象(该对象是DataSourceTask在调用format.nextRecord时传入的serializer.createInstance())

PojoCsvInputFormat.fillRecord

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/PojoCsvInputFormat.java

/**
 * Input format that reads csv into POJOs.
 * @param <OUT> resulting POJO type
 */
@Internal
public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
​
    //......
​
    @Override
    public void open(FileInputSplit split) throws IOException {
        super.open(split);
​
        pojoFields = new Field[pojoFieldNames.length];
​
        Map<String, Field> allFields = new HashMap<String, Field>();
​
        findAllFields(pojoTypeClass, allFields);
​
        for (int i = 0; i < pojoFieldNames.length; i++) {
            pojoFields[i] = allFields.get(pojoFieldNames[i]);
​
            if (pojoFields[i] != null) {
                pojoFields[i].setAccessible(true);
            } else {
                throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
            }
        }
    }
​
    @Override
    public OUT fillRecord(OUT reuse, Object[] parsedValues) {
        for (int i = 0; i < parsedValues.length; i++) {
            try {
                pojoFields[i].set(reuse, parsedValues[i]);
            } catch (IllegalAccessException e) {
                throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
            }
        }
        return reuse;
    }
​
    //......
}
  • PojoCsvInputFormat的open方法用于在executor的executePlan的时候调用,提前使用反射获取所需的Field
  • fillRecord方法这里仅仅是使用反射将parsedValues设置到pojo中
  • 如果反射设置不成功则抛出IllegalAccessException异常

CountingCollector.collect

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java

public class CountingCollector<OUT> implements Collector<OUT> {
    private final Collector<OUT> collector;
    private final Counter numRecordsOut;
​
    public CountingCollector(Collector<OUT> collector, Counter numRecordsOut) {
        this.collector = collector;
        this.numRecordsOut = numRecordsOut;
    }
​
    @Override
    public void collect(OUT record) {
        this.numRecordsOut.inc();
        this.collector.collect(record);
    }
​
    @Override
    public void close() {
        this.collector.close();
    }
}
  • 这里的collector为org.apache.flink.runtime.operators.chaining.ChainedMapDriver

ChainedMapDriver

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java

    @Override
    public void collect(IT record) {
        try {
            this.numRecordsIn.inc();
            this.outputCollector.collect(this.mapper.map(record));
        } catch (Exception ex) {
            throw new ExceptionInChainedStubException(this.taskName, ex);
        }
    }
  • 这里会先调用mapper的map方法,执行map逻辑,然后调用outputCollector.collect将结果发送出去
  • 这里的outputCollector为CountingCollector,它里头包装的collector为org.apache.flink.runtime.operators.shipping.OutputCollector

OutputCollector

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputCollector.java

    /**
     * Collects a record and emits it to all writers.
     */
    @Override
    public void collect(T record)  {
        if (record != null) {
            this.delegate.setInstance(record);
            try {
                for (RecordWriter<SerializationDelegate<T>> writer : writers) {
                    writer.emit(this.delegate);
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(), e);
            }
        }
        else {
            throw new NullPointerException("The system does not support records that are null."
                                + "Null values are only supported as fields inside other objects.");
        }
    }
  • 这里调用RecordWriter的emit方法来发射数据

RecordWriter

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java

    public void emit(T record) throws IOException, InterruptedException {
        for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
            sendToTarget(record, targetChannel);
        }
    }
  • 这里通过channelSelector.selectChannels返回要发送的targetChannel,这里的channelSelector为OutputEmitter

OutputEmitter

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputEmitter.java

    @Override
    public final int[] selectChannels(SerializationDelegate<T> record, int numberOfChannels) {
        switch (strategy) {
        case FORWARD:
            return forward();
        case PARTITION_RANDOM:
        case PARTITION_FORCED_REBALANCE:
            return robin(numberOfChannels);
        case PARTITION_HASH:
            return hashPartitionDefault(record.getInstance(), numberOfChannels);
        case BROADCAST:
            return broadcast(numberOfChannels);
        case PARTITION_CUSTOM:
            return customPartition(record.getInstance(), numberOfChannels);
        case PARTITION_RANGE:
            return rangePartition(record.getInstance(), numberOfChannels);
        default:
            throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name());
        }
    }
​
    private int[] forward() {
        return this.channels;
    }
  • 这里的strategy为FORWARD

小结

  • CsvReader创建的inputFormat为PojoCsvInputFormat,它主要的方法是fillRecord,利用反射填充数据,而数据的读取则是在DelimitedInputFormat的readLine方法中,它会调用fillBuffer方法,而fillBuffer方法会根据splitLength(DelimitedInputFormat.getStatistics方法里头FileInputSplit的length)及maxReadLength来确定toRead,之后从offset开始到toRead从文件读取数据到readBuffer中
  • DataSourceTask在invoke方法里头会不断循环调用format.nextRecord,然后挨个调用output.collect方法(包装了org.apache.flink.runtime.operators.shipping.OutputCollector的CountingCollector),直到taskCanceled或者format.reachedEnd()
  • output.collect方法,这里的output为CountingCollector,它代理的collector为ChainedMapDriver;ChainedMapDriver会对读取的数据进行map操作,最后将map的结果传递给代理了OutputCollector的CountingCollector,OutputCollector使用RecordWriter来发射数据

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏小樱的经验随笔

HDU 4256 The Famous Clock

The Famous Clock Time Limit: 2000/1000 MS (Java/Others)    Memory Limit: 32768/3...

28260
来自专栏码匠的流水账

聊聊storm的reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java

22430
来自专栏码匠的流水账

聊聊hystrix的execution.isolation.semaphore.maxConcurrentRequests属性

本文主要研究一下hystrix的execution.isolation.semaphore.maxConcurrentRequests属性

11310
来自专栏机器学习与自然语言处理

06-图2 Saving James Bond - Easy Version

题目来源:http://pta.patest.cn/pta/test/18/exam/4/question/625 This time let us consi...

21460
来自专栏码匠的流水账

聊聊flink的InputFormatSourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/envi...

21520
来自专栏程序你好

如何使用Java Stream Collectors(归约器)?

Java 8引入了Stream API,它允许我们以声明的方式处理数据。此外,Stream还可以在不需要编写多线程代码的情况下使用多核架构。

10610
来自专栏函数式编程语言及工具

SDP(7):Cassandra- Cassandra-Engine:Streaming

  akka在alpakka工具包里提供了对cassandra数据库的streaming功能。简单来讲就是用一个CQL-statement读取cassandra...

35560
来自专栏静默虚空的博客

JAVA 设计模式 访问者模式

用途 访问者模式 (Visitor) 表示一个作用于某对象结构中的各元素的操作。 它使你可以在不改变各元素的类的前提下定义作用于这些元素的新操作。 访问者模...

21760
来自专栏函数式编程语言及工具

SDP(5):ScalikeJDBC- JDBC-Engine:Streaming

  作为一种通用的数据库编程引擎,用Streaming来应对海量数据的处理是必备功能。同样,我们还是通过一种Context传递产生流的要求。因为Streamin...

32940
来自专栏java、Spring、技术分享

深入分析Spring MVC中RequestBody与ResponseBody

  在SpringMVC中,可以使用@RequestBody和@ResponseBody两个注解,分别完成请求报文到对象和对象到响应报文的转换。在Sprin...

37810

扫码关注云+社区

领取腾讯云代金券