Topology的构建

public class BlackListBolt extends BaseRichBolt{
	private static Logger logger = Logger.getLogger(BlackListBolt.class);
	private OutputCollector collector_;
	private Map<String,List<String>> blacklistMap_ = new ConcurrentHashMap<String,List<String>>();
	
	//实现了从数据库中获取黑名单基础数据的表,加载至内存作为比阿娘blacklistMap_维护 nextTuple()在每个tuple到达时被调用,这里主要实现了车牌在黑名单中的比对。
	public void prepare(Map stormConf,TopologyContext context,OutputCollector collector)
	{
	    collector_ = collector;
	    Connection con = null;
	    Statement jjhmd_statement = null;
	    ResultSet jjhmd_resultSet = null;
	    String jjhmd_queryString = "select ID,CPHID,SFSSBJ,SFCL from JJHMD";
	    try{
                   con = DBUtil.getDataSource().getConnection();
		   jjhmd_statement = con.createStatement();
		   jjhmd_resultSet = jjhmd_statement.executeQuery(jjhmd_queryString);
		   while(jjhmd_resultSet.next()){
			String jjhmd_cphid = jjhmd_resultSet.getString("CPHID");
			String jjhmd_sfssbj = jjhmd_resultSet.getString("SFSSBJ");
			String jjhmd_SFCL = jjhmd_resultSet.getString("SFCL");
			String jjhmd_id = jjhmd_resultSet.getString("ID");
			List<String> temp_info = new ArrayList<String>();
			temp_info.add(jjhmd_sfssbj);
			temp_info.add(jjhmd_SFCL);
			temp_info.add(jjhmd_id);
			blacklistMap_.put(jjhmd_cphid,temp_info);
		}
		jjhmd_resultSet.close();
		jjhmd_statement.close();
		}catch(SQLException e){
		    e.printStackTrace();
		}finally{
		   if(con!=null){
			try{
				con.close();
			   }catch(SQLException e){
				logger.warn("",e);
			   }
		}
            }
       }

	public void execute(Tuple tuple){
	    String no = tuple.getStringByField("no");
	    String location = tuple.getStringByFiled("location");
	    String tpid = tuple.getStringByField(tpidl:);
		try{
                        if(blacklistMap_.containsKey(no)){
			    List<String>temp_info = blacklistMap_.get(no);
			    if(temp_info.get(1).equals("否")){
			        String msg = convertToMsg(tuple);
				conllector_.emit(new Values(msg));
			}
		    }
	 	}catch(Excetption e){
		    logger.error(e.getMessage());
		}finally{
		}
	}
	……
	}
public class BlackListTopology{
     //topicSpout接收来自JMS消息中间件的主题数据,且不设置并行度(这是由topic在JMS协议中的语义决定的)
     public static final String TOPIC_SPOUT = "topic_spout";
     //以随机分组的方式接收来自JmsSpout的数据,并行度被设置为2.
     public static final String BLACKLIST_BOLT = "blacklist_bolt";
     //下面这两个均以随机分组的方式接收来自BlackListBolt的数据,分别向消息中间件和数据库写入计算的结果数据.
     public static final String TOPIC_BOLT = "topic_bolt";
     public static final String DB_BOLT = "db_bolt";

     public static void main(String[] args) throws Exception{
        ConfigUtil cu = ConfigUtil.getInstance();
        
        JmsProvider jmsTopicProvbider_source = new ActiveMQProvider("failover:(tcp://"+cu.getMessage_ip()+":"+cu.getMessage_port+")",cu.getMessage_sb_topic(),"","");
        
	
	//消息中间件的IP地址、端口和主题名称,都是在配置文件中维护的,此处通过ConfigUtil对象从配置文件中获取的。
        JmsSpout topicSpout = new JmsSpout();
        topicSpout.setJmsProvider(jmsTopicProvider_source);
        topicSpout.setJmsTupleProducer(new SB_Beijing_TupleProducer());
        topicSpout.setJmsAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
        topicSpout.setDistributed(false);
        JmsProvider jmsTopicProvider_target = new ActiveMQProvider("failover:(tcp://"+cu.getMessage_ip()+":"+cu.getMessage_port()+")",cu.getMessage_ijhmdbj_topic(),"","")
        JmsBolt topicBolt = new JmsBolt();
        topicBolt.setJmsProvider(jmsTopicProvider_target);
        topicBolt.setJmsMessageProducer(new JsonMessageProducer());
	topicBolt.setJmsAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);

