首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >用于Apache与miniO Lake (java.lang.IllegalArgumentException)的德尔塔湖水槽连接器

用于Apache与miniO Lake (java.lang.IllegalArgumentException)的德尔塔湖水槽连接器
EN

Stack Overflow用户
提问于 2022-09-20 19:39:32
回答 2查看 126关注 0票数 1

我正在尝试运行新的apache脉冲星Lakehouse Sink Connector,并获得java.lang.IllegalArgumentException

下面是我的设置。docker-compose.yaml文件:

代码语言:javascript
运行
复制
version: '3.7'
volumes:
  mssql-data:
  minio-data:
networks:
  oentity:
    driver: bridge
services:
  pulsar:
    image: apachepulsar/pulsar:latest
    command: bin/pulsar standalone
    hostname: pulsar
    ports:
      - "8080:8080"
      - "6650:6650"
    restart: unless-stopped
    networks:
      oentity:
    volumes:
      - "./data/:/pulsar/data"
      - "./connectors/:/pulsar/connectors"
  dashboard:
    image: apachepulsar/pulsar-manager:latest
    ports:
      - "9528:9527"
      - "7750:7750"
    networks:
      oentity:
    depends_on:
      - pulsar
    links:
      - pulsar
    environment:
      SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties
  minio:
    image: 'minio/minio:latest'
    hostname: minio
    container_name: minio
    ports:
      - '9000:9000'
      - '9001:9001'
    volumes:
      - minio-data:/data
    environment:
      MINIO_ROOT_USER: minio
      MINIO_ROOT_PASSWORD: minio123
      MINIO_ACCESS_KEY: minio
      MINIO_SECRET_KEY: minio123
    command: server --console-address ":9001" /data
    networks:
      oentity:
  • 我下载这里的连接器,并将NAR包复制到容器中的Pulsar连接器目录$PULSAR_HOME/connectors
  • 我从miniO登录到http://localhost:9001/login,并创建了一个水桶呼叫湖屋。
  • 我使用了类似于所描述的这里的配置,并将tablePath值与我的miniO路径重新放置在一起。我将文件命名为sink-connector-config.json
代码语言:javascript
运行
复制
{
  "tenant":"public",
  "namespace":"default",
  "name":"delta_sink",
  "parallelism":1,
  "inputs": [
    "test-delta-pulsar"
  ],
  "archive": "connectors/pulsar-io-lakehouse-2.9.3.7-cloud.nar",
  "processingGuarantees":"EFFECTIVELY_ONCE",
  "configs":{
      "type":"delta",
      "maxCommitInterval":120,
      "maxRecordsPerCommit":10000000,
      "tablePath": "s3a://lakehouse/delta_sink",
      "hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
  }
}
  • 我从容器里运行了湖屋水槽连接器。docker exec -it <container name> bash

然后我处决了

代码语言:javascript
运行
复制
PULSAR_HOME/bin/pulsar-admin sink localrun \
--sink-config-file sink-connector-config.json

我得到了下面的错误;

