我从官方的flink存储库使用以下docker-compose.yml启动了flink。我只添加了到外部hadoop网络的连接。
version: "2.1"
networks:
hadoop:
external:
name: flink_hadoop
services:
jobmanager:
image: flink:1.7.1-hadoop27-scala_2.11
container_name: flink-jobmanager
domainname: hadoop
networks:
- hadoop
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.7.1-hadoop27-scala_2.11
container_name: flink-taskmanager
domainname: hadoop
networks:
- hadoop
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager在此之后,一切都会运行,我可以访问WebUI。
然后我打包了下面的工作。
import org.apache.flink.api.scala._
import org.slf4j.LoggerFactory
import stoff.schnaps.pojo.ActorMovie
object HdfsJob {
private lazy val logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]) {
// set up the batch execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val actorMovies = env
.readCsvFile[ActorMovie](
"hdfs://namenode:8020/source-data/test.tsv",
"\r\n",
"\t",
includedFields = Array(2,3,5),
pojoFields = Array("actor",
"film",
"character"))
actorMovies.print
// execute program
env.execute("Flink Batch Scala API Skeleton")
}
}它只是将一个tsv文件从hdfs读取到pojos的DataSet中并打印出来。当我让它在本地运行时,一切都运行正常。但是当我上传.jar并让它在集群上运行时,作业管理器记录了以下异常:
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file STDOUT is not available on the TaskExecutor.
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file LOG is not available on the TaskExecutor.
显然,任务管理器不包含日志,这就是当前的问题。
发布于 2019-01-04 19:06:02
在Docker上运行Flink时,docker-entrypoint.sh脚本将启动Flink进程(TaskExecutor和JobMaster) in the foreground。这样做的效果是,Flink既不会将其STDOUT重定向到文件中,也不会登录到文件中。取而代之的是,Flink还将记录到STDOUT。这样,您就可以通过docker logs查看docker容器的日志和标准输出。
如果要更改此行为,只需更改docker-entrypoint.sh并传递start而不是start-foreground就足够了
if [ "${CMD}" == "${TASK_MANAGER}" ]; then
$FLINK_HOME/bin/taskmanager.sh start "$@"
else
$FLINK_HOME/bin/standalone-job.sh start "$@"
fi
sleep 1
exec /bin/bash -c "tail -f $FLINK_HOME/log/*.log"更新
在使用 Flink 的 DataSet API 时,调用 DataSet::print 方法实际上会从集群中检索相应的 DataSet 返回到客户端,然后将其打印到 STDOUT。由于需要检索,此方法仅在 Flink 的 CLI 客户端通过bin/flink run <job.jar> 提交作业时才有效。此行为与 DataStream::print方法不同,后者在执行程序的 TaskManagers 上打印 DataStream。
如果您想在TaskManager上打印DataSet结果,则需要调用DataSet::printOnTaskManager而不是print。
https://stackoverflow.com/questions/54036010
复制相似问题