前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的SocketClientSink

聊聊flink的SocketClientSink

作者头像
code4it
发布2018-12-21 13:13:18
1K0
发布2018-12-21 13:13:18
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下flink的SocketClientSink

DataStream.writeToSocket

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

代码语言:javascript
复制
    /**
     * Writes the DataStream to a socket as a byte array. The format of the
     * output is specified by a {@link SerializationSchema}.
     *
     * @param hostName
     *            host of the socket
     * @param port
     *            port of the socket
     * @param schema
     *            schema for serialization
     * @return the closed DataStream
     */
    @PublicEvolving
    public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema) {
        DataStreamSink<T> returnStream = addSink(new SocketClientSink<>(hostName, port, schema, 0));
        returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
        return returnStream;
    }
  • DataStream的writeToSocket方法,内部创建了SocketClientSink,这里传递了四个构造参数,分别是hostName、port、schema、maxNumRetries(这里为0)

SocketClientSink

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java

代码语言:javascript
复制
/**
 * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
 *
 * <p>The sink can be set to retry message sends after the sending failed.
 *
 * <p>The sink can be set to 'autoflush', in which case the socket stream is flushed after every
 * message. This significantly reduced throughput, but also decreases message latency.
 *
 * @param <IN> data to be written into the Socket.
 */
