首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >猪过滤器与下一行的关系相同

猪过滤器与下一行的关系相同
EN

Stack Overflow用户
提问于 2014-02-19 14:36:14
回答 1查看 1.1K关注 0票数 2

我寻找了很长一段时间来解决我的问题,但几乎没有发现任何有用的东西。希望你们中的一些人能给我个提示。

我有一个关系A与以下格式:用户名,时间戳,ip

例如:

代码语言:javascript
运行
复制
Harald 2014-02-18T16:14:49.503Z 123.123.123.123
Harald 2014-02-18T16:14:51.503Z 123.123.123.123
Harald 2014-02-18T16:14:55.503Z 321.321.321.321

我想知道是谁在不到5秒内改变了他的ip地址。所以第二行和第三行应该很有趣。

我想要按用户名对关系进行分组,希望将实际所有行的时间戳与下一行进行比较。如果ip入口不一样,时间戳小于5秒,则应该在输出端。

有人能帮我解决这个问题吗?

打招呼。

首先,我要感谢你抽出时间。

但实际上我还是坚持在Sessionize那部分。

这是我的数据开始:

代码语言:javascript
运行
复制
aoebcu  2014-02-19T14:23:17.503Z    220.61.65.25
aoebcu  2014-02-19T14:23:14.503Z    222.117.144.19
aoebcu  2014-02-19T14:23:14.503Z    222.117.144.19
jekgru  2014-02-19T14:23:14.503Z    213.56.157.109
zmembx  2014-02-19T14:23:12.503Z    199.188.198.91
qhixcg  2014-02-19T14:23:11.503Z    203.40.104.119

到现在为止我的代码是这样的:

代码语言:javascript
运行
复制
hijack_Reduced = FOREACH finalLogs GENERATE ClientUserName, timestamp, OriginalClientIP;
hijack_Filtered = FILTER hijack_Reduced BY OriginalClientIP != '-';

hijack_Sessionized = FOREACH (GROUP hijack_Filtered BY ClientUserName) {
  views = ORDER hijack_Filtered BY timestamp;
  GENERATE FLATTEN(Sessionize(views)) AS (ClientUserName,timestamp,OriginalClientIP,session_id);
}

但是,当我运行这个脚本时,我得到了以下错误消息:

15:36:22错误- org.apache.pig.tools.pigstats.SimplePigStats.setBackendException(542) \x{e76f}错误0:执行时异常[POUserFunc (名称:POUserFunc(datafu.pig.sessions.Sessionize)包-范围-199运算符键: scope-199)子级: null at []:java.lang.IllegalArgumentException:无效格式:"aoebcu“

我已经试过很多次了,但没什么效果。你有什么主意吗?

问候

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2014-02-20 04:43:42

虽然您可以为此编写一个UDF,但实际上可以使用Apache DataFu中已经可用的UDF来解决这个问题。

我的解决方案包括对数据应用会话化。基本上,您可以查看连续事件,并为每个事件分配一个会话ID。如果两个事件之间的时间超过指定的时间,在您的示例中为5秒,则下一个事件将获得一个新的会话ID。否则,连续事件将得到相同的会话ID。一旦每个事件被分配其会话ID,其余的就很容易了。我们按会话ID分组,并查找具有多个不同IP地址的会话。

我会处理我的解决方案。

假设您有以下输入数据。哈罗德和库马尔都改变了他们的IP地址。但是哈罗德在5秒内就做了,而库马尔没有。因此,我们的脚本输出应该只是简单的“哈罗德”。

代码语言:javascript
运行
复制
  Harold,2014-02-18T16:14:49.503Z,123.123.123.123
  Harold,2014-02-18T16:14:51.503Z,123.123.123.123
  Harold,2014-02-18T16:14:55.503Z,321.321.321.321
  Kumar,2014-02-18T16:14:49.503Z,123.123.123.123
  Kumar,2014-02-18T16:14:55.503Z,123.123.123.123
  Kumar,2014-02-18T16:15:05.503Z,321.321.321.321

加载数据

代码语言:javascript
运行
复制
data = LOAD 'input' using PigStorage(',') 
       AS (user:chararray,time:chararray,ip:chararray);

现在从DataFu定义几个UDF。正如我前面所描述的,分治 UDF执行会话化。DistinctBy UDF将用于在每个会话中查找不同的IP地址。

代码语言:javascript
运行
复制
define Sessionize datafu.pig.sessions.Sessionize('5s');

define DistinctBy datafu.pig.bags.DistinctBy('1');

按用户分组数据,按时间排序,并应用Sessonize。注意,时间戳必须是第一个字段,因为这是Sessionize所期望的。这个UDF在每个元组中附加一个会话ID。

代码语言:javascript
运行
复制
data = FOREACH data GENERATE time,user,ip;

data_sessionized = FOREACH (GROUP data BY user) {
  views = ORDER data BY time;
  GENERATE flatten(Sessionize(views)) as (time,user,ip,session_id);
}

现在数据已经会话化了,我们可以按用户和会话进行分组。我也按用户分组,因为我想把这个值放回原处。我们将事件包传递给DistinctBy UDF。有关更详细的描述,请查看此UDF的文档。但从本质上说,我们将得到与每个会话有不同IP地址一样多的元组。请注意,我已经从下面的关系中删除了时间。这是因为1)不需要它,2) 1.2.0中的DistinctBy在处理包含破折号的字段时有错误,就像时间字段所做的那样。