	TopologyBuilder builder = new ToplogyBuilder();
	builder.setSpout(TOPIC_SPOUT,topicSpout;
	builder.setBolt(BLACKLIST_BOLT,new BlackListBolt(),2).shuffleGrouping(TOPIC_SPOUT);
	builder.setBolt(TOPIC_BOLT,topicBolt,1).shuffleGrouping(BLACKLIST_BOLT);
	RegisterBlackCarBolt dbBolt = new RegisterBlackCarBolt();
	builder.setBolt(DB_BOLT,dbBolt,1).shuffleGrouping(BLACKLIST_BOLT);

	Config conf = new Config();
	conf.setNumWorkers(2)
	if(args.length >0){
	conf.setDebug(false);
	StormSubmitter.submitTopology(args[0],conf,builder.createTopology());
	}else{
	conf.setDebug(true);
	LocalCluster cluster = new LocalCluster();
	cluster.submitTopology("storm-traffic-blcaklist",conf,builder.createTopology());
	Utils.sleep(6000000);
	cluster.killTopology("storm-traffic-blacklist");
	cluster.shutdown();
	}
     }
}
topicBolt是类JmsBolt的对象,它以随机分组的方式,也接受来自BlackListBolt的数据,即黑名单检索的即时结果,然后向消息中间件写入计算的结果数据

    public class JmsBolt extends BaseRichBolt{
	private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class);
	private Connection connection;
	……
	public void prepare(Map stormConf,TopologyContext context,OutputCollector collector){
	   if(this.jmsProvider == null || this.producer == null){
		throw new IllegalStateException("JMS Provider and MessageProducer not set.");
	   }
	   this.collector = collector;
	    try{
		   ConnectionFactory cf = this.jmsProvider.connectionFactory();
		   Destination dest = this.jmsProvider.destination();
	           this.connection = cf.createConnection();
		   this.session = connection.createSeesion(this.jmsTransactional,this.jmsAcknowledgeMode);
		   this.messageProducer = session.createProducer(dest);
		    connection.start();
		}
		catch(Exception e){
		    LOG.warn("Error creating JMS connection.",e);
		}
	}

	public void execute(Tuple input){
		try{
			Message msg = this.producer.toMessage(this.session,input);
			if(msg!=null){
			   if(msg.getJMSDestination()!=null){
			      this.messageProducer.sen(msg.getJMSDestination(),msg);
			}else{
				this.messageProducer.send(msg);
			     }
			}
			if(this.autoAck){
				LOG.debug("ACKing tuple:"+input);
				this.collector.ack(intput);
			}
		    }catch(JMSException e){
			    LOG.warn("Failing tuple:" + input + "Exception:" + e);
			    this.collector.fail(input);
			}
		    }
			……
		}
JmsSpout类继承了BaseRichSpout类并实现了MessageListener接口。作为JMS的客户端,JmsSpout实现了MessageListener接口,这里分析一下该接口声明的方法onMessage().方法onMessage()在Jms消息中间件向它推送一个消息时这里的实现是将得到的消息放入缓存队列queue对象中.

public class JmsSpout extends BaseRichSpout implements MessageListener{
    private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
    private LinkedBlockingQueue<Message>queue;
    private ConcurrentHashMap<String,Message>pendingMessages;
    ……

    public void onMessage(Message msg){
	try{
	      LOG.debug("Queuing msg ["+msg.getJMSMessageID()+"]");
	    }catch(JMSException e){
	    }
		this.queue.offer(msg);
	   }