@PublicEvolving
public class SocketClientSink<IN> extends RichSinkFunction<IN> {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);

    private static final int CONNECTION_RETRY_DELAY = 500;


    private final SerializableObject lock = new SerializableObject();
    private final SerializationSchema<IN> schema;
    private final String hostName;
    private final int port;
    private final int maxNumRetries;
    private final boolean autoFlush;

    private transient Socket client;
    private transient OutputStream outputStream;

    private int retries;

    private volatile boolean isRunning = true;

    /**
     * Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure
     * and will not auto-flush the stream.
     *
     * @param hostName Hostname of the server to connect to.
     * @param port Port of the server.
     * @param schema Schema used to serialize the data into bytes.
     */
    public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema) {
        this(hostName, port, schema, 0);
    }

    /**
     * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
     * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
     * The sink will not auto-flush the stream.
     *
     * @param hostName Hostname of the server to connect to.
     * @param port Port of the server.
     * @param schema Schema used to serialize the data into bytes.
     * @param maxNumRetries The maximum number of retries after a message send failed.
     */
    public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema, int maxNumRetries) {
        this(hostName, port, schema, maxNumRetries, false);
    }

    /**
     * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
     * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
     *
     * @param hostName Hostname of the server to connect to.
     * @param port Port of the server.
     * @param schema Schema used to serialize the data into bytes.
     * @param maxNumRetries The maximum number of retries after a message send failed.
     * @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
     */
    public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema,
                            int maxNumRetries, boolean autoflush) {
        checkArgument(port > 0 && port < 65536, "port is out of range");
        checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");

        this.hostName = checkNotNull(hostName, "hostname must not be null");
        this.port = port;
        this.schema = checkNotNull(schema);
        this.maxNumRetries = maxNumRetries;
        this.autoFlush = autoflush;
    }

    // ------------------------------------------------------------------------
    //  Life cycle
    // ------------------------------------------------------------------------

    /**
     * Initialize the connection with the Socket in the server.
     * @param parameters Configuration.
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        try {
            synchronized (lock) {
                createConnection();
            }
        }
        catch (IOException e) {
            throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e);
        }
    }


    /**
     * Called when new data arrives to the sink, and forwards it to Socket.
     *
     * @param value The value to write to the socket.
     */
    @Override
    public void invoke(IN value) throws Exception {
        byte[] msg = schema.serialize(value);

        try {
            outputStream.write(msg);
            if (autoFlush) {
                outputStream.flush();
            }
        }
        catch (IOException e) {
            // if no re-tries are enable, fail immediately
            if (maxNumRetries == 0) {
                throw new IOException("Failed to send message '" + value + "' to socket server at "
                        + hostName + ":" + port + ". Connection re-tries are not enabled.", e);
            }

            LOG.error("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port +
                    ". Trying to reconnect..." , e);

            // do the retries in locked scope, to guard against concurrent close() calls
            // note that the first re-try comes immediately, without a wait!

            synchronized (lock) {
                IOException lastException = null;
                retries = 0;

                while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {

                    // first, clean up the old resources
                    try {
                        if (outputStream != null) {
                            outputStream.close();
                        }
                    }
                    catch (IOException ee) {
                        LOG.error("Could not close output stream from failed write attempt", ee);
                    }
                    try {
                        if (client != null) {
                            client.close();
                        }
                    }
                    catch (IOException ee) {
                        LOG.error("Could not close socket from failed write attempt", ee);
                    }

                    // try again
                    retries++;

                    try {
                        // initialize a new connection
                        createConnection();

                        // re-try the write
                        outputStream.write(msg);

                        // success!
                        return;
                    }
                    catch (IOException ee) {
                        lastException = ee;
                        LOG.error("Re-connect to socket server and send message failed. Retry time(s): " + retries, ee);
                    }

                    // wait before re-attempting to connect
                    lock.wait(CONNECTION_RETRY_DELAY);
                }

                // throw an exception if the task is still running, otherwise simply leave the method
                if (isRunning) {
                    throw new IOException("Failed to send message '" + value + "' to socket server at "
                            + hostName + ":" + port + ". Failed after " + retries + " retries.", lastException);
                }
            }
        }
    }

    /**
     * Closes the connection with the Socket server.
     */
    @Override
    public void close() throws Exception {
        // flag this as not running any more
        isRunning = false;

        // clean up in locked scope, so there is no concurrent change to the stream and client
        synchronized (lock) {
            // we notify first (this statement cannot fail). The notified thread will not continue
            // anyways before it can re-acquire the lock
            lock.notifyAll();

            try {
                if (outputStream != null) {
                    outputStream.close();
                }
            }
            finally {
                if (client != null) {
                    client.close();
                }
            }
        }
    }

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    private void createConnection() throws IOException {
        client = new Socket(hostName, port);
        client.setKeepAlive(true);
        client.setTcpNoDelay(true);

        outputStream = client.getOutputStream();
    }

    // ------------------------------------------------------------------------
    //  For testing
    // ------------------------------------------------------------------------

    int getCurrentNumberOfRetries() {
        synchronized (lock) {
            return retries;
        }
    }
}
  • SocketClientSink继承了RichSinkFunction,其autoFlush属性默认为false
  • open方法里头调用了createConnection,来初始化与socket的连接,如果此时出现IOException,则立马fail fast;createConnection的时候,这里设置的keepAlive及tcpNoDelay均为true
  • invoke方法首先调用schema.serialize方法来序列化value,然后调用socket的outputStream.write,如果autoFlush为true的话,则立马flush outputStream;如果出现IOException则立马进行重试,这里重试的逻辑直接写在catch里头,根据maxNumRetries来,重试的时候,就是先createConnection,然后调用outputStream.write,重试的delay为CONNECTION_RETRY_DELAY(500)

小结

  • DataStream的writeToSocket方法,内部创建了SocketClientSink,默认传递的maxNumRetries为0,而且没有调用带autoFlush属性默认为false的构造器,其autoFlush属性默认为false
  • open方法创建的socket,其keepAlive及tcpNoDelay均为true,如果open的时候出现IOException,则里头抛出异常终止运行
  • invoke方法比较简单,就是使用SerializationSchema来序列化value,然后write到outputStream;这里进行了简单的失败重试,默认的重试delay为CONNECTION_RETRY_DELAY(500),这个版本实现的重试比较简单,是同步进行的

doc

  • SocketClientSink
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-12-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • DataStream.writeToSocket
  • SocketClientSink
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档