代码语言:javascript
运行
复制
data_sessionized = FOREACH data_sessionized GENERATE user,ip,session_id;

data_sessionized = FOREACH (GROUP data_sessionized BY (user, session_id)) GENERATE
  group.user as user,
  SIZE(DistinctBy(data_sessionized)) as distinctIpCount;

现在,选择具有多个不同IP地址的所有会话,并返回这些会话的不同用户。

代码语言:javascript
运行
复制
data_sessionized = FILTER data_sessionized BY distinctIpCount > 1;

data_sessionized = FOREACH data_sessionized GENERATE user;

data_sessionized = DISTINCT data_sessionized;

这只会产生以下情况:

代码语言:javascript
运行
复制
Harold

下面是完整的源代码,您应该能够直接粘贴到DataFu单元测试中并运行:

代码语言:javascript
运行
复制
  /**
  define Sessionize datafu.pig.sessions.Sessionize('5s');

  define DistinctBy datafu.pig.bags.DistinctBy('1'); -- distinct by ip

  data = LOAD 'input' using PigStorage(',') AS (user:chararray,time:chararray,ip:chararray);

  data = FOREACH data GENERATE time,user,ip;

  data_sessionized = FOREACH (GROUP data BY user) {
    views = ORDER data BY time;
    GENERATE flatten(Sessionize(views)) as (time,user,ip,session_id);
  }

  data_sessionized = FOREACH data_sessionized GENERATE user,ip,session_id;

  data_sessionized = FOREACH (GROUP data_sessionized BY (user, session_id)) GENERATE
    group.user as user,
    SIZE(DistinctBy(data_sessionized)) as distinctIpCount;

  data_sessionized = FILTER data_sessionized BY distinctIpCount > 1;

  data_sessionized = FOREACH data_sessionized GENERATE user;

  data_sessionized = DISTINCT data_sessionized;

  STORE data_sessionized INTO 'output';
   */
  @Multiline private String sessionizeUserIpTest;

  private String[] sessionizeUserIpTestData = new String[] {
      "Harold,2014-02-18T16:14:49.503Z,123.123.123.123",
      "Harold,2014-02-18T16:14:51.503Z,123.123.123.123",
      "Harold,2014-02-18T16:14:55.503Z,321.321.321.321",
      "Kumar,2014-02-18T16:14:49.503Z,123.123.123.123",
      "Kumar,2014-02-18T16:14:55.503Z,123.123.123.123",
      "Kumar,2014-02-18T16:15:05.503Z,321.321.321.321"
  };

  @Test
  public void sessionizeUserIpTest() throws Exception
  {
    PigTest test = createPigTestFromString(sessionizeUserIpTest);

    this.writeLinesToFile("input", 
        sessionizeUserIpTestData);

    List<Tuple> result = this.getLinesForAlias(test, "data_sessionized");

    assertEquals(result.size(),1);
    assertEquals(result.get(0).get(0),"Harold");
  }
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/21883618

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档