本文主要研究一下flink的SocketClientSink
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
/**
* Writes the DataStream to a socket as a byte array. The format of the
* output is specified by a {@link SerializationSchema}.
*
* @param hostName
* host of the socket
* @param port
* port of the socket
* @param schema
* schema for serialization
* @return the closed DataStream
*/
@PublicEvolving
public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema) {
DataStreamSink<T> returnStream = addSink(new SocketClientSink<>(hostName, port, schema, 0));
returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
return returnStream;
}
这里为0
)flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
/**
* Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
*
* <p>The sink can be set to retry message sends after the sending failed.
*
* <p>The sink can be set to 'autoflush', in which case the socket stream is flushed after every
* message. This significantly reduced throughput, but also decreases message latency.
*
* @param <IN> data to be written into the Socket.
*/
@PublicEvolving
public class SocketClientSink<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
private static final int CONNECTION_RETRY_DELAY = 500;
private final SerializableObject lock = new SerializableObject();
private final SerializationSchema<IN> schema;
private final String hostName;
private final int port;
private final int maxNumRetries;
private final boolean autoFlush;
private transient Socket client;
private transient OutputStream outputStream;
private int retries;
private volatile boolean isRunning = true;
/**
* Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure
* and will not auto-flush the stream.
*
* @param hostName Hostname of the server to connect to.
* @param port Port of the server.
* @param schema Schema used to serialize the data into bytes.
*/
public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema) {
this(hostName, port, schema, 0);
}
/**
* Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
* A value of -1 for the number of retries will cause the system to retry an infinite number of times.
* The sink will not auto-flush the stream.
*
* @param hostName Hostname of the server to connect to.
* @param port Port of the server.
* @param schema Schema used to serialize the data into bytes.
* @param maxNumRetries The maximum number of retries after a message send failed.
*/
public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema, int maxNumRetries) {
this(hostName, port, schema, maxNumRetries, false);
}
/**
* Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
* A value of -1 for the number of retries will cause the system to retry an infinite number of times.
*
* @param hostName Hostname of the server to connect to.
* @param port Port of the server.
* @param schema Schema used to serialize the data into bytes.
* @param maxNumRetries The maximum number of retries after a message send failed.
* @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
*/
public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema,
int maxNumRetries, boolean autoflush) {
checkArgument(port > 0 && port < 65536, "port is out of range");
checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
this.hostName = checkNotNull(hostName, "hostname must not be null");
this.port = port;
this.schema = checkNotNull(schema);
this.maxNumRetries = maxNumRetries;
this.autoFlush = autoflush;
}
// ------------------------------------------------------------------------
// Life cycle
// ------------------------------------------------------------------------
/**
* Initialize the connection with the Socket in the server.
* @param parameters Configuration.
*/
@Override
public void open(Configuration parameters) throws Exception {
try {
synchronized (lock) {
createConnection();
}
}
catch (IOException e) {
throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e);
}
}
/**
* Called when new data arrives to the sink, and forwards it to Socket.
*
* @param value The value to write to the socket.
*/
@Override
public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
outputStream.write(msg);
if (autoFlush) {
outputStream.flush();
}
}
catch (IOException e) {
// if no re-tries are enable, fail immediately
if (maxNumRetries == 0) {
throw new IOException("Failed to send message '" + value + "' to socket server at "
+ hostName + ":" + port + ". Connection re-tries are not enabled.", e);
}
LOG.error("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port +
". Trying to reconnect..." , e);
// do the retries in locked scope, to guard against concurrent close() calls
// note that the first re-try comes immediately, without a wait!
synchronized (lock) {
IOException lastException = null;
retries = 0;
while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {
// first, clean up the old resources
try {
if (outputStream != null) {
outputStream.close();
}
}
catch (IOException ee) {
LOG.error("Could not close output stream from failed write attempt", ee);
}
try {
if (client != null) {
client.close();
}
}
catch (IOException ee) {
LOG.error("Could not close socket from failed write attempt", ee);
}
// try again
retries++;
try {
// initialize a new connection
createConnection();
// re-try the write
outputStream.write(msg);
// success!
return;
}
catch (IOException ee) {
lastException = ee;
LOG.error("Re-connect to socket server and send message failed. Retry time(s): " + retries, ee);
}
// wait before re-attempting to connect
lock.wait(CONNECTION_RETRY_DELAY);
}
// throw an exception if the task is still running, otherwise simply leave the method
if (isRunning) {
throw new IOException("Failed to send message '" + value + "' to socket server at "
+ hostName + ":" + port + ". Failed after " + retries + " retries.", lastException);
}
}
}
}
/**
* Closes the connection with the Socket server.
*/
@Override
public void close() throws Exception {
// flag this as not running any more
isRunning = false;
// clean up in locked scope, so there is no concurrent change to the stream and client
synchronized (lock) {
// we notify first (this statement cannot fail). The notified thread will not continue
// anyways before it can re-acquire the lock
lock.notifyAll();
try {
if (outputStream != null) {
outputStream.close();
}
}
finally {
if (client != null) {
client.close();
}
}
}
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private void createConnection() throws IOException {
client = new Socket(hostName, port);
client.setKeepAlive(true);
client.setTcpNoDelay(true);
outputStream = client.getOutputStream();
}
// ------------------------------------------------------------------------
// For testing
// ------------------------------------------------------------------------
int getCurrentNumberOfRetries() {
synchronized (lock) {
return retries;
}
}
}
500
)500
),这个版本实现的重试比较简单,是同步进行的原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。