前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springboot整合postgre和hbase实现互相交互功能

springboot整合postgre和hbase实现互相交互功能

作者头像
gzq大数据
发布2021-01-13 11:16:29
6380
发布2021-01-13 11:16:29
举报
文章被收录于专栏:大数据那些事

此项目是一个小测试,将postgre中的某些字段读取到hbase中变成某个表的列族,其中postgre和hbase已经在云服务器上建立好,用的docker技术,开放相应端口,并且win上用管道安全连接。 此项目用到了JPA技术,实现entity和postgre数据库的交互。 首先要加入相应的依赖:

代码语言:javascript
复制
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.5.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.yaml</groupId>
            <artifactId>snakeyaml</artifactId>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.5.1</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.5.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.htrace</groupId>
            <artifactId>htrace-core</artifactId>
            <version>3.0.4</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop-hbase</artifactId>
            <version>2.5.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop</artifactId>
            <version>2.5.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.opencsv</groupId>
            <artifactId>opencsv</artifactId>
            <version>5.3</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
            <version>2.3.5.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
    </dependencies>
</project>

相应的yaml配置文件:

代码语言:javascript
复制
hbase:
  zookeeper:
    quorum: xxxx
    property:
      clientPort: 2181

zookeeper:
  znode:
    parent: /zkData

spring.datasource:
  url: jdbc:postgresql://localhost:5432/db1
  username: xxxx
  password: xxxx
spring.jpa:
  database: postgresql
  properties.hibernate.dialect: org.hibernate.dialect.PostgreSQL9Dialect
  hibernate.ddl-auto: update
  show-sql: false

logging.level:
  root: info

加入后进行开发即可: entity实例如下(映射着postgre中的一张表device_type):

在这里插入图片描述
在这里插入图片描述

entity代码:

代码语言:javascript
复制
package com.nevt.db.repository.entity;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;

import javax.persistence.*;
import java.io.Serializable;

/**
 * (DeviceType)实体类
 *
 * @author makejava
 * @since 2020-12-28 15:50:04
 */
@Data
@Entity
@Table(name = "device_type")
@JsonIgnoreProperties(ignoreUnknown = true)
@EntityListeners(AuditingEntityListener.class)
public class DeviceType implements Serializable {
    private static final long serialVersionUID = 106469502944492174L;

    @Id
    @Column(name = "id")
    private Integer id;

    @Column(name = "name")
    private String name;

    @Column(name = "column_family")
    private String columnFamily;

    @Column(name = "data_station_type_id")
    private Integer dataStationTypeId;

}

数据访问层使用JPA提供的接口继承即可:

代码语言:javascript
复制
package com.nevt.db.repository;

import com.nevt.db.repository.entity.DeviceType;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;

/**
 * (DeviceType)表数据库访问层
 *
 * @author makejava
 * @since 2020-12-28 15:50:04
 */
public interface DeviceTypeRepository extends JpaRepository<DeviceType, Integer>,
        JpaSpecificationExecutor<DeviceType> {

}

相应的hbaseconfig文件利用yaml数据创造hbase连接如下:

代码语言:javascript
复制
package com.nevt.configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.hadoop.hbase.HbaseTemplate;

import java.io.IOException;

@Configuration
public class HBaseConfig {

    @Value("${hbase.zookeeper.quorum}")
    private String zookeeperQuorum;

    @Value("${hbase.zookeeper.property.clientPort}")
    private String clientPort;

    @Value("${zookeeper.znode.parent}")
    private String znodeParent;

    @Bean
    public Connection hbaseConnection() throws IOException {
        System.out.println("creating HBase bean");
        org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", zookeeperQuorum);
        Connection connection = ConnectionFactory.createConnection(configuration);
        return connection;
    }
}

核心代码postgre及hbase的类如下: postgre:

代码语言:javascript
复制
import cn.hutool.core.collection.SpliteratorUtil;
import com.nevt.db.repository.DeviceTypeRepository;
import com.nevt.db.repository.entity.DeviceType;
import org.junit.Test;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.sql.SQLOutput;
import java.util.ArrayList;
import java.util.List;

