前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink-mysql源-esSink

Flink-mysql源-esSink

作者头像
用户5927264
发布2023-03-16 19:45:48
2880
发布2023-03-16 19:45:48
举报
文章被收录于专栏:OSChinaOSChina

1实体对象

代码语言:javascript
复制
package com.shi.mysqlEsTest;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

/**
 * 电站信息ES对象
 *
 * @author shiye
 * @create 2023-02-21 11:26
 */
public class PowerStationInfo implements Serializable {
    /**
     * 电站id
     */
    private Long ps_id;

    /**
     * 电站名称
     */
    private String ps_name;

    /**
     * 电站图片链接
     */
    private String images;

    /**
     * 电站类型:
     */
    private Integer ps_type;
    /**
     * 电站在线离线状态:
     */
    private Integer ps_status;
    /**
     * 装机功率
     */
    private Double design_capacity;
    /**
     * 实时功率
     */
    private Double current_power;
    /**
     * 累计发电量
     */
    private Double today_energy;
    /**
     * 等效小时
     */
    private Double equivalent_hour;
    /**
     * 电站地址
     */
    private String ps_location;

    /**
     * 电站PR
     */
    private String pr_scale;
    /**
     * 瞬时辐照
     */
    private Double radiation;
    /**
     * 日辐射量
     */
    private Double daily_irradiation;

    /**
     * 电站有效标识
     */
    private Integer valid_flag;
    /**
     * 离线时间
     */
    private Date offline_time;
    /**
     * 行政区划code (最底级区划一般为区级区划)
     */
    private Integer division_code;
    /**
     * 经度-WGS84格式
     * 117.18748542291920
     * 纬度-WGS84格式
     * 31.81492717972597
     */
    private String[] lon_lat;
    /**
     * 当前电站的 业主信息
     */
    private List<OrgUserInfo> orgUserInfo_array;

    /**
     * 设备sn集合
     */
    private String[] sn_array;
    /**
     * 关注用户集合
     */
    private long[] userid_array;

    static class OrgUserInfo{
        /**
         * 电站的分享类型:
         */
        private Integer share_type;

        /**
         * 组织id
         */
        private Long org_id;

        /**
         * 用户id
         */
        private Long user_id;

        /**
         * 业主联系方式 - 手机号码
         */
        private String moble_tel;
        /**
         * 业主联系方式 - 邮箱
         */
        private String email;

        public Integer getShare_type() {
            return share_type;
        }

        public void setShare_type(Integer share_type) {
            this.share_type = share_type;
        }

        public Long getOrg_id() {
            return org_id;
        }

        public void setOrg_id(Long org_id) {
            this.org_id = org_id;
        }

        public Long getUser_id() {
            return user_id;
        }

        public void setUser_id(Long user_id) {
            this.user_id = user_id;
        }

        public String getMoble_tel() {
            return moble_tel;
        }

        public void setMoble_tel(String moble_tel) {
            this.moble_tel = moble_tel;
        }

        public String getEmail() {
            return email;
        }

        public void setEmail(String email) {
            this.email = email;
        }

        @Override
        public String toString() {
            return "OrgUserInfo{" +
                    "share_type=" + share_type +
                    ", org_id=" + org_id +
                    ", user_id=" + user_id +
                    ", moble_tel='" + moble_tel + '\'' +
                    ", email='" + email + '\'' +
                    '}';
        }
    }

    public List<OrgUserInfo> getOrgUserInfo_array() {
        return orgUserInfo_array;
    }

    public void setOrgUserInfo_array(List<OrgUserInfo> orgUserInfo_array) {
        this.orgUserInfo_array = orgUserInfo_array;
    }

    public Long getPs_id() {
        return ps_id;
    }

    public void setPs_id(Long ps_id) {
        this.ps_id = ps_id;
    }

    public String getPs_name() {
        return ps_name;
    }

    public void setPs_name(String ps_name) {
        this.ps_name = ps_name;
    }

