前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Presto Event Listener开发

Presto Event Listener开发

作者头像
叁金
发布2019-08-01 11:47:35
1.1K0
发布2019-08-01 11:47:35
举报
文章被收录于专栏:叁金大数据叁金大数据

简介

同Hive Hook一样,Presto也支持自定义实现Event Listener,用于侦听Presto引擎执行查询时发生的事件,并作出相应的处理。我们可以利用该功能实现诸如自定义日志记录、调试和性能分析插件,帮助我们更好的运维Presto集群。但是不同于Hive Hook的是,在Presto集群中,一次只能有一个Event Listener处于活动状态。

Event Listener作为Plugin监听以下事件:

  • Query Creation(查询建立相关信息)
  • Query completion (success or failure)(查询执行相关信息,包含成功查询的细节信息,失败查询的错误码等信息)
  • Split completion (success or failure)(split执行信息,同理包含成功和失败的细节信息)

了解Hook及Listener模式的朋友对于其步骤应该很清楚了,我们只需要:

  1. 实现Presto Event Listener和EventListenerFactory接口。
  2. 正确的打包我们的jar。
  3. 部署,放到Presto指定目录,修改配置文件。

接口

  1. 实现EventListener,该类是我们的核心逻辑所在,供包含上面所说的三个事件:
代码语言:javascript
复制
public interface EventListener
{
    //query创建的详细信息
    default void queryCreated(QueryCreatedEvent queryCreatedEvent)
    {
    }
    //query执行的详细信息
    default void queryCompleted(QueryCompletedEvent queryCompletedEvent)
    {
    }
    //split执行的详细信息
    default void splitCompleted(SplitCompletedEvent splitCompletedEvent)
    {
    }
}
  1. 实现EventListenerFactory创建我们自己实现的EventListener
  2. 实现Plugin接口,实现getEventListenerFactories()方法,获取我们自己实现的EventListenerFactory
  3. 添加配置信息,为etc/event-listener.properties。其中event-listener.name为必备属性,其他属性为我们plugin所需要的信息。

示例

由于集群运维的需要,先需要将用户的查询历史、查询花费的时间等信息进行统计,以便于后续对各个业务的查询进行优先级分级和评分,方便后续Presto集群稳定性易用性的维护。这里给出一个简单的将这些信息存储到Mysql数据库的样例。

Maven Pom
代码语言:javascript
复制
<dependency>
      <groupId>com.facebook.presto</groupId>
      <artifactId>presto-spi</artifactId>
      <version>0.220</version>
      <scope>compile</scope>
    </dependency>
QueryEventListenerFactory
代码语言:javascript
复制
public class QueryEventListenerFactory implements EventListenerFactory {

  @Override
  public String getName() {
    return "query-event-listener";
  }

  @Override
  public EventListener create(Map<String, String> config) {
    if (!config.containsKey("jdbc.uri")) {
      throw new RuntimeException("/etc/event-listener.properties file missing jdbc.uri");
    }
    if (!config.containsKey("jdbc.user")) {
      throw new RuntimeException("/etc/event-listener.properties file missing jdbc.user");
    }
    if (!config.containsKey("jdbc.pwd")) {
      throw new RuntimeException("/etc/event-listener.properties file missing jdbc.pwd");
    }

    return new QueryEventListener(config);
  }
}
QueryEventPlugin
代码语言:javascript
复制
public class QueryEventPlugin implements Plugin {

  @Override
  public Iterable<EventListenerFactory> getEventListenerFactories() {
    EventListenerFactory listenerFactory = new QueryEventListenerFactory();
    return Arrays.asList(listenerFactory);
  }
}
QueryEventListener
代码语言:javascript
复制
public class QueryEventListener implements EventListener {

  private Map<String, String> config;
  private Connection connection;

  public QueryEventListener(Map<String, String> config) {
    this.config = new HashMap<>();
    this.config.putAll(config);
    init();
  }

