之前查阅源码啊,性能测试啊调优啊。。基本告一段落,项目也接近尾声,那么整理下spark所有配置参数与优化策略,方便以后开发与配置:
Spark安装配置与代码框架
spark-default.conf 配置
spark.executor.instance 参数,向Yarn申请创建的资源池实例数
spark.executor.cores 参数,每个container中所包含的core数量
spark.executor.memory 参数,每个资源池所具有的内存数
spark.dirver.memory 参数,driver端所占用的资源数
spark.storage.memoryFraction 参数,用于cache数据与计算数据集的内存使用比例
spark.kryoserializer.buffer.max 参数,序列化最大值,默认64M
spark.shuffle.consolidateFiles 参数,shuffle是否合并文件
spark.rdd.compress 参数,rdd是否进行压缩
spark.sql.shuffle.partitions 参数,shuffle过程中所创建的partition个数
spark.reducer.maxSizeInFlight 参数,设置shuffle read task的buffer缓冲大小,它将决定每次数据从远程的executors中拉取大小。这个拉取过程是由5个并行的request,从不同的executor中拉取过来,从而提升了fetch的效率。
spark.shuffle.io.retryWait 参数,每次拉取数据的等待间隔
spark.shuffle.manage 参数,使用hash,同时与参数spark.shuffle.consolidateFiles true并用。因为不需要对中间结果进行排序,同时合并中间文件的个数,从而减少打开文件的性能消耗,在spark2.0.2中不可直接配置hash,会报错,其他优化参数包括:Sort Shuffle、Tungsten Sort,这里我们要根据数据量进行选择,优缺点请参考本博客《Spark Shuffle详细过程》
spark.executor.heartbeatInterval 参数,与driver的通讯间隔,使driver知道executor有木有挂
spark.driver.maxResultSize 参数,所有分区的序列化结果的总大小限制
spark.yarn.am.cores 参数,在yarn-client模式下,申请Yarn App Master所用的CPU核数
spark.master 参数,选用的模式
spark.task.maxFailures 参数,task失败多少次后丢弃job(防止因为网络IO等问题失败,重新拉取)
spark.shuffle.file.buffer 参数,会增大Map任务的写磁盘前的cache缓存
spark-env.sh 配置
export HADOOP_CONF_DIR 参数,配置hadoop所在配置文件路径
export HADOOP_HOME 参数,配置hadoop Client的所在路径
export JAVA_HOME 参数,配置JAVA的环境变量地址
export SPARK_YARN_APP_NAME 参数,配置application的名称
export SPARK_LOG_DIR 参数,配置Spark log的输出路径
export SPARK_PID_DIR 参数,配置spark的Pid输出路径
将hive-site.xml文件放入spark的conf下 修改spark thrift port,使其与hive的thrift的port分离开来,同时配置mysql的数据源,因为hive的meta信息存在mysql中,以及配置meta指定的hdfs路径:
<property> <name>hive.server2.thrift.port</name> <value>10000</value> <description>Port number of HiveServer2 Thrift interface. Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description> </property>
<property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> </property>
<property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> <description>Driver class name for a JDBC metastore</description> </property>
<property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> <description>username to use against metastore database</description> </property>
<property> <name>javax.jdo.option.ConnectionPassword</name> <value>root</value> <description>password to use against metastore database</description> </property>
Spark动态资源分配测试:
spark.dynamicAllocation.cachedExecutorIdleTimeout 360000000 如果executor中有数据则不移除
spark.dynamicAllocation.executorIdleTimeout 60s executor空闲时间达到规定值,则将该executor移除
spark.dynamicAllocation.initialExecutors 3 如果所有的executor都移除了,重新请求时启动的初始executor数
spark.dynamicAllocation.maxExecutors 30 能够启动的最大executor数目
spark.dynamicAllocation.minExecutors 1 能够启动的最小executor数目
spark.dynamicAllocation.schedulerBacklogTimeout 1s task等待运行时间超过该值后开始启动executor
spark.dynamicAllocation.enabled True 开启动态参数配置
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 1s 启动executor的时间间隔
启动脚本:
/usr/local/spark-1.6.1/sbin/start-thriftserver.sh \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=2 \
--conf spark.dynamicAllocation.maxExecutors=30 \
--conf spark.dynamicAllocation.sustainedSchedulerBacklogTimeout = 5s \
--conf spark.dynamicAllocation.schedulerBacklogTimeout=1s \
--conf spark.dynamicAllocation.initialExecutors=2 \
--conf spark.dynamicAllocation.executorIdleTimeout=60s \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=360000000s \
--conf spark.driver.memory=50g
代码框架:
首先,我们引入需要依赖的包括hadoop、spark、hbase等jar包,pom.xml配置如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sparkApp</groupId>
<artifactId>sparkApp</artifactId>
<version>1.0-SNAPSHOT</version>
<inceptionYear>2008</inceptionYear>
<properties>
<scala.version>2.10.0</scala.version>
<spring.version>4.0.2.RELEASE</spring.version>
<hadoop.version>2.6.0</hadoop.version>
<jedis.version>2.8.1</jedis.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<!-- SPARK START -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.1</version>
</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<!-- SPARK END -->
<!-- HADOOP START -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- HADOOP END -->
<!-- hbase START -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>3.1.0-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
<version>1.0.2</version>
</dependency>
<!-- hbase END -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
</dependency>
<!-- REDIS START -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- REDIS END -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.26</version>
</dependency>
<!--<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-api-jdo</artifactId>
<version>3.2.6</version>
</dependency>
<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-core</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-rdbms</artifactId>
<version>3.2.9</version>
</dependency>-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>1.2.1</version>
</dependency>-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
<build>
<!-- <sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>-->
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
然后将集群中hive-site.xml、hdfs-site.xml、hbase-site.xml引入项目中。
编写HBase公共方法(部分代码):
1 package hbase
2
3 import java.util.{Calendar, Date}
4
5 import org.apache.hadoop.hbase.HBaseConfiguration
6 import org.apache.hadoop.hbase.client.{Result, Scan}
7 import org.apache.hadoop.hbase.filter._
8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
9 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
10 import org.apache.hadoop.hbase.protobuf.ProtobufUtil
11 import org.apache.hadoop.hbase.util.{Base64, Bytes}
12 import org.apache.spark.{Logging, SparkContext}
13 import org.apache.spark.rdd.RDD
14 import org.slf4j.{Logger, LoggerFactory}
15
16 /**
17 * Created by ysy on 2016/11/6.
18 */
19 object HBaseTableHelper extends Serializable {
20
21 val logger: Logger = LoggerFactory.getLogger(HBaseTableHelper.getClass)
22
23 //根据timestramp过滤加载Hbase数据
24 def tableInitByTime(sc : SparkContext,tablename:String,columns :String,fromdate: Date,todate:Date):RDD[(ImmutableBytesWritable,Result)] = {
25 val configuration = HBaseConfiguration.create()
26 configuration.set(TableInputFormat.INPUT_TABLE, tablename)
27
28 val scan = new Scan
29 scan.setTimeRange(fromdate.getTime,todate.getTime)
30 val column = columns.split(",")
31 for(columnName <- column){
32 scan.addColumn("f1".getBytes, columnName.getBytes)
33 }
34 configuration.set(TableInputFormat.SCAN, convertScanToString(scan))
35 val hbaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
36 logger.info("-------count-------" + hbaseRDD.count() + "------------------")
37 hbaseRDD
38 }
39
40 def convertScanToString(scan : Scan) = {
41 val proto = ProtobufUtil.toScan(scan)
42 Base64.encodeBytes(proto.toByteArray)
43 }
44
45 //根据时间条件filter数据
46 def tableInitByFilter(sc : SparkContext,tablename : String,columns : String,time : String) : RDD[(ImmutableBytesWritable,Result)] = {
47 val configuration = HBaseConfiguration.create()
48 configuration.set(TableInputFormat.INPUT_TABLE,tablename)
49 val filter: Filter = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new SubstringComparator(time))
50 val scan = new Scan
51 scan.setFilter(filter)
52 val column = columns.split(",")
53 for(columnName <- column){
54 scan.addColumn("f1".getBytes, columnName.getBytes)
55 }
56 val hbaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
57 logger.info("-------count-------" + hbaseRDD.count() + "------------------")
58 hbaseRDD
59 }
60
61 def HBaseTableInit(): Unit ={
62
63 }
64
65 def hbaseToHiveTable(): Unit ={
66
67 }
68
69 //前N天的时间戳获取
70 def getPassDays(beforeDay : Int): Date ={
71 val calendar = Calendar.getInstance()
72 var year = calendar.get(Calendar.YEAR)
73 var dayOfYear = calendar.get(Calendar.DAY_OF_YEAR)
74 var j = 0
75 for(i <- 0 to beforeDay){
76 calendar.set(Calendar.DAY_OF_YEAR, dayOfYear - j);
77 if (calendar.get(Calendar.YEAR) < year) {
78 //跨年了
79 j = 1;
80 //更新 标记年
81 year = year + 1;
82 //重置日历
83 calendar.set(year,Calendar.DECEMBER,31);
84 //重新获取dayOfYear
85 dayOfYear = calendar.get(Calendar.DAY_OF_YEAR);
86 }else{
87 j = j + 1
88 }
89 }
90 calendar.getTime()
91 }
92
93 //根据startRow与endRow进行过滤
94 def scanHbaseByStartAndEndRow(sc : SparkContext,startRow : String,stopRow : String,tableName : String) : RDD[(ImmutableBytesWritable,Result)] ={
95 val configuration = HBaseConfiguration.create()
96 val scan = new Scan()
97 scan.setCacheBlocks(false)
98 scan.setStartRow(Bytes.toBytes(startRow))
99 scan.setStopRow(Bytes.toBytes(stopRow))
100 val filterList = new FilterList()
101 filterList.addFilter(new KeyOnlyFilter())
102 filterList.addFilter(new InclusiveStopFilter(Bytes.toBytes(stopRow)))
103 scan.setFilter(filterList)
104 configuration.set(TableInputFormat.INPUT_TABLE,tableName)
105 configuration.set(TableInputFormat.SCAN, convertScanToString(scan))
106 val hbaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
107 logger.info("-------ScanHbaseCount-------" + hbaseRDD.count() + "------------------")
108 hbaseRDD
109 }
110
111 }
编写hive公共方法(部分代码):
1 package hive
2
3 import org.apache.spark.{Logging, SparkContext}
4 import org.apache.spark.rdd.RDD
5 import org.apache.spark.sql.Row
6 import org.apache.spark.sql.hive.HiveContext
7 import org.apache.spark.sql.types.{StringType, StructField, StructType}
8
9 /**
10 * Created by uatcaiwy on 2016/11/6.
11 */
12 object HiveTableHelper extends Logging {
13
14 def hiveTableInit(sc:SparkContext): HiveContext ={
15 val sqlContext = new HiveContext(sc)
16 sqlContext
17 }
18
19 def writePartitionTable(HCtx:HiveContext,inputRdd:RDD[Row],tabName:String,colNames:String):Unit ={
20 val schema = StructType(
21 colNames.split(" ").map(fieldName => StructField(fieldName,StringType,true))
22 )
23 val table = colNames.replace(" dt","").split(" ").map(name => name + " String").toList.toString().replace("List(","").replace(")","")
24 val df = HCtx.createDataFrame(inputRdd,schema)
25 df.show(20)
26 logInfo("----------------------------------begin write table-----------------------------------")
27 val temptb = "temp" + tabName
28 HCtx.sql("drop table if exists " + tabName)
29 df.registerTempTable(temptb)
30 HCtx.sql("CREATE EXTERNAL TABLE if not exists " + tabName +" ("+ table+ ") PARTITIONED BY (`dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileIn putFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'")
31 HCtx.sql("set hive.exec.dynamic.partition.mode = nonstrict")
32 HCtx.sql("insert overwrite table " + tabName + " partition(`dt`)" + " select * from " + temptb)
33 }
34 }
读取hdfs文件,有时我们需要根据文件的编码来读取,否则会乱码,并改变编码公共方法:
1 package importSplitFiletoHive
2
3 import org.apache.hadoop.io.{LongWritable, Text}
4 import org.apache.hadoop.mapred.TextInputFormat
5 import org.apache.spark.SparkContext
6 import org.apache.spark.rdd.RDD
7
8 /**
9 * Created by ysy on 2016/12/7.
10 */
11 object changeEncode {
12
13 def changeFileEncoding(sc:SparkContext,path:String,encode : String):RDD[String]={
14 sc.hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],classOf[Text],1)
15 .map(p => new String(p._2.getBytes,0,p._2.getLength,encode))
16 }
spark进行xml解析(部分代码):
1 import hive.HiveTableHelper
2 import org.apache.spark.Logging
3 import org.apache.spark.rdd.RDD
4 import org.apache.spark.sql.Row
5 import org.apache.spark.sql.hive.HiveContext
6 import org.apache.spark.sql.types.{StringType, StructField, StructType}
7 import org.slf4j.LoggerFactory
8
9 import scala.xml._
10
11 object xmlParse extends Logging{
12 val schemaString = "column1,column2...."
13 def getPBOC_V1_F1(HCtx:HiveContext,rdd:RDD[String],outputTablename:String):Unit = {
14 val tbrdd = rdd.filter(_.split("\t").length == 9)
15 .filter(_.split("\t")(8) != "RESULTS")
16 .map(data => {
17 val sp = data.split("\t")
18 val dt = sp(5).substring(0, 10).replaceAll("-", "")
19 ((sp(0), sp(1), sp(2), sp(3), sp(4), sp(5), sp(6), "RN"), sp(8), dt)
20 }).filter(_._2 != "")
21 .filter(_._2.split("<").length > 2)
22 .filter(data => !(data._2.indexOf("SingleQueryResultMessage0009") == -1 || data._2.indexOf("ReportMessage") == -1))
23 .map(data => {
24 val xml = if (XML.loadString(data._2) != null) XML.loadString(data._2) else null
25 logDebug("%%%%%%%%%%%%%%%%%%finding xml-1:" + xml + "%%%%%%%%%%%%%%%%%%")
26 val column1 = if ((xml \ "PBOC" \ "TYPE") != null) (xml \ "PBOC" \ "TYPE").text else "null"
27 val column2 = if ((xml \ "HEAD" \ "VER") != null) (xml \ "HEAD" \ "VER").text else "null"
28 val column3 = if ((xml \ "HEAD" \ "SRC") != null) (xml \ "HEAD" \ "SRC").text else "null"
29 val column4 = if ((xml \ "HEAD" \ "DES") != null) (xml \ "HEAD" \ "DES").text else "null"
30 ....
31 (data._1,column1,column2,column3...)
32 })
33 ROW(....)
34 HiveTableHelper.writePartitionTable(HCtx, tbrdd, outputTablename, schemaString)
Redis编码公共方法(部分代码):
1 package redis
2
3 import org.slf4j.{Logger, LoggerFactory}
4 import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
5
6 import scala.collection.mutable.ArrayBuffer
7 import scala.util.Random
8
9 /**
10 * Created by ysy on 2016/11/21.
11 */
12 object RedisClient extends Serializable{
13 val logger: Logger = LoggerFactory.getLogger(RedisClient.getClass)
14 @transient private var jedisPool : JedisPool = null
15 private var jedisPoolList = new ArrayBuffer[JedisPool]
16 private val poolSize = 0
17 makePool
18
19 def makePool() ={
20 val pc = new PropConfig("redis.properties");
21 if(jedisPool == null){
22 val poolConfig : JedisPoolConfig = new JedisPoolConfig
23 poolConfig.setMaxIdle(pc.getProperty("redis.pool.maxActive").toInt)
24 poolConfig.setMaxTotal(pc.getProperty("redis.pool.maxActive").toInt)
25 poolConfig.setMaxWaitMillis(pc.getProperty("redis.pool.maxWait").toInt)
26 poolConfig.setTestOnBorrow(pc.getProperty("redis.pool.testOnBorrow").toBoolean)
27 poolConfig.setTestOnReturn(pc.getProperty("redis.pool.testOnReturn").toBoolean)
28 val hosts = pc.getProperty("redis.pool.servers").split(",")
29 .map(data => data.split(":"))
30 for(host <- hosts){
31 jedisPool = new JedisPool(poolConfig,host(0),host(1).toInt,pc.getProperty("redis.server.timeout").toInt)
32 jedisPoolList += jedisPool
33 }
34 }
35
36 }
37
38
39 def getForString(key : String) : String = {
40 var value = ""
41 if(key != null && !key.isEmpty()){
42 val jedis = getJedis
43 value = jedis.get(key)
44 }
45 value
46 }
47
48 def setForString(key : String,value : String) ={
49 if(key != null && !key.isEmpty){
50 var jedis = getJedis
51 jedis.set(key,value)
52 }else{
53
54 }
55 }
56
57 def zexsist(key : String) : Boolean ={
58 var flag = false
59 if(key != null && !key.isEmpty){
60 val jedis = getJedis
61
62 val resultNum = jedis.zcard(key)
63 if("0".equals(resultNum.toLong)){
64 flag = true
65 }
66 }
67 flag
68 }
69
70 def getJedis() : Jedis ={
71 var ramNub = 0
72 if(poolSize == 1){
73 ramNub = 0
74 }else{
75 val random = new Random
76 ramNub = Math.abs(random.nextInt % 1)
77 }
78 jedisPool = jedisPoolList(ramNub)
79 jedisPool.getResource
80 }
81
82 def returnJedisResource(redis : Jedis): Unit ={
83 if(redis != null){
84 redis.close()
85 }
86 }
87
88 def close: Unit = {
89 for(jedisPool <- jedisPoolList){
90 jedisPool.close
91 }
92 if(jedisPool!=null && !jedisPool.isClosed()){
93 jedisPool.close
94 }else{
95 jedisPool=null
96 }
97 }
98
99 }
详细就不写了,那么完整的工程框架搭建完毕:
随后通过main方法创建sparkContext对象,开始数据分析与处理,在spark路径的bin目录下或者写成脚本文件执行:
./spark-submit --conf spark.ui.port=5566 --name "sparkApp" --master yarn-client --num-executors 3 --executor-cores 2 --executor-memory 10g --class impl.spark /usr/local/spark1.6.1/sparkApp/sparkApp.jar
(注意:这里的配置参数会覆盖spark-default.conf中配置的变量,重新声明spark.ui.port的原因也是因为在同时启动spark的thrfit的时候,提交submit会造成UI占用的问题,至此spark完结)
Hadoop安装配置与MapReduce代码框架
安装:
yum install gcc
yum install gcc-c++
yum install make
yum install autoconfautomake libtool cmake
yum install ncurses-devel
yum install openssl-devel
安装protoc(需用root用户)
tar -xvf protobuf-2.5.0.tar.bz2
cd protobuf-2.5.0
./configure --prefix=/opt/protoc/
make && make install
编译hadoop
mvn clean package -Pdist,native -DskipTests -Dtar
编译完的hadoop在 /home/hadoop/ocdc/hadoop-2.6.0-src/hadoop-dist/target 路径下
配置hosts文件
10.1.245.244 master
10.1.245.243 slave1
命令行输入 hostname master
免密码登录:
执行命令生成密钥: ssh-keygen -t rsa -P ""
进入文件夹cd .ssh (进入文件夹后可以执行ls -a 查看文件)
将生成的公钥id_rsa.pub 内容追加到authorized_keys(执行命令:cat id_rsa.pub >> authorized_keys)
core-site.xml
<configuration>
<!--指定hdfs的nameservice为ns1-->
<property>
<name>fs.defaultFS</name>
<value>hdfs://master</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>
<!--指定hadoop数据存放目录-->
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/ocdc/hadoop-2.6.0/tmp</value>
<description>Abasefor other temporary directories.</description>
</property>
<property>
<name>hadoop.proxyuser.spark.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.spark.groups</name>
<value>*</value>
</property>
</configuration>
<!--指定zookeeper地址-->
<property>
<name>ha.zookeeper.quorum</name>
<value>h4:2181,h5:2181,h6:2181</value>
</property>
</configuration>
hdfs-site.xml
<configuration>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>master:9001</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/home/hadoop/ocdc/hadoop-2.6.0/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/home/hadoop/ocdc/hadoop-2.6.0/data</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>ns1</value>
</property>
<!-- ns1下面有两个NameNode,分别是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.ns1</name>
<value>nn1,nn2</value>
</property>
</configuration>
yarn-site.xml
<configuration>
<!-- Site specific YARN configuration properties -->
<!-- 指定nodemanager启动时加载server的方式为shuffle server -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>master:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>master:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>master:8035</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>master:8033</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>master:8088</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>16384</value>
</property>
<!-- 指定resourcemanager地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>h3</value>
</property>
</configuration>
mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>master:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>master:19888</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>16384</value>
</property>
</configuration>
关于MapReduce,1年多前大家都觉得很神秘,其实 就相当于在Map阶段或者Reduce阶段中,进行数据的处理,也可以在Map中读取写入hbase、redis都可以~其实就相当于在MapReduce中写业务处理逻辑,代码如下:
1 public static class Map extends MapReduceBase implments Mapper<LongWritable,Text,Text,IntWritable>{
2 //设置常量1,用来形成<word,1>形式的输出
3 private fianll static IntWritable one = new IntWritable(1)
4 private Text word = new Text();
5
6 public void map(LongWritable key,Text value,OutputCollector<Text,output,Reporter reporter) throws IOException{
7 //hadoop执行map函数时为是一行一行的读取数据处理,有多少行,就会执行多少次map函数
8 String line = value.toString();
9 //进行单词的分割,可以多传入进行分割的参数
10 StringTokenizer tokenizer = new StringTokenizer(line);
11 //遍历单词
12 while(tokenizer.hasMoreTokens()){
13 //往Text中写入<word,1>
14 word.set(tokenizer.nextToken());
15 output.collect(word,one);
16 }
17 }
18 }
19 //需要注意的是,reduce将相同key值(这里是word)的value值收集起来,形成<word,list of 1>的形式,再将这些1累加
20 public static class Reduce extends MapReduceBase implements Reducer<Text IntWritable,Text,IntWritable>{
21 public void reduce(Text key,Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException{
22 //初始word个数设置
23 int sum = 0;
24 while(values,hasNext()){
25 //单词个数相加
26 sum += value.next().get();
27 }
28 output.collect(key,new IntWritbale(sum));
29 }
30 }
HBASE安装配置与hbase代码框架
由于我使用的是外置的zookeeper,所以这里HBASE_MANAGES_ZK设置为,设置参数:
export JAVA_HOME=/usr/local/yangsy/jdk1.7.0_55
export HBASE_CLASSPATH=/usr/local/hbase-1.0.2/conf
export HBASE_MANAGES_ZK=false
hbase-site.xml
<configuration>
//设置将数据写入hdfs的目录 <property> <name>hbase.rootdir</name> <value>hdfs://master:9000/usr/local/hadoop-2.6.0/hbaseData</value> </property>
//设置hbase的模式为集群模式 <property> <name>hbase.cluster.distributed</name> <value>true</value> </property>
//设置hbase的master端口地址
<property> <name>hbase.master</name> <value>hdfs://master:60000</value> </property>
//HBase Master Web借鉴绑定的默认端口,默认为0.0.0.0
<property> <name>hbase.master.info.port</name> <value>60010</value> </property>
//设置zookeeper的连接地址(必须为基数个)
<property> <name>hbase.zookeeper.property.clientPort</name> <value>2183</value> </property>
//zookeeper的节点
<property> <name>hbase.zookeeper.quorum</name> <value>master,slave1,slave2</value> </property>
//zookeeper数据地址
<property> <name>hbase.zookeeper.property.dataDir</name> <value>/usr/local/zookeeper-3.4.6/data</value> </property>
//zookeeper连接超时时间
<property> <name>zookeeper.session.timeout</name>
<value>60000</value> </property>
</configuration>
这里要注意的是,如果选择外置的zookeeper集群,则需要将zookeeper的zoo.cfg拷贝至HBase的conf下。在启动HBase时,将会自动加载该配置文件。
regionServers中配置regionserver节点的地址
代码结构:
1 package HbaseTest;
2
3 import akka.io.Tcp;
4 import org.apache.hadoop.conf.Configuration;
5 import org.apache.hadoop.hbase.*;
6 import org.apache.hadoop.hbase.client.*;
7
8 import java.util.ArrayList;
9 import java.util.List;
10
11 /**
12 * Created by root on 5/30/16.
13 */
14 public class HbaseTest {
15 private Configuration conf;
16 public void init(){
17 conf = HBaseConfiguration.create();
18 }
19
20 public void createTable(){
21 Connection conn = null;
22 try{
23 conn = ConnectionFactory.createConnection(conf);
24 HBaseAdmin hadmin = (HBaseAdmin)conn.getAdmin();
25 HTableDescriptor desc = new HTableDescriptor("TableName".valueOf("yangsy"));
26
27 desc.addFamily(new HColumnDescriptor("f1"));
28 if(hadmin.tableExists("yangsy")){
29 System.out.println("table is exists!");
30 System.exit(0);
31 }else{
32 hadmin.createTable(desc);
33 System.out.println("create table success");
34 }
35 }catch (Exception e){
36 e.printStackTrace();
37 }finally {
38 {
39 if(null != conn){
40 try{
41 conn.close();
42 }catch(Exception e){
43 e.printStackTrace();
44 }
45 }
46 }
47 }
48 }
49
50 public void query(){
51 Connection conn = null;
52 HTable table = null;
53 ResultScanner scan = null;
54 try{
55 conn = ConnectionFactory.createConnection(conf);
56 table = (HTable)conn.getTable(TableName.valueOf("yangsy"));
57
58 scan = table.getScanner(new Scan());
59
60 for(Result rs : scan){
61 System.out.println("rowkey:" + new String(rs.getRow()));
62
63 for(Cell cell : rs.rawCells()){
64 System.out.println("column:" + new String(CellUtil.cloneFamily(cell)));
65
66 System.out.println("columnQualifier:"+new String(CellUtil.cloneQualifier(cell)));
67
68 System.out.println("columnValue:" + new String(CellUtil.cloneValue(cell)));
69
70 System.out.println("----------------------------");
71 }
72 }
73 }catch(Exception e){
74 e.printStackTrace();
75 }finally{
76 try {
77 table.close();
78 if(null != conn) {
79 conn.close();
80 }
81 }catch (Exception e){
82 e.printStackTrace();
83 }
84 }
85 }
86
87 public void queryByRowKey(){
88 Connection conn = null;
89 ResultScanner scann = null;
90 HTable table = null;
91 try {
92 conn = ConnectionFactory.createConnection(conf);
93 table = (HTable)conn.getTable(TableName.valueOf("yangsy"));
94
95 Result rs = table.get(new Get("1445320222118".getBytes()));
96 System.out.println("yangsy the value of rokey:1445320222118");
97 for(Cell cell : rs.rawCells()){
98 System.out.println("family" + new String(CellUtil.cloneFamily(cell)));
99 System.out.println("value:"+new String(CellUtil.cloneValue(cell)));
100 }
101 }catch (Exception e){
102 e.printStackTrace();
103 }finally{
104 if(null != table){
105 try{
106 table.close();
107 }catch (Exception e){
108 e.printStackTrace();
109 }
110 }
111 }
112 }
113
114 public void insertData(){
115 Connection conn = null;
116 HTable hTable = null;
117 try{
118 conn = ConnectionFactory.createConnection(conf);
119 hTable = (HTable)conn.getTable(TableName.valueOf("yangsy"));
120
121 Put put1 = new Put(String.valueOf("1445320222118").getBytes());
122
123 put1.addColumn("f1".getBytes(),"Column_1".getBytes(),"123".getBytes());
124 put1.addColumn("f1".getBytes(),"Column_2".getBytes(),"456".getBytes());
125 put1.addColumn("f1".getBytes(),"Column_3".getBytes(),"789".getBytes());
126
127 Put put2 = new Put(String.valueOf("1445320222119").getBytes());
128
129 put2.addColumn("f1".getBytes(),"Column_1".getBytes(),"321".getBytes());
130 put2.addColumn("f1".getBytes(),"Column_2".getBytes(),"654".getBytes());
131 put2.addColumn("f1".getBytes(),"Column_3".getBytes(),"987".getBytes());
132
133 List<Put> puts = new ArrayList<Put>();
134 puts.add(put1);
135 puts.add(put2);
136 hTable.put(puts);
137 }catch(Exception e){
138 e.printStackTrace();
139 }finally{
140 try {
141 if (null != hTable) {
142 hTable.close();
143 }
144 }catch(Exception e){
145 e.printStackTrace();
146 }
147 }
148 }
149
150 public static void main(String args[]){
151 HbaseTest test = new HbaseTest();
152 test.init();
153 test.createTable();
154 test.insertData();
155 test.query();
156 }
157
158
159 }
Storm安装配置与代码框架
拓扑构造: 编写topology实体类,在构造方法中加入配置参数,序列化等。通过CommandlLine获取启动时的workers与calc数量,最终调用StormSubmitter的submitTopologyWithProgressBar,传入topo的名称,配置项,以及TopologyBuilder实例。
数据接收Spout:
Storm从kafka中获取数据,创建于BasicTopology,其中配置参数:
kafka.brokerZkStr Kafka使用的zookeeper服务器地址
kafka.brokerZkPath 保存offset的zookeeper服务器地址
kafka.offset.zkPort 保存offset的zookeeper端口 默认2181
kafka.offset.zkRoot 保存offset的zookeeper路径 /kafka-offset
stateUpdateIntervalMs 把offset信息写入zookeeper的间隔时间 30000
spout、bolt初始化时,构建对象stormBeanFactory,其后使用getBean方法从BeanFactory中获取对象
Bolt数据处理:
自定义bolt继承自extendsBaseRichBolt,实现它的prepare、declareOutputFileds方法。在prepare方法中,获取StormBeanFactory的配置,加载业务实体类。
Storm配置参数:
dev.zookeeper.path : '/tmp/dev-storm-zookeeper' 以dev.zookeeper.path配置的值作为本地目录,以storm.zookeeper.port配置的值作为端口,启动一个新的zookeeper服务 drpc.childopts: '-Xms768m' drpc.invocations.port 3773 drpc.port : 3772 drpc.queue.size : 128 drpc.request.timeout.secs : 600 drpc.worker.threads : 64 java.library.path : '' logviewer.appender.name : 'A1' logviewer.childopts : '-Xms128m' logviewr.port : 8000 metrics.reporter.register : 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter' nimbus.childopts : '-Xmx1024m -javaagent:/usr/hdp/current/storm-nimbus/contrib/storm-jmxetric/lib/jmxetric-1.0.4.jar=host=localhost,port=8649,wireformat31x-true...' nimbus.cleanup.inbox.freq.secs : 600 nimbus.file.copy.exiration.secs : 600
storm.yaml
nimbus.host nimbus的配置地址
nimbus.inbox.jar.expiration.secs 主要负责清理nimbus的inbox文件夹最后一次修改时间,默认3600秒
nimbus.monitor.freq.secs : 120 nimbus.reassign : true 当发现task失败时nimbus是否重新分配执行。默认为真,不建议修改 nimbus.supervisor.timeout.secs : 60 nimbus.task.launch.secs : 120 nimbus.task.timeout.secs : 30 心跳超时时间,超时后nimbus会认为task死掉并重分配给另一个地址 nimbus.thrift.max_buffer_size : 1048576 nimbus.thrift.port : 6627 nimbus.topology.validator : 'backtype.storm.nimbus.DefaultTopologyValidator' storm.cluster.metrics.consumer.register : [{"class" : "org.apache.hadoop.metrics2.sink.storm.Strorm.StormTimeLineMetricsReporter"}] storm.cluster.mode : 'distributed' storm.local.dir : '/storage/disk1/hadoop/storm' storm.local.mode.zmq : false storm.log.dir : '/var/log/storm' storm.messaging.netty.buffer_size : 5242880 为每次批量发送的Tuple 序列化之后的Task Message 消息的大小 storm.messaging.netty.client_worker_threads : 10 指定netty服务器工作线程数量 storm.messaging.netty.max_retries : 60 指定最大重试次数 storm.messaging.netty.max_wait_ms : 2000 指定最大等待时间(毫秒) storm.messaging.netty.min_wait_ms : 100 指定最小等待时间(毫秒) storm.messaging.netty.server_worker_threads : 10 指定netty服务器工作线程数量 storm.messaging.transport : 'backtype.storm.messaging.netty.Context' 指定传输协议 storm.thrift.transport : 'backtype.storm.security.auth.SimpleTransportPlugin' storm.zookeeper.connection.timeout : 15000 storm.zookeeper.port : 2181 storm.zookeeper.retry.interval : 1000 storm.zookeeper.retry.intervalceiling.millis : 30000 storm.zookeeper.retry.times : 5 storm.zookeeper.root : '/storm' ZooKeeper中Storm的根目录位置 storm.zookeeper.servers : ['',''.....] zookeeper服务器列表 storm.zookeeper.session.timeout : 20000 客户端连接ZooKeeper超时时间 supervisor.childopts : 在storm-deploy项目中使用,用来配置supervisor守护进程的jvm选项 supervisor.heartbeat.frequency.secs : 5 supervisor心跳发送频率(多久发送一次) supervisor.monitor.frequency.secs : 3 supervisor检查worker心跳的频率 supervisor.slots.ports : [6700,6701,....] supervisor上能够运行workers的端口列表.每个worker占用一个端口,且每个端口只运行一个worker.通过这项配置可以调整每台机器上运行的worker数.(调整slot数/每机) supervisor.worker.start.timeout.secs : 120 supervisor.worker.timeout.secs : 30 supervisor中的worker心跳超时时间,一旦超时supervisor会尝试重启worker进程. task.heartbeat.frequency.secs : 3 task汇报状态心跳时间间隔 task.refresh.poll.secs : 10 ask与其他tasks之间链接同步的频率.(如果task被重分配,其他tasks向它发送消息需要刷新连接).一般来讲,重分配发生时其他tasks会理解得到通知。该配置仅仅为了防止未通知的情况。 topology.acker.executors : null topology.builtin.metrics.bucket.size.secs : 60 topology.debug : false topology.disruptor.wait.strategy : 'com.lmax.disruptor.BlockingWaitStrategy' topology.enable.message.timeouts : true topology.error.throttle.interval.secs : 10 topology.executor.receive.buffer.size : 1024 topology.executor.send.buffer.size : 1024 topology.fall.back.on.java.serialization : true topology.kryo.factory : 'backtype.storm.serialization.DefaultKryoFactory' topology.max.error.report.per.interval : 5 topology.max.spout.pending : null 一个spout task中处于pending状态的最大的tuples数量.该配置应用于单个task,而不是整个spouts或topology. topology.max.task.parallelism : null 在一个topology中能够允许的最大组件并行度.该项配置主要用在本地模式中测试线程数限制. topology.message.timeout.secs : 30 topology中spout发送消息的最大处理超时时间.如果一条消息在该时间窗口内未被成功ack,Storm会告知spout这条消息失败。而部分spout实现了失败消息重播功能。 topology.metrics.aggregate.metric.evict.secs : 5 topology.metrics.aggregate.per.worker : true topology.metrics.consumer.register : topology.metrics.expand.map.type : true topology.metrics.metric.name.separator : ',' topology.optimize : true topology.skip.missing.kryo.registrations : false Storm是否应该跳过它不能识别的kryo序列化方案.如果设置为否task可能会装载失败或者在运行时抛出错误. topology.sleep.spout.wait.strategy.time.ms : 1 topology.spoout.wait.strategy : 'backtype.storm.spout.SleeSpoutWaitStrategy' topology.state.synchronization.timeout.secs : 60 topology.stats.sample.rate : 0.05 topology.tick.tuple.freq.secs : null topology.transfer.buffer.size : 1024 topology.trident.batch.emit.interval.millis : 500 topology.tuple.serializer : 'backtype.storm.serialization.types.ListDelegateSerializer' topology.worker.childopts : null topology.worker.shared.thread.pool.size : 4 topology.workers : 40 执行该topology集群中应当启动的进程数量.每个进程内部将以线程方式执行一定数目的tasks.topology的组件结合该参数和并行度提示来优化性能 transactional.zookeeper.port : null transactional.zookeeper.root : '/transactional' transactional.zookeeper.servers : null ui.childopts : '-Xmx2048m' ui.filter : null ui.port : 8744 Storm UI的服务端口 worker.childopts : worker.heartbeet.frequency.secs : 1 zmq.hwm : 0 zmq.linger.millis : 5000 当连接关闭时,链接尝试重新发送消息到目标主机的持续时长.这是一个不常用的高级选项,基本上可以忽略. zmq.threads : 1 每个worker进程内zeromq通讯用到的线程数
storm代码框架总结:
基类topology,BasicTopology.java
1 import java.io.UnsupportedEncodingException;
2 import java.math.BigDecimal;
3 import java.util.Date;
4 import java.util.HashMap;
5 import java.util.List;
6 import java.util.Map;
7
8 import org.apache.commons.cli.CommandLine;
9 import org.apache.commons.cli.CommandLineParser;
10 import org.apache.commons.cli.DefaultParser;
11 import org.apache.commons.cli.HelpFormatter;
12 import org.apache.commons.cli.Options;
13 import org.apache.commons.lang3.StringUtils;
14 import org.apache.storm.guava.base.Preconditions;
15 import org.edm.storm.topo.util.DelayKafkaSpout;
16 import org.edm.storm.topo.util.StormBeanFactory;
17 import org.joda.time.DateTime;
18
19 import storm.kafka.BrokerHosts;
20 import storm.kafka.KafkaSpout;
21 import storm.kafka.SpoutConfig;
22 import storm.kafka.ZkHosts;
23 import backtype.storm.Config;
24 import backtype.storm.LocalCluster;
25 import backtype.storm.StormSubmitter;
26 import backtype.storm.topology.TopologyBuilder;
27 import backtype.storm.tuple.Fields;
28
29 public abstract class BasicTopology {
30
31 public static final String HASH_TAG = "hashTag";
32
33 public static final Fields HASH_FIELDS = new Fields(HASH_TAG);
34
35 protected Options options = new Options();
36
37 protected StormBeanFactory stormBeanFactory;
38
39 protected Config config = new Config();
40
41 protected String configFile;
42
43 public BasicTopology(){
44 config.setFallBackOnJavaSerialization(false);
45
46 config.setSkipMissingKryoRegistrations(false);
47 config.registerSerialization(Date.class);
48 config.registerSerialization(BigDecimal.class);
49 config.registerSerialization(HashMap.class);
50 config.registerSerialization(Map.class);
51
52 options.addOption("name", true, "拓扑运行时名称");
53 options.addOption("conf", false, "配置文件路径");
54 options.addOption("workers", true, "虚拟机数量");
55 }
56
57 protected void setupConfig(CommandLine cmd) throws UnsupportedEncodingException{
58 //配置文件名称
59 String confLocation = cmd.getOptionValue("conf",getConfigName());
60 //创建stormBeanFactory
61 stormBeanFactory = new StormBeanFactory(confLocation);
62 Map<String,Object> stormConfig = stormBeanFactory.getBean("stormConfig",Map.class);
63 Preconditions.checkNotNull(stormConfig);
64 config.putAll(stormConfig);
65 config.put(StormBeanFactory.SPRING_BEAN_FACTORY_XML, stormBeanFactory.getXml());
66 //先默认加载,然后再加载命令行
67 String numWorkers = cmd.getOptionValue("workers");
68 if(numWorkers != null){
69 config.setNumWorkers(Integer.parseInt(numWorkers));
70 }else{
71 config.setNumWorkers(getNumWorkers());
72 }
73 config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 180);
74 config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2000);
75 config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
76 config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
77 config.put(Config.TOPOLOGY_ACKER_EXECUTORS, config.get(Config.TOPOLOGY_WORKERS));
78 }
79
80 @SuppressWarnings("unchecked")
81 protected SpoutConfig getSpoutConfig(String topic){
82 String brokerZkStr = (String)config.get("kafka.brokerZkStr");
83 String brokerZkPath = (String)config.get("kafka.brokerZkPath");
84
85 List<String> zkServers = (List<String>)config.get("kafka.offset.zkServers");
86 Integer zkPort = Integer.parseInt(String.valueOf(config.get("kafka.offset.zkPort")));
87 String zkRoot = (String)config.get("kafka.offset.zkRoot");
88 String id = StringUtils.join(getTopoName(),"-",topic);
89 BrokerHosts kafkaBrokerZk = new ZkHosts(brokerZkStr, brokerZkPath);
90 SpoutConfig spoutConfig = new SpoutConfig(kafkaBrokerZk, topic, zkRoot, id);
91 spoutConfig.zkServers = zkServers;
92 spoutConfig.zkPort = zkPort;
93 spoutConfig.zkRoot = zkRoot;
94 spoutConfig.stateUpdateIntervalMs = 30000;
95 return spoutConfig;
96 }
97
98 //创建kafkaspout
99 public KafkaSpout getKafkaSpout(String topic){
100 SpoutConfig spoutConfig = getSpoutConfig(topic);
101 return new DelayKafkaSpout(spoutConfig);
102 }
103
104
105 /**
106 * 拓扑可部署多次,但从kafka获取数据,做唯一次过滤等用的
107 *
108 * @return
109 */
110 public abstract String getTopoName();
111
112 public abstract String getConfigName();
113
114 public abstract int getNumWorkers();
115
116 public void registerKryo(Config config){
117
118 }
119
120 public abstract void addOptions(Options options);
121
122 public abstract void setupOptionValue(CommandLine cmd);
123
124 public abstract void createTopology(TopologyBuilder builder);
125
126 public void runLocat(String[] args) throws Exception{
127 CommandLineParser parser = new DefaultParser();
128 CommandLine cmd = parser.parse(options, args);
129 HelpFormatter formatter = new HelpFormatter();
130 formatter.printHelp("topology", options);
131 setupConfig(cmd);
132
133 config.setDebug(true);
134 config.setNumWorkers(1);
135 TopologyBuilder builder = new TopologyBuilder();
136 createTopology(builder);
137 LocalCluster cluster = new LocalCluster();
138 String topoName = cmd.getOptionValue("name",
139 StringUtils.join(getTopoName(), "-", new DateTime().toString("yyyyMMdd-HHmmss")));
140 cluster.submitTopology(topoName,config,builder.createTopology());
141
142 }
143
144 public void run(String args[]) throws Exception{
145 CommandLineParser parser = new DefaultParser();
146 CommandLine cmd = parser.parse(options, args);
147 HelpFormatter formatter = new HelpFormatter();
148 formatter.printHelp("topology", options);
149 setupConfig(cmd);
150 setupOptionValue(cmd);
151
152 TopologyBuilder builder = new TopologyBuilder();
153 createTopology(builder);
154 String topoName = cmd.getOptionValue("name",StringUtils.join(getTopoName(),new DateTime().toString("yyyyMMdd-HHmmss")));
155 StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.createTopology());
156 }
157 }
用于实现数据流spout,bolt的业务实现Topology:
1 import org.apache.commons.cli.CommandLine;
2 import org.apache.commons.cli.Options;
3
4 import storm.kafka.KafkaSpout;
5
6 import backtype.storm.topology.TopologyBuilder;
7
8 import com.practice.impl.BasicTopology;
9
10 public class LbsCalcTopology extends BasicTopology{
11
12 private int spoutParallelism = 1;
13
14 private int onceParallelism = 1;
15
16 private int calculateParallelism = 1;
17
18
19 @Override
20 public void addOptions(Options options) {
21 options.addOption("spout",true,"spoutParallelism");
22 options.addOption("once",true,"onceParallelism");
23 options.addOption("calc",true,"calculateParallelism");
24 }
25
26 @Override
27 public void setupOptionValue(CommandLine cmd) {
28 spoutParallelism = Integer.parseInt(cmd.getOptionValue("spout","1"));
29 onceParallelism = Integer.parseInt(cmd.getOptionValue("once","1"));
30 calculateParallelism = Integer.parseInt(cmd.getOptionValue("calc","1"));
31 }
32
33 @Override
34 public void createTopology(TopologyBuilder builder) {
35 KafkaSpout kafkaSpout = getKafkaSpout("lbs");
36 LbsOnceBolt exactlyOnceBolt = new LbsOnceBolt(getTopoName(),"lbs","lbs");
37 LbsCalcBolt calculateBolt = new LbsCalcBolt();
38
39 builder.setSpout("kafkaSpout", kafkaSpout,spoutParallelism);
40 builder.setBolt("onceParallelism", exactlyOnceBolt,onceParallelism).shuffleGrouping("kafkaSpout");
41 builder.setBolt("calculateBolt", calculateBolt, calculateParallelism).fieldsGrouping("onceParallelism", BasicTopology.HASH_FIELDS);
42 }
43
44 @Override
45 public String getTopoName() {
46 return "lbs";
47 }
48
49 @Override
50 public String getConfigName() {
51 return "lbs-topology.xml";
52 }
53
54 @Override
55 public int getNumWorkers() {
56 return 3;
57 }
58
59 public static void main(String args[]) throws Exception{
60 LbsCalcTopology topo = new LbsCalcTopology();
61 topo.run(args);
62 }
63 }
具体业务时间bolt:
1 import java.util.Map;
2
3 import backtype.storm.task.OutputCollector;
4 import backtype.storm.task.TopologyContext;
5 import backtype.storm.topology.OutputFieldsDeclarer;
6 import backtype.storm.topology.base.BaseRichBolt;
7 import backtype.storm.tuple.Tuple;
8
9 public class LbsCalcBolt extends BaseRichBolt{
10
11 @Override
12 public void execute(Tuple input) {
13 try{
14 String msg = input.getString(0);
15 if(msg != null){
16 System.out.println(msg);
17 }
18 }catch(Exception e){
19 e.printStackTrace();
20 }
21 }
22
23 @Override
24 public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
25 // TODO Auto-generated method stub
26
27 }
28
29 @Override
30 public void declareOutputFields(OutputFieldsDeclarer arg0) {
31 // TODO Auto-generated method stub
32
33 }
34
35 }
Kafka安装配置与代码框架
1、主要配置参数说明 -- 日志数据存储目录 log.dirs = /storage/disk17/kafka-logs -- Kafka server端口 port = 6667 -- 强制切分日志文件的间隔时间(切分的文件大小由log.segment.bytes决定,默认1073741824 bytes) log.roll.hours = 168 -- 日志文件保留时间 log.retention.hours = 168 -- 每次flush到磁盘的最大消息数量,数量太大会让写磁盘时间过长(IO阻塞),导致客户端发生延迟 log.flush.interval.messages=10000 -- 每次flush消息的间隔时间 log.flush.scheduler.interval.ms=3000 -- zookeeper 连接超时时间 zookeeper.connection.timeout.ms = 20000 -- zookeeper 集群的节点地址,多个以,号分隔 zookeeper.connect=127.0.0.1:2181 -- zooKeeper集群中leader和follower之间的同步时间 zookeeper.sync.time.ms=2000 -- 是否允许自动创建主题 auto.create.topics.enable=true -- 默认复制因子数量 default.replication.factor=1 -- 默认分区数量 num.partitions=1 2、消息以及日志数据的存储策略 kafka和JMS(Java Message Service)实现(activeMQ)不同的是:即使消息被消费,消息仍然不会被立即删除,日志文件将会根据broker中的配置要求,保留一定的时间之后删除,比如log.retention.hours=168,那么七天后,文件会被清除,无论其中的消息是否被消费。kafka通过这种简单的手段,来释放磁盘空间,以及减少消息消费之后对文件内容改动的磁盘IO开销。 对于consumer而言,它需要保存消费消息的offset(如果不指定则存储在默认的zookeeper目录kafak-broker),对于offset的保存和使用,由consumer来控制。当consumer正常消费消息时,offset将会"线性"的向前驱动,即消息将依次顺序被消费。事实上consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值。 kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息由zookeeper保存。因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响。 3、常用操作命令 3.1、创建消息主题(kafka topic) ./kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic topicName --partitions 2 --replication-factor 3 --create --zookeeper zookeeper 服务端地址 --topic 消息主题名称 --partitions 分区数量(如果不指定数量则由Kafka的配置决定) --replication-factor 复制因子数量(副本数量,如果不指定数量则由Kafka的配置决定) --create 创建主题 3.2、删除消息主题 * 慎用,这条命令只是删除zookeeper中主题的元数据,日志文件并不会删除,不建议直接删除日志文件。如需删除,安全的措施是停止kafka集群服务,然后执行这条命令再删除日志文件,最后恢复服务,恢复服务后执行--describe命令(3.3)检查主题是否已经移除! ./kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper 127.0.0.1:2181 --topic topicName 3.3、查看消息主题 ./kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic topicName --describe --zookeeper zookeeper 服务端地址 --topic 消息主题名称 ----describe 主题信息 命令将输出如下信息,如topicName主题有2个分区,复制因子为3,以及主题所在的leader。 Topic:topicName PartitionCount:2 ReplicationFactor:3 Configs: Topic: topicName Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: topicName Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 3.4、其他命令 -- 列出所有的主题 ./kafka-topics.sh --zookeeper 127.0.0.1:2181 --list -- 修改主题分区数量 ./kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic topicName --partitions 4 --alter
Redis安装配置与代码框架
Jettry配置与代码框架
Jetty服务一共启动8个线程进行数据的接入。存储于bLOCKINGqUEUE中,进行消费,启启动多个线程的作用在于并发消费接入的报文数据。同时,同一事件启动多个服务,通过nginx进行负载均衡。
一、Jetty服务的启动
1、创建ApplicationContext,通过AbstractApplicationContext创建。 AbstractApplicationContext applicationContext = new FileSystemXmlApplicationContext(configXmlFile);
2、创建PlatformMBeanServer MBeanServer mbs = java.lang.management.ManagementFactory.getPlatformMBeanServer();
3、注册ApplicationServer
ObjectName mbeanName = getApplicationObjectName();
ApplicationServer applicationServer = new ApplicationServer();
....
mbs.registerMBean(applicationServer,mbeanNamer);
二、Jettry服务的停止 ......
HIVE安装配置与代码框架
Rstudio与Spark安装配置与代码框架
Nginx配置
user www-data; #运行用户 worker_processes 1; #启动进程,通常设置成和cpu的数量相等 error_log /var/log/nginx/error.log; #全局错误日志及PID文件 pid /var/run/nginx.pid;#PID文件
#工作模式及连接数上限 events { use epoll; #epoll是多路复用IO(I/O Multiplexing)中的一种方式,但是仅用于linux2.6以上内核,可以大大提高nginx的性能 worker_connections 1024;#单个后台worker process进程的最大并发链接数 # multi_accept on; }
#设定http服务器,利用它的反向代理功能提供负载均衡支持 http { #设定mime类型,类型由mime.type文件定义 include /etc/nginx/mime.types; default_type application/octet-stream; #设定日志格式 access_log /var/log/nginx/access.log;
#sendfile 指令指定 nginx 是否调用 sendfile 函数(zero copy 方式)来输出文件,对于普通应用, #必须设为 on,如果用来进行下载等应用磁盘IO重负载应用,可设置为 off,以平衡磁盘与网络I/O处理速度,降低系统的uptime. sendfile on; #tcp_nopush on;
#连接超时时间 #keepalive_timeout 0; keepalive_timeout 65; tcp_nodelay on; #开启gzip压缩 gzip on; gzip_disable "MSIE [1-6]\.(?!.*SV1)";
#设定请求缓冲 client_header_buffer_size 1k; large_client_header_buffers 4 4k;
include /etc/nginx/conf.d/*.conf; include /etc/nginx/sites-enabled/*;
#设定负载均衡的服务器列表 upstream mysvr { #weigth参数表示权值,权值越高被分配到的几率越大 #本机上的Squid开启3128端口 server 192.168.8.1:3128 weight=5; server 192.168.8.2:80 weight=1; server 192.168.8.3:80 weight=6; }
server { #侦听80端口 listen 80; #定义使用www.xx.com访问 server_name www.xx.com;(虚拟主机ip)
#设定本虚拟主机的访问日志 access_log logs/www.xx.com.access.log main;
#默认请求 location / { root /root; #定义服务器的默认网站根目录位置 index index.php index.html index.htm; #定义首页索引文件的名称
fastcgi_pass www.xx.com; fastcgi_param SCRIPT_FILENAME $document_root/$fastcgi_script_name; include /etc/nginx/fastcgi_params; }
# 定义错误提示页面 error_page 500 502 503 504 /50x.html; location = /50x.html { root /root; }
#静态文件,nginx自己处理 location ~ ^/(images|javascript|js|css|flash|media|static)/ { root /var/www/virtual/htdocs; #过期30天,静态文件不怎么更新,过期可以设大一点,如果频繁更新,则可以设置得小一点。 expires 30d; } #PHP 脚本请求全部转发到 FastCGI处理. 使用FastCGI默认配置. location ~ \.php$ { root /root; fastcgi_pass 127.0.0.1:9000; fastcgi_index index.php; fastcgi_param SCRIPT_FILENAME /home/www/www$fastcgi_script_name; include fastcgi_params; } #设定查看Nginx状态的地址 location /NginxStatus { stub_status on; access_log on; auth_basic "NginxStatus"; auth_basic_user_file conf/htpasswd; } #禁止访问 .htxxx 文件 location ~ /\.ht { deny all; } } }
Sentry配置
Hue配置(3.11版本)
hue安装 rpm -ivh hue-* --force --nodeps rpm -ivh sentry-* --force --nodeps
hue 启动 /usr/lib/hue/build/env/bin/hue runserver ip:port > /var/log/hue/hue-start.out 2> /var/log/hue/hue-error.out &
spark livy 集成 hue sqlited is locked desktopdefault_hdfs_superuserhadoopHDFS管理用户desktophttp_host10.10.4.125Hue Web Server所在主机/IPdesktophttp_port8000Hue Web Server服务端口desktopserver_userhadoop运行Hue Web Server的进程用户desktopserver_grouphadoop运行Hue Web Server的进程用户组desktopdefault_useryanjunHue管理员
1. 新建hue用户 #usradd hue 将包放到/home/hue/底下(可以自己选择位置) #rpm -ivh hue-* --force --nodeps 安装完成之后,hue的目录在/usr/lib/hue,配置文件目录在/etc/hue/conf/hue.ini 2. 配置hue(/etc/hue/conf/hue.ini) [beeswax] # Host where HiveServer2 is running. # If Kerberos security is enabled, use fully-qualified domain name (FQDN). hive_server_host=172.19.189.50(hiveserver2的地址) # Port where HiveServer2 Thrift server runs on. hive_server_port=10010(hiveserver2端口) # Hive configuration directory, where hive-site.xml is located hive_conf_dir=/etc/conf(hive配置文件路径) # Timeout in seconds for thrift calls to Hive service server_conn_timeout=120(连接hive超时时间) # Choose whether to use the old GetLog() thrift call from before Hive 0.14 to retrieve the logs. # If false, use the FetchResults() thrift call from Hive 1.0 or more instead. ## use_get_log_api=false # Limit the number of partitions that can be listed. list_partitions_limit=10000(不限制时查询界面上list的partition数据) # The maximum number of partitions that will be included in the SELECT * LIMIT sample query for partitioned tables. query_partitions_limit=10(查询语句中最多查询10个partition) # A limit to the number of cells (rows * columns) that can be downloaded from a query # (e.g. - 10K rows * 1K columns = 10M cells.) # A value of -1 means there will be no limit. download_cell_limit=10000000(download最大量) # Hue will try to close the Hive query when the user leaves the editor page. # This will free all the query resources in HiveServer2, but also make its results inaccessible. close_queries=true(关闭界面时直接断开该用户与hiveserver2的连接) # Thrift version to use when communicating with HiveServer2. # New column format is from version 7. thrift_version=7(默认值,不建议改会报错) [[yarn_clusters]] [[[default]]] # Enter the host on which you are running the ResourceManager resourcemanager_host= (RM地址) # The port where the ResourceManager IPC listens on resourcemanager_port=8032(RM端口) # Whether to submit jobs to this cluster submit_to=True # Resource Manager logical name (required for HA) logical_name=ActiveResourceManager(HA,给activeRM命名) # Change this if your YARN cluster is Kerberos-secured ## security_enabled=false # URL of the ResourceManager API resourcemanager_api_url=http://172.19.xxx.xx:8088(RM URL) # URL of the ProxyServer API ## proxy_api_url=http://localhost:8088 # URL of the HistoryServer API history_server_api_url=http://172.19.xx.xx:19888 # URL of the Spark History Server #spark_history_server_url=http://172.30.xx.xx:18088 # In secure mode (HTTPS), if SSL certificates from YARN Rest APIs # have to be verified against certificate authority ## ssl_cert_ca_verify=True # HA support by specifying multiple clusters. # Redefine different properties there. # e.g. [[[ha]]] # Resource Manager logical name (required for HA) logical_name=StandbyResourceManager(RM的HA从节点name) # Un-comment to enable submit_to=True # URL of the ResourceManager API resourcemanager_api_url=http://172.19.189.50:8088(RM从节点地址) [hadoop] # Configuration for HDFS NameNode # ------------------------------------------------------------------------ [[hdfs_clusters]] # HA support by using HttpFs [[[default]]] # Enter the filesystem uri fs_defaultfs=hdfs://hdp(HDFS的filesystem名字) # NameNode logical name. logical_name=MasterNamenode(NN name) # Use WebHdfs/HttpFs as the communication mechanism. # Domain should be the NameNode or HttpFs host. # Default port is 14000 for HttpFs. webhdfs_url=http://172.19.189.50:50070/webhdfs/v1(hue的获取hdfs的nn HA是通过WebHDFS来切换的,这个地址是启动Webhdfs的地址) # Change this if your HDFS cluster is Kerberos-secured ## security_enabled=false # In secure mode (HTTPS), if SSL certificates from YARN Rest APIs # have to be verified against certificate authority ## ssl_cert_ca_verify=True # Directory of the Hadoop configuration hadoop_conf_dir=/etc/hadoop/conf(hadoop配置文件路径) [desktop] # Set this to a random string, the longer the better. # This is used for secure hashing in the session store. ## secret_key= # Execute this script to produce the Django secret key. This will be used when # 'secret_key' is not set. ## secret_key_script= # Webserver listens on this address and port http_host=0.0.0.0 http_port=8888 # Time zone name time_zone=Asia/Shanghai # Enable or disable Django debug mode. #django_debug_mode=false # Enable or disable database debug mode. #database_logging=false # Whether to send debug messages from JavaScript to the server logs. #send_dbug_messages=true # Enable or disable backtrace for server error http_500_debug_mode=true # Enable or disable memory profiling. ## memory_profiler=false # Server email for internal error messages ## django_server_email='hue@localhost.localdomain' # Email backend ## django_email_backend=django.core.mail.backends.smtp.EmailBackend # Webserver runs as this user server_user=hue server_group=hadoop(hue界面登录用户) # This should be the Hue admin and proxy user default_user=hue(hue管理员用户) # This should be the hadoop cluster admin default_hdfs_superuser=hue(hue在hdfs上的管理员用户) [[database]] #engine=sqlite3 #name=/var/lib/hue/desktop.db # Database engine is typically one of: # postgresql_psycopg2, mysql, sqlite3 or oracle. # Note that for sqlite3, 'name', below is a path to the filename. For other backends, it is the database name # Note for Oracle, options={"threaded":true} must be set in order to avoid crashes. # Note for Oracle, you can use the Oracle Service Name by setting "host=" and "port=" and then "name=<host>:<port>/<service_name>". # Note for MariaDB use the 'mysql' engine. engine=mysql host=172.30.115.60 port=3306 user=hue password=hue name=hue (hue的元数据存储位置) # Execute this script to produce the database password. This will be used when 'password' is not set. ## password_script=/path/script ## name=desktop/desktop.db ## options={} [librdbms] # The RDBMS app can have any number of databases configured in the databases # section. A database is known by its section name # (IE sqlite, mysql, psql, and oracle in the list below). [[databases]] # sqlite configuration. ## [[[sqlite]]] # Name to show in the UI. ## nice_name=SQLite # For SQLite, name defines the path to the database. ## name=/tmp/sqlite.db # Database backend to use. ## engine=sqlite # Database options to send to the server when connecting. # https://docs.djangoproject.com/en/1.4/ref/databases/ ## options={} # mysql, oracle, or postgresql configuration. [[[mysql]]](开启mysql) # Name to show in the UI. nice_name="HUE DB"(界面显示mysql的名字) # For MySQL and PostgreSQL, name is the name of the database. # For Oracle, Name is instance of the Oracle server. For express edition # this is 'xe' by default. name=hue(连接的mysql数据库) # Database backend to use. This can be: # 1. mysql # 2. postgresql # 3. oracle engine=mysql(连接引擎) # IP or hostname of the database to connect to. host=172.30.115.60(mysql地址) # Port the database server is listening to. Defaults are: # 1. MySQL: 3306 # 2. PostgreSQL: 5432 # 3. Oracle Express Edition: 1521 port=3306(端口) # Username to authenticate with when connecting to the database. user=hue(连接数据库的用户) # Password matching the username to authenticate with when # connecting to the database. password=hue(连接数据库的密码) # Database options to send to the server when connecting. # https://docs.djangoproject.com/en/1.4/ref/databases/ ## options={} 修改hue源码 1. /usr/lib/hue/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py 添加调用query_server方法: def _get_db(self, snippet):
LOG.info('log by ck get_db') if snippet['type'] == 'hive':
name = 'beeswax'
elif snippet['type'] == 'impala':
name = 'impala'
else:
name = 'spark-sql'
return dbms.get_by_type(self.user, 'data', query_server=get_query_server_config_for_meta(name=name)) 修改计算(exector)的执行方法 @query_error_handler def execute(self, notebook, snippet): db = self._get_db(snippet)
statement = self._get_current_statement(db, snippet) session = self._get_session(notebook, snippet['type']) query = self._prepare_hql_query(snippet, statement['statement'], session)
try: if statement.get('statement_id') == 0: db.use(query.database) handle = db.client.query(query) except QueryServerException, ex: raise QueryError(ex.message, handle=statement)
# All good server_id, server_guid = handle.get() response = { 'secret': server_id, 'guid': server_guid, 'operation_type': handle.operation_type, 'has_result_set': handle.has_result_set, 'modified_row_count': handle.modified_row_count, 'log_context': handle.log_context, } response.update(statement)
return response 2. /usr/lib/hue/apps/beeswax/src/beeswax/server/dbms.py 添加query执行的路径:(修改ip为spark的ip和端口) def get_query_server_config_for_meta(name='beeswax', server=None):
if name == 'impala':
from impala.dbms import get_query_server_config as impala_query_server_config
query_server = impala_query_server_config()
else:
kerberos_principal = hive_site.get_hiveserver2_kerberos_principal(HIVE_SERVER_HOST.get())
query_server = {
'server_name': 'beeswax', # Aka HiveServer2 now
'server_host': '172.30.115.58',#HIVE_SERVER_HOST.get(),
'server_port': '10010',#HIVE_SERVER_PORT.get(),
'principal': kerberos_principal,
'http_url': '%(protocol)s://%(host)s:%(port)s/%(end_point)s' % {
'protocol': 'https' if hiveserver2_use_ssl() else 'http',
'host': '172.30.115.58',#HIVE_SERVER_HOST.get(),
'port': hive_site.hiveserver2_thrift_http_port(),
'end_point': hive_site.hiveserver2_thrift_http_path()
},
'transport_mode': 'http' if hive_site.hiveserver2_transport_mode() == 'HTTP' else 'socket',
'auth_username': AUTH_USERNAME.get(),
'auth_password': AUTH_PASSWORD.get()
}
if name == 'sparksql': # Spark SQL is almost the same as Hive
from spark.conf import SQL_SERVER_HOST as SPARK_SERVER_HOST, SQL_SERVER_PORT as SPARK_SERVER_PORT
query_server.update({
'server_name': 'sparksql',
'server_host': SPARK_SERVER_HOST.get(),
'server_port': SPARK_SERVER_PORT.get()
})
debug_query_server = query_server.copy()
debug_query_server['auth_password_used'] = bool(debug_query_server.pop('auth_password'))
LOG.info("Query Server: %s" % debug_query_server)
return query_server 添加query的cache locking DBMS_CACHE = {}
DBMS_CACHE_LOCK = threading.Lock()
DBMS_META_CACHE = {}
DBMS_META_CACHE_LOCK = threading.Lock()
DBMS_DATA_CACHE = {}
DBMS_DATA_CACHE_LOCK = threading.Lock()
def get(user, query_server=None):
global DBMS_CACHE
global DBMS_CACHE_LOCK
if query_server is None:
query_server = get_query_server_config()
DBMS_CACHE_LOCK.acquire()
try:
DBMS_CACHE.setdefault(user.username, {})
if query_server['server_name'] not in DBMS_CACHE[user.username]:
# Avoid circular dependency
from beeswax.server.hive_server2_lib import HiveServerClientCompatible
if query_server['server_name'] == 'impala':
from impala.dbms import ImpalaDbms
from impala.server import ImpalaServerClient
DBMS_CACHE[user.username][query_server['server_name']] = ImpalaDbms(HiveServerClientCompatible(ImpalaServerClient(query_server, user)), QueryHistory.SERVER_TYPE[1][0])
else:
from beeswax.server.hive_server2_lib import HiveServerClient
DBMS_CACHE[user.username][query_server['server_name']] = HiveServer2Dbms(HiveServerClientCompatible(HiveServerClient(query_server, user)), QueryHistory.SERVER_TYPE[1][0])
return DBMS_CACHE[user.username][query_server['server_name']]
finally:
DBMS_CACHE_LOCK.release()
def get_by_type(user, type, query_server=None):
global DBMS_META_CACHE
global DBMS_META_CACHE_LOCK
global DBMS_DATA_CACHE
global DBMS_DATA_CACHE_LOCK
if query_server is None:
query_server = get_query_server_config()
if type == 'meta':
DBMS_META_CACHE_LOCK.acquire()
try:
DBMS_META_CACHE.setdefault(user.username, {})
if query_server['server_name'] not in DBMS_META_CACHE[user.username]:
# Avoid circular dependency
from beeswax.server.hive_server2_lib import HiveServerClientCompatible
if query_server['server_name'] == 'impala':
from impala.dbms import ImpalaDbms
from impala.server import ImpalaServerClient
DBMS_META_CACHE[user.username][query_server['server_name']] = ImpalaDbms(HiveServerClientCompatible(ImpalaServerClient(query_server, user)), QueryHistory.SERVER_TYPE[1][0])
else:
from beeswax.server.hive_server2_lib import HiveServerClient
DBMS_META_CACHE[user.username][query_server['server_name']] = HiveServer2Dbms(HiveServerClientCompatible(HiveServerClient(query_server, user)), QueryHistory.SERVER_TYPE[1][0])
return DBMS_META_CACHE[user.username][query_server['server_name']]
finally:
DBMS_META_CACHE_LOCK.release()
else:
DBMS_DATA_CACHE_LOCK.acquire()
try:
DBMS_DATA_CACHE.setdefault(user.username, {})
if query_server['server_name'] not in DBMS_DATA_CACHE[user.username]:
# Avoid circular dependency
from beeswax.server.hive_server2_lib import HiveServerClientCompatible
if query_server['server_name'] == 'impala':
from impala.dbms import ImpalaDbms
from impala.server import ImpalaServerClient
DBMS_DATA_CACHE[user.username][query_server['server_name']] = ImpalaDbms(HiveServerClientCompatible(ImpalaServerClient(query_server, user)), QueryHistory.SERVER_TYPE[1][0])
else:
from beeswax.server.hive_server2_lib import HiveServerClient
DBMS_DATA_CACHE[user.username][query_server['server_name']] = HiveServer2Dbms(HiveServerClientCompatible(HiveServerClient(query_server, user)), QueryHistory.SERVER_TYPE[1][0])
return DBMS_DATA_CACHE[user.username][query_server['server_name']]
finally:
DBMS_DATA_CACHE_LOCK.release() 3. /usr/lib/hue/apps/filebrowser/src/filebrowser/templates/listdir.mako 删除filebrower中的delete和drop < !-- ko if: $root.isS3 --> < button
class ="btn fileToolbarBtn delete-link" title="${_('Delete forever')}" data-bind="enable: selectedFiles().length > 0, click: deleteSelected" > < i class ="fa fa-bolt" > < / i > ${_('Delete forever')} < / button >
< !-- / ko -->
< ul
class ="dropdown-menu" >
< li > < a
href = "#"
class ="delete-link" title="${_('Delete forever')}" data-bind="enable: selectedFiles().length > 0, click: deleteSelected" > < i class ="fa fa-bolt" > < / i > ${_('Delete forever')} < / a > < / li >
< / ul > 4. /usr/lib/hue/apps/filebrowser/src/filebrowser/templates/listdir_components.mako < li > < a href = "#" class ="delete-link" title="${_('Delete forever')}" data-bind="enable: $root.selectedFiles().length > 0, click: $root.deleteSelected" > < i class ="fa fa-fw fa-bolt" > < / i > ${_('Delete forever')} < / a > < / li > 5. /usr/lib/hue/apps/filebrowser/src/filebrowser/views.py request.fs.do_as_user(request.user, request.fs.rmtree, arg['path'], True) # the old code was ('skip_trash' in request.GET) 修改hdfs配置 hdfs-site.xml文件 修改:fs.permissions.umask-mode 000 (hue建表用户为web登录用户,在用spark做计算引擎时会在hdfs的表目录中创建文件,后台spark的用户为hive无法写入) 添加:添加hue用户为hdfs代理用户(superuser) hadoop.proxyuser.hue.hosts * hadoop.proxyuser.hue.groups *