From 1615445247fa11ae6aaa13ca5be145d67b4b4d85 Mon Sep 17 00:00:00 2001 From: davidhua Date: Tue, 20 Dec 2022 16:56:27 +0800 Subject: [PATCH] Use the "flink.yarn.ship-directories" instead of "flink.app.user.class.path" fixed #56 --- .../FlinkJarStreamisJobContentTransform.scala | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/transform/impl/FlinkJarStreamisJobContentTransform.scala b/streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/transform/impl/FlinkJarStreamisJobContentTransform.scala index 551ae0f98..72206eee0 100644 --- a/streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/transform/impl/FlinkJarStreamisJobContentTransform.scala +++ b/streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/transform/impl/FlinkJarStreamisJobContentTransform.scala @@ -27,6 +27,7 @@ import com.webank.wedatasphere.streamis.jobmanager.manager.utils.JobUtils import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer /** * Created by enjoyyin on 2021/9/23. @@ -50,20 +51,24 @@ class FlinkJarStreamisStartupParamsTransform extends Transform { startupMap.put("flink.app.main.class.jar", transformJobContent.getMainClassJar.getFileName) startupMap.put("flink.app.main.class.jar.bml.json", JsonUtils.jackson.writeValueAsString(getStreamisFileContent(transformJobContent.getMainClassJar))) - val classpathFiles = if(transformJobContent.getDependencyJars != null && transformJobContent.getResources != null) { - startupMap.put("flink.app.user.class.path", transformJobContent.getDependencyJars.asScala.map(_.getFileName).mkString(",")) - transformJobContent.getDependencyJars.asScala ++ transformJobContent.getResources.asScala - } else if(transformJobContent.getDependencyJars != null) { - startupMap.put("flink.app.user.class.path", transformJobContent.getDependencyJars.asScala.map(_.getFileName).mkString(",")) - transformJobContent.getDependencyJars.asScala - } else if(transformJobContent.getResources != null) { - startupMap.put("flink.yarn.ship-directories", transformJobContent.getResources.asScala.map(_.getFileName).mkString(",")) - transformJobContent.getResources.asScala + + /** + * Notice : "flink.app.user.class.path" equals to PipelineOptions.CLASSPATHS in Flink + * paths must specify a protocol (e.g. file://) and be accessible on all nodes + * so we use "flink.yarn.ship-directories" instead + */ + var classPathFiles = Option(transformJobContent.getDependencyJars) match { + case Some(list) => list.asScala + case _ => mutable.Buffer[StreamisFile]() + } + Option(transformJobContent.getResources) match { + case Some(list) => classPathFiles = classPathFiles ++ list.asScala + case _ => // Do nothing } - else mutable.Buffer[StreamisFile]() - if(classpathFiles.nonEmpty) + startupMap.put("flink.yarn.ship-directories", classPathFiles.map(_.getFileName).mkString(",")) + if(classPathFiles.nonEmpty) startupMap.put("flink.app.user.class.path.bml.json", - JsonUtils.jackson.writeValueAsString(classpathFiles.map(getStreamisFileContent).asJava)) + JsonUtils.jackson.writeValueAsString(classPathFiles.map(getStreamisFileContent).asJava)) if(transformJobContent.getHdfsJars != null) startupMap.put("flink.user.lib.path", transformJobContent.getHdfsJars.asScala.mkString(",")) val params = if(job.getParams == null) new util.HashMap[String, Any] else job.getParams