  private void init() {
    try {
      if (connection == null || !connection.isValid(10)) {
        Class.forName("com.mysql.jdbc.Driver");
        connection = DriverManager
            .getConnection(config.get("jdbc.uri"), config.get("jdbc.user"), config.get("jdbc.pwd"));
      }
    } catch (SQLException | ClassNotFoundException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void queryCreated(QueryCreatedEvent queryCreatedEvent) {
  }

  @Override
  public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {
    String queryId = queryCompletedEvent.getMetadata().getQueryId();
    String querySql = queryCompletedEvent.getMetadata().getQuery();
    String queryState = queryCompletedEvent.getMetadata().getQueryState();
    String queryUser = queryCompletedEvent.getContext().getUser();
    long createTime = queryCompletedEvent.getCreateTime().toEpochMilli();
    long endTime = queryCompletedEvent.getEndTime().toEpochMilli();
    long startTime = queryCompletedEvent.getExecutionStartTime().toEpochMilli();
    //insert into query execution table

    long analysisTime = queryCompletedEvent.getStatistics().getAnalysisTime().orElse(Duration.ZERO)
        .toMillis();
    long cpuTime = queryCompletedEvent.getStatistics().getCpuTime().toMillis();
    long queuedTime = queryCompletedEvent.getStatistics().getQueuedTime().toMillis();
    long wallTime = queryCompletedEvent.getStatistics().getWallTime().toMillis();
    int completedSplits = queryCompletedEvent.getStatistics().getCompletedSplits();
    double cumulativeMemory = queryCompletedEvent.getStatistics().getCumulativeMemory();
    long outputBytes = queryCompletedEvent.getStatistics().getOutputBytes();
    long outputRows = queryCompletedEvent.getStatistics().getOutputRows();
    long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes();
    long totalRows = queryCompletedEvent.getStatistics().getTotalRows();
    long writtenBytes = queryCompletedEvent.getStatistics().getWrittenBytes();
    long writtenRows = queryCompletedEvent.getStatistics().getWrittenRows();
    //insert into query info table
    
    queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -> {
      int code = queryFailureInfo.getErrorCode().getCode();
      String name = queryFailureInfo.getErrorCode().getName();
      String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase();
      String failureHost = queryFailureInfo.getFailureHost().orElse("").toUpperCase();
      String failureMessage = queryFailureInfo.getFailureMessage().orElse("").toUpperCase();
      String failureTask = queryFailureInfo.getFailureTask().orElse("").toUpperCase();
      String failuresJson = queryFailureInfo.getFailuresJson();
      // insert into failed query table
    });
  }


  @Override
  public void splitCompleted(SplitCompletedEvent splitCompletedEvent) {
    long createTime = splitCompletedEvent.getCreateTime().toEpochMilli();
    long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MIN).toEpochMilli();
    String payload = splitCompletedEvent.getPayload();
    String queryId = splitCompletedEvent.getQueryId();
    String stageId = splitCompletedEvent.getStageId();
    long startTime = splitCompletedEvent.getStartTime().orElse(Instant.MIN).toEpochMilli();
    String taskId = splitCompletedEvent.getTaskId();
    long completedDataSizeBytes = splitCompletedEvent.getStatistics().getCompletedDataSizeBytes();
    long completedPositions = splitCompletedEvent.getStatistics().getCompletedPositions();
    long completedReadTime = splitCompletedEvent.getStatistics().getCompletedReadTime().toMillis();
    long cpuTime = splitCompletedEvent.getStatistics().getCpuTime().toMillis();
    long queuedTime = splitCompletedEvent.getStatistics().getQueuedTime().toMillis();
    long wallTime = splitCompletedEvent.getStatistics().getWallTime().toMillis();
    //insert into stage info table
  }

}
打包
  1. Presto使用服务提供者接口(SPI)来扩展Presto。Presto使用SPI加载连接器功能类型系统访问控制。SPI通过元数据文件加载。我们还需要创建src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin元数据文件。该文件应包含我们插件的类名如: com.ji3jin.presto.listener.QueryEventListener
  2. 执行mvn clean install打包
部署
  1. 创建配置文件etc/event-listener.properties
代码语言:javascript
复制
event-listener.name=query-event-listener

jdbc.uri=jdbc:mysql://localhost:3306/presto_monitor
jdbc.user=presto
jdbc.pwd=presto123
  1. 在presto根目录下创建query-event-listener目录,名称与我们上面event listener的name一致
  2. 将我们的jar包和mysql connector的jar包拷贝到上面创建的目录
  3. 重新启动Presto服务即可

好了,现在你可以执行查询,然后就可以在Mysql中看到你的查询历史和相关时间的统计信息了。如果你目前的工作对此也有需要,还等什么,快动手实现一个吧。

欢迎关注我的公众号:叁金大数据

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-07-30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 接口
  • 示例
    • Maven Pom
      • QueryEventListenerFactory
        • QueryEventPlugin
          • QueryEventListener
            • 打包
              • 部署
              相关产品与服务
              云数据库 SQL Server
              腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档