配置文件如下:
env {
# You can set flink configuration here
execution.parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 2000
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
MySQL-CDC {
base-url = "jdbc:mysql://192.168.0.80:3306/xgeo?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8"
username = "root"
password = "root"
#需要监控的数据库名称
database-names=["xgeo"]
#需要监控的数据库表 数据库.表名称
table-names = ["xgeo.xgeo_tle"]
#初始化模式 先同步历史后增量同步
startup.mode="initial"
#作业停止模式 never:从不停止
stop.mode="never"
snapshot.split.size=10000000
#结果表
#result_table_name= "xgeo.xgeo_tle"
server-id = 5401
format = compatible_debezium_json
#只执行一次 默认是true
exactly_once=false
catalog {
factory=MySQL
}
debezium = {
snapshot.mode = "never"
# include schema into kafka message
key.converter.schemas.enable = true
value.converter.schemas.enable = true
# include dd1
#include.schema.changes = true
# topic.prefix
#database.server.name = "mysql80"
"name":"xgro-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "192.168.0.80",
"database.port": "3306",
"database.user": "root",
"database.password": "root",
"database.server.id": "5401",
"topic.prefix": "fullfillment",
#捕获那些数据库的更改
"database.include.list": "xgeo",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.fullfillment",
"include.schema.changes": "true"
}
}
schema ={
table = "xgeo.xgeo_tle"
fields {
ORBIT_ID = INT
CTIME = TIMESTAMP
OPERATOR = STRING
NORAD_ID = INT
EPOCH = TIMESTAMP
DSEC = float
DA = DOUBLE
DE = DOUBLE
DI = DOUBLE
DOMG = DOUBLE
XOMG = DOUBLE
DM = DOUBLE
N = DOUBLE
N1 = DOUBLE
SMR = DOUBLE
P = DOUBLE
P1 = DOUBLE
DOMG1 = DOUBLE
SMD = DOUBLE
LOCATION = double
DRIFT = double
Line1 = STRING
Line2 = STRING
Line3 = STRING
ESOURCE = INT
PSOURCE = INT
SPOD_ORBIT_ID = INT
}
}
}
# If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/category/source-v2
}
sink {
Jdbc {
url = "jdbc:mysql://192.168.0.89:3306/xgeo?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "root"
database = "xgeo"
table = "xgeo_tle"
generate_sink_sql = true
primary_keys = ["ORBIT_ID"]
}
# If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}
提交任务之后报错
2024-01-11 21:46:55,702 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Fatal Error,
2024-01-11 21:46:55,702 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Please submit bug report in https://github.com/apache/seatunnel/issues
2024-01-11 21:46:55,702 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Reason:SeaTunnel job executed failed
2024-01-11 21:46:55,703 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.lang.ClassCastException: cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.requestSplit(SourceFlowLifeCycle.java:223)
at org.apache.seatunnel.engine.server.task.context.SourceReaderContext.sendSplitRequest(SourceReaderContext.java:64)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:94)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:95)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:100)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.lang.ClassCastException: cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:121)
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100)
at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.requestSplit(SourceFlowLifeCycle.java:220)
... 12 more
Caused by: java.util.concurrent.CompletionException: java.lang.ClassCastException: cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
at com.hazelcast.spi.impl.AbstractInvocationFuture.returnOrThrowWithJoinConventions(AbstractInvocationFuture.java:819)
at com.hazelcast.spi.impl.AbstractInvocationFuture.resolveAndThrowWithJoinConvention(AbstractInvocationFuture.java:835)
at com.hazelcast.spi.impl.AbstractInvocationFuture.join(AbstractInvocationFuture.java:553)
at org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext.assignSplit(SeaTunnelSplitEnumeratorContext.java:82)
at org.apache.seatunnel.api.source.SourceSplitEnumerator$Context.assignSplit(SourceSplitEnumerator.java:101)
at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:164)
at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:81)
at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.requestSplit(SourceSplitEnumeratorTask.java:226)
at org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation.lambda$run$0(RequestSplitOperation.java:62)
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
at org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation.run(RequestSplitOperation.java:52)
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
at ------ submitted from ------.()
at com.hazelcast.internal.util.ExceptionUtil.cloneExceptionWithFixedAsyncStackTrace(ExceptionUtil.java:336)
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:112)
... 15 more
Caused by: java.lang.ClassCastException: cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2100)
at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2064)
at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1351)
at java.base/java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2690)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2497)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2268)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1744)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:514)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:472)
at org.apache.seatunnel.common.utils.SerializationUtils.deserialize(SerializationUtils.java:74)
at org.apache.seatunnel.api.serialization.DefaultSerializer.deserialize(DefaultSerializer.java:41)
at org.apache.seatunnel.api.serialization.DefaultSerializer.deserialize(DefaultSerializer.java:25)
at org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation.lambda$run$0(AssignSplitOperation.java:67)
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
at org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation.run(AssignSplitOperation.java:54)
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:184)
... 2 more
2024-01-11 21:46:55,703 ERROR org.apache.seatunnel.core.starter.SeaTunnel -
===============================================================================
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.lang.ClassCastException: cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.requestSplit(SourceFlowLifeCycle.java:223)
at org.apache.seatunnel.engine.server.task.context.SourceReaderContext.sendSplitRequest(SourceReaderContext.java:64)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:94)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:95)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:100)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.lang.ClassCastException: cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:121)
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100)
at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.requestSplit(SourceFlowLifeCycle.java:220)
... 12 more
Caused by: java.util.concurrent.CompletionException: java.lang.ClassCastException: cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
at com.hazelcast.spi.impl.AbstractInvocationFuture.returnOrThrowWithJoinConventions(AbstractInvocationFuture.java:819)
at com.hazelcast.spi.impl.AbstractInvocationFuture.resolveAndThrowWithJoinConvention(AbstractInvocationFuture.java:835)
at com.hazelcast.spi.impl.AbstractInvocationFuture.join(AbstractInvocationFuture.java:553)
at org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext.assignSplit(SeaTunnelSplitEnumeratorContext.java:82)
at org.apache.seatunnel.api.source.SourceSplitEnumerator$Context.assignSplit(SourceSplitEnumerator.java:101)
at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:164)
at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:81)
at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.requestSplit(SourceSplitEnumeratorTask.java:226)
at org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation.lambda$run$0(RequestSplitOperation.java:62)
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
at org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation.run(RequestSplitOperation.java:52)
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
at ------ submitted from ------.()
at com.hazelcast.internal.util.ExceptionUtil.cloneExceptionWithFixedAsyncStackTrace(ExceptionUtil.java:336)
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:112)
... 15 more
Caused by: java.lang.ClassCastException: cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2100)
at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2064)
at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1351)
at java.base/java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2690)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2497)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2268)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1744)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:514)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:472)
at org.apache.seatunnel.common.utils.SerializationUtils.deserialize(SerializationUtils.java:74)
at org.apache.seatunnel.api.serialization.DefaultSerializer.deserialize(DefaultSerializer.java:41)
at org.apache.seatunnel.api.serialization.DefaultSerializer.deserialize(DefaultSerializer.java:25)
at org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation.lambda$run$0(AssignSplitOperation.java:67)
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
at org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation.run(AssignSplitOperation.java:54)
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:184)
... 2 more
2024-01-11 21:46:55,705 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal
微信小程序自带的数据库叫做云开发数据库。
解释:
微信小程序提供了一个云开发的框架,其中包含了云数据库这一核心组件。这个云数据库允许开发者在小程序端进行数据的增删改查操作,而无需自己搭建和维护数据库服务器。云开发数据库提供了结构化的数据存储功能,并且与微信小程序的后台管理紧密集成,便于进行数据的权限控制和备份。
举例:
假设你正在开发一个微信小程序,用于记录用户的运动步数。你可以使用云开发数据库来存储每个用户的步数数据。在小程序端,你可以通过云开发的API来添加新的步数记录、查询用户的步数历史等。这样,你就不需要自己购买和管理数据库服务器,大大简化了开发流程。
推荐产品:
对于微信小程序的云开发需求,推荐使用腾讯云的云开发服务。它不仅提供了云数据库,还包括了云函数、存储等一系列的服务,能够满足小程序的全栈开发需求。通过腾讯云云开发,你可以更高效地构建稳定、安全的小程序应用。