首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >使用Java添加每行多个任意星火列

使用Java添加每行多个任意星火列
EN

Stack Overflow用户
提问于 2022-09-10 06:32:35
回答 1查看 52关注 0票数 2

我想出了如何获得一个Spark来提前返回已知的固定数量的列。但是,Spark如何返回每个行可能不同的任意数量的列呢?

我在Java17中使用Spark3.3.0,假设我有一个包含100万人的DataFrame。对于每个人,我想查找每年的工资(例如从一个数据库),但每个人可能有不同年份的薪水。如果我知道有2020年和2021年的时间,我会这样做:

代码语言:javascript
运行
AI代码解释
复制
StructType salarySchema = createStructType(List.of(createStructField("salary2020",
    createDecimalType(12, 2), true), createStructField("salary2021",
    createDecimalType(12, 2), true)));
UserDefinedFunction lookupSalariesForId = udf((String id) -> {
  // TODO look up salaries
  return RowFactory.create(salary2020, salary2021);
}, salarySchema).asNondeterministic();

df = df.withColumn("salaries", lookupSalariesForId.apply(col("id")))
    .select("*", "salaries.*");

这是Spark的迂回方式,可以将多个值从UDF加载到单个列中,然后将它们拆分为单独的列。

那么,如果一个人只有2003年和2004年的工资,而另一个人的薪水是2007年、2008年和2009年的,那该怎么办?我想为第一个人创建salary2003salary2004列,然后为第二个人创建salary2007salary2008salary2009列。我要怎么用UDF来实现呢?(我知道如何动态创建一个数组,以便通过RowFactory.create()传回。问题是与UDF模式相关的模式是在UDF逻辑之外定义的。)

还是有什么更好的方法来解决火花呢?我是否应该为每个可能的薪资年创建一个单独的查找DataFrame,只包含个人in和一列,然后以某种方式加入它们,就像我们在关系数据库世界中所做的那样?但是,一个单独的DataFrame会给我什么好处,难道我不会回到原点去构建它吗?当然,我可以在Java中手动构建它,但是我无法从Spark引擎、并行执行器等获得好处。

简而言之,根据每个行的标识符(),为现有的DataFrame中的每一行动态添加任意数量的列的最佳方式是什么?

EN

回答 1

Stack Overflow用户

发布于 2022-09-10 12:29:23

您可以从UDF返回一个映射:

代码语言:javascript
运行
AI代码解释
复制
MapType salarySchema2 = createMapType(StringType, createDecimalType(12, 2));
UserDefinedFunction lookupSalariesForId2 = udf((String id) -> {
    //the Java map containg the result of the UDF
    Map<String, BigDecimal> result = new HashMap<String, BigDecimal>();
    //Generate some random test data
    Random r = new Random();
    int years = r.nextInt(10) + 1; // max 10 years
    int startYear = r.nextInt(5) + 2010;
    for (int i = 0; i < years; i++) {
        result.put("salary" + (startYear + i), new BigDecimal(r.nextDouble() * 1000));
    }
    return result;
}, salarySchema2).asNondeterministic();

df = df.withColumn("salaries2", lookupSalariesForId2.apply(col("id"))).cache();
df.show(false);

输出:

代码语言:javascript
运行
AI代码解释
复制
+---+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |salaries2                                                                                                                                                |
+---+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|1  |{salary2014 -> 333.74}                                                                                                                                   |
|2  |{salary2010 -> 841.83, salary2011 -> 764.24, salary2012 -> 703.35, salary2013 -> 727.06, salary2014 -> 314.52}                                           |
|3  |{salary2012 -> 770.90, salary2013 -> 790.92}                                                                                                             |
|4  |{salary2011 -> 696.24, salary2012 -> 420.56, salary2013 -> 566.10, salary2014 -> 160.99}                                                                 |
|5  |{salary2011 -> 60.59, salary2012 -> 313.57, salary2013 -> 770.82, salary2014 -> 641.90, salary2015 -> 776.13, salary2016 -> 145.28, salary2017 -> 216.02}|
|6  |{salary2011 -> 842.02, salary2012 -> 565.32}                                                                                                             |
+---+---------------------------------------------------------------------------------------------------------------------------------------------------------+

