diff --git a/core/pom.xml b/core/pom.xml
index 1040fcea6..851566fe0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -56,6 +56,18 @@
flink-streaming-scala_2.11
${flink.version}
+
+
+ org.apache.flink
+ flink-shaded-hadoop2
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-yarn_2.11
+ ${flink.version}
+
diff --git a/core/src/main/java/com/dtstack/flink/sql/MyLocalStreamEnvironment.java b/core/src/main/java/com/dtstack/flink/sql/MyLocalStreamEnvironment.java
index 54ddaa647..f41ecf00b 100644
--- a/core/src/main/java/com/dtstack/flink/sql/MyLocalStreamEnvironment.java
+++ b/core/src/main/java/com/dtstack/flink/sql/MyLocalStreamEnvironment.java
@@ -100,7 +100,7 @@ public JobExecutionResult execute(String jobName) throws Exception {
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
- configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
+ configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "-1L");
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
// add (and override) the settings with what the user defined
diff --git a/core/src/main/java/com/dtstack/flink/yarn/JobParameter.java b/core/src/main/java/com/dtstack/flink/yarn/JobParameter.java
new file mode 100644
index 000000000..1879c6ea1
--- /dev/null
+++ b/core/src/main/java/com/dtstack/flink/yarn/JobParameter.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright (C) 2018 The Sylph Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dtstack.flink.yarn;
+
+import java.util.Objects;
+import java.util.Properties;
+
+public class JobParameter
+{
+ private int parallelism = 1;
+ private String queue = "default";
+ private int taskManagerMemoryMb = 1024;
+ private int taskManagerCount = 1;
+ private int taskManagerSlots = 1;
+ private int jobManagerMemoryMb = 1024;
+
+ public JobParameter() {}
+
+ public JobParameter(Properties confProperties) {
+ this.parallelism = confProperties.getProperty("parallelism")==null?parallelism:Integer.parseInt(confProperties.getProperty("parallelism"));
+ this.queue = confProperties.getProperty("queue")==null?queue:confProperties.getProperty("queue");
+ this.taskManagerMemoryMb = confProperties.getProperty("taskManagerMemoryMb")==null?taskManagerMemoryMb:Integer.parseInt(confProperties.getProperty("taskManagerMemoryMb"));
+ this.taskManagerCount = confProperties.getProperty("taskManagerCount")==null?taskManagerCount:Integer.parseInt(confProperties.getProperty("taskManagerCount"));
+ this.taskManagerSlots = confProperties.getProperty("taskManagerSlots")==null?taskManagerSlots:Integer.parseInt(confProperties.getProperty("taskManagerSlots"));
+ this.jobManagerMemoryMb = confProperties.getProperty("jobManagerMemoryMb")==null?jobManagerMemoryMb:Integer.parseInt(confProperties.getProperty("jobManagerMemoryMb"));
+ }
+
+ public JobParameter(int parallelism, String queue, int taskManagerMemoryMb, int taskManagerCount, int taskManagerSlots, int jobManagerMemoryMb) {
+ this.parallelism = parallelism;
+ this.queue = queue;
+ this.taskManagerMemoryMb = taskManagerMemoryMb;
+ this.taskManagerCount = taskManagerCount;
+ this.taskManagerSlots = taskManagerSlots;
+ this.jobManagerMemoryMb = jobManagerMemoryMb;
+ }
+
+ public void setQueue(String queue)
+ {
+ this.queue = queue;
+ }
+
+ public void setTaskManagerCount(int taskManagerCount)
+ {
+ this.taskManagerCount = taskManagerCount;
+ }
+
+ public void setTaskManagerMemoryMb(int taskManagerMemoryMb)
+ {
+ this.taskManagerMemoryMb = taskManagerMemoryMb;
+ }
+
+ public void setTaskManagerSlots(int taskManagerSlots)
+ {
+ this.taskManagerSlots = taskManagerSlots;
+ }
+
+ public void setJobManagerMemoryMb(int jobManagerMemoryMb)
+ {
+ this.jobManagerMemoryMb = jobManagerMemoryMb;
+ }
+
+ public void setParallelism(int parallelism)
+ {
+ this.parallelism = parallelism;
+ }
+
+ public int getParallelism()
+ {
+ return parallelism;
+ }
+
+ public String getQueue()
+ {
+ return queue;
+ }
+
+ public int getJobManagerMemoryMb()
+ {
+ return jobManagerMemoryMb;
+ }
+
+ public int getTaskManagerSlots()
+ {
+ return taskManagerSlots;
+ }
+
+ public int getTaskManagerCount()
+ {
+ return taskManagerCount;
+ }
+
+ public int getTaskManagerMemoryMb()
+ {
+ return taskManagerMemoryMb;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobParameter jobParameter = (JobParameter) o;
+ return Objects.equals(this.queue, jobParameter.queue) &&
+ Objects.equals(this.taskManagerCount, jobParameter.taskManagerCount) &&
+ Objects.equals(this.taskManagerMemoryMb, jobParameter.taskManagerMemoryMb);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(queue, taskManagerMemoryMb, taskManagerCount);
+ }
+}
diff --git a/core/src/main/java/com/dtstack/flink/yarn/YarnClusterConfiguration.java b/core/src/main/java/com/dtstack/flink/yarn/YarnClusterConfiguration.java
new file mode 100644
index 000000000..f2ccea2c4
--- /dev/null
+++ b/core/src/main/java/com/dtstack/flink/yarn/YarnClusterConfiguration.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2018 The Sylph Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dtstack.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.util.Set;
+
+public class YarnClusterConfiguration {
+ /**
+ * The configuration used by YARN (i.e., yarn-site.xml
).
+ */
+ private final YarnConfiguration yarnConf;
+
+ /**
+ * The home directory of all job where all the temporary files for each jobs are stored.
+ */
+ private final String appRootDir;
+
+ /**
+ * The location of the Flink jar.
+ */
+ private final Path flinkJar;
+
+ /**
+ * Additional resources to be localized for both JobManager and TaskManager.
+ * They will NOT be added into the classpaths.
+ */
+ private final Set resourcesToLocalize;
+
+ /**
+ * flink conf
+ */
+ private final Configuration flinkConfiguration;
+
+ public YarnClusterConfiguration(
+ Configuration flinkConf,
+ YarnConfiguration conf,
+ String appRootDir,
+ Path flinkJar,
+ Set resourcesToLocalize) {
+ this.flinkConfiguration = flinkConf;
+ this.yarnConf = conf;
+ this.appRootDir = appRootDir;
+ this.flinkJar = flinkJar;
+ this.resourcesToLocalize = resourcesToLocalize;
+ }
+
+ YarnConfiguration yarnConf() {
+ return yarnConf;
+ }
+
+ public String appRootDir() {
+ return appRootDir;
+ }
+
+ public Configuration flinkConfiguration() {
+ return flinkConfiguration;
+ }
+
+ public Path flinkJar() {
+ return flinkJar;
+ }
+
+ public Set resourcesToLocalize() {
+ return resourcesToLocalize;
+ }
+
+}
diff --git a/core/src/main/java/com/dtstack/flink/yarn/YarnClusterDescriptor.java b/core/src/main/java/com/dtstack/flink/yarn/YarnClusterDescriptor.java
new file mode 100644
index 000000000..d442d16f2
--- /dev/null
+++ b/core/src/main/java/com/dtstack/flink/yarn/YarnClusterDescriptor.java
@@ -0,0 +1,393 @@
+/*
+ * Copyright (C) 2018 The Sylph Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dtstack.flink.yarn;
+
+import org.apache.flink.client.deployment.ClusterDeploymentException;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.yarn.*;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.yarn.api.records.YarnApplicationState.NEW;
+
+public class YarnClusterDescriptor
+ extends AbstractYarnClusterDescriptor
+{
+ private static final String APPLICATION_TYPE = "58_FLINK";
+ private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
+ private static final int MAX_ATTEMPT = 1;
+ private static final long DEPLOY_TIMEOUT_MS = 600 * 1000;
+ private static final long RETRY_DELAY_MS = 250;
+ private static final ScheduledExecutorService YARN_POLL_EXECUTOR = Executors.newSingleThreadScheduledExecutor();
+
+ private final YarnClusterConfiguration clusterConf;
+ private final YarnClient yarnClient;
+ private final JobParameter appConf;
+ private final Path homedir;
+ private final ApplicationId yarnAppId;
+ private final String jobName;
+ private final Iterable userProvidedJars;
+ private Path flinkJar;
+
+ public YarnClusterDescriptor(
+ final YarnClusterConfiguration clusterConf,
+ final YarnClient yarnClient,
+ final JobParameter appConf,
+ ApplicationId yarnAppId,
+ String jobName,
+ Iterable userProvidedJars)
+ {
+ super(clusterConf.flinkConfiguration(), clusterConf.yarnConf(), clusterConf.appRootDir(), yarnClient, false);
+ this.jobName = jobName;
+ this.clusterConf = clusterConf;
+ this.yarnClient = yarnClient;
+ this.appConf = appConf;
+ this.yarnAppId = yarnAppId;
+ this.userProvidedJars = userProvidedJars;
+ this.homedir = new Path(clusterConf.appRootDir(), yarnAppId.toString());
+ }
+
+ @Override
+ protected String getYarnSessionClusterEntrypoint()
+ {
+ return YarnApplicationMasterRunner.class.getName();
+ }
+
+ /**
+ * 提交到yarn时 任务启动入口类
+ */
+ @Override
+ protected String getYarnJobClusterEntrypoint()
+ {
+ return YarnApplicationMasterRunner.class.getName();
+ }
+
+ @Override
+ protected ClusterClient createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster)
+ throws Exception
+ {
+ return new RestClusterClient<>(
+ flinkConfiguration,
+ report.getApplicationId());
+ }
+
+ @Override
+ public YarnClient getYarnClient()
+ {
+ return this.yarnClient;
+ }
+
+ public YarnClusterClient deploy()
+ {
+ ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class);
+ context.setApplicationId(yarnAppId);
+ try {
+ ApplicationReport report = startAppMaster(context);
+
+ Configuration conf = getFlinkConfiguration();
+ conf.setString(JobManagerOptions.ADDRESS.key(), report.getHost());
+ conf.setInteger(JobManagerOptions.PORT.key(), report.getRpcPort());
+
+ return new YarnClusterClient(this,
+ appConf.getTaskManagerCount(),
+ appConf.getTaskManagerSlots(),
+ report, conf, false);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private ApplicationReport startAppMaster(ApplicationSubmissionContext appContext)
+ throws Exception
+ {
+ ApplicationId appId = appContext.getApplicationId();
+ appContext.setMaxAppAttempts(MAX_ATTEMPT);
+
+ Map localResources = new HashMap<>();
+ Set shippedPaths = new HashSet<>();
+ collectLocalResources(localResources, shippedPaths);
+
+ final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
+ getYarnJobClusterEntrypoint(),
+ false,
+ true,
+ false,
+ appConf.getJobManagerMemoryMb()
+ );
+
+ amContainer.setLocalResources(localResources);
+
+ final String classPath = String.join(File.pathSeparator, localResources.keySet());
+
+ final String shippedFiles = shippedPaths.stream()
+ .map(path -> path.getName() + "=" + path)
+ .collect(Collectors.joining(","));
+
+ // Setup CLASSPATH and environment variables for ApplicationMaster
+ final Map appMasterEnv = setUpAmEnvironment(
+ appId,
+ classPath,shippedFiles,
+ //"","",
+ getDynamicPropertiesEncoded()
+ );
+
+ amContainer.setEnvironment(appMasterEnv);
+
+ // Set up resource type requirements for ApplicationMaster
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(appConf.getJobManagerMemoryMb()); //设置jobManneger
+ capability.setVirtualCores(1); //默认是1
+
+ appContext.setApplicationName(jobName);
+ appContext.setApplicationType(APPLICATION_TYPE);
+ appContext.setAMContainerSpec(amContainer);
+ appContext.setResource(capability);
+ //appContext.setApplicationTags(appConf.getAppTags());
+ if (appConf.getQueue() != null) {
+ appContext.setQueue(appConf.getQueue());
+ }
+
+ LOG.info("Submitting application master {}", appId);
+ yarnClient.submitApplication(appContext);
+
+ PollDeploymentStatus poll = new PollDeploymentStatus(appId);
+ YARN_POLL_EXECUTOR.submit(poll);
+ try {
+ return poll.result.get();
+ }
+ catch (ExecutionException e) {
+ LOG.warn("Failed to deploy {}, cause: {}", appId.toString(), e.getCause());
+ yarnClient.killApplication(appId);
+ throw (Exception) e.getCause();
+ }
+ }
+
+ private void collectLocalResources(
+ Map resources,
+ Set shippedPaths
+ )
+ throws IOException, URISyntaxException
+ {
+ if(clusterConf.flinkJar() != null) {
+ Path flinkJar = clusterConf.flinkJar();
+ LocalResource flinkJarResource = setupLocalResource(flinkJar, homedir, ""); //放到 Appid/根目录下
+ this.flinkJar = ConverterUtils.getPathFromYarnURL(flinkJarResource.getResource());
+ resources.put("flink.jar", flinkJarResource);
+ }
+ if(clusterConf.resourcesToLocalize() != null) {
+ for (Path p : clusterConf.resourcesToLocalize()) { //主要是 flink.jar log4f.propors 和 flink.yaml 三个文件
+ LocalResource resource = setupLocalResource(p, homedir, ""); //这些需要放到根目录下
+ resources.put(p.getName(), resource);
+ if ("log4j.properties".equals(p.getName())) {
+ shippedPaths.add(ConverterUtils.getPathFromYarnURL(resource.getResource()));
+ }
+ }
+ }
+ if(userProvidedJars != null) {
+ for (Path p : userProvidedJars) {
+ String name = p.getName();
+ if (resources.containsKey(name)) { //这里当jar 有重复的时候 会抛出异常
+ LOG.warn("Duplicated name in the shipped files {}", p);
+ } else {
+ LocalResource resource = setupLocalResource(p, homedir, "jars"); //这些放到 jars目录下
+ resources.put(name, resource);
+ shippedPaths.add(ConverterUtils.getPathFromYarnURL(resource.getResource()));
+ }
+ }
+ }
+ }
+
+ private LocalResource registerLocalResource(FileSystem fs, Path remoteRsrcPath)
+ throws IOException
+ {
+ LocalResource localResource = Records.newRecord(LocalResource.class);
+ FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
+ localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
+ localResource.setSize(jarStat.getLen());
+ localResource.setTimestamp(jarStat.getModificationTime());
+ localResource.setType(LocalResourceType.FILE);
+ localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ return localResource;
+ }
+
+ private LocalResource setupLocalResource(
+ Path localSrcPath,
+ Path homedir,
+ String relativeTargetPath)
+ throws IOException
+ {
+ if (new File(localSrcPath.toUri().getPath()).isDirectory()) {
+ throw new IllegalArgumentException("File to copy must not be a directory: " +
+ localSrcPath);
+ }
+
+ // copy resource to HDFS
+ String suffix = "." + (relativeTargetPath.isEmpty() ? "" : "/" + relativeTargetPath)
+ + "/" + localSrcPath.getName();
+
+ Path dst = new Path(homedir, suffix);
+
+ LOG.info("Uploading {}", dst);
+
+ FileSystem hdfs = FileSystem.get(clusterConf.yarnConf());
+ hdfs.copyFromLocalFile(false, true, localSrcPath, dst);
+
+ // now create the resource instance
+ LocalResource resource = registerLocalResource(hdfs, dst);
+ return resource;
+ }
+
+ private Map setUpAmEnvironment(
+ ApplicationId appId,
+ String amClassPath,
+ String shipFiles,
+ String dynamicProperties)
+ throws IOException, URISyntaxException
+ {
+ final Map appMasterEnv = new HashMap<>();
+
+ // set Flink app class path
+ appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, amClassPath);
+
+ // set Flink on YARN internal configuration values
+ appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(appConf.getTaskManagerCount()));
+ appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(appConf.getTaskManagerMemoryMb()));
+ appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(appConf.getTaskManagerSlots()));
+ appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, flinkJar.toString());
+ appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
+ appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, homedir.toString()); //$home/.flink/appid 这个目录里面存放临时数据
+ appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, shipFiles);
+
+ appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(true)); //是否分离 分离就cluser模式 否则是client模式
+
+ appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME,
+ UserGroupInformation.getCurrentUser().getUserName());
+
+ if (dynamicProperties != null) {
+ appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicProperties);
+ }
+
+ // set classpath from YARN configuration
+ Utils.setupYarnClassPath(clusterConf.yarnConf(), appMasterEnv);
+
+ return appMasterEnv;
+ }
+
+ /**
+ * flink 1.5 add
+ */
+ @Override
+ public ClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)
+ throws ClusterDeploymentException
+ {
+ throw new UnsupportedOperationException("this method have't support!");
+ }
+
+ private final class PollDeploymentStatus
+ implements Runnable
+ {
+ private final CompletableFuture result = new CompletableFuture<>();
+ private final ApplicationId appId;
+ private YarnApplicationState lastAppState = NEW;
+ private long startTime;
+
+ private PollDeploymentStatus(ApplicationId appId)
+ {
+ this.appId = appId;
+ }
+
+ @Override
+ public void run()
+ {
+ if (startTime == 0) {
+ startTime = System.currentTimeMillis();
+ }
+
+ try {
+ ApplicationReport report = poll();
+ if (report == null) {
+ YARN_POLL_EXECUTOR.schedule(this, RETRY_DELAY_MS, TimeUnit.MILLISECONDS);
+ }
+ else {
+ result.complete(report);
+ }
+ }
+ catch (YarnException | IOException e) {
+ result.completeExceptionally(e);
+ }
+ }
+
+ private ApplicationReport poll()
+ throws IOException, YarnException
+ {
+ ApplicationReport report;
+ report = yarnClient.getApplicationReport(appId);
+ YarnApplicationState appState = report.getYarnApplicationState();
+ LOG.debug("Application State: {}", appState);
+
+ switch (appState) {
+ case FAILED:
+ case FINISHED:
+ //TODO: the finished state may be valid in flip-6
+ case KILLED:
+ throw new IOException("The YARN application unexpectedly switched to state "
+ + appState + " during deployment. \n"
+ + "Diagnostics from YARN: " + report.getDiagnostics() + "\n"
+ + "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
+ + "yarn logs -applicationId " + appId);
+ //break ..
+ case RUNNING:
+ LOG.info("YARN application has been deployed successfully.");
+ break;
+ default:
+ if (appState != lastAppState) {
+ LOG.info("Deploying cluster, current state " + appState);
+ }
+ lastAppState = appState;
+ if (System.currentTimeMillis() - startTime > DEPLOY_TIMEOUT_MS) {
+ throw new RuntimeException(String.format("Deployment took more than %d seconds. "
+ + "Please check if the requested resources are available in the YARN cluster", DEPLOY_TIMEOUT_MS));
+ }
+ return null;
+ }
+ return report;
+ }
+ }
+}
diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java b/launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java
index aa8b5db5f..36ab25d2a 100644
--- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java
+++ b/launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java
@@ -18,12 +18,13 @@
package com.dtstack.flink.sql.launcher;
+import com.dtstack.flink.sql.util.PluginUtil;
+import com.dtstack.flink.yarn.JobParameter;
+import com.dtstack.flink.yarn.YarnClusterConfiguration;
+import org.apache.commons.io.Charsets;
import org.apache.commons.lang.StringUtils;
-import org.apache.flink.client.deployment.ClusterRetrieveException;
-import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -33,21 +34,26 @@
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.File;
import java.io.FilenameFilter;
import java.net.InetSocketAddress;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.net.URLDecoder;
+import java.util.*;
+
import com.dtstack.flink.sql.ClusterMode;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import java.io.IOException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Objects.requireNonNull;
/**
* The Factory of ClusterClient
@@ -61,8 +67,8 @@ public static ClusterClient createClusterClient(LauncherOptions launcherOptions)
String mode = launcherOptions.getMode();
if(mode.equals(ClusterMode.standalone.name())) {
return createStandaloneClient(launcherOptions);
- } else if(mode.equals(ClusterMode.yarn.name())) {
- return createYarnClient(launcherOptions);
+ } else if(mode.equals(ClusterMode.yarn.name()) || mode.equals(ClusterMode.yarnPer.name())) {
+ return createYarnClient(launcherOptions,mode);
}
throw new IllegalArgumentException("Unsupported cluster client type: ");
}
@@ -79,86 +85,153 @@ public static ClusterClient createStandaloneClient(LauncherOptions launcherOptio
return clusterClient;
}
- public static ClusterClient createYarnClient(LauncherOptions launcherOptions) {
+ public static ClusterClient createYarnClient(LauncherOptions launcherOptions,String mode) {
String flinkConfDir = launcherOptions.getFlinkconf();
- Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
+ Configuration flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir);
String yarnConfDir = launcherOptions.getYarnconf();
- YarnConfiguration yarnConf = new YarnConfiguration();
+ YarnConfiguration yarnConf;
if(StringUtils.isNotBlank(yarnConfDir)) {
try {
-
- config.setString(ConfigConstants.PATH_HADOOP_CONFIG, yarnConfDir);
- FileSystem.initialize(config);
+ flinkConf.setString(ConfigConstants.PATH_HADOOP_CONFIG, yarnConfDir);
+ FileSystem.initialize(flinkConf);
File dir = new File(yarnConfDir);
if(dir.exists() && dir.isDirectory()) {
- File[] xmlFileList = new File(yarnConfDir).listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- if(name.endsWith(".xml")){
- return true;
- }
- return false;
- }
- });
-
- if(xmlFileList != null) {
- for(File xmlFile : xmlFileList) {
- yarnConf.addResource(xmlFile.toURI().toURL());
- }
- }
+ yarnConf = loadYarnConfiguration(yarnConfDir);
YarnClient yarnClient = YarnClient.createYarnClient();
haYarnConf(yarnConf);
yarnClient.init(yarnConf);
yarnClient.start();
+
+ String confProp = launcherOptions.getConfProp();
+ confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
+ System.out.println("confProp="+confProp);
+ Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
+
ApplicationId applicationId = null;
+ ClusterClient clusterClient = null;
+ if(mode.equals(ClusterMode.yarn.name())) {//on yarn cluster mode
+ applicationId = getYarnClusterApplicationId(yarnClient);
+ System.out.println("applicationId="+applicationId.toString());
- Set set = new HashSet<>();
- set.add("Apache Flink");
- EnumSet enumSet = EnumSet.noneOf(YarnApplicationState.class);
- enumSet.add(YarnApplicationState.RUNNING);
- List reportList = yarnClient.getApplications(set, enumSet);
-
- int maxMemory = -1;
- int maxCores = -1;
- for(ApplicationReport report : reportList) {
- if(!report.getName().startsWith("Flink session")){
- continue;
- }
-
- if(!report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
- continue;
- }
-
- int thisMemory = report.getApplicationResourceUsageReport().getNeededResources().getMemory();
- int thisCores = report.getApplicationResourceUsageReport().getNeededResources().getVirtualCores();
- if(thisMemory > maxMemory || thisMemory == maxMemory && thisCores > maxCores) {
- maxMemory = thisMemory;
- maxCores = thisCores;
- applicationId = report.getApplicationId();
- }
+ AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+ flinkConf, yarnConf, ".", yarnClient, false);
+ clusterClient = clusterDescriptor.retrieve(applicationId);
- }
+ System.out.println("applicationId="+applicationId.toString()+" has retrieve!");
+ } else {//on yarn per-job mode
+ applicationId = createApplication(yarnClient);
+ System.out.println("applicationId="+applicationId.toString());
- if(StringUtils.isEmpty(applicationId.toString())) {
- throw new RuntimeException("No flink session found on yarn cluster.");
- }
+ YarnClusterConfiguration clusterConf = getYarnClusterConfiguration(flinkConf,yarnConf,flinkConfDir);
+ //jobmanager+taskmanager param
+ JobParameter appConf = new JobParameter(confProperties);
+ com.dtstack.flink.yarn.YarnClusterDescriptor clusterDescriptor = new com.dtstack.flink.yarn.YarnClusterDescriptor(
+ clusterConf, yarnClient, appConf,applicationId, launcherOptions.getName(),null );
+ clusterClient = clusterDescriptor.deploy();
- AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(config, yarnConf, ".", yarnClient, false);
- ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId);
+ System.out.println("applicationId="+applicationId.toString()+" has deploy!");
+ }
clusterClient.setDetached(true);
+ yarnClient.stop();
return clusterClient;
}
} catch(Exception e) {
throw new RuntimeException(e);
}
}
+ throw new UnsupportedOperationException("Haven't been developed yet!");
+ }
+ private static YarnConfiguration loadYarnConfiguration(String yarnConfDir)
+ {
+ org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+ hadoopConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ Stream.of("yarn-site.xml", "core-site.xml", "hdfs-site.xml").forEach(file -> {
+ File site = new File(requireNonNull(yarnConfDir, "ENV HADOOP_CONF_DIR is not setting"), file);
+ if (site.exists() && site.isFile()) {
+ hadoopConf.addResource(new org.apache.hadoop.fs.Path(site.toURI()));
+ }
+ else {
+ throw new RuntimeException(site + " not exists");
+ }
+ });
- throw new UnsupportedOperationException("Haven't been developed yet!");
+ YarnConfiguration yarnConf = new YarnConfiguration(hadoopConf);
+ // try (PrintWriter pw = new PrintWriter(new FileWriter(yarnSite))) { //write local file
+ // yarnConf.writeXml(pw);
+ // }
+ return yarnConf;
+ }
+
+ public static YarnClusterConfiguration getYarnClusterConfiguration(Configuration flinkConf,YarnConfiguration yarnConf,String flinkConfDir)
+ {
+ Path flinkJar = new Path(getFlinkJarFile(flinkConfDir).toURI());
+ @SuppressWarnings("ConstantConditions") final Set resourcesToLocalize = Stream
+ .of("flink-conf.yaml", "log4j.properties")
+ .map(x -> new Path(new File(flinkConfDir, x).toURI()))
+ .collect(Collectors.toSet());
+
+ String home = "hdfs:///tmp/flink/apps";
+ return new YarnClusterConfiguration(
+ flinkConf,
+ yarnConf,
+ home,
+ flinkJar,
+ resourcesToLocalize);
+ }
+
+ public static final String FLINK_DIST = "flink-dist";
+ private static File getFlinkJarFile(String flinkConfDir)
+ {
+ String errorMessage = "error not search " + FLINK_DIST + "*.jar";
+ File[] files = requireNonNull(new File(flinkConfDir, "/../lib").listFiles(), errorMessage);
+ Optional file = Arrays.stream(files)
+ .filter(f -> f.getName().startsWith(FLINK_DIST)).findFirst();
+ return file.orElseThrow(() -> new IllegalArgumentException(errorMessage));
+ }
+
+ private static ApplicationId createApplication(YarnClient yarnClient)throws IOException, YarnException {
+ YarnClientApplication app = yarnClient.createApplication();
+ return app.getApplicationSubmissionContext().getApplicationId();
+ }
+ private static ApplicationId getYarnClusterApplicationId(YarnClient yarnClient) throws Exception{
+ ApplicationId applicationId = null;
+
+ Set set = new HashSet<>();
+ set.add("Apache Flink");
+ EnumSet enumSet = EnumSet.noneOf(YarnApplicationState.class);
+ enumSet.add(YarnApplicationState.RUNNING);
+ List reportList = yarnClient.getApplications(set, enumSet);
+
+ int maxMemory = -1;
+ int maxCores = -1;
+ for(ApplicationReport report : reportList) {
+ if(!report.getName().startsWith("Flink session")){
+ continue;
+ }
+
+ if(!report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
+ continue;
+ }
+
+ int thisMemory = report.getApplicationResourceUsageReport().getNeededResources().getMemory();
+ int thisCores = report.getApplicationResourceUsageReport().getNeededResources().getVirtualCores();
+ if(thisMemory > maxMemory || thisMemory == maxMemory && thisCores > maxCores) {
+ maxMemory = thisMemory;
+ maxCores = thisCores;
+ applicationId = report.getApplicationId();
+ }
+
+ }
+
+ if(StringUtils.isEmpty(applicationId.toString())) {
+ throw new RuntimeException("No flink session found on yarn cluster.");
+ }
+ return applicationId;
}
/**
@@ -181,31 +254,4 @@ private static org.apache.hadoop.conf.Configuration haYarnConf(org.apache.hadoop
return yarnConf;
}
- private static org.apache.hadoop.conf.Configuration getYarnConf(String yarnConfDir) {
- org.apache.hadoop.conf.Configuration yarnConf = new YarnConfiguration();
- try {
-
- File dir = new File(yarnConfDir);
- if(dir.exists() && dir.isDirectory()) {
- File[] xmlFileList = new File(yarnConfDir).listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- if(name.endsWith(".xml")){
- return true;
- }
- return false;
- }
- });
- if(xmlFileList != null) {
- for(File xmlFile : xmlFileList) {
- yarnConf.addResource(xmlFile.toURI().toURL());
- }
- }
- }
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- return yarnConf;
- }
-
}
diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java b/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java
index 55f085c98..957d8cb18 100644
--- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java
+++ b/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java
@@ -27,6 +27,9 @@
import java.io.File;
import java.util.List;
import com.dtstack.flink.sql.ClusterMode;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.shaded.org.apache.commons.lang.StringUtils;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.table.shaded.org.apache.commons.lang.BooleanUtils;
@@ -64,8 +67,13 @@ public static void main(String[] args) throws Exception {
if(StringUtils.isNotBlank(launcherOptions.getSavePointPath())){
program.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(launcherOptions.getSavePointPath(), BooleanUtils.toBoolean(launcherOptions.getAllowNonRestoredState())));
}
+ //final JobGraph jobGraph;
+ //jobGraph = PackagedProgramUtils.createJobGraph(program, new Configuration(), 1);
+ //clusterClient.runDetached(jobGraph,null);
clusterClient.run(program, 1);
clusterClient.shutdown();
+
+ System.exit(0);
}
}
}
diff --git a/pom.xml b/pom.xml
index 0922f739b..9b8f3ea45 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
UTF-8
- 1.5.3
+ 1.6.0