前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >storm togolopy转换jstorm topology

storm togolopy转换jstorm topology

作者头像
尚浩宇
发布2018-08-17 10:42:22
6840
发布2018-08-17 10:42:22
举报
文章被收录于专栏:杂烩杂烩

本文参照https://my.oschina.net/shyloveliyi/blog/785812中代码,进行转换。

1、转换依赖

    storm

代码语言:javascript
复制
<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-core</artifactId>
			<version>1.0.2</version>
		</dependency>

    转换成

代码语言:javascript
复制
<dependency>
			<groupId>com.alibaba.jstorm</groupId>
			<artifactId>jstorm-core</artifactId>
			<version>2.1.0</version>
			<scope>provided</scope>
		</dependency>

2、切换import

    转换依赖后,项目报错,挨个打开代码,将import全部删掉,然后重新导入依赖,eclipse快捷键ctrl+shift+o,以下是切换后的代码:

    FromMysqlSpout

代码语言:javascript
复制
package scc.storm;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * @ClassName: MysqlSpout
 * @Description: TODO(这里用一句话描述这个类的作用)
 * @author shangchengcai@voole.com
 * @date 2016年11月10日 下午4:26:38
 * 
 */
public class FromMysqlSpout extends BaseRichSpout {
    private Map conf;
    private TopologyContext context;
    private SpoutOutputCollector collector;

    /*
     * (非 Javadoc) <p>Title: open</p> <p>Description: </p>
     * 
     * @param conf
     * 
     * @param context
     * 
     * @param collector
     * 
     * @see org.apache.storm.spout.ISpout#open(java.util.Map, org.apache.storm.task.TopologyContext,
     * org.apache.storm.spout.SpoutOutputCollector)
     */
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.conf = conf;
        this.collector = collector;
        this.context = context;
    }

    /*
     * (非 Javadoc) <p>Title: nextTuple</p> <p>Description: </p>
     * 
     * @see org.apache.storm.spout.ISpout#nextTuple()
     */
    public void nextTuple() {
        JdbcUtils jdbcUtils = new JdbcUtils();
        try {
            if (new Random().nextInt(100) > 50) {
                return;
            }
            List<Map<String, Object>> data = jdbcUtils.findModeResult("select * from sds limit 1",
                    new ArrayList<Object>());
            this.collector.emit(new Values(data));
        } catch (SQLException e) {
            e.printStackTrace();
            this.collector.reportError(e);
        }
    }

    /*
     * (非 Javadoc) <p>Title: declareOutputFields</p> <p>Description: </p>
     * 
     * @param declarer
     * 
     * @see org.apache.storm.topology.IComponent#declareOutputFields(org.apache.storm.topology.OutputFieldsDeclarer)
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("data"));
    }

}

ToFileBolt

代码语言:javascript
复制
package scc.storm;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

/**
 * @ClassName: ToFileBolt
 * @Description: TODO(这里用一句话描述这个类的作用)
 * @author shangchengcai@voole.com
 * @date 2016年11月10日 下午4:44:09
 * 
 */
public class ToFileBolt extends BaseRichBolt {
    private Map conf;
    private TopologyContext context;
    private OutputCollector collector;

