今天,在kafka connect 上完成了 sqlite到hive的etl配置,具体如下:
默认配置:
cat /mnt/etc/connect-avro-standalone.properties
# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
# Local storage file for offset data
offset.storage.file.filename=/mnt/connect.offsets
配置:/mnt/etc/sqlite.properties :
cat /mnt/etc/sqlite.properties
##
# Copyright 2015 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-sqlite-jdbc-autoincrement
tasks.max=2
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:sqlite:/usr/local/lib/retail.db
mode=incrementing
topic.prefix=test_sqlite_jdbc_
配置:/mnt/etc/hdfs_sqlite.properties
cat /mnt/etc/hdfs_sqlite.properties
# Copyright 2015 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
name=hdfs-sink
tasks.max=2
topics=test_sqlite_jdbc_locations
hdfs.url=hdfs://localhost:9000
flush.size=2
hive.metastore.uris=thrift://localhost:9083
hive.integration=true
schema.compatibility=BACKWARD
加载配置文件:
vagrant@vagrant-ubuntu-trusty-64:~$ connect-standalone /mnt/etc/connect-avro-standalone.properties /mnt/etc/sqlite.properties /mnt/etc/hdfs_sqlite.properties
验证结果:
hive> select * from test_sqlite_jdbc_locations order by 1;
Query ID = vagrant_20180530035036_dc0cbd5d-f472-4a7f-afb2-c137630bee95
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
In order to limit the maximum number of reducers:
In order to set a constant number of reducers:
Job running in-process (local Hadoop)
2018-05-30 03:50:39,448 Stage-1 map = 0%, reduce = 0%
2018-05-30 03:50:41,476 Stage-1 map = 100%, reduce = 100%
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 16888 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
4bobo400
3bobby300
2bob200
8bbbbb900
7bbbb800
6bbb700
5bb600
1alice100
Time taken: 5.303 seconds, Fetched: 8 row(s)
配置注意事项:
1. topic名可以用 “-” 减号,但是hive表名不能用 “-” 号。所以避免 “-” 号。
2. connection.url 需要填写 retail.db 的绝对路径:
connection.url=jdbc:sqlite:/usr/local/lib/retail.db
查看connectors:
vagrant@vagrant-ubuntu-trusty-64:~$ curl -s localhost:8083/connectors
["hdfs-sink","test-source-sqlite-jdbc-autoincrement"]
curl -X GET http://localhost:8081/subjects
["test_sqlite_jdbc_locations-value"]
vagrant@vagrant-ubuntu-trusty-64:~$ curl -X GET http://localhost:8081/subjects/test_sqlite_jdbc_locations-value/versions
[1]vagrant@vagrant-ubuntu-trusty-64:~$
curl -X GET http://localhost:8081/subjects/test_sqlite_jdbc_locations-value/versions/1
{"subject":"test_sqlite_jdbc_locations-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"locations\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"]},{\"name\":\"sale\",\"type\":[\"null\",\"int\"]}],\"connect.name\":\"locations\"}"}
参考:
【1】https://gerardnico.com/dit/kafka/connector/sqlite_standalone
【2】https://www.confluent.io/blog/building-real-time-streaming-etl-pipeline-20-minutes/
领取专属 10元无门槛券
私享最新 技术干货