    public String getImages() {
        return images;
    }

    public void setImages(String images) {
        this.images = images;
    }

    public Integer getPs_type() {
        return ps_type;
    }

    public void setPs_type(Integer ps_type) {
        this.ps_type = ps_type;
    }

    public Integer getPs_status() {
        return ps_status;
    }

    public void setPs_status(Integer ps_status) {
        this.ps_status = ps_status;
    }

    public Double getDesign_capacity() {
        return design_capacity;
    }

    public void setDesign_capacity(Double design_capacity) {
        this.design_capacity = design_capacity;
    }

    public Double getCurrent_power() {
        return current_power;
    }

    public void setCurrent_power(Double current_power) {
        this.current_power = current_power;
    }

    public Double getToday_energy() {
        return today_energy;
    }

    public void setToday_energy(Double today_energy) {
        this.today_energy = today_energy;
    }

    public Double getEquivalent_hour() {
        return equivalent_hour;
    }

    public void setEquivalent_hour(Double equivalent_hour) {
        this.equivalent_hour = equivalent_hour;
    }

    public String getPs_location() {
        return ps_location;
    }

    public void setPs_location(String ps_location) {
        this.ps_location = ps_location;
    }

    public String getPr_scale() {
        return pr_scale;
    }

    public void setPr_scale(String pr_scale) {
        this.pr_scale = pr_scale;
    }

    public Double getRadiation() {
        return radiation;
    }

    public void setRadiation(Double radiation) {
        this.radiation = radiation;
    }

    public Double getDaily_irradiation() {
        return daily_irradiation;
    }

    public void setDaily_irradiation(Double daily_irradiation) {
        this.daily_irradiation = daily_irradiation;
    }

    public Integer getValid_flag() {
        return valid_flag;
    }

    public void setValid_flag(Integer valid_flag) {
        this.valid_flag = valid_flag;
    }

    public Date getOffline_time() {
        return offline_time;
    }

    public void setOffline_time(Date offline_time) {
        this.offline_time = offline_time;
    }

    public Integer getDivision_code() {
        return division_code;
    }

    public void setDivision_code(Integer division_code) {
        this.division_code = division_code;
    }

    public String[] getLon_lat() {
        return lon_lat;
    }

    public void setLon_lat(String[] lon_lat) {
        this.lon_lat = lon_lat;
    }

    public String[] getSn_array() {
        return sn_array;
    }

    public void setSn_array(String[] sn_array) {
        this.sn_array = sn_array;
    }

    public long[] getUserid_array() {
        return userid_array;
    }

    public void setUserid_array(long[] userid_array) {
        this.userid_array = userid_array;
    }

    @Override
    public String toString() {
        return "PowerStationInfo{" +
                "ps_id=" + ps_id +
                ", ps_name='" + ps_name + '\'' +
                ", images='" + images + '\'' +
                ", ps_type=" + ps_type +
                ", ps_status=" + ps_status +
                ", design_capacity=" + design_capacity +
                ", current_power=" + current_power +
                ", today_energy=" + today_energy +
                ", equivalent_hour=" + equivalent_hour +
                ", ps_location='" + ps_location + '\'' +
                ", pr_scale='" + pr_scale + '\'' +
                ", radiation=" + radiation +
                ", daily_irradiation=" + daily_irradiation +
                ", valid_flag=" + valid_flag +
                ", offline_time=" + offline_time +
                ", division_code=" + division_code +
                ", lon_lat=" + Arrays.toString(lon_lat) +
                ", orgUserInfo_array=" + orgUserInfo_array +
                ", sn_array=" + Arrays.toString(sn_array) +
                ", userid_array=" + Arrays.toString(userid_array) +
                '}';
    }
}

2.源获取数据

代码语言:javascript
复制
package com.shi.mysqlEsTest;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

/**
 * @author shiye
 * @create 2023-02-21 11:54
 */