第二行中出现cache的原因是第二部分:使用一些sql函数可以获得映射列中所有键的(sql函数)集合。然后,可以使用该集合为每年创建一个列:

代码语言:javascript
运行
AI代码解释
复制
Collection<String> years = JavaConverters.asJavaCollection((WrappedArray<String>)
        df.withColumn("years", functions.map_keys(col("salaries2")))
                .agg(functions.array_sort(
                        functions.array_distinct(
                                functions.flatten(
                                        functions.collect_set(col("years"))))))
                .first().get(0));

List<Column> salaries2 = years.stream().map((year) -> 
    col("salaries2").getItem(year).alias(year)).collect(Collectors.toList());
salaries2.add(0, col("id"));

df.select(salaries2.toArray(new Column[0])).show();

输出:

代码语言:javascript
运行
AI代码解释
复制
+---+----------+----------+----------+----------+----------+----------+----------+----------+
| id|salary2010|salary2011|salary2012|salary2013|salary2014|salary2015|salary2016|salary2017|
+---+----------+----------+----------+----------+----------+----------+----------+----------+
|  1|      null|      null|      null|      null|    333.74|      null|      null|      null|
|  2|    841.83|    764.24|    703.35|    727.06|    314.52|      null|      null|      null|
|  3|      null|      null|    770.90|    790.92|      null|      null|      null|      null|
|  4|      null|    696.24|    420.56|    566.10|    160.99|      null|      null|      null|
|  5|      null|     60.59|    313.57|    770.82|    641.90|    776.13|    145.28|    216.02|
|  6|      null|    842.02|    565.32|      null|      null|      null|      null|      null|
+---+----------+----------+----------+----------+----------+----------+----------+----------+

收集所有地图中的所有年份可能需要在大型数据集上花费一些时间,因为Spark必须先处理所有UDF调用,然后收集映射键。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73672625

