diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterMode.java b/core/src/main/java/com/dtstack/flink/sql/ClusterMode.java similarity index 79% rename from launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterMode.java rename to core/src/main/java/com/dtstack/flink/sql/ClusterMode.java index 590aba48e..a73730840 100644 --- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterMode.java +++ b/core/src/main/java/com/dtstack/flink/sql/ClusterMode.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.dtstack.flink.sql.launcher; +package com.dtstack.flink.sql; /** * This class defines three running mode of FlinkX @@ -24,12 +24,14 @@ * Company: www.dtstack.com * @author huyifan.zju@163.com */ -public class ClusterMode { +public enum ClusterMode { - public static final String MODE_LOCAL = "local"; + local(0),standalone(1),yarn(2),yarnPer(3); - public static final String MODE_STANDALONE = "standalone"; + private int type; - public static final String MODE_YARN = "yarn"; + ClusterMode(int type){ + this.type = type; + } } diff --git a/core/src/main/java/com/dtstack/flink/sql/Main.java b/core/src/main/java/com/dtstack/flink/sql/Main.java index e31d6125b..cc06f2b60 100644 --- a/core/src/main/java/com/dtstack/flink/sql/Main.java +++ b/core/src/main/java/com/dtstack/flink/sql/Main.java @@ -88,8 +88,6 @@ public class Main { private static final Logger LOG = LoggerFactory.getLogger(Main.class); - private static final String LOCAL_MODE = "local"; - private static final int failureRate = 3; private static final int failureInterval = 6; //min @@ -135,7 +133,7 @@ public static void main(String[] args) throws Exception { Thread.currentThread().setContextClassLoader(dtClassLoader); URLClassLoader parentClassloader; - if(!LOCAL_MODE.equals(deployMode)){ + if(!ClusterMode.local.name().equals(deployMode)){ parentClassloader = (URLClassLoader) threadClassLoader.getParent(); }else{ parentClassloader = dtClassLoader; @@ -286,7 +284,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en } private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException { - StreamExecutionEnvironment env = !LOCAL_MODE.equals(deployMode) ? + StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ? StreamExecutionEnvironment.getExecutionEnvironment() : new MyLocalStreamEnvironment(); diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java b/launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java index 960398906..06c759997 100644 --- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java +++ b/launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java @@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; - +import com.dtstack.flink.sql.ClusterMode; import java.io.File; import java.io.FilenameFilter; import java.lang.reflect.Field; @@ -42,10 +42,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; -import static com.dtstack.flink.sql.launcher.LauncherOptions.*; /** * The Factory of ClusterClient @@ -55,18 +53,18 @@ */ public class ClusterClientFactory { - public static ClusterClient createClusterClient(Properties props) { - String clientType = props.getProperty(OPTION_MODE); - if(clientType.equals(ClusterMode.MODE_STANDALONE)) { - return createStandaloneClient(props); - } else if(clientType.equals(ClusterMode.MODE_YARN)) { - return createYarnClient(props); + public static ClusterClient createClusterClient(LauncherOptions launcherOptions) { + String mode = launcherOptions.getMode(); + if(mode.equals(ClusterMode.standalone.name())) { + return createStandaloneClient(launcherOptions); + } else if(mode.equals(ClusterMode.yarn.name())) { + return createYarnClient(launcherOptions); } throw new IllegalArgumentException("Unsupported cluster client type: "); } - public static StandaloneClusterClient createStandaloneClient(Properties props) { - String flinkConfDir = props.getProperty(LauncherOptions.OPTION_FLINK_CONF_DIR); + public static StandaloneClusterClient createStandaloneClient(LauncherOptions launcherOptions) { + String flinkConfDir = launcherOptions.getFlinkconf(); Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir); StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); StandaloneClusterClient clusterClient = descriptor.retrieve(null); @@ -74,10 +72,10 @@ public static StandaloneClusterClient createStandaloneClient(Properties props) { return clusterClient; } - public static YarnClusterClient createYarnClient(Properties props) { - String flinkConfDir = props.getProperty(LauncherOptions.OPTION_FLINK_CONF_DIR); + public static YarnClusterClient createYarnClient(LauncherOptions launcherOptions) { + String flinkConfDir = launcherOptions.getFlinkconf(); Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir); - String yarnConfDir = props.getProperty(LauncherOptions.OPTION_YARN_CONF_DIR); + String yarnConfDir =launcherOptions.getYarnconf(); org.apache.hadoop.conf.Configuration yarnConf = new YarnConfiguration(); if(StringUtils.isNotBlank(yarnConfDir)) { try { diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java b/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java index 3c4cdc57f..0bb25424c 100644 --- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java +++ b/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java @@ -22,15 +22,14 @@ import avro.shaded.com.google.common.collect.Lists; import com.dtstack.flink.sql.Main; +import org.apache.commons.lang.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; - import java.io.File; import java.util.List; - -import static com.dtstack.flink.sql.launcher.ClusterMode.MODE_LOCAL; -import static com.dtstack.flink.sql.launcher.LauncherOptions.OPTION_LOCAL_SQL_PLUGIN_PATH; -import static com.dtstack.flink.sql.launcher.LauncherOptions.OPTION_MODE; +import com.dtstack.flink.sql.ClusterMode; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; /** * Date: 2017/2/20 @@ -51,18 +50,21 @@ private static String getLocalCoreJarPath(String localSqlRootJar){ public static void main(String[] args) throws Exception { LauncherOptionParser optionParser = new LauncherOptionParser(args); - String mode = (String) optionParser.getVal(OPTION_MODE); + LauncherOptions launcherOptions = optionParser.getLauncherOptions(); + String mode = launcherOptions.getMode(); List argList = optionParser.getProgramExeArgList(); - - if(mode.equals(MODE_LOCAL)) { + if(mode.equals(ClusterMode.local.name())) { String[] localArgs = argList.toArray(new String[argList.size()]); Main.main(localArgs); } else { - ClusterClient clusterClient = ClusterClientFactory.createClusterClient(optionParser.getProperties()); - String pluginRoot = (String) optionParser.getVal(OPTION_LOCAL_SQL_PLUGIN_PATH); + ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions); + String pluginRoot = launcherOptions.getLocalSqlPluginPath(); File jarFile = new File(getLocalCoreJarPath(pluginRoot)); String[] remoteArgs = argList.toArray(new String[argList.size()]); PackagedProgram program = new PackagedProgram(jarFile, Lists.newArrayList(), remoteArgs); + if(StringUtils.isNotBlank(launcherOptions.getSavePointPath())){ + program.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(launcherOptions.getSavePointPath(), BooleanUtils.toBoolean(launcherOptions.getAllowNonRestoredState()))); + } clusterClient.run(program, 1); clusterClient.shutdown(); } diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptionParser.java b/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptionParser.java index 50484dd02..ac83e55cd 100644 --- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptionParser.java +++ b/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptionParser.java @@ -19,23 +19,19 @@ package com.dtstack.flink.sql.launcher; import avro.shaded.com.google.common.collect.Lists; +import com.dtstack.flink.sql.util.PluginUtil; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.lang.StringUtils; import org.apache.flink.hadoop.shaded.com.google.common.base.Charsets; import org.apache.flink.hadoop.shaded.com.google.common.base.Preconditions; - import java.io.File; import java.io.FileInputStream; import java.net.URLEncoder; import java.util.List; import java.util.Map; -import java.util.Properties; - -import static com.dtstack.flink.sql.launcher.LauncherOptions.*; -import static com.dtstack.flink.sql.launcher.ClusterMode.*; - +import com.dtstack.flink.sql.ClusterMode; /** * The Parser of Launcher commandline options @@ -45,14 +41,36 @@ */ public class LauncherOptionParser { + public static final String OPTION_MODE = "mode"; + + public static final String OPTION_NAME = "name"; + + public static final String OPTION_SQL = "sql"; + + public static final String OPTION_FLINK_CONF_DIR = "flinkconf"; + + public static final String OPTION_YARN_CONF_DIR = "yarnconf"; + + public static final String OPTION_LOCAL_SQL_PLUGIN_PATH = "localSqlPluginPath"; + + public static final String OPTION_REMOTE_SQL_PLUGIN_PATH = "remoteSqlPluginPath"; + + public static final String OPTION_ADDJAR = "addjar"; + + public static final String OPTION_CONF_PROP = "confProp"; + + public static final String OPTION_SAVE_POINT_PATH = "savePointPath"; + + public static final String OPTION_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState"; + private Options options = new Options(); private BasicParser parser = new BasicParser(); - private Properties properties = new Properties(); + private LauncherOptions properties = new LauncherOptions(); public LauncherOptionParser(String[] args) { - options.addOption(LauncherOptions.OPTION_MODE, true, "Running mode"); + options.addOption(OPTION_MODE, true, "Running mode"); options.addOption(OPTION_SQL, true, "Job sql file"); options.addOption(OPTION_NAME, true, "Job name"); options.addOption(OPTION_FLINK_CONF_DIR, true, "Flink configuration directory"); @@ -62,11 +80,14 @@ public LauncherOptionParser(String[] args) { options.addOption(OPTION_CONF_PROP, true, "sql ref prop,eg specify event time"); options.addOption(OPTION_YARN_CONF_DIR, true, "Yarn and hadoop configuration directory"); + options.addOption(OPTION_SAVE_POINT_PATH, true, "Savepoint restore path"); + options.addOption(OPTION_ALLOW_NON_RESTORED_STATE, true, "Flag indicating whether non restored state is allowed if the savepoint"); + try { CommandLine cl = parser.parse(options, args); - String mode = cl.getOptionValue(OPTION_MODE, MODE_LOCAL); + String mode = cl.getOptionValue(OPTION_MODE, ClusterMode.local.name()); //check mode - properties.put(OPTION_MODE, mode); + properties.setMode(mode); String job = Preconditions.checkNotNull(cl.getOptionValue(OPTION_SQL), "Must specify job file using option '" + OPTION_SQL + "'"); @@ -76,78 +97,65 @@ public LauncherOptionParser(String[] args) { in.read(filecontent); String content = new String(filecontent, "UTF-8"); String sql = URLEncoder.encode(content, Charsets.UTF_8.name()); - properties.put(OPTION_SQL, sql); - + properties.setSql(sql); String localPlugin = Preconditions.checkNotNull(cl.getOptionValue(OPTION_LOCAL_SQL_PLUGIN_PATH)); - properties.put(OPTION_LOCAL_SQL_PLUGIN_PATH, localPlugin); - + properties.setLocalSqlPluginPath(localPlugin); String remotePlugin = cl.getOptionValue(OPTION_REMOTE_SQL_PLUGIN_PATH); - if(!mode.equalsIgnoreCase(ClusterMode.MODE_LOCAL)){ + if(!ClusterMode.local.name().equals(mode)){ Preconditions.checkNotNull(remotePlugin); - properties.put(OPTION_REMOTE_SQL_PLUGIN_PATH, remotePlugin); + properties.setRemoteSqlPluginPath(remotePlugin); } - String name = Preconditions.checkNotNull(cl.getOptionValue(OPTION_NAME)); - properties.put(OPTION_NAME, name); - + properties.setName(name); String addJar = cl.getOptionValue(OPTION_ADDJAR); if(StringUtils.isNotBlank(addJar)){ - properties.put(OPTION_ADDJAR, addJar); + properties.setAddjar(addJar); } - String confProp = cl.getOptionValue(OPTION_CONF_PROP); if(StringUtils.isNotBlank(confProp)){ - properties.put(OPTION_CONF_PROP, confProp); + properties.setConfProp(confProp); } - String flinkConfDir = cl.getOptionValue(OPTION_FLINK_CONF_DIR); if(StringUtils.isNotBlank(flinkConfDir)) { - properties.put(OPTION_FLINK_CONF_DIR, flinkConfDir); + properties.setFlinkconf(flinkConfDir); } String yarnConfDir = cl.getOptionValue(OPTION_YARN_CONF_DIR); if(StringUtils.isNotBlank(yarnConfDir)) { - properties.put(OPTION_YARN_CONF_DIR, yarnConfDir); + properties.setYarnconf(yarnConfDir); + } + + String savePointPath = cl.getOptionValue(OPTION_SAVE_POINT_PATH); + if(StringUtils.isNotBlank(savePointPath)) { + properties.setSavePointPath(savePointPath); + } + + String allow_non = cl.getOptionValue(OPTION_ALLOW_NON_RESTORED_STATE); + if(StringUtils.isNotBlank(allow_non)) { + properties.setAllowNonRestoredState(allow_non); } } catch (Exception e) { throw new RuntimeException(e); } - } - public Properties getProperties(){ + public LauncherOptions getLauncherOptions(){ return properties; } - public Object getVal(String key){ - return properties.get(key); - } - - public List getAllArgList(){ + public List getProgramExeArgList() throws Exception { + Map mapConf = PluginUtil.ObjectToMap(properties); List args = Lists.newArrayList(); - for(Map.Entry one : properties.entrySet()){ - args.add("-" + one.getKey().toString()); - args.add(one.getValue().toString()); - } - - return args; - } - - public List getProgramExeArgList(){ - List args = Lists.newArrayList(); - for(Map.Entry one : properties.entrySet()){ - String key = one.getKey().toString(); + for(Map.Entry one : mapConf.entrySet()){ + String key = one.getKey(); if(OPTION_FLINK_CONF_DIR.equalsIgnoreCase(key) || OPTION_YARN_CONF_DIR.equalsIgnoreCase(key)){ continue; } - args.add("-" + key); args.add(one.getValue().toString()); } - return args; } - } diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptions.java b/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptions.java index 2169bb698..ab4a276bb 100644 --- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptions.java +++ b/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptions.java @@ -26,23 +26,113 @@ */ public class LauncherOptions { - public static final String OPTION_MODE = "mode"; + private String mode; - public static final String OPTION_NAME = "name"; + private String name; - public static final String OPTION_SQL = "sql"; + private String sql; - public static final String OPTION_FLINK_CONF_DIR = "flinkconf"; + private String flinkconf; - public static final String OPTION_YARN_CONF_DIR = "yarnconf"; + private String yarnconf; - public static final String OPTION_LOCAL_SQL_PLUGIN_PATH = "localSqlPluginPath"; + private String localSqlPluginPath; - public static final String OPTION_REMOTE_SQL_PLUGIN_PATH = "remoteSqlPluginPath"; + private String remoteSqlPluginPath ; - public static final String OPTION_ADDJAR = "addjar"; + private String addjar; - public static final String OPTION_CONF_PROP = "confProp"; + private String confProp; + private String savePointPath; + private String allowNonRestoredState = "false"; + + public String getMode() { + return mode; + } + + public void setMode(String mode) { + this.mode = mode; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public String getFlinkconf() { + return flinkconf; + } + + public void setFlinkconf(String flinkconf) { + this.flinkconf = flinkconf; + } + + public String getYarnconf() { + return yarnconf; + } + + public void setYarnconf(String yarnconf) { + this.yarnconf = yarnconf; + } + + public String getLocalSqlPluginPath() { + return localSqlPluginPath; + } + + public void setLocalSqlPluginPath(String localSqlPluginPath) { + this.localSqlPluginPath = localSqlPluginPath; + } + + public String getRemoteSqlPluginPath() { + return remoteSqlPluginPath; + } + + public void setRemoteSqlPluginPath(String remoteSqlPluginPath) { + this.remoteSqlPluginPath = remoteSqlPluginPath; + } + + public String getAddjar() { + return addjar; + } + + public void setAddjar(String addjar) { + this.addjar = addjar; + } + + public String getConfProp() { + return confProp; + } + + public void setConfProp(String confProp) { + this.confProp = confProp; + } + + public String getSavePointPath() { + return savePointPath; + } + + public void setSavePointPath(String savePointPath) { + this.savePointPath = savePointPath; + } + + public String getAllowNonRestoredState() { + return allowNonRestoredState; + } + + public void setAllowNonRestoredState(String allowNonRestoredState) { + this.allowNonRestoredState = allowNonRestoredState; + } }