前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据开发之Flink Table操作

大数据开发之Flink Table操作

作者头像
码客说
发布2022-11-22 16:23:59
3570
发布2022-11-22 16:23:59
举报
文章被收录于专栏:码客码客

前言

本文使用环境版本

  • Hive:2.3.9
  • Flink:flink-1.12.7-bin-scala_2.12

依赖

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.psvmc</groupId>
  <artifactId>WordCount</artifactId>
  <version>1.0</version>

  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.12.7</flink.version>
    <scala.version>2.12.15</scala.version>
    <hadoop.version>2.7.7</hadoop.version>
    <scala.binary.version>2.12</scala.binary.version>
  </properties>

  <repositories>
    <repository>
      <id>alimaven</id>
      <name>aliyun maven</name>
      <url>https://maven.aliyun.com/repository/public</url>
    </repository>
  </repositories>
  <dependencies>
    <!-- flink核心API -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
          <artifactId>slf4j-api</artifactId>
          <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>


    <!-- rocksdb-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>

    <!-- Hive Connector的支持,仅在编译时生效-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_2.12</artifactId>
      <version>1.12.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>2.1.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.calcite</groupId>
          <artifactId>calcite-core</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.calcite</groupId>
          <artifactId>calcite-avatica</artifactId>
        </exclusion>
        <exclusion>
          <artifactId>hadoop-hdfs</artifactId>
          <groupId>org.apache.hadoop</groupId>
        </exclusion>
      </exclusions>
    </dependency>

    <!--读取hadoop文件-->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-hadoop-compatibility_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.4.6</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

先看一个简单的例子

代码语言:javascript
复制
import org.apache.flink.table.api.{$, EnvironmentSettings, FieldExpression, SqlDialect, TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog

object WordCount {
  case class Student(id: String, name: String, sex: String, age: Int, department: String)

  def main(args: Array[String]): Unit = {
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build()
    val tableEnv: TableEnvironment = TableEnvironment.create(settings)

    val name: String = "hive"
    val defaultDataBase: String = "default"
    val hiveConfDir: String = "/data/tools/bigdata/apache-hive-2.3.9-bin/conf"

    val hive = new HiveCatalog(name, defaultDataBase, hiveConfDir)
    tableEnv.registerCatalog("myHive", hive) // 注册Catalog
    tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
    tableEnv.useCatalog("myHive") // 使用注册的Catalog ,不使用的话查不到数据
    tableEnv.useDatabase("default") // 设置要查询的数据库
    tableEnv.executeSql("show tables").print()

    val selectTables_sql = "select id,name,password from t_user"
    val result = tableEnv.sqlQuery(selectTables_sql)
    result.execute().print()

    val mTable = tableEnv.from("t_user").select($"id",$"name",$"password")
    mTable.execute().print()
  }
}

如上我们可以看到

  • Table 可以调用计算处理相关方法 Table调用execute返回TableResult
  • TableResult 可以用来打印
代码语言:javascript
复制
//返回Table 
tableEnv.sqlQuery(sqlstr)
//返回TableResult 
tableEnv.executeSql(sqlstr)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-11-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档