public class PowerStationSourceFromMysql extends RichSourceFunction<Long> {

    private PreparedStatement ps = null;
    private Connection connection = null;
    String driver = "com.mysql.jdbc.Driver";
    String url = "jdbc:mysql://10.0.81.151:3306/sungrow?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&serverTimezone=Asia/Shanghai&useSSL=false";
    String username = "root";
    String password = "1234";

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //加载驱动
        Class.forName(driver);
        connection = DriverManager.getConnection(url, username, password);

    }

    @Override
    public void run(SourceContext<Long> sourceContext) throws Exception {
        String sql = "SELECT t1.ps_id FROM power_station t1 ORDER BY t1.ps_id";
        ps = connection.prepareStatement(sql);
        ResultSet resultSet = ps.executeQuery();

        while (resultSet.next()) {
            Long ps_id = resultSet.getLong("ps_id");
            sourceContext.collect(ps_id);
        }

    }

    @Override
    public void cancel() {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.map处理数据

代码语言:javascript
复制
package com.shi.mysqlEsTest;

import com.mysql.jdbc.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

/**
 * @author shiye
 * @create 2023-02-22 14:56
 */
public class SqlMapFunction extends RichMapFunction<Long, PowerStationInfo> {
    private PreparedStatement ps = null;
    private Connection connection = null;
    String driver = "com.mysql.jdbc.Driver";
    String url = "jdbc:mysql://10.0.81.151:3306/sungrow?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&serverTimezone=Asia/Shanghai&useSSL=false";
    String username = "root";
    String password = "1234";

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //加载驱动
        Class.forName(driver);
        connection = DriverManager.getConnection(url, username, password);
    }

    @Override
    public void close() throws Exception {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public PowerStationInfo map(Long ps_id) throws Exception {
        PowerStationInfo psInfo = new PowerStationInfo();

        String sql2 = "SELECT\n" +
                "  t1.ps_id,\n" +
                "  IFNULL(t1.ps_name,'') AS ps_name,\n" +
                "  t1.ps_type,\n" +
                "  t1.ps_location,\n" +
                "  t1.valid_flag,\n" +
                "  t1.division_code,\n" +
                "  t1.longitude,\n" +
                "  t1.latitude,\n" +
                "  IFNULL(t2.`PICTURE_URL`,'') AS images,\n" +
                "  t3.`dev_status` AS ps_status,\n" +
                "  t3.`update_time` AS offline_time,\n" +
                "  t4.`DESIGN_CAPACITY` AS design_capacity,\n" +
                "  t4.`MAX_PR` AS max_pr,\n" +
                "  t4.`min_pr` AS min_pr,\n" +
                "  t5.`p83023` AS p83023,\n" +
                "  t5.`p83033` AS current_power,\n" +
                "  t5.`p83022` AS today_energy,\n" +
                "  t5.`p83024` AS total_energy,\n" +
                "  t5.`p83025` AS equivalent_hour,\n" +
                "  t5.`p83012` AS radiation,\n" +
                "  t5.`p83013` AS daily_irradiation,\n" +
                "  GROUP_CONCAT(DISTINCT t6.`DEVICE_PRO_SN`) AS sn_array,\n" +
                "  GROUP_CONCAT(DISTINCT t8.`user_id`) AS userid_array\n" +
                "FROM\n" +
                "  power_station t1\n" +
                "LEFT JOIN power_station_picture t2 ON t1.ps_id = t2.`PS_ID`\n" +
                "LEFT JOIN power_device_attr t6 ON t1.`PS_ID` = t6.`PS_ID`\n" +
                "LEFT JOIN power_device_status t3 ON t6.uuid = t3.`uuid`\n" +
                "LEFT JOIN power_station_config_ratio t4 ON t1.ps_id = t4.`PS_ID`\n" +
                "LEFT JOIN power_statistical_data t5 ON t1.ps_id = t5.`ps_id`\n" +
                "LEFT JOIN user_station_follow_config t8 ON t1.`PS_ID` = t8.`ps_id`\n" +
                "WHERE t1.`PS_ID` = " + ps_id +
                " GROUP BY t1.`PS_ID`";
        ResultSet resultSet2 = connection.prepareStatement(sql2).executeQuery();

        if (resultSet2.next()) {
            psInfo.setPs_id(ps_id);
            psInfo.setPs_name(resultSet2.getString("ps_name"));
            psInfo.setPs_type(resultSet2.getInt("ps_type"));
            psInfo.setPs_location(resultSet2.getString("ps_location"));
            psInfo.setValid_flag(resultSet2.getInt("valid_flag"));
            psInfo.setValid_flag(resultSet2.getInt("division_code"));
            String longitude = resultSet2.getString("longitude");
            String latitude = resultSet2.getString("latitude");
            String[] lon_lat = new String[]{longitude, latitude};
            psInfo.setLon_lat(lon_lat);
            psInfo.setImages(resultSet2.getString("images"));
            psInfo.setPs_status(resultSet2.getInt("ps_status"));
            Date offlineTime = resultSet2.getDate("offline_time");
            psInfo.setOffline_time(offlineTime);
            psInfo.setDesign_capacity(resultSet2.getDouble("design_capacity"));
            double max_pr = resultSet2.getDouble("max_pr");
            double min_pr = resultSet2.getDouble("min_pr");
            double p83023 = resultSet2.getDouble("p83023");
            if (p83023 == 0) {
                psInfo.setPr_scale("--");
            } else if (p83023 > max_pr) {
                psInfo.setPr_scale(String.format("%.2f", (max_pr * 100)));
            } else if (p83023 < min_pr) {
                psInfo.setPr_scale(String.format("%.2f", (min_pr * 100)));
            } else {
                psInfo.setPr_scale(String.format("%.2f", (p83023 * 100)));
            }

            psInfo.setCurrent_power(resultSet2.getDouble("current_power"));
            psInfo.setToday_energy(resultSet2.getDouble("today_energy"));
            psInfo.setToday_energy(resultSet2.getDouble("total_energy"));
            psInfo.setEquivalent_hour(resultSet2.getDouble("equivalent_hour"));
            psInfo.setRadiation(resultSet2.getDouble("radiation"));
            String snStr = resultSet2.getString("sn_array");
            if (!StringUtils.isNullOrEmpty(snStr)) {
                psInfo.setSn_array(snStr.split(","));
            }
            String useridStr = resultSet2.getString("userid_array");
            if (!StringUtils.isNullOrEmpty(useridStr)) {
                String[] split = resultSet2.getString("userid_array").split(",");
                psInfo.setUserid_array(Arrays.asList(split).stream().mapToLong(Long::parseLong).toArray());
            }

            String sql3 = "SELECT\n" +
                    "\tu.user_id,\n" +
                    "\tu.moble_tel,\n" +
                    "\tu.email,\n" +
                    "\tp.share_type,\n" +
                    "\tu.org_id\n" +
                    "FROM\n" +
                    "power_station_org p\n" +
                    "LEFT JOIN sys_organization s ON p.org_id = s.org_id\n" +
                    "LEFT JOIN sys_user_org o ON o.org_id = s.org_id AND o.share_type = 0\n" +
                    "LEFT JOIN sys_user u ON o.user_id = u.user_id\n" +
                    "WHERE\n" +
                    " p.ps_id = " + ps_id +
                    " AND (p.root_org_id = 499 OR is_installer_ps_org = -1)\n" +
                    " AND s.share_type = 0\n" +
                    " AND p.share_type = 0\n" +
                    "UNION\n" +
                    "SELECT\n" +
                    "    s.user_id,\n" +
                    "    s.moble_tel,\n" +
                    "    s.email,\n" +
                    "    '0' AS share_type,\n" +
                    "    so.org_id\n" +
                    "FROM\n" +
                    "power_station_owner_user_rel pso\n" +
                    "JOIN sys_user s ON s.user_id = pso.user_id\n" +
                    "LEFT JOIN sys_user_org suo ON suo.user_id = pso.user_id\n" +
                    "LEFT JOIN sys_organization so ON so.org_id = suo.org_id\n" +
                    "WHERE pso.ps_id = " + ps_id +
                    " AND suo.is_master_org != -1 " +
                    " AND suo.share_type = 0";
            ResultSet resultSet3 = connection.prepareStatement(sql3).executeQuery();
            List<PowerStationInfo.OrgUserInfo> orgUserInfo_array = new ArrayList<>();
            while (resultSet3.next()) {
                PowerStationInfo.OrgUserInfo orgUserInfo = new PowerStationInfo.OrgUserInfo();
                orgUserInfo.setOrg_id(resultSet3.getLong("org_id"));
                orgUserInfo.setUser_id(resultSet3.getLong("user_id"));
                orgUserInfo.setMoble_tel(resultSet3.getString("moble_tel"));
                orgUserInfo.setEmail(resultSet3.getString("email"));
                orgUserInfo.setShare_type(resultSet3.getInt("share_type"));
                orgUserInfo_array.add(orgUserInfo);
            }
            psInfo.setOrgUserInfo_array(orgUserInfo_array);
        }
        return psInfo;
    }
}

4.sink

代码语言:javascript
复制
package com.shi.mysqlEsTest;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.security.user.User;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.ArrayList;
import java.util.Map;

/**
 * @author shiye
 * @create 2023-02-21 13:55
 */
public class MysqlSourceToEsTest {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("10.0.81.167", 9201, "http"));
        httpHosts.add(new HttpHost("10.0.81.168", 9201, "http"));
        httpHosts.add(new HttpHost("10.0.81.169", 9201, "http"));

        DataStreamSource<Long> mapDataStreamSource = env.addSource(new PowerStationSourceFromMysql());

        SingleOutputStreamOperator<PowerStationInfo> map = mapDataStreamSource.map(new SqlMapFunction());

        ElasticsearchSink.Builder<PowerStationInfo> esBuilder = new ElasticsearchSink.Builder<PowerStationInfo>(httpHosts, getEsSinkFunction());
        //刷新前最大缓存的操作数。
        esBuilder.setBulkFlushMaxActions(100);
        //刷新前最大缓存的数据量(以兆字节为单位)。
        esBuilder.setBulkFlushMaxSizeMb(100);
        //刷新的时间间隔(不论缓存操作的数量或大小如何)
        esBuilder.setBulkFlushInterval(1000);
        //设置用户名密码
        esBuilder.setRestClientFactory(getRestClientFactory());
        map.addSink(esBuilder.build());

        env.execute();
    }

    /**
     * 定义ES Sink算子
     *
     * @return
     */
    public static ElasticsearchSinkFunction<PowerStationInfo> getEsSinkFunction() {
        ElasticsearchSinkFunction<PowerStationInfo> esSinkFunction = new ElasticsearchSinkFunction<PowerStationInfo>() {
            @Override
            public void process(PowerStationInfo event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                System.out.println("------> " + event);
                String string = JSON.toJSONString(event);

                IndexRequest request = Requests.indexRequest()
                        .index("temp_ps_info2")
                        .id(event.getPs_id().toString())
                        .source(string, XContentType.JSON);// 放入数据json格式

                requestIndexer.add(request);
            }
        };
        return esSinkFunction;
    }

    /**
     * es 用户名密码
     */
    public static RestClientFactory getRestClientFactory() {

        RestClientFactory restClientFactory = new RestClientFactory() {
            @Override
            public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
                CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "1111"));
                restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                        httpAsyncClientBuilder.disableAuthCaching();
                        return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });
            }
        };
        return restClientFactory;
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-02-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档