专栏首页码匠的流水账聊聊flink的SocketClientSink
原创

聊聊flink的SocketClientSink

本文主要研究一下flink的SocketClientSink

DataStream.writeToSocket

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

    /**
     * Writes the DataStream to a socket as a byte array. The format of the
     * output is specified by a {@link SerializationSchema}.
     *
     * @param hostName
     *            host of the socket
     * @param port
     *            port of the socket
     * @param schema
     *            schema for serialization
     * @return the closed DataStream
     */
    @PublicEvolving
    public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema) {
        DataStreamSink<T> returnStream = addSink(new SocketClientSink<>(hostName, port, schema, 0));
        returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
        return returnStream;
    }
  • DataStream的writeToSocket方法,内部创建了SocketClientSink,这里传递了四个构造参数,分别是hostName、port、schema、maxNumRetries(这里为0)

SocketClientSink

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java

/**
 * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
 *
 * <p>The sink can be set to retry message sends after the sending failed.
 *
 * <p>The sink can be set to 'autoflush', in which case the socket stream is flushed after every
 * message. This significantly reduced throughput, but also decreases message latency.
 *
 * @param <IN> data to be written into the Socket.
 */
@PublicEvolving
public class SocketClientSink<IN> extends RichSinkFunction<IN> {
​
    private static final long serialVersionUID = 1L;
​
    private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
​
    private static final int CONNECTION_RETRY_DELAY = 500;
​
​
    private final SerializableObject lock = new SerializableObject();
    private final SerializationSchema<IN> schema;
    private final String hostName;
    private final int port;
    private final int maxNumRetries;
    private final boolean autoFlush;
​
    private transient Socket client;
    private transient OutputStream outputStream;
​
    private int retries;
​
    private volatile boolean isRunning = true;
​
    /**
     * Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure
     * and will not auto-flush the stream.
     *
     * @param hostName Hostname of the server to connect to.
     * @param port Port of the server.
     * @param schema Schema used to serialize the data into bytes.
     */
    public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema) {
        this(hostName, port, schema, 0);
    }
​
    /**
     * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
     * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
     * The sink will not auto-flush the stream.
     *
     * @param hostName Hostname of the server to connect to.
     * @param port Port of the server.
     * @param schema Schema used to serialize the data into bytes.
     * @param maxNumRetries The maximum number of retries after a message send failed.
     */
    public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema, int maxNumRetries) {
        this(hostName, port, schema, maxNumRetries, false);
    }
​
    /**
     * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
     * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
     *
     * @param hostName Hostname of the server to connect to.
     * @param port Port of the server.
     * @param schema Schema used to serialize the data into bytes.
     * @param maxNumRetries The maximum number of retries after a message send failed.
     * @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
     */
    public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema,
                            int maxNumRetries, boolean autoflush) {
        checkArgument(port > 0 && port < 65536, "port is out of range");
        checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
​
        this.hostName = checkNotNull(hostName, "hostname must not be null");
        this.port = port;
        this.schema = checkNotNull(schema);
        this.maxNumRetries = maxNumRetries;
        this.autoFlush = autoflush;
    }
​
    // ------------------------------------------------------------------------
    //  Life cycle
    // ------------------------------------------------------------------------
​
    /**
     * Initialize the connection with the Socket in the server.
     * @param parameters Configuration.
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        try {
            synchronized (lock) {
                createConnection();
            }
        }
        catch (IOException e) {
            throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e);
        }
    }
​
​
    /**
     * Called when new data arrives to the sink, and forwards it to Socket.
     *
     * @param value The value to write to the socket.
     */
    @Override
    public void invoke(IN value) throws Exception {
        byte[] msg = schema.serialize(value);
​
        try {
            outputStream.write(msg);
            if (autoFlush) {
                outputStream.flush();
            }
        }
        catch (IOException e) {
            // if no re-tries are enable, fail immediately
            if (maxNumRetries == 0) {
                throw new IOException("Failed to send message '" + value + "' to socket server at "
                        + hostName + ":" + port + ". Connection re-tries are not enabled.", e);
            }
​
            LOG.error("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port +
                    ". Trying to reconnect..." , e);
​
            // do the retries in locked scope, to guard against concurrent close() calls
            // note that the first re-try comes immediately, without a wait!
​
            synchronized (lock) {
                IOException lastException = null;
                retries = 0;
​
                while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {
​
                    // first, clean up the old resources
                    try {
                        if (outputStream != null) {
                            outputStream.close();
                        }
                    }
                    catch (IOException ee) {
                        LOG.error("Could not close output stream from failed write attempt", ee);
                    }
                    try {
                        if (client != null) {
                            client.close();
                        }
                    }
                    catch (IOException ee) {
                        LOG.error("Could not close socket from failed write attempt", ee);
                    }
​
                    // try again
                    retries++;
​
                    try {
                        // initialize a new connection
                        createConnection();
​
                        // re-try the write
                        outputStream.write(msg);
​
                        // success!
                        return;
                    }
                    catch (IOException ee) {
                        lastException = ee;
                        LOG.error("Re-connect to socket server and send message failed. Retry time(s): " + retries, ee);
                    }
​
                    // wait before re-attempting to connect
                    lock.wait(CONNECTION_RETRY_DELAY);
                }
​
                // throw an exception if the task is still running, otherwise simply leave the method
                if (isRunning) {
                    throw new IOException("Failed to send message '" + value + "' to socket server at "
                            + hostName + ":" + port + ". Failed after " + retries + " retries.", lastException);
                }
            }
        }
    }
