简单抽样一般分为:
RandomSampling - 随机采样 StratifiedSampling - 分层采样 WeightedSampling - 权重采样
计算逻辑
样例:
https://www.cnblogs.com/itboys/p/9801489.html
pyspark 样例:
https://www.it1352.com/1933988.html
from pyspark.sql.functions import lit
list = [(2147481832,23355149,1),(2147481832,973010692,1),(2147481832,2134870842,1),(2147481832,541023347,1),(2147481832,1682206630,1),(2147481832,1138211459,1),(2147481832,852202566,1),(2147481832,201375938,1),(2147481832,486538879,1),(2147481832,919187908,1),(214748183,919187908,1),(214748183,91187908,1)]
df = spark.createDataFrame(list, ["x1","x2","x3"])
df.show()
df.count()
df.groupBy("x1").count().show()
fractions = df.select("x1").distinct().withColumn("fraction", lit(0.8)).rdd.collectAsMap()
print(fractions)
# {2147481832: 0.8, 214748183: 0.8}
sampled_df = df.stat.sampleBy("x1", fractions, seed)
sampled_df.show()
sampled_df.count()
# 9
sampled_df.groupBy("x1").count().show()
参考:
一个集合里有 n 个元素,每个元素有不同的权重,现在要不放回地随机抽取 m 个元素,每个元素被抽中的概率为元素的权重占总权重的比例。
https://www.jianshu.com/p/ece671db9813
https://lotabout.me/2018/Weighted-Random-Sampling/ https://stackoverflow.com/questions/48558131/sampling-with-weight-using-pyspark https://www.codenong.com/44352986/
https://stackoverflow.com/questions/31633117/spark-is-sample-method-on-dataframes-uniform-sampling
There are a few code paths here:
If withReplacement = false && fraction > .4 then it uses a souped up random number generator (rng.nextDouble() <= fraction) and lets that do the work. This seems like it would be pretty uniform. If withReplacement = false && fraction <= .4 then it uses a more complex algorithm (GapSamplingIterator) that also seems pretty uniform. At a glance, it looks like it should be uniform also If withReplacement = true it does close to the same thing, except it can duplicate by the looks of it, so this looks to me like it would not be as uniform as the first two
特别注意的是,sample 函数用来随机抽样,主要是给dataset 用的。sampleBy 是用来做分层抽样的,主要是给dataframe 用的。
spark scala最新版文档:
spark scala老版本的文档:
pyspark rdd 文档:
pyspark dataframe 文档:
def sampleBy[T](col: String, fractions: Map[T, Double], seed: Long): DataFrame
Returns a stratified sample without replacement based on the fraction given on each stratum.
val df = spark.createDataFrame(Seq((1, 1), (1, 2), (2, 1), (2, 1), (2, 3), (3, 2),
(3, 3))).toDF("key", "value")
val fractions = Map(1 -> 1.0, 3 -> 0.5)
df.stat.sampleBy("key", fractions, 36L).show()
+---+-----+
|key|value|
+---+-----+
| 1| 1|
| 1| 2|
| 3| 2|
+---+-----+
def sample(self, withReplacement=None, fraction=None, seed=None):
"""Returns a sampled subset of this :class:`DataFrame`.
.. versionadded:: 1.3.0
Parameters
----------
withReplacement : bool, optional
Sample with replacement or not (default ``False``).
fraction : float, optional
Fraction of rows to generate, range [0.0, 1.0].
seed : int, optional
Seed for sampling (default a random seed).
Notes
-----
This is not guaranteed to provide exactly the fraction specified of the total
count of the given :class:`DataFrame`.
`fraction` is required and, `withReplacement` and `seed` are optional.
Examples
--------
>>> df = spark.range(10)
>>> df.sample(0.5, 3).count()
7
>>> df.sample(fraction=0.5, seed=3).count()
7
>>> df.sample(withReplacement=True, fraction=0.5, seed=3).count()
1
>>> df.sample(1.0).count()
10
>>> df.sample(fraction=1.0).count()
10
>>> df.sample(False, fraction=1.0).count()
10
"""
# For the cases below:
# sample(True, 0.5 [, seed])
# sample(True, fraction=0.5 [, seed])
# sample(withReplacement=False, fraction=0.5 [, seed])
is_withReplacement_set = \
type(withReplacement) == bool and isinstance(fraction, float)
# For the case below:
# sample(faction=0.5 [, seed])
is_withReplacement_omitted_kwargs = \
withReplacement is None and isinstance(fraction, float)
# For the case below:
# sample(0.5 [, seed])
is_withReplacement_omitted_args = isinstance(withReplacement, float)
if not (is_withReplacement_set
or is_withReplacement_omitted_kwargs
or is_withReplacement_omitted_args):
argtypes = [
str(type(arg)) for arg in [withReplacement, fraction, seed] if arg is not None]
raise TypeError(
"withReplacement (optional), fraction (required) and seed (optional)"
" should be a bool, float and number; however, "
"got [%s]." % ", ".join(argtypes))
if is_withReplacement_omitted_args:
if fraction is not None:
seed = fraction
fraction = withReplacement
withReplacement = None
seed = int(seed) if seed is not None else None
args = [arg for arg in [withReplacement, fraction, seed] if arg is not None]
jdf = self._jdf.sample(*args)
return DataFrame(jdf, self.sql_ctx)
根据每个层上给定的分数返回分层样本,不进行替换。
def sampleBy(self, col, fractions, seed=None):
"""
Returns a stratified sample without replacement based on the
fraction given on each stratum.
.. versionadded:: 1.5.0
Parameters
----------
col : :class:`Column` or str
column that defines strata
.. versionchanged:: 3.0
Added sampling by a column of :class:`Column`
fractions : dict
sampling fraction for each stratum. If a stratum is not
specified, we treat its fraction as zero.
seed : int, optional
random seed
Returns
-------
a new :class:`DataFrame` that represents the stratified sample
Examples
--------
>>> from pyspark.sql.functions import col
>>> dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
>>> sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
>>> sampled.groupBy("key").count().orderBy("key").show()
+---+-----+
|key|count|
+---+-----+
| 0| 3|
| 1| 6|
+---+-----+
>>> dataset.sampleBy(col("key"), fractions={2: 1.0}, seed=0).count()
33
"""
if isinstance(col, str):
col = Column(col)
elif not isinstance(col, Column):
raise ValueError("col must be a string or a column, but got %r" % type(col))
if not isinstance(fractions, dict):
raise ValueError("fractions must be a dict but got %r" % type(fractions))
for k, v in fractions.items():
if not isinstance(k, (float, int, str)):
raise ValueError("key must be float, int, or string, but got %r" % type(k))
fractions[k] = float(v)
col = col._jc
seed = seed if seed is not None else random.randint(0, sys.maxsize)
return DataFrame(self._jdf.stat().sampleBy(col, self._jmap(fractions), seed), self.sql_ctx)
DataFrame/Dataset 转 RDD: val rdd1=testDF.rdd
val rdd2=testDS.rdd
RDD 转 DataFrame: // 一般用元组把一行的数据写在一起,然后在toDF中指定字段名
import spark.implicits._
val testDF = rdd.map {line=>
(line._1,line._2)
}.toDF(“col1”,“col2”)
RDD 转 Dataet: // 核心就是要定义case class
import spark.implicits._
case class Coltest(col1:String, col2:Int)
val testDS = rdd.map{line=>Coltest(line._1,line._2)}.toDS
DataSet 转 DataFrame: // 这个转换简单,只是把 case class 封装成Row
import spark.implicits._
val testDF = testDS.toDF
DataFrame 转 DataSet: // 每一列的类型后,使用as方法(as方法后面还是跟的case class,这个是核心),转成Dataset。
import spark.implicits._
case class Coltest … …
val testDS = testDF.as[Coltest]
特别注意:
在使用一些特殊操作时,一定要加上import spark.implicits._ 不然toDF、toDS无法使用
今天学习了一招,发现DataFrame 转换为DataSet 时候比较讨厌,居然需要动态写个case class 其实不需要
可以这么写:
df_dataset = df.asInstanceOf[Dataset[_]]
https://gist.github.com/frne/391b809e3528efe6aac718e1a64f4603 https://gist.github.com/yoyama/ce83f688717719fc8ca145c3b3ff43fd