复制
相关文章
对map集合进行排序
今天做统计时需要对X轴的地区按照地区代码(areaCode)进行排序,由于在构建XMLData使用的map来进行数据统计的,所以在统计过程中就需要对map进行排序。
java思维导图
2018/11/30
1.8K0
对map集合进行排序
如何对Scala中集合(Collections)进行排序
文章标题: 《如何对Scala中集合(Collections)进行排序》 本文链接: http://www.iteblog.com/archives/1171 下面是一系列对 Scala 中的Lists、Array进行排序的例子,数据结构的定义如下: // data structures working with val s = List( "a", "d", "F", "B", "e") val n = List(3, 7, 2, 1, 5) val m = Map( -2 -> 5,
Albert陈凯
2018/04/04
1.9K0
使用Comparable和Comparator对Java集合对象进行排序
在现实生活中,我们可能会遇到需要对集合内的对象进行排序的场景,比如,有一个游戏得分排行榜,如先按照分数的高低由高到低排序,在分数相同的情况下,按照记录创建的时间由早到新的顺序排序。
孟君
2019/09/09
5.5K0
对大文件字符进行计数
但是当文件过大时,会报错,显示空间不足: sort: write failed: /tmp/sortbDyE0W: No space left on device
生信编程日常
2020/10/10
6300
Java 使用Collections.reverse对list集合进行降序排序
今天无意中搜了一下Collections.reverse这个方法,结果发现有些人对它的误解蛮深的。下面是一个有百万访问量博主写的,reverse可以对指定列表进行降序排序,可是自己输出的结果都不是降序。
我是李超人
2020/08/21
2.4K0
Java 使用Collections.reverse对list集合进行降序排序
用JavaScript对GridView进行上移下移,保存排序
<script type=”text/javascript” language=”javascript”>         //移动排序         function Move(part)         {           var count= document.getElementById(‘HidCount’).value;           var temp=0;           var index=0;           var indexid=0      
全栈程序员站长
2021/07/28
4810
python对字典进行排序
标准的python字典是无序的。即使对(键、值)对进行了排序,也无法以保留排序的方式将它们存储在dict中。
IT工作者
2022/07/07
2K0
Delphi对TStrings进行排序
最近在做一个Delphi的对接第三方支付的接口,接口签名机制模仿微信的签名方式,把参数按ascii码进行排序后再加上key进行md5的加密,因为调用接口的的Post里面的参数是TStrings类型的,但是在TStrings类型里面没有Sorted排序这个方法。所以我自己重写了一个函数用于处理这个。
Vaccae
2019/07/25
1.5K0
Laravel多对多关系详解【文章 - 标签】
今天弄了一天的关于文章的功能,其中主要卡在文章与标签的多对多的关系纠结中。卡了半天,终于算是解决了,不是很完美,但可以。
无道
2019/11/13
1.9K0
Laravel多对多关系详解【文章 - 标签】
LUA对Map进行排序
Lua中最常见的数据结构就是Table, 用Table表示Map很容易, 但早期Lua没有提供一个针对Map数据结构的排序方法,下面用Moonscript实现了一个Map型数据结构排序函数方法。
糖果
2019/11/20
3.5K0
计数排序
算法思想 编辑 计数排序对输入的数据有附加的限制条件: 1、输入的线性表的元素属于有限偏序集S; 2、设输入的线性表的长度为n,|S|=k(表示集合S中元素的总数目为k),则k=O(n)。 在这两个条件下,计数排序的复杂性为O(n)。 计数排序的基本思想是对于给定的输入序列中的每一个元素x,确定该序列中值小于x的元素的个数(此处并非比较各元素的大小,而是通过对元素值的计数和计数值的累加来确定)。一旦有了这个信息,就可以将x直接存放到最终的输出序列的正确位置上。例如,如果输入序列中只有17个元素的值小于x
attack
2018/04/12
1.1K0
计数排序
计数排序属于非比较排序算法类,故其时间复杂度不受比较排序算法时间复杂度下界的限制,可以达到 。其中, 为待排序序列的排序关键字的最大范围。 计数排序是稳定的、非原址的。
hotarugali
2022/03/01
2K0
排序算法 --- 计数排序
前面说的那些排序算法,都是要通过比较来实现的。排序还能不通过比较来实现?是的,计数排序就是这么神奇。
贪挽懒月
2020/10/10
5710
计数排序
计数排序和原来说过的几个排序算法有一个特别大的不同之处:它是一个不基于比较的排序算法。不管是快排,归并,还是堆排,它们都难以突破NlogN的运行时间下限,而计数排序是一个线性时间级别的排序算法。对NlogN的突破凭借的就是不基于比较对元素进行排序,当然了,它也有很大的局限性,比如它只能对整数进行排序。总之,计数排序是一种对整数进行排序非常有效的排序算法。 计数排序的思想就是记录每个元素出现的次数,通过数组下标确定每个元素的先后关系。比如对数组A{2,5,6,8,4,2,5,4,8,6}进行排序
naget
2019/07/03
7910
计数排序
计数排序
通常的排序算法是要进行元素之间的比较,而计数排序是记录下每个元素出现的个数,是一种空间换时间的排序方法。适合整数数组排序,并且不同元素个数不宜过多。
三猫
2020/11/03
7950
C++ 对vector进行排序
title: C++ vector排序 tags: c++,vector,排序 grammar_cjkRuby: true --- 每次都要重复造轮子真的很累,所以用别人的吧。 目的:对vector进行排序 示例: 记得将 algorithm 这个头文件包括进去 #include <iostream> #include <vector> #include <algorithm> using namespace std; int main(void) { vector <int> a
marsggbo
2018/01/23
8.5K0
排序8: 计数排序
这种数字对应下标的叫做绝对映射。那么如果是100 ~ 110范围的数字我们总不可能开110个空间吧,所以我们下面介绍一种相对映射的办法。
青衫哥
2023/03/31
2180
排序8: 计数排序
使用QCollat​​or对QStringList进行排序
  QCollator类根据排序规则算法比较字符串。以下例子是对QStringList进行数字优先且区分大小写比较排序。
