如何给Apache Pig自定义UDF函数?

近日由于工作所需,需要使用到Pig来分析线上的搜索日志数据,散仙本打算使用hive来分析的,但由于种种原因,没有用成,而Pig(pig0.12-cdh)散仙一直没有接触过,所以只能临阵磨枪了,花了两天时间,大致看完了pig官网的文档,在看文档期间,也是边实战边学习,这样以来,对pig的学习,会更加容易,当然本篇不是介绍如何快速学好一门框架或语言的文章,正如标题所示,散仙打算介绍下如何在Pig中,使用用户自定义的UDF函数,关于学习经验,散仙会在后面的文章里介绍。 一旦你学会了UDF的使用,就意味着,你可以以更加灵活的方式来使用Pig,使它扩展一些为我们的业务场景定制的特殊功能,而这些功能,在通用的pig里是没有的,举个例子: 你从HDFS上读取的数据格式,如果使用默认的PigStorage()来加载,存储可能只支持有限的数据编码和类型,如果我们定义了一种特殊的编码存储或序列化方式,那么当我们使用默认的Pig来加载的时候,就会发现加载不了,这时候我们的UDF就派上用场了,我们只需要自定义一个LoadFunction和一个StoreFunction就可以解决,这种问题。 本篇散仙根据官方文档的例子,来实战一下,并在hadoop集群上使用Pig测试通过: 我们先来看下定义一个UDF扩展类,需要几个步骤:

序号

步骤

说明

1

在eclipse里新建一个java工程,并导入pig的核心包

java项目

2

新建一个包,继承特定的接口或类,重写自定义部分

核心业务

3

编写完成后,使用ant打包成jar

编译时需要pig依赖,但不用把pig的jar包打入UDF中

4

把打包完成后的jar上传到HDFS上

pig运行时候需要加载使用

5

在pig脚本里,注册我们自定义的udf的jar包

注入运行时环境

6

编写我们的核心业务pig脚本运行

测试是否运行成功

项目工程截图如下:

Java代码

package com.pigudf;  
 
import java.io.IOException;  
 
import org.apache.pig.EvalFunc;  
import org.apache.pig.data.Tuple;  
import org.apache.pig.impl.util.WrappedIOException;  
/** 
 * 自定义UDF类,对字符串转换大写 
 * @author qindongliang 
 * */ 
public class MyUDF extends EvalFunc<String> {  
 
 @Override 
 public String exec(Tuple input) throws IOException {  
 
 //判断是否为null或空,就跳过 
 if(input==null||input.size()==0){  
 return null;  
        }  
 try{  
 //获取第一个元素 
            String str=(String) input.get(0);  
 //转成大写返回 
 return str.toUpperCase();  
 
        }catch(Exception e){  
 throw WrappedIOException.wrap("Caught exception processing input row ",e);  
        }  
    }  
 
 
} 

关于打包的ant脚本,散仙会在文末上传附件,下面看下造的一些测试数据(注意,文件一定要上传到HDFS上,除非你是local模式):

Java代码

grunt> cat s.txt  
zhang san,12 
Song,34 
long,34 
abC,12 
grunt>   

我们在看下,操作文件和jar包是放在一起的:

Java代码

grunt> ls  
hdfs://dnode1:8020/tmp/udf/pudf.jar<r 3>        1295 
hdfs://dnode1:8020/tmp/udf/s.txt<r 3>   36 
grunt>   

最后,我们看下pig脚本的定义:

Pig代码

--注册自定义的jar包  
REGISTER pudf.jar;   
--加载测试文件的数据,逗号作为分隔符  
a = load 's.txt' using PigStorage(',');     
--遍历数据,对name列转成大写  
b =  foreach a generate com.pigudf.MyUDF((chararray)$0);   
--启动MapReduce的Job进行数据分析  
dump b  

最后,我们看下结果,只要过程不出现异常和任务失败,就证明我们的udf使用成功:

Java代码

Counters:  
Total records written : 4 
Total bytes written : 64 
Spillable Memory Manager spill count : 0 
Total bags proactively spilled: 0 
Total records proactively spilled: 0 
 
Job DAG:  
job_1419419533357_0147  
 
 
2014-12-30 18:10:24,394 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!  
2014-12-30 18:10:24,395 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS  
2014-12-30 18:10:24,396 [main] INFO  org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.  
2014-12-30 18:10:24,405 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1 
2014-12-30 18:10:24,405 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1 
(ZHANG SAN,12)  
(SONG,34)  
(LONG,34)  
(ABC,12)  

结果没问题,我们的UDF加载执行成功,如果我们还想将我们的输出结果直接写入到HDFS上,可以在pig脚本的末尾,去掉dump命令,加入 store e into '/tmp/dongliang/result/'; 将结果存储到HDFS上,当然我们可以自定义存储函数,将结果写入数据库,Lucene,Hbase等关系型或一些NOSQL数据库里。

原文发布于微信公众号 - 我是攻城师(woshigcs)

原文发表时间:2014-12-30

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏分布式系统进阶

Kafka源码分析-启动流程

使用getPropsFromArgs方法来获取各配置项, 然后将启动和停止动作全部代理给KafkaServerStartable类;

24400
来自专栏lgp20151222

rabbit的简单搭建,java使用rabbitmq queue的简单例子和一些坑

由于本人的码云太多太乱了,于是决定一个一个的整合到一个springboot项目里面。

44610
来自专栏老码农专栏

ActFramework R1.4.0 带来 WebSocket 的支持

12130
来自专栏实战docker

修改,编译,GDB调试openjdk8源码(docker环境下)

在上一章《在docker上编译openjdk8》里,我们在docker容器内成功编译了openjdk8的源码,有没有读者朋友产生过这个念头:“能不能修改open...

60790
来自专栏非典型程序猿

Golang任务队列machinery使用与源码剖析(二)

在Golang任务队列machinery使用与源码剖析(一)一文中,我们主要对golang中任务队列machinery的设计结构以及具体模块的功能与源码实现进行...

1.7K80
来自专栏木木玲

Netty 那些事儿 ——— 心跳机制

83790
来自专栏chenssy

【死磕Sharding-jdbc】---读写分离

先执行 sharding-jdbc-example-config-spring-masterslave模块中的的SQL脚本 all_schema.sql,这里有...

18640
来自专栏岑玉海

Hadoop源码系列(一)FairScheduler申请和分配container的过程

1、如何申请资源 1.1 如何启动AM并申请资源 1.1.1 如何启动AM val yarnClient = YarnClient.createYarnClie...

46840
来自专栏比原链

Derek解读Bytom源码-启动与停止

Gitee地址:https://gitee.com/BytomBlockchain/bytom

13630
来自专栏坚毅的PHP

jersey处理支付宝异步回调通知的问题:java.lang.IllegalArgumentException: Error parsing media type 'application/x-www

tcpflow以流为单位分析请求内容,非常适合服务器端接口类服务查问题 这次遇到的问题跟支付宝支付后的回调post结果有关 淘宝的代码例子: publi...

62950

扫码关注云+社区

领取腾讯云代金券