我试图向Kubernetes集群上运行的Flink作业管理器发送post请求。在为不需要任何命令行参数的类发送/jar/run的post请求时,它工作得很好。但是,当尝试在同一个jar中提交一个不同的类(需要命令行参数)时,会出现以下错误。-:{"errors":["Request did not match expected format JarRunRequestBody."]}'
然而,在传递命令行参数和直接提交作业时,就像下面的works -:
./flink run -m localhost:30287 -c com.class.name ~/path/to/jar/1.0-1.0-SNAPSHOT.jar --bootstrap.servers izac-cp-kafka:9092 --group.id test --topic bank_transaction --schema.registry http://mysr-schema-registry:8081 --CepJson """{\"keyId\": \"customer_id\",\"pattern\": [{\"patternName\": \"p1\",\"simpleCondition\":{\"columnName\": \"amount\",\"operator\": \">\",\"value\": \"50\",\"dataType\": \"Int\"}},{\"patternName\":\"p2\",\"simpleCondition\":{\"columnName\":\"amount\",\"operator\":\">\",\"value\":\"30\",\"dataType\":\"Int\"}}],\"connector\":[{\"name\":\"begin\",\"connectorType\":\"next\",\"start\":\"p1\",\"end\":\"p2\"}]}"""
为了将上述命令转换为基于flink REST的post请求,我执行了以下-:
curl -k -v -X POST -H "Content-Type: application/json" --data '{ "entryClass":"com.class.name", "programArgsList": [ "--bootstrap.servers izac-cp-kafka:9092", "--group.id test", "--topic bank_transaction", "--schema.registry http://mysr-schema-registry:8081", "--CepJson """{\"keyId\": \"customer_id\",\"pattern\": [{\"patternName\": \"p1\",\"simpleCondition\":{\"columnName\": \"amount\",\"operator\": \">\",\"value\": \"50\",\"dataType\": \"Int\"}},{\"patternName\":\"p2\",\"simpleCondition\":{\"columnName\":\"amount\",\"operator\":\">\",\"value\":\"30\",\"dataType\":\"Int\"}}],\"connector\":[{\"name\":\"begin\",\"connectorType\":\"next\",\"start\":\"p1\",\"end\":\"p2\"}]}""""]}' http://localhost:30287/jars/2a788e33-c92d-47c4-84af-31e3dff28666_1.0-1.0-SNAPSHOT.jar/run
然而,这给出了前面提到的错误。我只想将上面的命令行作业提交转换为基于rest api的flink集群提交。
注意-:post请求是针对已经包含所需Jar的flink集群的。我只想提交一个使用特定类的作业。
发布于 2020-07-06 07:59:27
我认为curl不会像scala那样接受"""
作为字符串插值,因此它不会确切地发送正确的CepJson
参数,所以我将从改变这一点开始。
发布于 2020-07-06 11:28:00
我将上面的curl请求更改为以下内容,并在-中运行:
curl -k -v -X POST -H "Content-Type: application/json" --data '{ "entryClass":"com.class.name", "programArgsList": [ "--bootstrap.servers", "izac-cp-kafka:9092", "--group.id"," test", "--topic","bank_transaction", "--schema.registry", "http://mysr-schema-registry:8081", "--CepJson", "{\"keyId\": \"customer_id\",\"pattern\": [{\"patternName\": \"p1\",\"simpleCondition\":{\"columnName\": \"amount\",\"operator\": \">\",\"value\": \"50\",\"dataType\": \"Int\"}},{\"patternName\":\"p2\",\"simpleCondition\":{\"columnName\":\"amount\",\"operator\":\">\",\"value\":\"30\",\"dataType\":\"Int\"}}],\"connector\":[{\"name\":\"begin\",\"connectorType\":\"next\",\"start\":\"p1\",\"end\":\"p2\"}]}"]}' http://localhost:30287/jars/2a788e33-c92d-47c4-84af-31e3dff28666_1.0-1.0-SNAPSHOT.jar/run
https://stackoverflow.com/questions/62750276
复制