前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink自定义OSS的Sink

Flink自定义OSS的Sink

原创
作者头像
平常心
修改2021-06-16 10:04:37
5.1K1
修改2021-06-16 10:04:37
举报
文章被收录于专栏:个人总结系列个人总结系列

1.背景

基于上篇说明的OSS异常内容和功能弱的缘故,考虑自定义Sink处理的方式。主要关注点是文件命名的动态化和高效批写入。

2.代码内容

代码语言:javascript
复制
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSS;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClientBuilder;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.model.AppendObjectRequest;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.model.AppendObjectResult;
import org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.model.ObjectMetadata;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.io.ByteArrayInputStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

public class OssSink extends RichSinkFunction<String> {

    private static final  String OSS_ENDPOINT = "https://oss-xxxx.aliyuncs.com";
    private static final  String OSS_ACCESS_KEYID = "xxxxx";
    private static final  String OSS_ACCESSKEYSECRET = "xxxx";
    private static final  String OSS_BUCKETNAME = "xxxxx";

    private List<String> result;
    private Map<String,String> cap;
    private Long startTs;


    private String path;
    private DateFormat format;
    private ObjectMetadata meta;
    private OSS ossClient;
    private AppendObjectRequest appendObjectRequest;
    private AppendObjectResult appendObjectResult;

    @Override
    public void open(Configuration parameters) throws Exception {
        result = new ArrayList<>();
        cap = new HashMap<>();
        startTs = System.currentTimeMillis();

        this.ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, OSS_ACCESS_KEYID, OSS_ACCESSKEYSECRET);
        this.meta = new ObjectMetadata();
        // 指定上传的内容类型。
        meta.setContentType("text/plain");
        format=new SimpleDateFormat("yyyyMMdd");
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        result.add(value + "\n");
        //TODO: 3是Map存储还是其他的进行批量写入
        if (100 <= result.size()) {
            batchWrite();
            result.clear();
        }
    }

    @Override
    public void close() throws Exception {
        this.ossClient.shutdown();
        super.close();
    }


    private void batchWrite(){
        StringBuffer sb = new StringBuffer();
        for (String str: result) {
            sb.append(str);
        }
        Date date=new Date();
        String day = format.format(date);
        path = "user_event/day=" + day + "/id=${id}/sdk=sa_sdk/user_event.json";
        appendObjectRequest = new AppendObjectRequest(OSS_BUCKETNAME, path, new ByteArrayInputStream(sb.toString().getBytes()), meta);
        if (null == appendObjectResult) {
            appendObjectRequest.setPosition(0l);
        } else {
            Long position = appendObjectResult.getNextPosition();
            appendObjectRequest.setPosition(position);
        }
        appendObjectResult = ossClient.appendObject(appendObjectRequest);
    }
}

3. 进阶代码版本

代码语言:javascript
复制
@Override
    public void invoke(Tuple2<String,String>  value, Context context) throws Exception {
        result.add(value);
        final long currentTimeMillis = System.currentTimeMillis();
        if (100 <= result.size() || (currentTimeMillis - startTs) >= 60000) {
            startTs = currentTimeMillis;
            batchWrite();
            result.clear();
        }
    }

    @Override
    public void close() throws Exception {
        if (this.ossClient != null) {
            this.ossClient.shutdown();
        }
        super.close();
    }

    private void batchWrite(){
        if (this.ossClient != null) {
            this.ossClient.shutdown();
        }

        this.ossClient =  new OSSClientBuilder().build(OSS_ENDPOINT, OSS_ACCESS_KEYID, OSS_ACCESSKEYSECRET);
        LOG.info( "{} oss batch start ms {}",Thread.currentThread().getName(), startTs);

        Map<String, StringBuffer> batchMap = new HashMap<>();
        for (Tuple2<String,String> value: result) {
            String key = value.f0;
            String message = value.f1;

            StringBuffer sb = batchMap.get(key);
            if (null == sb) {
                sb = new StringBuffer();
            }
            sb.append(message).append("\n");
            batchMap.put(key, sb);
        }

        final Set<String> keySet = batchMap.keySet();
        for (String key : keySet) {
            final String[] split = key.split("\\^");
            String day = split[0];
            String tid = split[1];
            path = "user_event/day=" + day + "/tid=" + tid + "/sdk=sa_sdk/user_event_"+startTs+".json";

            final String messages = batchMap.get(key).toString();
            appendObjectRequest = new AppendObjectRequest(OSS_BUCKETNAME, path, new ByteArrayInputStream(messages.getBytes()), meta);
            final boolean exist = this.ossClient.doesObjectExist(OSS_BUCKETNAME, path);
            if (exist) {
                final ObjectMetadata metadata = this.ossClient.getObjectMetadata(OSS_BUCKETNAME, path);
                Long position =  metadata.getContentLength();
                appendObjectRequest.setPosition(position);
            } else {
                appendObjectRequest.setPosition(0L);
            }

            final AppendObjectResult appendObjectResult = this.ossClient.appendObject(appendObjectRequest);
            LOG.info( "oss position {}", appendObjectResult.getNextPosition());
        }
    }