Qt君
2023/03/17
2.9K0
使用QCollat​​or对QStringList进行排序
Python中对list进行排序
很多时候,我们需要对List进行排序,Python提供了两个方法 对给定的List L进行排序, 方法1.用List的成员函数sort进行排序 方法2.用built-in函数sorted进行排序(从2.4开始) 这两种方法使用起来差不多,以第一种为例进行讲解: 从Python2.4开始,sort方法有了三个可选的参数,Python Library Reference里是这样描述的 cmp:cmp specifies a custom comparison function of two arguments (iterable elements) which should return a negative, zero or positive number depending on whether the first argument is considered smaller than, equal to, or larger than the second argument: "cmp=lambda x,y: cmp(x.lower(), y.lower())" key:key specifies a function of one argument that is used to extract a comparison key from each list element: "key=str.lower" reverse:reverse is a boolean value. If set to True, then the list elements are sorted as if each comparison were reversed.In general, the key and reverse conversion processes are much faster than specifying an equivalent cmp function. This is because cmp is called multiple times for each list element while key and reverse touch each element only once. 以下是sort的具体实例。 实例1: >>>L = [2,3,1,4] >>>L.sort() >>>L >>>[1,2,3,4] 实例2: >>>L = [2,3,1,4] >>>L.sort(reverse=True) >>>L >>>[4,3,2,1] 实例3: >>>L = [('b',2),('a',1),('c',3),('d',4)] >>>L.sort(cmp=lambda x,y:cmp(x[1],y[1])) >>>L >>>[('a', 1), ('b', 2), ('c', 3), ('d', 4)] 实例4: >>>L = [('b',2),('a',1),('c',3),('d',4)] >>>L.sort(key=lambda x:x[1]) >>>L >>>[('a', 1), ('b', 2), ('c', 3), ('d', 4)] 实例5: >>>L = [('b',2),('a',1),('c',3),('d',4)] >>>import operator >>>L.sort(key=operator.itemgetter(1)) >>>L >>>[('a', 1), ('b', 2), ('c', 3), ('d', 4)] 实例6:(DSU方法:Decorate-Sort-Undercorate) >>>L = [('b',2),('a',1),('c',3),('d',4)] >>>A = [(x[1],i,x) for i,x in enumerate(L)] #i can confirm the stable sort >>>A.sort() >>>L = [s[2] for s in A] >>>L >>>[('a', 1), ('b', 2), ('c', 3), ('d', 4)] 以上给出了6中对List排序的方法,其中实例3.4.5.6能起到对以List item中的某一项 为比较关键字进行排序. 效率比较: cmp < DSU < key 通过实验比较,方法3比方法6要慢,方法6比方法4要慢,方法4和方法5基本相当 多关键字比较排序: 实例7: >>>L = [('d',2),('a',4),('b',3),('c',2)] >>> L.sort(key=lambda x:x[1]) >>> L >>>[('d', 2), ('c', 2), ('b', 3), ('a', 4)] 我们看到,此时排序过的L是仅仅按照第二个关键字来排的,如果我们想用
py3study
2020/01/09
2.5K0
【集合论】集合概念与关系 ( 集合表示 | 数集合 | 集合关系 | 包含 | 相等 | 集合关系性质 )
列举法 : 列举出集合中的所有元素 , 元素之间使用逗号分开 , 使用花括号 “{}” 括起来 ; 如 :
韩曙亮
2023/03/28
2.2K0

相似问题

对laravel关系集合进行自定义排序

20

Laravel:对具有多对多关系的集合进行排序

211

对集合和关系集合进行排序

18

Laravel对关系进行排序

14

Laravel按关系值对父级集合进行排序

210
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文