entry : pluginClassLoader.entrySet()) {
+ classPaths.addAll(Arrays.asList(entry.getValue().getURLs()));
+ }
+ return classPaths;
+ }
+}
diff --git a/core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderSupplier.java b/core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderSupplier.java
new file mode 100644
index 000000000..859aa75f4
--- /dev/null
+++ b/core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderSupplier.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package com.dtstack.flink.sql.classloader;
+
+/**
+ * Represents a supplier of results.
+ *
+ * There is no requirement that a new or distinct result be returned each
+ * time the supplier is invoked.
+ *
+ *
This is a functional interface
+ * whose functional method is {@link #get()}.
+ *
+ * @param the type of results supplied by this supplier
+ *
+ * @since 1.8
+ */
+@FunctionalInterface
+public interface ClassLoaderSupplier {
+
+ /**
+ * Gets a result.
+ *
+ * @return a result
+ */
+ T get(ClassLoader cl) throws Exception;
+}
diff --git a/core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderSupplierCallBack.java b/core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderSupplierCallBack.java
new file mode 100644
index 000000000..51d37ef5e
--- /dev/null
+++ b/core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderSupplierCallBack.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package com.dtstack.flink.sql.classloader;
+
+/**
+ * company: www.dtstack.com
+ * author: toutian
+ * create: 2019/10/14
+ */
+public class ClassLoaderSupplierCallBack {
+
+ public static R callbackAndReset(ClassLoaderSupplier supplier, ClassLoader toSetClassLoader) throws Exception {
+ ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(toSetClassLoader);
+ try {
+ return supplier.get(toSetClassLoader);
+ } finally {
+ Thread.currentThread().setContextClassLoader(oldClassLoader);
+ }
+ }
+
+
+}
diff --git a/core/src/main/java/com/dtstack/flink/sql/config/CalciteConfig.java b/core/src/main/java/com/dtstack/flink/sql/config/CalciteConfig.java
new file mode 100644
index 000000000..54ae66bbc
--- /dev/null
+++ b/core/src/main/java/com/dtstack/flink/sql/config/CalciteConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package com.dtstack.flink.sql.config;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.SqlParser.Config;
+
+public class CalciteConfig {
+
+ public static Config MYSQL_LEX_CONFIG = SqlParser
+ .configBuilder()
+ .setLex(Lex.MYSQL)
+ .build();
+
+
+
+}
diff --git a/core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java b/core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java
index 6b6551f9a..76f5996c3 100644
--- a/core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java
+++ b/core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java
@@ -29,7 +29,9 @@
*/
public class ConfigConstrant {
- public static final String FLINK_CHECKPOINT_INTERVAL_KEY = "sql.checkpoint.interval";
+ public static final String SQL_CHECKPOINT_INTERVAL_KEY = "sql.checkpoint.interval";
+ // 兼容上层
+ public static final String FLINK_CHECKPOINT_INTERVAL_KEY = "flink.checkpoint.interval";
public static final String FLINK_CHECKPOINT_MODE_KEY = "sql.checkpoint.mode";
@@ -37,7 +39,11 @@ public class ConfigConstrant {
public static final String FLINK_MAXCONCURRENTCHECKPOINTS_KEY = "sql.max.concurrent.checkpoints";
- public static final String FLINK_CHECKPOINT_CLEANUPMODE_KEY = "sql.checkpoint.cleanup.mode";
+ public static final String SQL_CHECKPOINT_CLEANUPMODE_KEY = "sql.checkpoint.cleanup.mode";
+
+ public static final String FLINK_CHECKPOINT_CLEANUPMODE_KEY = "flink.checkpoint.cleanup.mode";
+
+
public static final String FLINK_CHECKPOINT_DATAURI_KEY = "flinkCheckpointDataURI";
diff --git a/core/src/main/java/com/dtstack/flink/sql/enums/EPluginLoadMode.java b/core/src/main/java/com/dtstack/flink/sql/enums/EPluginLoadMode.java
new file mode 100644
index 000000000..6cb027ac3
--- /dev/null
+++ b/core/src/main/java/com/dtstack/flink/sql/enums/EPluginLoadMode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.flink.sql.enums;
+
+/**
+ *
+ * CLASSPATH: plugin jar depends on each machine node.
+ * SHIPFILE: plugin jar only depends on the client submitted by the task.
+ *
+ */
+public enum EPluginLoadMode {
+
+ CLASSPATH(0),
+ SHIPFILE(1);
+
+ private int type;
+
+ EPluginLoadMode(int type){
+ this.type = type;
+ }
+
+ public int getType(){
+ return this.type;
+ }
+}
diff --git a/core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java b/core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java
index 10e34a5e6..07860b608 100644
--- a/core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java
@@ -18,7 +18,7 @@
package com.dtstack.flink.sql.option;
-import avro.shaded.com.google.common.collect.Lists;
+import com.google.common.collect.Lists;
import com.dtstack.flink.sql.util.PluginUtil;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
diff --git a/core/src/main/java/com/dtstack/flink/sql/option/Options.java b/core/src/main/java/com/dtstack/flink/sql/option/Options.java
index eef54a617..a653aa42e 100644
--- a/core/src/main/java/com/dtstack/flink/sql/option/Options.java
+++ b/core/src/main/java/com/dtstack/flink/sql/option/Options.java
@@ -19,6 +19,7 @@
package com.dtstack.flink.sql.option;
import com.dtstack.flink.sql.enums.ClusterMode;
+import com.dtstack.flink.sql.enums.EPluginLoadMode;
/**
@@ -71,6 +72,9 @@ public class Options {
@OptionRequired(description = "yarn session configuration,such as yid")
private String yarnSessionConf = "{}";
+ @OptionRequired(description = "plugin load mode, by classpath or shipfile")
+ private String pluginLoadMode = EPluginLoadMode.CLASSPATH.name();
+
public String getMode() {
return mode;
}
@@ -182,4 +186,12 @@ public String getYarnSessionConf() {
public void setYarnSessionConf(String yarnSessionConf) {
this.yarnSessionConf = yarnSessionConf;
}
+
+ public String getPluginLoadMode() {
+ return pluginLoadMode;
+ }
+
+ public void setPluginLoadMode(String pluginLoadMode) {
+ this.pluginLoadMode = pluginLoadMode;
+ }
}
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/CreateFuncParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/CreateFuncParser.java
index 793dd6baa..670d98a7e 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/CreateFuncParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/CreateFuncParser.java
@@ -32,7 +32,7 @@
public class CreateFuncParser implements IParser {
- private static final String funcPatternStr = "(?i)\\s*create\\s+(scala|table)\\s+function\\s+(\\S+)\\s+WITH\\s+(\\S+)";
+ private static final String funcPatternStr = "(?i)\\s*create\\s+(scala|table|aggregate)\\s+function\\s+(\\S+)\\s+WITH\\s+(\\S+)";
private static final Pattern funcPattern = Pattern.compile(funcPatternStr);
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java
index 5e126e786..ae6e1f708 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java
@@ -21,7 +21,7 @@
package com.dtstack.flink.sql.parser;
import com.dtstack.flink.sql.util.DtStringUtil;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
+import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java
index db18986b7..de7141eb5 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java
@@ -25,8 +25,7 @@
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
+import com.google.common.collect.Lists;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java
index ff2bb9e4b..a7c6db9eb 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java
@@ -21,17 +21,11 @@
package com.dtstack.flink.sql.parser;
import org.apache.calcite.config.Lex;
-import org.apache.calcite.sql.SqlBasicCall;
-import org.apache.calcite.sql.SqlInsert;
-import org.apache.calcite.sql.SqlJoin;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOrderBy;
-import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import com.google.common.collect.Lists;
import java.util.List;
@@ -119,6 +113,10 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
sqlParseResult.addSourceTable(identifierNode.toString());
}
break;
+ case MATCH_RECOGNIZE:
+ SqlMatchRecognize node = (SqlMatchRecognize) sqlNode;
+ sqlParseResult.addSourceTable(node.getTableRef().toString());
+ break;
case UNION:
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java
index e9fb68cfe..a76c1b31a 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java
@@ -25,8 +25,8 @@
import com.dtstack.flink.sql.table.TableInfoParser;
import com.dtstack.flink.sql.util.DtStringUtil;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
-import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.base.Strings;
import java.util.List;
import java.util.Set;
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/SqlTree.java b/core/src/main/java/com/dtstack/flink/sql/parser/SqlTree.java
index 754de0819..1b64b7c68 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/SqlTree.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/SqlTree.java
@@ -22,8 +22,8 @@
import com.dtstack.flink.sql.table.TableInfo;
-import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Maps;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/FieldReplaceInfo.java b/core/src/main/java/com/dtstack/flink/sql/side/FieldReplaceInfo.java
index bc716ddaa..37b23d046 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/FieldReplaceInfo.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/FieldReplaceInfo.java
@@ -20,7 +20,7 @@
package com.dtstack.flink.sql.side;
-import org.apache.flink.calcite.shaded.com.google.common.collect.HashBasedTable;
+import com.google.common.collect.HashBasedTable;
/**
* Reason:
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java b/core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java
index 03dbde5a6..6fde02493 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java
@@ -22,9 +22,10 @@
import org.apache.calcite.sql.JoinType;
import org.apache.calcite.sql.SqlNode;
-import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
+import com.google.common.base.Strings;
import java.io.Serializable;
+import java.util.Map;
/**
* Join信息
@@ -40,6 +41,8 @@ public class JoinInfo implements Serializable {
//左表是否是维表
private boolean leftIsSideTable;
+ //左表是 转换后的中间表
+ private boolean leftIsMidTable;
//右表是否是维表
private boolean rightIsSideTable;
@@ -63,6 +66,8 @@ public class JoinInfo implements Serializable {
private SqlNode selectNode;
private JoinType joinType;
+ // 左边是中间转换表,做表映射关系,给替换属性名称使用
+ private Map leftTabMapping;
public String getSideTableName(){
if(leftIsSideTable){
@@ -87,6 +92,22 @@ public String getNewTableName(){
return leftStr + "_" + rightTableName;
}
+ public boolean isLeftIsMidTable() {
+ return leftIsMidTable;
+ }
+
+ public void setLeftIsMidTable(boolean leftIsMidTable) {
+ this.leftIsMidTable = leftIsMidTable;
+ }
+
+ public Map getLeftTabMapping() {
+ return leftTabMapping;
+ }
+
+ public void setLeftTabMapping(Map leftTabMapping) {
+ this.leftTabMapping = leftTabMapping;
+ }
+
public String getNewTableAlias(){
return leftTableAlias + "_" + rightTableAlias;
}
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/JoinScope.java b/core/src/main/java/com/dtstack/flink/sql/side/JoinScope.java
index ba07e714a..c7a73e0d7 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/JoinScope.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/JoinScope.java
@@ -23,8 +23,8 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java b/core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java
index df242a390..74d303c24 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java
@@ -27,7 +27,7 @@
import org.apache.calcite.sql.SqlSelect;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
+import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
@@ -41,8 +41,12 @@
public class ParserJoinField {
+
/**
- * Need to parse the fields of information and where selectlist
+ * build row by field
+ * @param sqlNode select node
+ * @param scope join left and right table all info
+ * @param getAll true,get all fields from two tables; false, extract useful field from select node
* @return
*/
public static List getRowTypeInfo(SqlNode sqlNode, JoinScope scope, boolean getAll){
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java b/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java
index 97e5e555f..df41e1663 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java
@@ -27,8 +27,8 @@
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.List;
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java b/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java
index b0ccc5feb..c881d6344 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java
@@ -20,7 +20,8 @@
package com.dtstack.flink.sql.side;
-import org.apache.calcite.config.Lex;
+import com.dtstack.flink.sql.config.CalciteConfig;
+import com.dtstack.flink.sql.util.ParseUtils;
import org.apache.calcite.sql.JoinType;
import org.apache.calcite.sql.SqlAsOperator;
import org.apache.calcite.sql.SqlBasicCall;
@@ -38,9 +39,17 @@
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
-import org.apache.flink.calcite.shaded.com.google.common.collect.Queues;
-
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.Set;
@@ -54,16 +63,20 @@
*/
public class SideSQLParser {
+ private static final Logger LOG = LoggerFactory.getLogger(SideSQLParser.class);
+
+ private final char SPLIT = '_';
+
+ private String tempSQL = "SELECT * FROM TMP";
public Queue