这种写法,并发运行一段时间,会报

代码语言:javascript
复制
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.ClientException: Connection reset by peer: socket write error
[ErrorCode]: SocketException
[RequestId]: Unknown
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.utils.ExceptionFactory.createNetworkException(ExceptionFactory.java:71)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.DefaultServiceClient.sendRequestCore(DefaultServiceClient.java:127)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequestImpl(ServiceClient.java:133)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequest(ServiceClient.java:70)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.send(OSSOperation.java:83)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:145)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:102)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.writeObjectInternal(OSSObjectOperation.java:897)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.appendObject(OSSObjectOperation.java:185)
	at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.appendObject(OSSClient.java:595)
	at com.am.oss.OssSink.batchWrite(OssSink.java:101)

所以更改方式

代码语言:javascript
复制
@Override
    public void open(Configuration parameters) throws Exception {
        result = new ArrayList<>();
        startTs = System.currentTimeMillis();

        this.ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, OSS_ACCESS_KEYID, OSS_ACCESSKEYSECRET);
    }

    @Override
    public void invoke(Tuple2<String,String>  value, Context context) throws Exception {
        result.add(value);
        final long currentTimeMillis = System.currentTimeMillis();
        if (1000 <= result.size() || (currentTimeMillis - startTs) >= 60000) {
            startTs = currentTimeMillis;
            batchWrite();
            result.clear();
        }
    }

    @Override
    public void close() throws Exception {
        if (this.ossClient != null) {
            this.ossClient.shutdown();
        }
        super.close();
    }

    private void batchWrite(){
        if (null == this.ossClient) {
            this.ossClient =  new OSSClientBuilder().build(OSS_ENDPOINT, OSS_ACCESS_KEYID, OSS_ACCESSKEYSECRET);
        }
        LOG.info( "{} oss batch start ms {}",Thread.currentThread().getName(), startTs);

        Map<String, StringBuffer> batchMap = new HashMap<>();
        for (Tuple2<String,String> value: result) {
            String key = value.f0;
            String message = value.f1;

            StringBuffer sb = batchMap.get(key);
            if (null == sb) {
                sb = new StringBuffer();
            }
            sb.append(message).append("\n");
            batchMap.put(key, sb);
        }

        final Set<String> keySet = batchMap.keySet();
        for (String key : keySet) {
            final String[] split = key.split("\\^");
            String day = split[0];
            String tid = split[1];
            //OSS写入文件有5G限制,所以增加时间戳,putObject的方式
            path = "user_event/day=" + day + "/tid=" + tid + "/sdk=sa_sdk/user_event_"+startTs+".json";
            final String messages = batchMap.get(key).toString();
            final PutObjectResult putObjectResult = this.ossClient.putObject(OSS_BUCKETNAME, path, new ByteArrayInputStream(messages.getBytes()));
            final String eTag = putObjectResult.getETag();
            LOG.info( "oss put {} end, tag {}", path, eTag);
        }
    }

对应代码完整调用:

代码语言:javascript
复制
final FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                TOPIC_NAME,
                new SimpleStringSchema(),
                props);
//        kafkaConsumer.setStartFromTimestamp(System.currentTimeMillis() - 2 * 24 * 60 * 60 * 1000);
        kafkaConsumer.setStartFromLatest();

        final KeyedStream<Tuple2<String, String>, String> streamSource = env.addSource(kafkaConsumer)
                .map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String jsonData) throws Exception {
                        //json格式处理,获取time,  tid
                        String tid = null;
                        if (isJson(jsonData)) {
                            JSONObject json = JSONObject.parseObject(jsonData);
                            final JSONObject jsonObject = json.getJSONObject(PROPERTIES);
                            if (jsonObject.containsKey(TID)) {
                                tid = jsonObject.getString(TID);
                            }

                            final Long time = json.getLong(RECV_TIME);
                            Date date = new Date(time);
                            String day = format.format(date);

                            String key = day + FLAG + tid;
                            return new Tuple2<>(key, jsonData);
                        } else {
                            return null;
                        }
                    }
                })
                .keyBy(new KeySelector<Tuple2<String, String>, String>() {
                    @Override
                    public String getKey(Tuple2<String, String> tuple2) throws Exception {
                        return tuple2.f0;
                    }
                });

        System.out.println("start ----- >");
        LOG.info("start ----- >");

        streamSource.addSink( new OssSink());

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档