​
    /**
     * Closes the connection with the Socket server.
     */
    @Override
    public void close() throws Exception {
        // flag this as not running any more
        isRunning = false;
​
        // clean up in locked scope, so there is no concurrent change to the stream and client
        synchronized (lock) {
            // we notify first (this statement cannot fail). The notified thread will not continue
            // anyways before it can re-acquire the lock
            lock.notifyAll();
​
            try {
                if (outputStream != null) {
                    outputStream.close();
                }
            }
            finally {
                if (client != null) {
                    client.close();
                }
            }
        }
    }
​
    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------
​
    private void createConnection() throws IOException {
        client = new Socket(hostName, port);
        client.setKeepAlive(true);
        client.setTcpNoDelay(true);
​
        outputStream = client.getOutputStream();
    }
​
    // ------------------------------------------------------------------------
    //  For testing
    // ------------------------------------------------------------------------
​
    int getCurrentNumberOfRetries() {
        synchronized (lock) {
            return retries;
        }
    }
}
  • SocketClientSink继承了RichSinkFunction,其autoFlush属性默认为false
  • open方法里头调用了createConnection,来初始化与socket的连接,如果此时出现IOException,则立马fail fast;createConnection的时候,这里设置的keepAlive及tcpNoDelay均为true
  • invoke方法首先调用schema.serialize方法来序列化value,然后调用socket的outputStream.write,如果autoFlush为true的话,则立马flush outputStream;如果出现IOException则立马进行重试,这里重试的逻辑直接写在catch里头,根据maxNumRetries来,重试的时候,就是先createConnection,然后调用outputStream.write,重试的delay为CONNECTION_RETRY_DELAY(500)

小结

  • DataStream的writeToSocket方法,内部创建了SocketClientSink,默认传递的maxNumRetries为0,而且没有调用带autoFlush属性默认为false的构造器,其autoFlush属性默认为false
  • open方法创建的socket,其keepAlive及tcpNoDelay均为true,如果open的时候出现IOException,则里头抛出异常终止运行
  • invoke方法比较简单,就是使用SerializationSchema来序列化value,然后write到outputStream;这里进行了简单的失败重试,默认的重试delay为CONNECTION_RETRY_DELAY(500),这个版本实现的重试比较简单,是同步进行的

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊flink的SocketClientSink

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/data...

    codecraft
  • 聊聊flink的ParallelIteratorInputFormat

    本文主要研究一下flink的ParallelIteratorInputFormat

    codecraft
  • 聊聊flink的ParallelIteratorInputFormat

    本文主要研究一下flink的ParallelIteratorInputFormat

    codecraft
  • 聊聊flink的SocketClientSink

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/data...

    codecraft
  • 【Codeforces】1213A - Chips Moving

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    喜欢ctrl的cxk
  • tomcat 配置好tomcat-user.xml 无法访问

    小贝壳
  • 用户注意力与行为在虚拟现实艺术中相遇(CS HC)

    随着消费者虚拟现实(VR)耳机和创意工具的激增,内容创作者已开始尝试使用沉浸式媒体尝试新形式的交互式受众体验。 了解虚拟环境中的用户注意力和行为可以极大地指导V...

    小童
  • Android bionic 和 其中的libc由什么组成

    The C library. Stuff like fopen(3) and kill(2).

    望天
  • 利用迭代细化进行依存关系语法分析的递归非自回归图到图转换器(CS and Language

    我们提出了一种通过非自回归图到图转换器的递归应用程序对任意图进行迭代细化的递归非自回归图到图转换器(RNG-Tr)。虽然之前自回归图预测中已经使用了\newci...

    用户6868260
  • 神经网络零空间分析的异常值检测 (CS)

    许多机器学习分类系统缺乏能力意识。具体而言,许多系统缺乏识别何时将异常值(例如,不同于训练数据分布且未在训练数据分布中表示的样本)呈现给系统的能力。检测异常值的...

    DDDDDaemon

扫码关注云+社区

领取腾讯云代金券