首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

kafka connect 应用之:sqlite到hive的etl配置过程

今天,在kafka connect 上完成了 sqlite到hive的etl配置,具体如下:

默认配置:

cat /mnt/etc/connect-avro-standalone.properties

# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and

# integrates the the Schema Registry. This sample configuration assumes a local installation of

# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.

bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.

# Every Connect user will need to configure these based on the format they want their data in

# when loaded from or stored into Kafka

# The internal converter used for offsets and config data is configurable and must be specified,

# but most users will always want to use the built-in default. Offset and config data is never

# visible outside of Connect in this format.

# Local storage file for offset data

offset.storage.file.filename=/mnt/connect.offsets

配置:/mnt/etc/sqlite.properties :

cat /mnt/etc/sqlite.properties

##

# Copyright 2015 Confluent Inc.

#

# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

##

# A simple example that copies all tables from a SQLite database. The first few settings are

# required for all connectors: a name, the connector class to run, and the maximum number of

# tasks to create:

name=test-source-sqlite-jdbc-autoincrement

tasks.max=2

# The remaining configs are specific to the JDBC source connector. In this example, we connect to a

# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to

# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.

# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.

connection.url=jdbc:sqlite:/usr/local/lib/retail.db

mode=incrementing

topic.prefix=test_sqlite_jdbc_

配置:/mnt/etc/hdfs_sqlite.properties

cat /mnt/etc/hdfs_sqlite.properties

# Copyright 2015 Confluent Inc.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except

# in compliance with the License. You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software distributed under the License

# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express

# or implied. See the License for the specific language governing permissions and limitations under

# the License.

name=hdfs-sink

tasks.max=2

topics=test_sqlite_jdbc_locations

hdfs.url=hdfs://localhost:9000

flush.size=2

hive.metastore.uris=thrift://localhost:9083

hive.integration=true

schema.compatibility=BACKWARD

加载配置文件:

vagrant@vagrant-ubuntu-trusty-64:~$ connect-standalone /mnt/etc/connect-avro-standalone.properties /mnt/etc/sqlite.properties /mnt/etc/hdfs_sqlite.properties

验证结果:

hive> select * from test_sqlite_jdbc_locations order by 1;

Query ID = vagrant_20180530035036_dc0cbd5d-f472-4a7f-afb2-c137630bee95

Total jobs = 1

Launching Job 1 out of 1

Number of reduce tasks determined at compile time: 1

In order to change the average load for a reducer (in bytes):

In order to limit the maximum number of reducers:

In order to set a constant number of reducers:

Job running in-process (local Hadoop)

2018-05-30 03:50:39,448 Stage-1 map = 0%, reduce = 0%

2018-05-30 03:50:41,476 Stage-1 map = 100%, reduce = 100%

MapReduce Jobs Launched:

Stage-Stage-1: HDFS Read: 16888 HDFS Write: 0 SUCCESS

Total MapReduce CPU Time Spent: 0 msec

OK

4bobo400

3bobby300

2bob200

8bbbbb900

7bbbb800

6bbb700

5bb600

1alice100

Time taken: 5.303 seconds, Fetched: 8 row(s)

配置注意事项:

1. topic名可以用 “-” 减号,但是hive表名不能用 “-” 号。所以避免 “-” 号。

2. connection.url 需要填写 retail.db 的绝对路径:

connection.url=jdbc:sqlite:/usr/local/lib/retail.db

查看connectors:

vagrant@vagrant-ubuntu-trusty-64:~$ curl -s localhost:8083/connectors

["hdfs-sink","test-source-sqlite-jdbc-autoincrement"]

curl -X GET http://localhost:8081/subjects

["test_sqlite_jdbc_locations-value"]

vagrant@vagrant-ubuntu-trusty-64:~$ curl -X GET http://localhost:8081/subjects/test_sqlite_jdbc_locations-value/versions

[1]vagrant@vagrant-ubuntu-trusty-64:~$

curl -X GET http://localhost:8081/subjects/test_sqlite_jdbc_locations-value/versions/1

{"subject":"test_sqlite_jdbc_locations-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"locations\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"]},{\"name\":\"sale\",\"type\":[\"null\",\"int\"]}],\"connect.name\":\"locations\"}"}

参考:

【1】https://gerardnico.com/dit/kafka/connector/sqlite_standalone

【2】https://www.confluent.io/blog/building-real-time-streaming-etl-pipeline-20-minutes/

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180601G0KDI200?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券