首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

重要|Spark driver端得到executor返回值的方法

有人说spark的代码不优雅,这个浪尖就忍不了了。实际上,说spark代码不优雅的主要是对scala不熟悉,spark代码我觉得还是很赞的,最值得阅读的大数据框架之一。...spark 使用的时候,总有些需求比较另类吧,比如有球友问过这样一个需求: 浪尖,我想要在driver端获取executor执行task返回的结果,比如task是个规则引擎,我想知道每条规则命中了几条数据...大家也可以自己琢磨一下下~ 那么,浪尖就给大家介绍一个比较常用也比较骚的操作吧。 其实,这种操作我们最先想到的应该是count函数,因为他就是将task的返回值返回到driver端,然后进行聚合的。...Utils.getIteratorSize _这个方法主要是计算每个iterator的元素个数,也即是每个分区的元素个数,返回值就是元素个数: /** * Counts the number of...,每个数组的元素就是我们task执行函数的返回值,然后调用sum就得到我们的统计值了。

2K40
  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    Spark SQL读数据库时不支持某些数据类型的问题

    在大数据平台中,经常需要做数据的ETL,从传统关系型数据库RDBMS中抽取数据到HDFS中。...之前开发数据湖新版本时使用Spark SQL来完成ETL的工作,但是遇到了 Spark SQL 不支持某些数据类型(比如ORACLE中的Timestamp with local Timezone)的问题...driver 版本:ojdbc7.jar Scala 版本:2.11.8 二、Spark SQL读数据库表遇到的不支持某些数据类型 Spark SQL 读取传统的关系型数据库同样需要用到 JDBC,毕竟这是提供的访问数据库官方...关系; getJDBCType(dt: DataType):输入Spark 的DataType,得到对应的数据库的SQLType; quoteIdentifier(colName: String):引用标识符...对象,并重写方法(主要是getCatalystType()方法,因为其定义了数据库 SQLType 到 Spark DataType 的映射关系),修改映射关系,将不支持的 SQLType 以其他的支持的数据类型返回比如

    2.3K10

    03-SparkSQL入门

    0.1 设计 灵感来自 Google 的 Dremel 系统: 将数据存储在列式存储引擎 使用分布式计算引擎进行查询 Shark 采用类似架构并使用 Spark 作为计算引擎,使 Shark 具有很高查询性能和可扩展性...这种统一意味着开发人员可以根据提供最自然的方式表达给定转换的API轻松切换。 2 用途 执行SQL查询。 Spark SQL也可用于从Hive读取数据。...当从另一种编程语言中运行SQL时,结果将作为Dataset/DataFrame返回。还可使用命令行或通过JDBC/ODBC与SQL接口交互。...对于包含空格的值,将“key=value”括在引号中(如图所示)。多个配置应作为单独的参数传递。...指定启动类为HiveThriftServer2,该类负责启动Spark SQL的Thrift Server。 指定服务名称为"Spark SQL Thrift Server"。

    13700

    PySpark UD(A)F 的高效使用

    需要注意的一件重要的事情是,除了基于编程数据的处理功能之外,Spark还有两个显著的特性。一种是,Spark附带了SQL作为定义查询的替代方式,另一种是用于机器学习的Spark MLlib。...举个例子,假设有一个DataFrame df,它包含10亿行,带有一个布尔值is_sold列,想要过滤带有sold产品的行。...如果工作流从 Hive 加载 DataFrame 并将生成的 DataFrame 保存为 Hive 表,在整个查询执行过程中,所有数据操作都在 Java Spark 工作线程中以分布式方式执行,这使得...这意味着在UDF中将这些列转换为JSON,返回Pandas数据帧,并最终将Spark数据帧中的相应列从JSON转换为复杂类型 [2enpwvagkq.png] 5.实现 将实现分为三种不同的功能: 1)...结语 本文展示了一个实用的解决方法来处理 Spark 2.3/4 的 UDF 和复杂数据类型。与每个解决方法一样,它远非完美。话虽如此,所提出的解决方法已经在生产环境中顺利运行了一段时间。

    19.7K31

    Storm与Spark、Hadoop三种框架对比

    Spark采用了内存计算。从多迭代批处理出发,允许将数据载入内存作反复查询,此外还融合数据仓库,流处理和图形计算等多种计算范式。Spark构建在HDFS上,能与Hadoop很好的结合。...Hadoop是专为从单一服务器到上千台机器扩展,每个机器都可以提供本地计算和存储。...化简(reduce)则是把列表中的值化简成一个单值,这个值被返回,然后再次进行键分组,直到每个键的列表只有一个值为止。...映射器处理该数据,并创建数据的若干小块。 减少阶段:这个阶段是:Shuffle阶段和Reduce阶段的组合。减速器的工作是处理该来自映射器中的数据。...图四 MapReduce 2.3 HIVE hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的sql查询功能,可以将sql语句转换为MapReduce任务进行运行

    2.4K20

    HBase Bulkload 实践探讨

    3.2 Hive SQL 当我们需要从 Hive 数据导出到 HBase,可以通过写 Hive SQL 的方式生成 HFile,有赞在最开始便采用这一个版本,相比与 MR 任务,该方式有以下优点: 可以直接从...MR任务来实现 Transform 的逻辑,转而用 Spark,同时还可以借住 Spark SQL 的能力直接实现 Hive 数据通过 SQL 生成 HFile。...该方式相比 Hive SQL 方式并不需要做很多前置工作,同时更快更灵活。 优点: 比 MR 执行的快。 可以借助 Spark SQL 完成从 Hive 的数据抽取与过滤。...四、 有赞 Bulkload 方式演进 有赞 Bulkload 主要经过两个比较大版本迭代,从 MR 到 Hive SQL, 再到 Spark 方案。...从 SQL 中一条条读取数据并根据逻辑过滤,返回一个 List,KeyValue>> 列表。

    1.7K30

    10万字的Spark全文!

    /算子分类 2.2.1 分类 RDD 的算子分为两类: 1)Transformation转换操作:返回一个新的RDD 2)Action动作操作:返回值不是RDD(无返回值或返回其他的) 注意: RDD...的第一个元素(类似于 take(1)) take(n) 返回一个由数据集的前 n 个元素组成的数组 takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的...最小值 variance 方差 sampleVariance 从采样中计算方差 stdev 标准差:衡量数据的离散程度 sampleStdev 采样的标准差 stats 查看统计结果 2.3...值,如:1,1,1 (以测试数据中的hadoop为例) //historyValue:之前累计的历史值,第一次没有值是0,第二次是3 //目标是把当前数据+历史数据返回作为新的结果(下次的历史数据...3.query name:指定查询的标识。

    1.5K10

    Apache Spark 3.0.0重磅发布 —— 重要特性全面解析

    改进的Spark SQL引擎 Spark SQL是支持大多数Spark应用的引擎。...为了提升兼容性,该版本采用Proleptic Gregorian日历,用户可以禁止使用ANSI SQL的保留关键字作为标识符。...6.jpg Spark 3.0为PySpark API做了多个增强功能: 带有类型提示的新pandas API pandas UDF最初是在Spark 2.3中引入的,用于扩展PySpark中的用户定义函数...对于同时实现了目录插件API和数据源V2 API的外部数据源,用户可以通过标识符直接操作外部表的数据和元数据(在相应的外部目录注册了之后)。...Spark 3.0的其他更新 Spark 3.0是社区的一个重要版本,解决了超过3400个Jira问题,这是440多个contributors共同努力的结果,这些contributors包括个人以及来自

    4.1K00

    Apache Spark 3.0.0重磅发布 —— 重要特性全面解析

    此外,采用Spark3.0版本,主要代码并没有发生改变。 改进的Spark SQL引擎 Spark SQL是支持大多数Spark应用的引擎。...ANSI SQL兼容性 对于将工作负载从其他SQL引擎迁移到Spark SQL来说至关重要。...为了提升兼容性,该版本采用Proleptic Gregorian日历,用户可以禁止使用ANSI SQL的保留关键字作为标识符。...Spark 3.0为PySpark API做了多个增强功能: 带有类型提示的新pandas API pandas UDF最初是在Spark 2.3中引入的,用于扩展PySpark中的用户定义函数,并将pandas...Spark 3.0的其他更新 Spark 3.0是社区的一个重要版本,解决了超过3400个Jira问题,这是440多个contributors共同努力的结果,这些contributors包括个人以及来自

    2.3K20

    浅析Hadoop大数据分析与应用

    Spark采用了内存计算。从多迭代批处理出发,允许将数据载入内存作反复查询,此外还融合数据仓库,流处理和图形计算等多种计算范式。Spark构建在HDFS上,能与Hadoop很好的结合。...Hadoop框架应用工程提供跨计算机集群的分布式存储和计算的环境。 Hadoop是专为从单一服务器到上千台机器扩展,每个机器都可以提供本地计算和存储。...化简(reduce)则是把列表中的值化简成一个单值,这个值被返回,然后再次进行键分组,直到每个键的列表只有一个值为止。...映射器处理该数据,并创建数据的若干小块。 减少阶段:这个阶段是:Shuffle阶段和Reduce阶段的组合。减速器的工作是处理该来自映射器中的数据。...(图四)MapReduce 2.3 HIVE hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的sql查询功能,可以将sql语句转换为MapReduce

    1.2K100

    Structured Streaming

    Spark一直处于不停的更新中,从Spark 2.3.0版本开始引入持续流式处理模型后,可以将原先流处理的延迟降低到毫秒级别。...数据到达和得到处理并输出结果之间的延时超过100毫秒。 2、持续处理模型 Spark从2.3.0版本开始引入了持续处理的试验性功能,可以实现流计算的毫秒级延迟。...虽然Spark SQL也是采用DataFrame作为数据抽象,但是,Spark SQL只能处理静态的数据,而Structured Streaming可以处理结构化的数据流。...import split from pyspark.sql.functions import explode 由于程序中需要用到拆分字符串和展开数组内的所有单词的功能,所以引用了来自...(四)Rate源 Rate源可每秒生成特定个数的数据行,每个数据行包括时间戳和值字段。时间戳是消息发送的时间,值是从开始到当前消息发送的总个数,从0开始。

    4000
    领券