	//从queue对象获取数据,组织为tuple结构后发送(emit);
    public void nextTuple(){
	Message msg = this.queue.poll();
	if(msg == null){
	   Utils.sleep(50);
	}else{
	   LOG.debug("sending tuple:"+msg);
	    try{
	          Values vals = this.tupleProducer.toTuple(msg);
		  if(this.isDurableSubscription()||(msg.getJMSDeliveryMode()!=Session.AUTO_ACKNOWLEDGE)){
			LOG.debug("Requesting acks.");
			this.collector.emit(vals,msg.getJMSMessageID());
			this.pendingMessages.put(msg.getJMSMessageID(),msg);
		  }else{
			this.collector.emit(vals);
		  }catch(JMSException e){
			LOG.warn("Unable to convert JMS message:"+msg);
		  }
	      }
      }

       //在tuple需要被确认处理成功时调用,这里的实现是从中间结果队列pendingMessages移除相应数据项,并对这条消息调用JMS的方法acknowledge()进行确认.
	public void ack(Object msgId){
	    Message msg = this.pendingMessage.remove(msgId);
	    if(msg!=null){
		try{
			msg.acknowledge();
			LOG.debug("JMS Message Scked:"+msgId);
		    }catch(JMSException e){
			LOG.warn("Error acknowldging JMS message:" + msgId,e);
		    }
		 }else{
			LOG.warn("Couldn't acknowledge unknown JMS message ID:"+msgId);
		}
          }
	
	//在tuple需要被确认处理失败时调用,这里的实现是从中间结果队列pendingMessages移除相应数据项,并设置存在失败的标志位.
	public void fail(Object msgId){
	    LOG.warn("Message failed:" + msgId);
	    this.pendingMessages.remove(msgId);
            synchronized(this.recoveryMutex);{
	       this.hasFailures = true;
	     }
        }
    }
	……
}
//dbBolt是类RegisterBlackCarBolt的对象,它以随机分组的方式,接受来自BlackListBolt的数据,也即黑名单检索的即时结果,然后向数据库写入计算的结果数据。
	public class RegisterBlackCarBolt implements IBasicBolt{
	    private static Logger log = Logger.getLogger(RegisterBlackCarBolt.class);
	    private Connection con = null;
	    private String tableName = "JJHMDBJ";

	    private void prepare(Map stormConf,TopologyContext context){
                try{
			con = DBUtil.getDataSource().getConnection();
		    }catch(SQLException el){
			el.printStackTrace();
		    }
	     }

	     public void execute(Tuple input,BasicOutputCollector collector){
		String json = (String)input.getValue(0);
		String[] tupleStrs = json.split(",");
		  try{
			String stmt = "insert into "+tableName+"("+TPID+","+JCDID+","+HMDID+","+CPHID+","+LRSJ+","+primaryKey+","+FQH+")                         values(?,?,?,?,?,?,?)";

			PreparedStatment prepstmt = con.prepareStatement(stmt);
			if(tupleStrs.length==5){
			     prepstmt.setString(1,tupleStrs[0]);
			     prepstmt.setString(2,tupleStrs[1]);
			     prepstmt.setString(3,tupleStrs[2]);
			     prepstmt.setString(4,tupleStrs[3]);
			     prepstmt.setTimestamp(5,new Timestamp((TimeUtil.string2datetime(tupleStrs[4])).getTime()));
			     prepstmt.setInt(6,1);
			     prepstmt.setInt(7,getPartNO(tupleStrs[4]));
		     }else{
			     log.error("tupple attribte size error!");
			   }
     			int r = prepstmt.executeUpdate();
			log.info("insert"+r+" row");
		     }catch(Exception e){
			 e.printStackTrace();
		     }
 		  }
 		……
		}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊flink的log.file配置

flink-release-1.6.2/flink-dist/src/main/flink-bin/conf/log4j.properties

50300
来自专栏开发与安全

《dive into python3》 笔记摘录

0、In Python 2, the / operator usually meant integer division, but you could make...

28500
来自专栏C/C++基础

TinyXML2读取和创建XML文件

TinyXML2是simple、small、efficient C++ XML文件解析库!方便易于使用,是对TinyXML的升级改写!源码见本人上传到CSDN的...

20710
来自专栏Jerry的SAP技术分享

使用SAP云平台 + JNDI访问Internet Service

以Internet Service http://maps.googleapis.com/maps/api/distancematrix/xml?origins...

35430
来自专栏ml

hduoj1073--Online Judge

做道题,并没有太多的技巧,关键在与对Accepted,presented error 和wa的判断,第一步如果两者完全一样,那么很定是AC了 ...

39470
来自专栏向治洪

ssh搭建开发环境

公司一直不是ssh零配置的框架,每次写action都要在applicationcontext和struts里面配置,好麻烦,最近有空,写了一个ssh零配置的框架...

256100
来自专栏Java开发者杂谈

jvm运行时环境属性一览

前言: 在web编程技术内幕中看到一个用apache组件进行文件下载的例子,对于DiskFileUpload类的setRepositoryPath方法,设置临时...

33270
来自专栏码匠的流水账

聊聊flink的SocketClientSink

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/data...

23830
来自专栏JadePeng的技术博客

Netty断线重连

Netty断线重连 最近使用Netty开发一个中转服务,需要一直保持与Server端的连接,网络中断后需要可以自动重连,查询官网资料,实现方案很简单,核心思想是...

51650
来自专栏张善友的专栏

MSBUILD 命令行编译的时候请注意msbuild文件名称或路经中空格导致出错

在使用MSBUILD 去编译msbuild文件的时候,如果这个方案或者项目的名称或者路经中间有空格符号,需要把这个方案或者项目整个用引号引起来,否则编译的时候会...

23150

扫码关注云+社区

领取腾讯云代金券