代码语言:javascript
运行
复制
2022-09-06T16:53:08,396+0000 [main] INFO  org.apache.pulsar.functions.utils.io.ConnectorUtils - Found connector ConnectorDefinition(name=lakehouse, description=Lakehouse connectors, sourceClass=org.apache.pulsar.ecosystem.io.lakehouse.SourceConnector, sinkClass=org.apache.pulsar.ecosystem.io.lakehouse.SinkConnector, sourceConfigClass=org.apache.pulsar.ecosystem.io.lakehouse.SourceConnectorConfig, sinkConfigClass=org.apache.pulsar.ecosystem.io.lakehouse.SinkConnectorConfig) from /pulsar/connectors/pulsar-io-lakehouse-2.9.3.7-cloud.nar
2022-09-06T16:53:44,562+0000 [main] ERROR org.apache.pulsar.functions.LocalRunner - Encountered error starting localrunner
java.lang.IllegalArgumentException: Could not validate sink config: Cannot construct instance of `org.apache.pulsar.ecosystem.io.lakehouse.SinkConnectorConfig` (no Creators, like default constructor, exist): abstract types either need to be mapped 
to concrete types, have custom deserializer, or contain additional type information
 at [Source: UNKNOWN; byte offset: #UNKNOWN]
        at org.apache.pulsar.functions.utils.SinkConfigUtils.validateSinkConfig(SinkConfigUtils.java:594) ~[org.apache.pulsar-pulsar-functions-utils-2.9.3.jar:2.9.3]
        at org.apache.pulsar.functions.utils.SinkConfigUtils.validateAndExtractDetails(SinkConfigUtils.java:441) ~[org.apache.pulsar-pulsar-functions-utils-2.9.3.jar:2.9.3]
        at org.apache.pulsar.functions.LocalRunner.start(LocalRunner.java:439) ~[org.apache.pulsar-pulsar-functions-local-runner-original-2.9.3.jar:2.9.3]
        at org.apache.pulsar.functions.LocalRunner.main(LocalRunner.java:198) [org.apache.pulsar-pulsar-functions-local-runner-original-2.9.3.jar:2.9.3]
root@pulsar:/pulsar#

@Tim,他向我指出了他的回购:https://github.com/tspannhw/FLiP-Pi-DeltaLake-Thermal

我做错了什么是

  • 将消息从cli发送到队列
  • 无模式定义

对我起作用的是

  • 为队列创建架构
  • 将yaml用于连接器属性而不是json。e.g

miniO config

代码语言:javascript
运行
复制
tenant: public
namespace: default
name: delta_sink
parallelism: 1
inputs:
- raw.ro
archive: connectors/pulsar-io-lakehouse-2.9.3.7-cloud.nar
processingGuarantees: EFFECTIVELY_ONCE
configs:
  type: delta
  maxCommitInterval: 120
  maxRecordsPerCommit: 10000000
  tablePath: s3a://lakehouse/delta_sink
  processingGuarantees: "EXACTLY_ONCE"
  deltaFileType: "parquet"
  subscriptionType: "Failover"
  hadoop.fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
  hadoop.fs.s3a.endpoint: http://minio:9000
  hadoop.fs.s3a.access.key: minio
  hadoop.fs.s3a.secret.key: minio123
  hadoop.fs.s3a.path.style.access: true

本地文件系统

代码语言:javascript
运行
复制
tenant: public
namespace: default
name: delta_sink
parallelism: 1
inputs:
- raw.ro
archive: connectors/pulsar-io-lakehouse-2.9.3.7.nar
processingGuarantees: EFFECTIVELY_ONCE
configs:
 type: delta
 maxCommitInterval: 120
 maxRecordsPerCommit: 10000000
 tablePath: file:///opt/demo/lakehouse
 processingGuarantees: "EXACTLY_ONCE"
 deltaFileType: "parquet"
 subscriptionType: "Failover"

在python脚本中创建模式以向脉冲星发送消息

代码语言:javascript
运行
复制
import pulsar
from pulsar.schema import *

class CciMessage(Record):
    message = String()
    id = Integer()

client = pulsar.Client('pulsar://pulsar:6650')
producer = client.create_producer(topic='raw.ro',
                            schema=JsonSchema(CciMessage))

producer.send(CciMessage(message="Sandbox", id=1))

剩下的步骤我可以在下面的bash脚本中总结。

代码语言:javascript
运行
复制
SLEEP_TIME=2
SINK_NAME=delta_sink
NAME_SPACE=default
TENANT=public
TOPIC=raw.ro

sleep $SLEEP_TIME && echo "removing existing sink"
docker  exec -it solution_pulsar_1 bin/pulsar-admin sink stop --name $SINK_NAME --namespace $NAME_SPACE --tenant $TENANT

sleep $SLEEP_TIME && echo "copying connectors"
docker  exec -it solution_pulsar_1 bin/pulsar-admin sinks delete --tenant $TENANT --namespace $NAME_SPACE --name $SINK_NAME

sleep $SLEEP_TIME && echo "copying connectors"
docker cp connectors/pulsar-io-lakehouse-2.9.3.7-cloud.nar solution_pulsar_1:/pulsar/connectors
docker cp connectors/pulsar-io-lakehouse-2.9.3.7.nar solution_pulsar_1:/pulsar/connectors
docker cp l_conf/deltalake_sink.yaml solution_pulsar_1:/pulsar/connectors
docker cp l_conf/source-cloud-config.json solution_pulsar_1:/pulsar/connectors

sleep $SLEEP_TIME && echo "creating sink"
 sinks create --sink-config-file connectors/sink-cloud-config.json
docker  exec -it solution_pulsar_1 bin/pulsar-admin sinks create --sink-config-file connectors/deltalake_sink.yaml

sleep $SLEEP_TIME && echo "get sink properties"
docker  exec -it solution_pulsar_1 bin/pulsar-admin sinks get --tenant $TENANT --namespace $NAME_SPACE --name $SINK_NAME

sleep $SLEEP_TIME && echo "listing sink"
docker  exec -it solution_pulsar_1 bin/pulsar-admin sinks list

sleep $SLEEP_TIME && echo "get status of sink"
docker  exec -it solution_pulsar_1 bin/pulsar-admin sinks status --tenant $TENANT --namespace $NAME_SPACE --name $SINK_NAME

sleep $SLEEP_TIME && echo "sending test message to sink"
docker  exec -it solution_pulsar_1 python connectors/prducer_simple_schema.py

我希望这能帮上忙。是的,我可以在我的delta湖看到条目,作为文件系统目标。

_delta_log part-0000-a7539e24-7eaa-4c43-9f4d-a8ebe24c8c7f-c000.snappy.parquet

EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73791829

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档