本文参照https://my.oschina.net/shyloveliyi/blog/785812中代码,进行转换。
1、转换依赖
storm
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
</dependency>
转换成
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-core</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
2、切换import
转换依赖后,项目报错,挨个打开代码,将import全部删掉,然后重新导入依赖,eclipse快捷键ctrl+shift+o,以下是切换后的代码:
FromMysqlSpout
package scc.storm;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/**
* @ClassName: MysqlSpout
* @Description: TODO(这里用一句话描述这个类的作用)
* @author shangchengcai@voole.com
* @date 2016年11月10日 下午4:26:38
*
*/
public class FromMysqlSpout extends BaseRichSpout {
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
/*
* (非 Javadoc) <p>Title: open</p> <p>Description: </p>
*
* @param conf
*
* @param context
*
* @param collector
*
* @see org.apache.storm.spout.ISpout#open(java.util.Map, org.apache.storm.task.TopologyContext,
* org.apache.storm.spout.SpoutOutputCollector)
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
}
/*
* (非 Javadoc) <p>Title: nextTuple</p> <p>Description: </p>
*
* @see org.apache.storm.spout.ISpout#nextTuple()
*/
public void nextTuple() {
JdbcUtils jdbcUtils = new JdbcUtils();
try {
if (new Random().nextInt(100) > 50) {
return;
}
List<Map<String, Object>> data = jdbcUtils.findModeResult("select * from sds limit 1",
new ArrayList<Object>());
this.collector.emit(new Values(data));
} catch (SQLException e) {
e.printStackTrace();
this.collector.reportError(e);
}
}
/*
* (非 Javadoc) <p>Title: declareOutputFields</p> <p>Description: </p>
*
* @param declarer
*
* @see org.apache.storm.topology.IComponent#declareOutputFields(org.apache.storm.topology.OutputFieldsDeclarer)
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("data"));
}
}
ToFileBolt
package scc.storm;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
/**
* @ClassName: ToFileBolt
* @Description: TODO(这里用一句话描述这个类的作用)
* @author shangchengcai@voole.com
* @date 2016年11月10日 下午4:44:09
*
*/
public class ToFileBolt extends BaseRichBolt {
private Map conf;
private TopologyContext context;
private OutputCollector collector;
/*
* (非 Javadoc) <p>Title: prepare</p> <p>Description: </p>
*
* @param stormConf
*
* @param context
*
* @param collector
*
* @see org.apache.storm.task.IBolt#prepare(java.util.Map, org.apache.storm.task.TopologyContext,
* org.apache.storm.task.OutputCollector)
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.conf = this.conf;
this.collector = collector;
this.context = context;
}
/*
* (非 Javadoc) <p>Title: execute</p> <p>Description: </p>
*
* @param input
*
* @see org.apache.storm.task.IBolt#execute(org.apache.storm.tuple.Tuple)
*/
public void execute(Tuple input) {
List<Map<String, Object>> data = (List<Map<String, Object>>) input.getValueByField("data");
String outdata = data.toString() + "\r\n";
File file = new File("\\opt\\jstorm\\stormtest.txt");
if (!file.exists()) {
new File("\\opt\\jstorm").mkdirs();
try {
new File("\\opt\\jstorm\\stormtest.txt").createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
FileOutputStream fos;
try {
fos = new FileOutputStream(file, true);
fos.write(outdata.getBytes());
fos.flush();
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/*
* (非 Javadoc) <p>Title: declareOutputFields</p> <p>Description: </p>
*
* @param declarer
*
* @see org.apache.storm.topology.IComponent#declareOutputFields(org.apache.storm.topology.OutputFieldsDeclarer)
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
MyTopology
package scc.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
/**
* @ClassName: MyTopology
* @Description: TODO(这里用一句话描述这个类的作用)
* @author shangchengcai@voole.com
* @date 2016年11月10日 下午4:52:05
*
*/
public class MyTopology {
public static void main(String[] args)
throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("frommysql", new FromMysqlSpout());
builder.setBolt("tofile", new ToFileBolt()).shuffleGrouping("frommysql");
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("firstTopo", conf, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology("firstTopo");
cluster.shutdown();
}
}
}
JdbcUtils
/**
*
* @Title: HiveJdbcTest.java
* @Package com.scc.hive
* @Description: TODO(用一句话描述该文件做什么)
* @author scc
* @date 2016年11月9日 上午10:16:32
*/
package scc.storm;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
* @ClassName: HiveJdbcTest
* @Description: TODO(这里用一句话描述这个类的作用)
* @author scc
* @date 2016年11月9日 上午10:16:32
*
*/
public class JdbcUtils {
// 数据库用户名
private static final String USERNAME = "root";
// 数据库密码
private static final String PASSWORD = "1234";
// 驱动信息
private static final String DRIVER = "com.mysql.jdbc.Driver";
// 数据库地址
private static final String URL = "jdbc:mysql://10.5.3.24:3306/hive";
private Connection connection;
private PreparedStatement pstmt;
private ResultSet resultSet;
public JdbcUtils() {
// TODO Auto-generated constructor stub
try {
Class.forName(DRIVER);
System.out.println("数据库连接成功!");
this.getConnection();
} catch (Exception e) {
}
}
/**
* 获得数据库的连接
*
* @return
*/
public Connection getConnection() {
try {
this.connection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return this.connection;
}
/**
* 增加、删除、改
*
* @param sql
* @param params
* @return
* @throws SQLException
*/
public boolean updateByPreparedStatement(String sql, List<Object> params) throws SQLException {
boolean flag = false;
int result = -1;
this.pstmt = this.connection.prepareStatement(sql);
int index = 1;
if (params != null && !params.isEmpty()) {
for (int i = 0; i < params.size(); i++) {
this.pstmt.setObject(index++, params.get(i));
}
}
result = this.pstmt.executeUpdate();
flag = result > 0 ? true : false;
return flag;
}
/**
* 查询单条记录
*
* @param sql
* @param params
* @return
* @throws SQLException
*/
public Map<String, Object> findSimpleResult(String sql, List<Object> params) throws SQLException {
Map<String, Object> map = new HashMap<String, Object>();
int index = 1;
this.pstmt = this.connection.prepareStatement(sql);
if (params != null && !params.isEmpty()) {
for (int i = 0; i < params.size(); i++) {
this.pstmt.setObject(index++, params.get(i));
}
}
this.resultSet = this.pstmt.executeQuery();// 返回查询结果
ResultSetMetaData metaData = this.resultSet.getMetaData();
int col_len = metaData.getColumnCount();
while (this.resultSet.next()) {
for (int i = 0; i < col_len; i++) {
String cols_name = metaData.getColumnName(i + 1);
Object cols_value = this.resultSet.getObject(cols_name);
if (cols_value == null) {
cols_value = "";
}
map.put(cols_name, cols_value);
}
}
return map;
}
/**
* 查询多条记录
*
* @param sql
* @param params
* @return
* @throws SQLException
*/
public List<Map<String, Object>> findModeResult(String sql, List<Object> params) throws SQLException {
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
int index = 1;
this.pstmt = this.connection.prepareStatement(sql);
if (params != null && !params.isEmpty()) {
for (int i = 0; i < params.size(); i++) {
this.pstmt.setObject(index++, params.get(i));
}
}
this.resultSet = this.pstmt.executeQuery();
ResultSetMetaData metaData = this.resultSet.getMetaData();
int cols_len = metaData.getColumnCount();
while (this.resultSet.next()) {
Map<String, Object> map = new HashMap<String, Object>();
for (int i = 0; i < cols_len; i++) {
String cols_name = metaData.getColumnName(i + 1);
Object cols_value = this.resultSet.getObject(cols_name);
if (cols_value == null) {
cols_value = "";
}
map.put(cols_name, cols_value);
}
list.add(map);
}
// this.connection.close();
return list;
}
/**
* 通过反射机制查询单条记录
*
* @param sql
* @param params
* @param cls
* @return
* @throws Exception
*/
public <T> T findSimpleRefResult(String sql, List<Object> params, Class<T> cls) throws Exception {
T resultObject = null;
int index = 1;
this.pstmt = this.connection.prepareStatement(sql);
if (params != null && !params.isEmpty()) {
for (int i = 0; i < params.size(); i++) {
this.pstmt.setObject(index++, params.get(i));
}
}
this.resultSet = this.pstmt.executeQuery();
ResultSetMetaData metaData = this.resultSet.getMetaData();
int cols_len = metaData.getColumnCount();
while (this.resultSet.next()) {
// 通过反射机制创建一个实例
resultObject = cls.newInstance();
for (int i = 0; i < cols_len; i++) {
String cols_name = metaData.getColumnName(i + 1);
Object cols_value = this.resultSet.getObject(cols_name);
if (cols_value == null) {
cols_value = "";
}
Field field = cls.getDeclaredField(cols_name);
field.setAccessible(true); // 打开javabean的访问权限
field.set(resultObject, cols_value);
}
}
return resultObject;
}
/**
* 通过反射机制查询多条记录
*
* @param sql
* @param params
* @param cls
* @return
* @throws Exception
*/
public <T> List<T> findMoreRefResult(String sql, List<Object> params, Class<T> cls) throws Exception {
List<T> list = new ArrayList<T>();
int index = 1;
this.pstmt = this.connection.prepareStatement(sql);
if (params != null && !params.isEmpty()) {
for (int i = 0; i < params.size(); i++) {
this.pstmt.setObject(index++, params.get(i));
}
}
this.resultSet = this.pstmt.executeQuery();
ResultSetMetaData metaData = this.resultSet.getMetaData();
int cols_len = metaData.getColumnCount();
while (this.resultSet.next()) {
// 通过反射机制创建一个实例
T resultObject = cls.newInstance();
for (int i = 0; i < cols_len; i++) {
String cols_name = metaData.getColumnName(i + 1);
Object cols_value = this.resultSet.getObject(cols_name);
if (cols_value == null) {
cols_value = "";
}
Field field = cls.getDeclaredField(cols_name);
field.setAccessible(true); // 打开javabean的访问权限
field.set(resultObject, cols_value);
}
list.add(resultObject);
}
return list;
}
/**
* 释放数据库连接
*/
public void releaseConn() {
if (this.resultSet != null) {
try {
this.resultSet.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
System.out.println(new JdbcUtils().getConnection());
}
}
这里补充一下打包及运行操作
1、打包
使用maven对项目执行
mvn clean package
2、运行
将打包上传至服务器,然后执行
#jstorm jar jar包地址 main方法主类 参数(空格分割)
jstorm jar /opt/hadoop/loganalyst/storm-dependend/data/teststorm-1.0.jar teststorm.TopologyMain /opt/hadoop/loganalyst/storm-dependend/data/words.txt
3、查看
一种方式是执行
./jstorm list
另一种是在webui界面上查看