@Component
public class DBService {

    @Resource
    private DeviceTypeRepository deviceTypeRepository;

   

    public List<String> getColumnFamily(int dataStationType) {
        List<String> result = new ArrayList<>();
        List<DeviceType> deviceTypeList = deviceTypeRepository.findAll();
        for (DeviceType deviceType : deviceTypeList) {
            System.out.println(deviceType);
            if (deviceType.getDataStationTypeId() == dataStationType) {
                result.add(deviceType.getColumnFamily());
            }
        }
        return result;
    }
 }

hbase(实现了和postgre的交互):

代码语言:javascript
复制
package com.nevt.service;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;

/**
 * @Auther: gzq
 * @Date: 2021/1/7 - 01 - 07 - 16:27
 * @Description: com.nevt.service
 */

@Component
@EnableScheduling
public class HBaseService {

    @Autowired
    private Connection hbaseConnection;

    @Autowired
    private DBService dbService;



    /*
     * 制氢厂数据写入HBase数据库表
     * 数据库表RowKey = <data_source_id>:<timestamp>
     * @param tableName 写出要添加列族的表名
     * @param dataStationType 填postgre数据库里面对应的字段
     */

    public void writeHydrogenFactory(String tableName, int dataStationType) throws IOException {

        Admin admin = hbaseConnection.getAdmin();

        List<String> columnFamily = dbService.getColumnFamily(dataStationType);
        System.out.println(2);
        System.out.println(columnFamily);
        if (admin.tableExists(TableName.valueOf(tableName))) {
            ifTableExist(columnFamily, admin, tableName);
        } else {
            ifTableNotExist(columnFamily, admin, tableName);
        }
    }

    private void ifTableExist(List<String> columnFamily, Admin admin, String tableName) {
        for (String column : columnFamily) {
            System.out.println("Table Exist!");
            //如果没有表就要创建表用如下方法
            HColumnDescriptor newFamily = new HColumnDescriptor(column.getBytes());
            System.out.println(1);
            //try catch的原因:有可能该字段之前已经添加过了,就不用添加了,但是有些没添加的还要添加,所以先在这里把异
            // 常处理掉,后面的字段可以进行添加,不处理的话后面的字段加不上,这里直接抛出异常
            try {
                admin.addColumn(TableName.valueOf(tableName), newFamily);
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("ColumnFamily has added!");
        }
    }

    private void ifTableNotExist(List<String> columnFamily, Admin admin, String tableName) throws IOException {
        System.out.println("Table Not Exist!");
        HTableDescriptor tableCreate = new HTableDescriptor(TableName.valueOf(tableName));
        for (String column : columnFamily) {
            System.out.println(column);
            HColumnDescriptor columnName = new HColumnDescriptor(column.getBytes());
            tableCreate.addFamily(columnName);
        }
        admin.createTable(tableCreate);
        System.out.println("Table and columnFamily have established!");
    }
}

测试代码:

代码语言:javascript
复制
package com.nevt;


import com.nevt.service.HBaseService;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;


import javax.annotation.Resource;
import java.io.IOException;

@SpringBootTest()
class HBaseTest {

    @Resource
    HBaseService hbaseService;

    @Test
    void testWrite() throws IOException {
        hbaseService.writeHydrogenFactory("data:hydrogen_station_data",10002);
//        hbaseService.writeHydrogenFactory("data:hydrogen_vehicle_data",10003);
//        hbaseService.writeHydrogenFactory("data:test2", 10003);


    }
}

查看hbase中的数据:

在这里插入图片描述
在这里插入图片描述

并且查看列族是否增加成功:

在这里插入图片描述
在这里插入图片描述

查看postgre中的数据:

在这里插入图片描述
在这里插入图片描述

可以对应上。 这样就实现了postgre和hbase之间的交互。 另外附上hbase的客户端的一些操作语句: (1)删除表 先disable再drop disable “表名” drop “表名” (2)删除列族 alter ‘ table name ’, ‘delete’ => ‘ column family ’ (3)插看某表具体信息 desc “表名”

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/01/08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
TDSQL MySQL 版
TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档