    /*
     * (非 Javadoc) <p>Title: prepare</p> <p>Description: </p>
     * 
     * @param stormConf
     * 
     * @param context
     * 
     * @param collector
     * 
     * @see org.apache.storm.task.IBolt#prepare(java.util.Map, org.apache.storm.task.TopologyContext,
     * org.apache.storm.task.OutputCollector)
     */
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.conf = this.conf;
        this.collector = collector;
        this.context = context;
    }

    /*
     * (非 Javadoc) <p>Title: execute</p> <p>Description: </p>
     * 
     * @param input
     * 
     * @see org.apache.storm.task.IBolt#execute(org.apache.storm.tuple.Tuple)
     */
    public void execute(Tuple input) {
        List<Map<String, Object>> data = (List<Map<String, Object>>) input.getValueByField("data");
        String outdata = data.toString() + "\r\n";
        File file = new File("\\opt\\jstorm\\stormtest.txt");
        if (!file.exists()) {
            new File("\\opt\\jstorm").mkdirs();
            try {
                new File("\\opt\\jstorm\\stormtest.txt").createNewFile();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        FileOutputStream fos;
        try {
            fos = new FileOutputStream(file, true);
            fos.write(outdata.getBytes());
            fos.flush();
            fos.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /*
     * (非 Javadoc) <p>Title: declareOutputFields</p> <p>Description: </p>
     * 
     * @param declarer
     * 
     * @see org.apache.storm.topology.IComponent#declareOutputFields(org.apache.storm.topology.OutputFieldsDeclarer)
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }

}

MyTopology

代码语言:javascript
复制
package scc.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

/**
 * @ClassName: MyTopology
 * @Description: TODO(这里用一句话描述这个类的作用)
 * @author shangchengcai@voole.com
 * @date 2016年11月10日 下午4:52:05
 * 
 */
public class MyTopology {
    public static void main(String[] args)
            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("frommysql", new FromMysqlSpout());
        builder.setBolt("tofile", new ToFileBolt()).shuffleGrouping("frommysql");
        Config conf = new Config();
        conf.setDebug(true);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("firstTopo", conf, builder.createTopology());
            Utils.sleep(100000);
            cluster.killTopology("firstTopo");
            cluster.shutdown();
        }
    }
}

JdbcUtils

代码语言:javascript
复制
/**   
 *
 * @Title: HiveJdbcTest.java 
 * @Package com.scc.hive 
 * @Description: TODO(用一句话描述该文件做什么) 
 * @author scc
 * @date 2016年11月9日 上午10:16:32   
 */
package scc.storm;

import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 *
 * @ClassName: HiveJdbcTest
 * @Description: TODO(这里用一句话描述这个类的作用)
 * @author scc
 * @date 2016年11月9日 上午10:16:32
 * 
 */
public class JdbcUtils {
    // 数据库用户名
    private static final String USERNAME = "root";
    // 数据库密码
    private static final String PASSWORD = "1234";
    // 驱动信息
    private static final String DRIVER = "com.mysql.jdbc.Driver";
    // 数据库地址
    private static final String URL = "jdbc:mysql://10.5.3.24:3306/hive";
    private Connection connection;
    private PreparedStatement pstmt;
    private ResultSet resultSet;

    public JdbcUtils() {
        // TODO Auto-generated constructor stub
        try {
            Class.forName(DRIVER);
            System.out.println("数据库连接成功!");
            this.getConnection();
        } catch (Exception e) {

        }
    }

    /**
     * 获得数据库的连接
     * 
     * @return
     */
    public Connection getConnection() {
        try {
            this.connection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return this.connection;
    }

    /**
     * 增加、删除、改
     * 
     * @param sql
     * @param params
     * @return
     * @throws SQLException
     */
    public boolean updateByPreparedStatement(String sql, List<Object> params) throws SQLException {
        boolean flag = false;
        int result = -1;
        this.pstmt = this.connection.prepareStatement(sql);
        int index = 1;
        if (params != null && !params.isEmpty()) {
            for (int i = 0; i < params.size(); i++) {
                this.pstmt.setObject(index++, params.get(i));
            }
        }
        result = this.pstmt.executeUpdate();
        flag = result > 0 ? true : false;
        return flag;
    }

    /**
     * 查询单条记录
     * 
     * @param sql
     * @param params
     * @return
     * @throws SQLException
     */
    public Map<String, Object> findSimpleResult(String sql, List<Object> params) throws SQLException {
        Map<String, Object> map = new HashMap<String, Object>();
        int index = 1;
        this.pstmt = this.connection.prepareStatement(sql);
        if (params != null && !params.isEmpty()) {
            for (int i = 0; i < params.size(); i++) {
                this.pstmt.setObject(index++, params.get(i));
            }
        }
        this.resultSet = this.pstmt.executeQuery();// 返回查询结果
        ResultSetMetaData metaData = this.resultSet.getMetaData();
        int col_len = metaData.getColumnCount();
        while (this.resultSet.next()) {
            for (int i = 0; i < col_len; i++) {
                String cols_name = metaData.getColumnName(i + 1);
                Object cols_value = this.resultSet.getObject(cols_name);
                if (cols_value == null) {
                    cols_value = "";
                }
                map.put(cols_name, cols_value);
            }
        }
        return map;
    }

    /**
     * 查询多条记录
     * 
     * @param sql
     * @param params
     * @return
     * @throws SQLException
     */
    public List<Map<String, Object>> findModeResult(String sql, List<Object> params) throws SQLException {
        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
        int index = 1;
        this.pstmt = this.connection.prepareStatement(sql);
        if (params != null && !params.isEmpty()) {
            for (int i = 0; i < params.size(); i++) {
                this.pstmt.setObject(index++, params.get(i));
            }
        }
        this.resultSet = this.pstmt.executeQuery();
        ResultSetMetaData metaData = this.resultSet.getMetaData();
        int cols_len = metaData.getColumnCount();
        while (this.resultSet.next()) {
            Map<String, Object> map = new HashMap<String, Object>();
            for (int i = 0; i < cols_len; i++) {
                String cols_name = metaData.getColumnName(i + 1);
                Object cols_value = this.resultSet.getObject(cols_name);
                if (cols_value == null) {
                    cols_value = "";
                }
                map.put(cols_name, cols_value);
            }
            list.add(map);
        }
        // this.connection.close();
        return list;
    }

    /**
     * 通过反射机制查询单条记录
     * 
     * @param sql
     * @param params
     * @param cls
     * @return
     * @throws Exception
     */
    public <T> T findSimpleRefResult(String sql, List<Object> params, Class<T> cls) throws Exception {
        T resultObject = null;
        int index = 1;
        this.pstmt = this.connection.prepareStatement(sql);
        if (params != null && !params.isEmpty()) {
            for (int i = 0; i < params.size(); i++) {
                this.pstmt.setObject(index++, params.get(i));
            }
        }
        this.resultSet = this.pstmt.executeQuery();
        ResultSetMetaData metaData = this.resultSet.getMetaData();
        int cols_len = metaData.getColumnCount();
        while (this.resultSet.next()) {
            // 通过反射机制创建一个实例
            resultObject = cls.newInstance();
            for (int i = 0; i < cols_len; i++) {
                String cols_name = metaData.getColumnName(i + 1);
                Object cols_value = this.resultSet.getObject(cols_name);
                if (cols_value == null) {
                    cols_value = "";
                }
                Field field = cls.getDeclaredField(cols_name);
                field.setAccessible(true); // 打开javabean的访问权限
                field.set(resultObject, cols_value);
            }
        }
        return resultObject;

    }

    /**
     * 通过反射机制查询多条记录
     * 
     * @param sql
     * @param params
     * @param cls
     * @return
     * @throws Exception
     */
    public <T> List<T> findMoreRefResult(String sql, List<Object> params, Class<T> cls) throws Exception {
        List<T> list = new ArrayList<T>();
        int index = 1;
        this.pstmt = this.connection.prepareStatement(sql);
        if (params != null && !params.isEmpty()) {
            for (int i = 0; i < params.size(); i++) {
                this.pstmt.setObject(index++, params.get(i));
            }
        }
        this.resultSet = this.pstmt.executeQuery();
        ResultSetMetaData metaData = this.resultSet.getMetaData();
        int cols_len = metaData.getColumnCount();
        while (this.resultSet.next()) {
            // 通过反射机制创建一个实例
            T resultObject = cls.newInstance();
            for (int i = 0; i < cols_len; i++) {
                String cols_name = metaData.getColumnName(i + 1);
                Object cols_value = this.resultSet.getObject(cols_name);
                if (cols_value == null) {
                    cols_value = "";
                }
                Field field = cls.getDeclaredField(cols_name);
                field.setAccessible(true); // 打开javabean的访问权限
                field.set(resultObject, cols_value);
            }
            list.add(resultObject);
        }
        return list;
    }

    /**
     * 释放数据库连接
     */
    public void releaseConn() {
        if (this.resultSet != null) {
            try {
                this.resultSet.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        System.out.println(new JdbcUtils().getConnection());
    }
}

这里补充一下打包及运行操作

1、打包

    使用maven对项目执行

代码语言:javascript
复制
mvn clean package

2、运行

    将打包上传至服务器,然后执行

代码语言:javascript
复制
#jstorm jar jar包地址 main方法主类 参数(空格分割)
jstorm jar /opt/hadoop/loganalyst/storm-dependend/data/teststorm-1.0.jar teststorm.TopologyMain /opt/hadoop/loganalyst/storm-dependend/data/words.txt

3、查看

    一种方式是执行

代码语言:javascript
复制
./jstorm list

    另一种是在webui界面上查看

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016/11/17 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档