{
+ private static final long serialVersionUID = -7994311331389155692L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
+
+ private String address;
+ private String tableName;
+ private String userName;
+ private String password;
+ private String database;
+ private Integer maxRequestsPerConnection;
+ private Integer coreConnectionsPerHost;
+ private Integer maxConnectionsPerHost;
+ private Integer maxQueueSize;
+ private Integer readTimeoutMillis;
+ private Integer connectTimeoutMillis;
+ private Integer poolTimeoutMillis;
+
+ protected String[] fieldNames;
+ TypeInformation>[] fieldTypes;
+
+ private int batchInterval = 5000;
+
+ private Cluster cluster;
+ private Session session = null;
+
+ private int batchCount = 0;
+
+ private transient Counter outRecords;
+
+ private transient Meter outRecordsRate;
+
+ public CassandraOutputFormat() {
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ /**
+ * Connects to the target database and initializes the prepared statement.
+ *
+ * @param taskNumber The number of the parallel instance.
+ * @throws IOException Thrown, if the output could not be opened due to an
+ * I/O problem.
+ */
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ try {
+ if (session == null) {
+ QueryOptions queryOptions = new QueryOptions();
+ //The default consistency level for queries: ConsistencyLevel.TWO.
+ queryOptions.setConsistencyLevel(ConsistencyLevel.QUORUM);
+ Integer maxRequestsPerConnection = this.maxRequestsPerConnection == null ? 1 : this.maxRequestsPerConnection;
+ Integer coreConnectionsPerHost = this.coreConnectionsPerHost == null ? 8 : this.coreConnectionsPerHost;
+ Integer maxConnectionsPerHost = this.maxConnectionsPerHost == null ? 32768 : this.maxConnectionsPerHost;
+ Integer maxQueueSize = this.maxQueueSize == null ? 100000 : this.maxQueueSize;
+ Integer readTimeoutMillis = this.readTimeoutMillis == null ? 60000 : this.readTimeoutMillis;
+ Integer connectTimeoutMillis = this.connectTimeoutMillis == null ? 60000 : this.connectTimeoutMillis;
+ Integer poolTimeoutMillis = this.poolTimeoutMillis == null ? 60000 : this.poolTimeoutMillis;
+ Integer cassandraPort = 0;
+
+ ArrayList serversList = new ArrayList();
+ //Read timeout or connection timeout Settings
+ SocketOptions so = new SocketOptions()
+ .setReadTimeoutMillis(readTimeoutMillis)
+ .setConnectTimeoutMillis(connectTimeoutMillis);
+
+ //The cluster USES hostdistance.local in the same machine room
+ //Hostdistance. REMOTE is used for different machine rooms
+ //Ignore use HostDistance. IGNORED
+ PoolingOptions poolingOptions = new PoolingOptions()
+ //Each connection allows a maximum of 64 concurrent requests
+ .setMaxRequestsPerConnection(HostDistance.LOCAL, maxRequestsPerConnection)
+ //Have at least two connections to each machine in the cluster
+ .setCoreConnectionsPerHost(HostDistance.LOCAL, coreConnectionsPerHost)
+ //There are up to eight connections to each machine in the cluster
+ .setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnectionsPerHost)
+ .setMaxQueueSize(maxQueueSize)
+ .setPoolTimeoutMillis(poolTimeoutMillis);
+ //重试策略
+ RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
+
+ for (String server : address.split(",")) {
+ cassandraPort = Integer.parseInt(server.split(":")[1]);
+ serversList.add(InetAddress.getByName(server.split(":")[0]));
+ }
+
+ if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
+ cluster = Cluster.builder().addContactPoints(serversList).withRetryPolicy(retryPolicy)
+ .withPort(cassandraPort)
+ .withPoolingOptions(poolingOptions).withSocketOptions(so)
+ .withQueryOptions(queryOptions).build();
+ } else {
+ cluster = Cluster.builder().addContactPoints(serversList).withRetryPolicy(retryPolicy)
+ .withPort(cassandraPort)
+ .withPoolingOptions(poolingOptions).withSocketOptions(so)
+ .withCredentials(userName, password)
+ .withQueryOptions(queryOptions).build();
+ }
+ // 建立连接 连接已存在的键空间
+ session = cluster.connect(database);
+ LOG.info("connect cassandra is successed!");
+ initMetric();
+ }
+ } catch (Exception e) {
+ LOG.error("connect cassandra is error:" + e.getMessage());
+ }
+ }
+
+ private void initMetric() {
+ outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
+ outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
+ }
+
+ /**
+ * Adds a record to the prepared statement.
+ *
+ * When this method is called, the output format is guaranteed to be opened.
+ *
+ *
+ * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
+ * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
+ *
+ * @param tuple2 The records to add to the output.
+ * @throws IOException Thrown, if the records could not be added due to an I/O problem.
+ * @see PreparedStatement
+ */
+ @Override
+ public void writeRecord(Tuple2 tuple2) throws IOException {
+ Tuple2 tupleTrans = tuple2;
+ Boolean retract = tupleTrans.getField(0);
+ Row row = tupleTrans.getField(1);
+ try {
+ if (retract) {
+ insertWrite(row);
+ outRecords.inc();
+ } else {
+ //do nothing
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException("writeRecord() failed", e);
+ }
+ }
+
+ private void insertWrite(Row row) {
+ try {
+ String cql = buildSql(row);
+ if (cql != null) {
+ ResultSet resultSet = session.execute(cql);
+ resultSet.wasApplied();
+ }
+ } catch (Exception e) {
+ LOG.error("[upsert] is error:" + e.getMessage());
+ }
+ }
+
+ private String buildSql(Row row) {
+ StringBuffer fields = new StringBuffer();
+ StringBuffer values = new StringBuffer();
+ for (int index = 0; index < row.getArity(); index++) {
+ if (row.getField(index) == null) {
+ } else {
+ fields.append(fieldNames[index] + ",");
+ values.append("'" + row.getField(index) + "'" + ",");
+ }
+ }
+ fields.deleteCharAt(fields.length() - 1);
+ values.deleteCharAt(values.length() - 1);
+ String cql = "INSERT INTO " + database + "." + tableName + " (" + fields.toString() + ") "
+ + " VALUES (" + values.toString() + ")";
+ return cql;
+ }
+
+ /**
+ * Executes prepared statement and closes all resources of this instance.
+ *
+ * @throws IOException Thrown, if the input could not be closed properly.
+ */
+ @Override
+ public void close() throws IOException {
+ try {
+ if (session != null) {
+ session.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while closing session.", e);
+ }
+ try {
+ if (cluster != null) {
+ cluster.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while closing cluster.", e);
+ }
+ LOG.info("close cassandra is successed!");
+ }
+
+ public static CassandraFormatBuilder buildOutputFormat() {
+ return new CassandraFormatBuilder();
+ }
+
+ public static class CassandraFormatBuilder {
+ private final CassandraOutputFormat format;
+
+ protected CassandraFormatBuilder() {
+ this.format = new CassandraOutputFormat();
+ }
+
+ public CassandraFormatBuilder setUsername(String username) {
+ format.userName = username;
+ return this;
+ }
+
+ public CassandraFormatBuilder setPassword(String password) {
+ format.password = password;
+ return this;
+ }
+
+ public CassandraFormatBuilder setAddress(String address) {
+ format.address = address;
+ return this;
+ }
+
+ public CassandraFormatBuilder setTableName(String tableName) {
+ format.tableName = tableName;
+ return this;
+ }
+
+ public CassandraFormatBuilder setDatabase(String database) {
+ format.database = database;
+ return this;
+ }
+
+ public CassandraFormatBuilder setFieldNames(String[] fieldNames) {
+ format.fieldNames = fieldNames;
+ return this;
+ }
+
+ public CassandraFormatBuilder setFieldTypes(TypeInformation>[] fieldTypes) {
+ format.fieldTypes = fieldTypes;
+ return this;
+ }
+
+ public CassandraFormatBuilder setMaxRequestsPerConnection(Integer maxRequestsPerConnection) {
+ format.maxRequestsPerConnection = maxRequestsPerConnection;
+ return this;
+ }
+
+ public CassandraFormatBuilder setCoreConnectionsPerHost(Integer coreConnectionsPerHost) {
+ format.coreConnectionsPerHost = coreConnectionsPerHost;
+ return this;
+ }
+
+ public CassandraFormatBuilder setMaxConnectionsPerHost(Integer maxConnectionsPerHost) {
+ format.maxConnectionsPerHost = maxConnectionsPerHost;
+ return this;
+ }
+
+ public CassandraFormatBuilder setMaxQueueSize(Integer maxQueueSize) {
+ format.maxQueueSize = maxQueueSize;
+ return this;
+ }
+
+ public CassandraFormatBuilder setReadTimeoutMillis(Integer readTimeoutMillis) {
+ format.readTimeoutMillis = readTimeoutMillis;
+ return this;
+ }
+
+ public CassandraFormatBuilder setConnectTimeoutMillis(Integer connectTimeoutMillis) {
+ format.connectTimeoutMillis = connectTimeoutMillis;
+ return this;
+ }
+
+ public CassandraFormatBuilder setPoolTimeoutMillis(Integer poolTimeoutMillis) {
+ format.poolTimeoutMillis = poolTimeoutMillis;
+ return this;
+ }
+
+ /**
+ * Finalizes the configuration and checks validity.
+ *
+ * @return Configured RetractJDBCOutputFormat
+ */
+ public CassandraOutputFormat finish() {
+ if (format.userName == null) {
+ LOG.info("Username was not supplied separately.");
+ }
+ if (format.password == null) {
+ LOG.info("Password was not supplied separately.");
+ }
+ if (format.address == null) {
+ throw new IllegalArgumentException("No address URL supplied.");
+ }
+ if (format.database == null) {
+ throw new IllegalArgumentException("No dababase suplied");
+ }
+ if (format.tableName == null) {
+ throw new IllegalArgumentException("No tableName supplied");
+ }
+ return format;
+ }
+ }
+}
diff --git a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java
new file mode 100644
index 000000000..eb7b23b53
--- /dev/null
+++ b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package com.dtstack.flink.sql.sink.cassandra;
+
+
+import com.dtstack.flink.sql.sink.IStreamSinkGener;
+import com.dtstack.flink.sql.sink.cassandra.table.CassandraTableInfo;
+import com.dtstack.flink.sql.table.TargetTableInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * Reason:
+ * Date: 2018/11/22
+ *
+ * @author xuqianjin
+ */
+public class CassandraSink implements RetractStreamTableSink, IStreamSinkGener {
+
+
+ protected String[] fieldNames;
+ TypeInformation>[] fieldTypes;
+ protected String address;
+ protected String tableName;
+ protected String userName;
+ protected String password;
+ protected String database;
+ protected Integer maxRequestsPerConnection;
+ protected Integer coreConnectionsPerHost;
+ protected Integer maxConnectionsPerHost;
+ protected Integer maxQueueSize;
+ protected Integer readTimeoutMillis;
+ protected Integer connectTimeoutMillis;
+ protected Integer poolTimeoutMillis;
+
+ public CassandraSink() {
+ // TO DO NOTHING
+ }
+
+ @Override
+ public CassandraSink genStreamSink(TargetTableInfo targetTableInfo) {
+ CassandraTableInfo cassandraTableInfo = (CassandraTableInfo) targetTableInfo;
+ this.address = cassandraTableInfo.getAddress();
+ this.tableName = cassandraTableInfo.getTableName();
+ this.userName = cassandraTableInfo.getUserName();
+ this.password = cassandraTableInfo.getPassword();
+ this.database = cassandraTableInfo.getDatabase();
+ this.maxRequestsPerConnection = cassandraTableInfo.getMaxRequestsPerConnection();
+ this.coreConnectionsPerHost = cassandraTableInfo.getCoreConnectionsPerHost();
+ this.maxConnectionsPerHost = cassandraTableInfo.getMaxConnectionsPerHost();
+ this.maxQueueSize = cassandraTableInfo.getMaxQueueSize();
+ this.readTimeoutMillis = cassandraTableInfo.getReadTimeoutMillis();
+ this.connectTimeoutMillis = cassandraTableInfo.getConnectTimeoutMillis();
+ this.poolTimeoutMillis = cassandraTableInfo.getPoolTimeoutMillis();
+ return this;
+ }
+
+ @Override
+ public void emitDataStream(DataStream> dataStream) {
+ CassandraOutputFormat.CassandraFormatBuilder builder = CassandraOutputFormat.buildOutputFormat();
+ builder.setAddress(this.address)
+ .setDatabase(this.database)
+ .setTableName(this.tableName)
+ .setPassword(this.password)
+ .setUsername(this.userName)
+ .setMaxRequestsPerConnection(this.maxRequestsPerConnection)
+ .setCoreConnectionsPerHost(this.coreConnectionsPerHost)
+ .setMaxConnectionsPerHost(this.maxConnectionsPerHost)
+ .setMaxQueueSize(this.maxQueueSize)
+ .setReadTimeoutMillis(this.readTimeoutMillis)
+ .setConnectTimeoutMillis(this.connectTimeoutMillis)
+ .setPoolTimeoutMillis(this.poolTimeoutMillis)
+ .setFieldNames(this.fieldNames)
+ .setFieldTypes(this.fieldTypes);
+
+ CassandraOutputFormat outputFormat = builder.finish();
+ RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
+ dataStream.addSink(richSinkFunction);
+ }
+
+ @Override
+ public TableSink> configure(String[] fieldNames, TypeInformation>[] fieldTypes) {
+ this.fieldNames = fieldNames;
+ this.fieldTypes = fieldTypes;
+ return this;
+ }
+
+ @Override
+ public TupleTypeInfo> getOutputType() {
+ return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), getRecordType());
+ }
+
+ @Override
+ public TypeInformation getRecordType() {
+ return new RowTypeInfo(fieldTypes, fieldNames);
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ @Override
+ public TypeInformation>[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+}
diff --git a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraSinkParser.java b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraSinkParser.java
new file mode 100644
index 000000000..4c68e71ae
--- /dev/null
+++ b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraSinkParser.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package com.dtstack.flink.sql.sink.cassandra.table;
+
+import com.dtstack.flink.sql.table.AbsTableParser;
+import com.dtstack.flink.sql.table.TableInfo;
+import com.dtstack.flink.sql.util.MathUtil;
+
+import java.util.Map;
+
+import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
+
+/**
+ * Reason:
+ * Date: 2018/11/22
+ *
+ * @author xuqianjin
+ */
+public class CassandraSinkParser extends AbsTableParser {
+
+ public static final String ADDRESS_KEY = "address";
+
+ public static final String TABLE_NAME_KEY = "tableName";
+
+ public static final String USER_NAME_KEY = "userName";
+
+ public static final String PASSWORD_KEY = "password";
+
+ public static final String DATABASE_KEY = "database";
+
+ public static final String MAX_REQUEST_PER_CONNECTION_KEY = "maxRequestsPerConnection";
+
+ public static final String CORE_CONNECTIONS_PER_HOST_KEY = "coreConnectionsPerHost";
+
+ public static final String MAX_CONNECTIONS_PER_HOST_KEY = "maxConnectionsPerHost";
+
+ public static final String MAX_QUEUE_SIZE_KEY = "maxQueueSize";
+
+ public static final String READ_TIMEOUT_MILLIS_KEY = "readTimeoutMillis";
+
+ public static final String CONNECT_TIMEOUT_MILLIS_KEY = "connectTimeoutMillis";
+
+ public static final String POOL_TIMEOUT_MILLIS_KEY = "poolTimeoutMillis";
+
+ @Override
+ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map props) {
+ CassandraTableInfo cassandraTableInfo = new CassandraTableInfo();
+ cassandraTableInfo.setName(tableName);
+ parseFieldsInfo(fieldsInfo, cassandraTableInfo);
+
+ cassandraTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(PARALLELISM_KEY.toLowerCase())));
+ cassandraTableInfo.setAddress(MathUtil.getString(props.get(ADDRESS_KEY.toLowerCase())));
+ cassandraTableInfo.setTableName(MathUtil.getString(props.get(TABLE_NAME_KEY.toLowerCase())));
+ cassandraTableInfo.setDatabase(MathUtil.getString(props.get(DATABASE_KEY.toLowerCase())));
+ cassandraTableInfo.setUserName(MathUtil.getString(props.get(USER_NAME_KEY.toLowerCase())));
+ cassandraTableInfo.setPassword(MathUtil.getString(props.get(PASSWORD_KEY.toLowerCase())));
+ cassandraTableInfo.setMaxRequestsPerConnection(MathUtil.getIntegerVal(props.get(MAX_REQUEST_PER_CONNECTION_KEY.toLowerCase())));
+ cassandraTableInfo.setCoreConnectionsPerHost(MathUtil.getIntegerVal(props.get(CORE_CONNECTIONS_PER_HOST_KEY.toLowerCase())));
+ cassandraTableInfo.setMaxConnectionsPerHost(MathUtil.getIntegerVal(props.get(MAX_CONNECTIONS_PER_HOST_KEY.toLowerCase())));
+ cassandraTableInfo.setMaxQueueSize(MathUtil.getIntegerVal(props.get(MAX_QUEUE_SIZE_KEY.toLowerCase())));
+ cassandraTableInfo.setReadTimeoutMillis(MathUtil.getIntegerVal(props.get(READ_TIMEOUT_MILLIS_KEY.toLowerCase())));
+ cassandraTableInfo.setConnectTimeoutMillis(MathUtil.getIntegerVal(props.get(CONNECT_TIMEOUT_MILLIS_KEY.toLowerCase())));
+ cassandraTableInfo.setPoolTimeoutMillis(MathUtil.getIntegerVal(props.get(POOL_TIMEOUT_MILLIS_KEY.toLowerCase())));
+
+ return cassandraTableInfo;
+ }
+}
diff --git a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraTableInfo.java b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraTableInfo.java
new file mode 100644
index 000000000..7d52b23bb
--- /dev/null
+++ b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraTableInfo.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package com.dtstack.flink.sql.sink.cassandra.table;
+
+import com.dtstack.flink.sql.table.TargetTableInfo;
+import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
+
+/**
+ * Reason:
+ * Date: 2018/11/22
+ *
+ * @author xuqianjin
+ */
+public class CassandraTableInfo extends TargetTableInfo {
+
+ private static final String CURR_TYPE = "cassandra";
+
+ private String address;
+ private String tableName;
+ private String userName;
+ private String password;
+ private String database;
+ private Integer maxRequestsPerConnection;
+ private Integer coreConnectionsPerHost;
+ private Integer maxConnectionsPerHost;
+ private Integer maxQueueSize;
+ private Integer readTimeoutMillis;
+ private Integer connectTimeoutMillis;
+ private Integer poolTimeoutMillis;
+
+ public CassandraTableInfo() {
+ setType(CURR_TYPE);
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public Integer getMaxRequestsPerConnection() {
+ return maxRequestsPerConnection;
+ }
+
+ public void setMaxRequestsPerConnection(Integer maxRequestsPerConnection) {
+ this.maxRequestsPerConnection = maxRequestsPerConnection;
+ }
+
+ public Integer getCoreConnectionsPerHost() {
+ return coreConnectionsPerHost;
+ }
+
+ public void setCoreConnectionsPerHost(Integer coreConnectionsPerHost) {
+ this.coreConnectionsPerHost = coreConnectionsPerHost;
+ }
+
+ public Integer getMaxConnectionsPerHost() {
+ return maxConnectionsPerHost;
+ }
+
+ public void setMaxConnectionsPerHost(Integer maxConnectionsPerHost) {
+ this.maxConnectionsPerHost = maxConnectionsPerHost;
+ }
+
+ public Integer getMaxQueueSize() {
+ return maxQueueSize;
+ }
+
+ public void setMaxQueueSize(Integer maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ }
+
+ public Integer getReadTimeoutMillis() {
+ return readTimeoutMillis;
+ }
+
+ public void setReadTimeoutMillis(Integer readTimeoutMillis) {
+ this.readTimeoutMillis = readTimeoutMillis;
+ }
+
+ public Integer getConnectTimeoutMillis() {
+ return connectTimeoutMillis;
+ }
+
+ public void setConnectTimeoutMillis(Integer connectTimeoutMillis) {
+ this.connectTimeoutMillis = connectTimeoutMillis;
+ }
+
+ public Integer getPoolTimeoutMillis() {
+ return poolTimeoutMillis;
+ }
+
+ public void setPoolTimeoutMillis(Integer poolTimeoutMillis) {
+ this.poolTimeoutMillis = poolTimeoutMillis;
+ }
+
+ @Override
+ public boolean check() {
+ Preconditions.checkNotNull(address, "Cassandra field of ADDRESS is required");
+ Preconditions.checkNotNull(database, "Cassandra field of database is required");
+ Preconditions.checkNotNull(tableName, "Cassandra field of tableName is required");
+ return true;
+ }
+
+ @Override
+ public String getType() {
+ // return super.getType().toLowerCase() + TARGET_SUFFIX;
+ return super.getType().toLowerCase();
+ }
+}
diff --git a/cassandra/cassandra-sink/src/test/java/com/dtstack/flinkx/AppTest.java b/cassandra/cassandra-sink/src/test/java/com/dtstack/flinkx/AppTest.java
new file mode 100644
index 000000000..33a0233ac
--- /dev/null
+++ b/cassandra/cassandra-sink/src/test/java/com/dtstack/flinkx/AppTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package com.dtstack.flinkx;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+ extends TestCase
+{
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ public AppTest( String testName )
+ {
+ super( testName );
+ }
+
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite()
+ {
+ return new TestSuite( AppTest.class );
+ }
+
+ /**
+ * Rigourous Test :-)
+ */
+ public void testApp()
+ {
+ assertTrue( true );
+ }
+}
diff --git a/cassandra/pom.xml b/cassandra/pom.xml
new file mode 100644
index 000000000..f49de388b
--- /dev/null
+++ b/cassandra/pom.xml
@@ -0,0 +1,39 @@
+
+
+
+ flink.sql
+ com.dtstack.flink
+ 1.0-SNAPSHOT
+
+ 4.0.0
+ sql.cassandra
+ pom
+
+
+ cassandra-sink
+ cassandra-side
+
+
+
+
+ junit
+ junit
+ 3.8.1
+ test
+
+
+ com.dtstack.flink
+ sql.core
+ 1.0-SNAPSHOT
+ provided
+
+
+ com.datastax.cassandra
+ cassandra-driver-core
+ 3.6.0
+
+
+
+
\ No newline at end of file
diff --git a/console/console-sink/pom.xml b/console/console-sink/pom.xml
new file mode 100644
index 000000000..a5ca2629f
--- /dev/null
+++ b/console/console-sink/pom.xml
@@ -0,0 +1,79 @@
+
+
+
+ sql.console
+ com.dtstack.flink
+ 1.0-SNAPSHOT
+
+ 4.0.0
+
+ console-sink
+ jar
+
+ console-sink
+ http://maven.apache.org
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 1.4
+
+
+ package
+
+ shade
+
+
+
+
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+ maven-antrun-plugin
+ 1.2
+
+
+ copy-resources
+
+ package
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java
new file mode 100644
index 000000000..7658e9979
--- /dev/null
+++ b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.flink.sql.sink.console;
+
+import com.dtstack.flink.sql.sink.MetricOutputFormat;
+import com.dtstack.flink.sql.sink.console.table.TablePrintUtil;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Reason:
+ * Date: 2018/12/19
+ *
+ * @author xuqianjin
+ */
+public class ConsoleOutputFormat extends MetricOutputFormat {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConsoleOutputFormat.class);
+
+ protected String[] fieldNames;
+ TypeInformation>[] fieldTypes;
+
+ @Override
+ public void configure(Configuration parameters) {
+
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ initMetric();
+ }
+
+ @Override
+ public void writeRecord(Tuple2 tuple2) throws IOException {
+ Tuple2 tupleTrans = tuple2;
+ Boolean retract = tupleTrans.getField(0);
+ if (!retract) {
+ return;
+ }
+
+ Row record = tupleTrans.getField(1);
+ if (record.getArity() != fieldNames.length) {
+ return;
+ }
+
+ List data = new ArrayList<>();
+ data.add(fieldNames);
+ data.add(record.toString().split(","));
+ TablePrintUtil.build(data).print();
+
+ outRecords.inc();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ private ConsoleOutputFormat() {
+ }
+
+ public static ConsoleOutputFormatBuilder buildOutputFormat() {
+ return new ConsoleOutputFormatBuilder();
+ }
+
+ public static class ConsoleOutputFormatBuilder {
+
+ private final ConsoleOutputFormat format;
+
+ protected ConsoleOutputFormatBuilder() {
+ this.format = new ConsoleOutputFormat();
+ }
+
+ public ConsoleOutputFormatBuilder setFieldNames(String[] fieldNames) {
+ format.fieldNames = fieldNames;
+ return this;
+ }
+
+ public ConsoleOutputFormatBuilder setFieldTypes(TypeInformation>[] fieldTypes) {
+ format.fieldTypes = fieldTypes;
+ return this;
+ }
+
+ /**
+ * Finalizes the configuration and checks validity.
+ *
+ * @return Configured RetractConsoleCOutputFormat
+ */
+ public ConsoleOutputFormat finish() {
+ return format;
+ }
+ }
+}
diff --git a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleSink.java b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleSink.java
new file mode 100644
index 000000000..77a3efea2
--- /dev/null
+++ b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleSink.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.flink.sql.sink.console;
+
+import com.dtstack.flink.sql.sink.IStreamSinkGener;
+import com.dtstack.flink.sql.table.TargetTableInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * Reason:
+ * Date: 2018/12/19
+ *
+ * @author xuqianjin
+ */
+public class ConsoleSink implements RetractStreamTableSink, IStreamSinkGener {
+
+ protected String[] fieldNames;
+ TypeInformation>[] fieldTypes;
+
+ @Override
+ public TableSink> configure(String[] fieldNames, TypeInformation>[] fieldTypes) {
+ this.fieldNames = fieldNames;
+ this.fieldTypes = fieldTypes;
+ return this;
+ }
+
+ @Override
+ public TupleTypeInfo> getOutputType() {
+ return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), getRecordType());
+ }
+
+ @Override
+ public TypeInformation getRecordType() {
+ return new RowTypeInfo(fieldTypes, fieldNames);
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ @Override
+ public TypeInformation>[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+ @Override
+ public void emitDataStream(DataStream> dataStream) {
+ ConsoleOutputFormat.ConsoleOutputFormatBuilder builder = ConsoleOutputFormat.buildOutputFormat();
+ builder.setFieldNames(this.fieldNames)
+ .setFieldTypes(this.fieldTypes);
+ ConsoleOutputFormat outputFormat = builder.finish();
+ RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
+ dataStream.addSink(richSinkFunction);
+ }
+
+ @Override
+ public ConsoleSink genStreamSink(TargetTableInfo targetTableInfo) {
+ return this;
+ }
+}
diff --git a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/ConsoleSinkParser.java b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/ConsoleSinkParser.java
new file mode 100644
index 000000000..e77444bfd
--- /dev/null
+++ b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/ConsoleSinkParser.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.flink.sql.sink.console.table;
+
+import com.dtstack.flink.sql.table.AbsTableParser;
+import com.dtstack.flink.sql.table.TableInfo;
+import com.dtstack.flink.sql.util.MathUtil;
+
+import java.util.Map;
+
+import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
+
+/**
+ * Reason:
+ * Date: 2018/12/19
+ *
+ * @author xuqianjin
+ */
+public class ConsoleSinkParser extends AbsTableParser {
+ @Override
+ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map props) {
+ ConsoleTableInfo consoleTableInfo = new ConsoleTableInfo();
+ consoleTableInfo.setName(tableName);
+ parseFieldsInfo(fieldsInfo, consoleTableInfo);
+
+ consoleTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(PARALLELISM_KEY.toLowerCase())));
+ return consoleTableInfo;
+ }
+}
diff --git a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/ConsoleTableInfo.java b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/ConsoleTableInfo.java
new file mode 100644
index 000000000..4b286c667
--- /dev/null
+++ b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/ConsoleTableInfo.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.flink.sql.sink.console.table;
+
+import com.dtstack.flink.sql.table.TargetTableInfo;
+
+/**
+ * Reason:
+ * Date: 2018/12/19
+ *
+ * @author xuqianjin
+ */
+public class ConsoleTableInfo extends TargetTableInfo {
+
+ private static final String CURR_TYPE = "console";
+
+ public ConsoleTableInfo() {
+ setType(CURR_TYPE);
+ }
+
+ @Override
+ public boolean check() {
+ return true;
+ }
+
+ @Override
+ public String getType() {
+ return super.getType().toLowerCase();
+ }
+}
diff --git a/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/TablePrintUtil.java b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/TablePrintUtil.java
new file mode 100644
index 000000000..8813da619
--- /dev/null
+++ b/console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/TablePrintUtil.java
@@ -0,0 +1,341 @@
+package com.dtstack.flink.sql.sink.console.table;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Reason:
+ * Date: 2018/12/19
+ *
+ * @author xuqianjin
+ */
+public class TablePrintUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(TablePrintUtil.class);
+ public static final int ALIGN_LEFT = 1;//左对齐
+ public static final int ALIGN_RIGHT = 2;//右对齐
+ public static final int ALIGN_CENTER = 3;//居中对齐
+
+ private int align = ALIGN_CENTER;//默认居中对齐
+ private boolean equilong = false;//默认不等宽
+ private int padding = 1;//左右边距默认为1
+ private char h = '-';//默认水平分隔符
+ private char v = '|';//默认竖直分隔符
+ private char o = '+';//默认交叉分隔符
+ private char s = ' ';//默认空白填充符
+ private List data;//数据
+
+ private TablePrintUtil() {
+ }
+
+ /**
+ * 链式调用入口方法
+ *
+ * @param data
+ * @return
+ */
+ public static TablePrintUtil build(String[][] data) {
+ TablePrintUtil self = new TablePrintUtil();
+ self.data = new ArrayList<>(Arrays.asList(data));
+ return self;
+ }
+
+ /**
+ * 链式调用入口方法,T可以是String[]、List、任意实体类
+ * 由于java泛型不同无法重载,所以这里要写if instanceof进行类型判断
+ *
+ * @param data
+ * @param
+ * @return
+ */
+ public static TablePrintUtil build(List data) {
+ TablePrintUtil self = new TablePrintUtil();
+ self.data = new ArrayList<>();
+ if (data.size() <= 0) throw new RuntimeException("数据源至少得有一行吧");
+ Object obj = data.get(0);
+
+
+ if (obj instanceof String[]) {
+ //如果泛型为String数组,则直接设置
+ self.data = (List) data;
+ } else if (obj instanceof List) {
+ //如果泛型为List,则把list中的item依次转为String[],再设置
+ int length = ((List) obj).size();
+ for (Object item : data) {
+ List col = (List) item;
+ if (col.size() != length) throw new RuntimeException("数据源每列长度必须一致");
+ self.data.add(col.toArray(new String[length]));
+ }
+ } else {
+ //如果泛型为实体类,则利用反射获取get方法列表,从而推算出属性列表。
+ //根据反射得来的属性列表设置表格第一行thead
+ List colList = getColList(obj);
+ String[] header = new String[colList.size()];
+ for (int i = 0; i < colList.size(); i++) {
+ header[i] = colList.get(i).colName;
+ }
+ self.data.add(header);
+ //利用反射调用相应get方法获取属性值来设置表格tbody
+ for (int i = 0; i < data.size(); i++) {
+ String[] item = new String[colList.size()];
+ for (int j = 0; j < colList.size(); j++) {
+ String value = null;
+ try {
+ value = obj.getClass().getMethod(colList.get(j).getMethodName).invoke(data.get(i)).toString();
+ } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+ e.printStackTrace();
+ }
+ item[j] = value == null ? "null" : value;
+ }
+ self.data.add(item);
+ }
+ }
+ return self;
+ }
+
+ private static class Col {
+ private String colName;//列名
+ private String getMethodName;//get方法名
+ }
+
+ /**
+ * 利用反射获取get方法名和属性名
+ *
+ * @return
+ */
+ private static List getColList(Object obj) {
+ List colList = new ArrayList<>();
+ Method[] methods = obj.getClass().getMethods();
+ for (Method m : methods) {
+ StringBuilder getMethodName = new StringBuilder(m.getName());
+ if (getMethodName.substring(0, 3).equals("get") && !m.getName().equals("getClass")) {
+ Col col = new Col();
+ col.getMethodName = getMethodName.toString();
+ char first = Character.toLowerCase(getMethodName.delete(0, 3).charAt(0));
+ getMethodName.delete(0, 1).insert(0, first);
+ col.colName = getMethodName.toString();
+ colList.add(col);
+ }
+ }
+ return colList;
+ }
+
+ /**
+ * 获取字符串占的字符位数
+ *
+ * @param str
+ * @return
+ */
+ private int getStringCharLength(String str) {
+ Pattern p = Pattern.compile("[\u4e00-\u9fa5]");//利用正则找到中文
+ Matcher m = p.matcher(str);
+ int count = 0;
+ while (m.find()) {
+ count++;
+ }
+ return str.length() + count;
+ }
+
+ /**
+ * 纵向遍历获取数据每列的长度
+ *
+ * @return
+ */
+ private int[] getColLengths() {
+ int[] result = new int[data.get(0).length];
+ for (int x = 0; x < result.length; x++) {
+ int max = 0;
+ for (int y = 0; y < data.size(); y++) {
+ int len = getStringCharLength(data.get(y)[x]);
+ if (len > max) {
+ max = len;
+ }
+ }
+ result[x] = max;
+ }
+ if (equilong) {//如果等宽表格
+ int max = 0;
+ for (int len : result) {
+ if (len > max) max = len;
+ }
+ for (int i = 0; i < result.length; i++) {
+ result[i] = max;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * 取得表格字符串
+ *
+ * @return
+ */
+ public String getTableString() {
+ StringBuilder sb = new StringBuilder();
+ int[] colLengths = getColLengths();//获取每列文字宽度
+ StringBuilder line = new StringBuilder();//表格横向分隔线
+ line.append(o);
+ for (int len : colLengths) {
+ int allLen = len + padding * 2;//还需要加上边距和分隔符的长度
+ for (int i = 0; i < allLen; i++) {
+ line.append(h);
+ }
+ line.append(o);
+ }
+ sb.append(line).append("\r\n");
+ for (int y = 0; y < data.size(); y++) {
+ sb.append(v);
+ for (int x = 0; x < data.get(y).length; x++) {
+ String cell = data.get(y)[x];
+ switch (align) {
+ case ALIGN_LEFT:
+ for (int i = 0; i < padding; i++) {sb.append(s);}
+ sb.append(cell);
+ for (int i = 0; i < colLengths[x] - getStringCharLength(cell) + padding; i++) {sb.append(s);}
+ break;
+ case ALIGN_RIGHT:
+ for (int i = 0; i < colLengths[x] - getStringCharLength(cell) + padding; i++) {sb.append(s);}
+ sb.append(cell);
+ for (int i = 0; i < padding; i++) {sb.append(s);}
+ break;
+ case ALIGN_CENTER:
+ int space = colLengths[x] - getStringCharLength(cell);
+ int left = space / 2;
+ int right = space - left;
+ for (int i = 0; i < left + padding; i++) {sb.append(s);}
+ sb.append(cell);
+ for (int i = 0; i < right + padding; i++) {sb.append(s);}
+ break;
+ }
+ sb.append(v);
+ }
+ sb.append("\r\n");
+ sb.append(line).append("\r\n");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * 直接打印表格
+ */
+ public void print() {
+ LOG.info("\n"+getTableString());
+ System.out.println(getTableString());
+ }
+
+ //下面是链式调用的set方法
+ public TablePrintUtil setAlign(int align) {
+ this.align = align;
+ return this;
+ }
+
+ public TablePrintUtil setEquilong(boolean equilong) {
+ this.equilong = equilong;
+ return this;
+ }
+
+ public TablePrintUtil setPadding(int padding) {
+ this.padding = padding;
+ return this;
+ }
+
+ public TablePrintUtil setH(char h) {
+ this.h = h;
+ return this;
+ }
+
+ public TablePrintUtil setV(char v) {
+ this.v = v;
+ return this;
+ }
+
+ public TablePrintUtil setO(char o) {
+ this.o = o;
+ return this;
+ }
+
+ public TablePrintUtil setS(char s) {
+ this.s = s;
+ return this;
+ }
+
+ /**
+ * 使用示例
+ *
+ * @param args
+ */
+ public static void main(String[] args) {
+ List data1 = new ArrayList<>();
+ data1.add(new String[]{"用户名", "密码", "姓名"});
+ data1.add(new String[]{"xiaoming", "xm123", "小明"});
+ data1.add(new String[]{"xiaohong", "xh123", "小红"});
+ TablePrintUtil.build(data1).print();
+
+ List> data2 = new ArrayList<>();
+ data2.add(new ArrayList<>());
+ data2.add(new ArrayList<>());
+ data2.add(new ArrayList<>());
+ data2.get(0).add("用户名");
+ data2.get(0).add("密码");
+ data2.get(0).add("姓名");
+ data2.get(1).add("xiaoming");
+ data2.get(1).add("xm123");
+ data2.get(1).add("小明");
+ data2.get(2).add("xiaohong");
+ data2.get(2).add("xh123");
+ data2.get(2).add("小红");
+ TablePrintUtil.build(data2)
+ .setAlign(TablePrintUtil.ALIGN_LEFT)
+ .setPadding(5)
+ .setEquilong(true)
+ .print();
+
+
+ class User {
+ String username;
+ String password;
+ String name;
+
+ User(String username, String password, String name) {
+ this.username = username;
+ this.password = password;
+ this.name = name;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+ }
+ List data3 = new ArrayList<>();
+ data3.add(new User("xiaoming", "xm123", "小明"));
+ data3.add(new User("xiaohong", "xh123", "小红"));
+ TablePrintUtil.build(data3).setH('=').setV('!').print();
+ }
+}
diff --git a/console/console-sink/src/test/java/com/dtstack/flinkx/AppTest.java b/console/console-sink/src/test/java/com/dtstack/flinkx/AppTest.java
new file mode 100644
index 000000000..e03e5451f
--- /dev/null
+++ b/console/console-sink/src/test/java/com/dtstack/flinkx/AppTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package com.dtstack.flinkx;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+ extends TestCase
+{
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ public AppTest(String testName )
+ {
+ super( testName );
+ }
+
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite()
+ {
+ return new TestSuite( AppTest.class );
+ }
+
+ /**
+ * Rigourous Test :-)
+ */
+ public void testApp()
+ {
+ assertTrue( true );
+ }
+}
diff --git a/console/pom.xml b/console/pom.xml
new file mode 100644
index 000000000..983e1c185
--- /dev/null
+++ b/console/pom.xml
@@ -0,0 +1,34 @@
+
+
+
+ flink.sql
+ com.dtstack.flink
+ 1.0-SNAPSHOT
+
+ 4.0.0
+ sql.console
+ pom
+
+
+ console-sink
+
+
+
+
+ junit
+ junit
+ 3.8.1
+ test
+
+
+ com.dtstack.flink
+ sql.core
+ 1.0-SNAPSHOT
+ provided
+
+
+
+
+
\ No newline at end of file
diff --git a/core/pom.xml b/core/pom.xml
index 1040fcea6..a38d818ae 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -28,9 +28,9 @@
- org.apache.flink
- flink-table_2.11
- ${flink.version}
+ joda-time
+ joda-time
+ 2.5
@@ -45,6 +45,25 @@
${flink.version}
+
+ org.apache.flink
+ flink-table_2.11
+ ${flink.version}
+
+
+
+ org.apache.calcite
+ calcite-server
+
+ 1.16.0
+
+
+
+ org.apache.flink
+ flink-cep-scala_2.11
+ ${flink.version}
+
+
org.apache.flink
flink-scala_2.11
@@ -56,6 +75,18 @@
flink-streaming-scala_2.11
${flink.version}
+
+
+ org.apache.flink
+ flink-shaded-hadoop2
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-yarn_2.11
+ ${flink.version}
+
diff --git a/core/src/main/java/com/dtstack/flink/sql/ClusterMode.java b/core/src/main/java/com/dtstack/flink/sql/ClusterMode.java
index 024a31854..82105d9e7 100644
--- a/core/src/main/java/com/dtstack/flink/sql/ClusterMode.java
+++ b/core/src/main/java/com/dtstack/flink/sql/ClusterMode.java
@@ -31,4 +31,8 @@ public enum ClusterMode {
ClusterMode(int type){
this.type = type;
}
+
+ public int getType(){
+ return this.type;
+ }
}
diff --git a/core/src/main/java/com/dtstack/flink/sql/Main.java b/core/src/main/java/com/dtstack/flink/sql/Main.java
index 610abf21f..aa05528ce 100644
--- a/core/src/main/java/com/dtstack/flink/sql/Main.java
+++ b/core/src/main/java/com/dtstack/flink/sql/Main.java
@@ -16,12 +16,15 @@
* limitations under the License.
*/
-
+
package com.dtstack.flink.sql;
import com.dtstack.flink.sql.classloader.DtClassLoader;
import com.dtstack.flink.sql.enums.ECacheType;
+import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
+import com.dtstack.flink.sql.options.LauncherOptionParser;
+import com.dtstack.flink.sql.options.LauncherOptions;
import com.dtstack.flink.sql.parser.*;
import com.dtstack.flink.sql.side.SideSqlExec;
import com.dtstack.flink.sql.side.SideTableInfo;
@@ -34,6 +37,8 @@
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
import com.dtstack.flink.sql.util.FlinkUtil;
import com.dtstack.flink.sql.util.PluginUtil;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.cli.CommandLine;
@@ -41,16 +46,18 @@
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.io.Charsets;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import org.apache.flink.calcite.shaded.com.google.common.collect.Sets;
import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
@@ -58,7 +65,6 @@
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +72,7 @@
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.net.URLDecoder;
@@ -97,6 +104,11 @@ public class Main {
private static final int delayInterval = 10; //sec
+ private static org.apache.calcite.sql.parser.SqlParser.Config config = org.apache.calcite.sql.parser.SqlParser
+ .configBuilder()
+ .setLex(Lex.MYSQL)
+ .build();
+
public static void main(String[] args) throws Exception {
Options options = new Options();
@@ -127,8 +139,8 @@ public static void main(String[] args) throws Exception {
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
-
List addJarFileList = Lists.newArrayList();
+
if(!Strings.isNullOrEmpty(addJarListStr)){
addJarListStr = URLDecoder.decode(addJarListStr, Charsets.UTF_8.name());
addJarFileList = objMapper.readValue(addJarListStr, List.class);
@@ -185,7 +197,7 @@ public static void main(String[] args) throws Exception {
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
- SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql).parseStmt();
+ SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql,config).parseStmt();
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
tmp.setExecSql(tmpSql);
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);
@@ -232,6 +244,12 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set cla
contextEnvironment.getClasspaths().add(url);
}
}
+ int i = 0;
+ for(URL url : classPathSet){
+ String classFileName = String.format(CLASS_FILE_NAME_FMT, i);
+ env.registerCachedFile(url.getPath(), classFileName, true);
+ i++;
+ }
}
private static void registerUDF(SqlTree sqlTree, List jarURList, URLClassLoader parentClassloader,
@@ -246,7 +264,7 @@ private static void registerUDF(SqlTree sqlTree, List jarURList, URLClassLo
classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
}
classLoader.loadClass(funcInfo.getClassName());
- FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName().toUpperCase(),
+ FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(),
tableEnv, classLoader);
}
}
@@ -283,18 +301,23 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
tableEnv.registerTable(tableInfo.getName(), regTable);
registerTableCache.put(tableInfo.getName(), regTable);
- classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath));
+ if(StringUtils.isNotBlank(remoteSqlPluginPath)){
+ classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath));
+ }
} else if (tableInfo instanceof TargetTableInfo) {
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
TypeInformation[] flinkTypes = FlinkUtil.transformTypes(tableInfo.getFieldClasses());
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
- classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
+ if(StringUtils.isNotBlank(remoteSqlPluginPath)){
+ classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
+ }
} else if(tableInfo instanceof SideTableInfo){
-
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
- classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
+ if(StringUtils.isNotBlank(remoteSqlPluginPath)){
+ classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
+ }
}else {
throw new RuntimeException("not support table type:" + tableInfo.getType());
}
@@ -302,20 +325,35 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
//The plug-in information corresponding to the table is loaded into the classPath env
addEnvClassPath(env, classPathSet);
- int i = 0;
- for(URL url : classPathSet){
- String classFileName = String.format(CLASS_FILE_NAME_FMT, i);
- env.registerCachedFile(url.getPath(), classFileName, true);
- i++;
- }
}
- private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException {
+ private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException, NoSuchMethodException {
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
StreamExecutionEnvironment.getExecutionEnvironment() :
new MyLocalStreamEnvironment();
env.setParallelism(FlinkUtil.getEnvParallelism(confProperties));
+ Configuration globalJobParameters = new Configuration();
+ Method method = Configuration.class.getDeclaredMethod("setValueInternal", String.class, Object.class);
+ method.setAccessible(true);
+
+ confProperties.forEach((key,val) -> {
+ try {
+ method.invoke(globalJobParameters, key, val);
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ } catch (InvocationTargetException e) {
+ e.printStackTrace();
+ }
+ });
+
+ ExecutionConfig exeConfig = env.getConfig();
+ if(exeConfig.getGlobalJobParameters() == null){
+ exeConfig.setGlobalJobParameters(globalJobParameters);
+ }else if(exeConfig.getGlobalJobParameters() instanceof Configuration){
+ ((Configuration) exeConfig.getGlobalJobParameters()).addAll(globalJobParameters);
+ }
+
if(FlinkUtil.getMaxEnvParallelism(confProperties) > 0){
env.setMaxParallelism(FlinkUtil.getMaxEnvParallelism(confProperties));
diff --git a/core/src/main/java/com/dtstack/flink/sql/MyLocalStreamEnvironment.java b/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java
similarity index 95%
rename from core/src/main/java/com/dtstack/flink/sql/MyLocalStreamEnvironment.java
rename to core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java
index 54ddaa647..c1cea1e14 100644
--- a/core/src/main/java/com/dtstack/flink/sql/MyLocalStreamEnvironment.java
+++ b/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package com.dtstack.flink.sql;
+package com.dtstack.flink.sql.environment;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
@@ -100,8 +100,8 @@ public JobExecutionResult execute(String jobName) throws Exception {
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
- configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
+ configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "512M");
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
// add (and override) the settings with what the user defined
configuration.addAll(this.conf);
diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptionParser.java b/core/src/main/java/com/dtstack/flink/sql/options/LauncherOptionParser.java
similarity index 94%
rename from launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptionParser.java
rename to core/src/main/java/com/dtstack/flink/sql/options/LauncherOptionParser.java
index 75c5c4f0f..d4b4a19f2 100644
--- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptionParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/options/LauncherOptionParser.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package com.dtstack.flink.sql.launcher;
+package com.dtstack.flink.sql.options;
import avro.shaded.com.google.common.collect.Lists;
import org.apache.commons.cli.BasicParser;
@@ -73,7 +73,7 @@ public class LauncherOptionParser {
private LauncherOptions properties = new LauncherOptions();
- public LauncherOptionParser(String[] args) {
+ public LauncherOptionParser(String[] args) throws Exception {
options.addOption(OPTION_MODE, true, "Running mode");
options.addOption(OPTION_SQL, true, "Job sql file");
options.addOption(OPTION_NAME, true, "Job name");
@@ -87,8 +87,6 @@ public LauncherOptionParser(String[] args) {
options.addOption(OPTION_SAVE_POINT_PATH, true, "Savepoint restore path");
options.addOption(OPTION_ALLOW_NON_RESTORED_STATE, true, "Flag indicating whether non restored state is allowed if the savepoint");
options.addOption(OPTION_FLINK_JAR_PATH, true, "flink jar path for submit of perjob mode");
-
- try {
CommandLine cl = parser.parse(options, args);
String mode = cl.getOptionValue(OPTION_MODE, ClusterMode.local.name());
//check mode
@@ -101,21 +99,24 @@ public LauncherOptionParser(String[] args) {
byte[] filecontent = new byte[(int) file.length()];
in.read(filecontent);
String content = new String(filecontent, "UTF-8");
+
String sql = URLEncoder.encode(content, Charsets.UTF_8.name());
properties.setSql(sql);
+
String localPlugin = Preconditions.checkNotNull(cl.getOptionValue(OPTION_LOCAL_SQL_PLUGIN_PATH));
properties.setLocalSqlPluginPath(localPlugin);
+
String remotePlugin = cl.getOptionValue(OPTION_REMOTE_SQL_PLUGIN_PATH);
- if(!ClusterMode.local.name().equals(mode)){
- Preconditions.checkNotNull(remotePlugin);
- properties.setRemoteSqlPluginPath(remotePlugin);
- }
+ properties.setRemoteSqlPluginPath(remotePlugin);
+
String name = Preconditions.checkNotNull(cl.getOptionValue(OPTION_NAME));
properties.setName(name);
+
String addJar = cl.getOptionValue(OPTION_ADDJAR);
if(StringUtils.isNotBlank(addJar)){
properties.setAddjar(addJar);
}
+
String confProp = cl.getOptionValue(OPTION_CONF_PROP);
if(StringUtils.isNotBlank(confProp)){
properties.setConfProp(confProp);
@@ -144,10 +145,6 @@ public LauncherOptionParser(String[] args) {
if(StringUtils.isNotBlank(flinkJarPath)){
properties.setFlinkJarPath(flinkJarPath);
}
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
}
public LauncherOptions getLauncherOptions(){
@@ -157,6 +154,7 @@ public LauncherOptions getLauncherOptions(){
public List getProgramExeArgList() throws Exception {
Map mapConf = PluginUtil.ObjectToMap(properties);
List args = Lists.newArrayList();
+
for(Map.Entry one : mapConf.entrySet()){
String key = one.getKey();
if(OPTION_FLINK_CONF_DIR.equalsIgnoreCase(key)
diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptions.java b/core/src/main/java/com/dtstack/flink/sql/options/LauncherOptions.java
similarity index 94%
rename from launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptions.java
rename to core/src/main/java/com/dtstack/flink/sql/options/LauncherOptions.java
index 5cc762ebd..d84637abc 100644
--- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptions.java
+++ b/core/src/main/java/com/dtstack/flink/sql/options/LauncherOptions.java
@@ -16,7 +16,10 @@
* limitations under the License.
*/
-package com.dtstack.flink.sql.launcher;
+package com.dtstack.flink.sql.options;
+
+import com.dtstack.flink.sql.ClusterMode;
+import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
/**
* This class define commandline options for the Launcher program
@@ -26,7 +29,7 @@
*/
public class LauncherOptions {
- private String mode;
+ private String mode = ClusterMode.local.name();
private String name;
@@ -156,4 +159,5 @@ public String getQueue() {
public void setQueue(String queue) {
this.queue = queue;
}
+
}
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/CreateFuncParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/CreateFuncParser.java
index 9b7017743..793dd6baa 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/CreateFuncParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/CreateFuncParser.java
@@ -24,7 +24,7 @@
import java.util.regex.Pattern;
/**
- * 解析创建自定义方法sql
+ * parser register udf sql
* Date: 2018/6/26
* Company: www.dtstack.com
* @author xuchao
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java
index d10d6825e..5e126e786 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java
@@ -29,7 +29,7 @@
import java.util.regex.Pattern;
/**
- * 解析创建表结构sql
+ * parser create table sql
* Date: 2018/6/26
* Company: www.dtstack.com
* @author xuchao
@@ -54,7 +54,7 @@ public boolean verify(String sql) {
public void parseSql(String sql, SqlTree sqlTree) {
Matcher matcher = PATTERN.matcher(sql);
if(matcher.find()){
- String tableName = matcher.group(1).toUpperCase();
+ String tableName = matcher.group(1);
String fieldsInfoStr = matcher.group(2);
String propsStr = matcher.group(3);
Map props = parseProp(propsStr);
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java
index 9bd1374a0..db18986b7 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java
@@ -20,6 +20,8 @@
package com.dtstack.flink.sql.parser;
+import com.dtstack.flink.sql.util.DtStringUtil;
+import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
@@ -31,6 +33,12 @@
import static org.apache.calcite.sql.SqlKind.IDENTIFIER;
+/**
+ * parser create tmp table sql
+ * Date: 2018/6/26
+ * Company: www.dtstack.com
+ * @author yanxi
+ */
public class CreateTmpTableParser implements IParser {
//select table tableName as select
@@ -61,11 +69,16 @@ public void parseSql(String sql, SqlTree sqlTree) {
String tableName = null;
String selectSql = null;
if(matcher.find()) {
- tableName = matcher.group(1).toUpperCase();
+ tableName = matcher.group(1);
selectSql = "select " + matcher.group(2);
}
- SqlParser sqlParser = SqlParser.create(selectSql);
+ SqlParser.Config config = SqlParser
+ .configBuilder()
+ .setLex(Lex.MYSQL)
+ .build();
+ SqlParser sqlParser = SqlParser.create(selectSql,config);
+
SqlNode sqlNode = null;
try {
sqlNode = sqlParser.parseStmt();
@@ -77,7 +90,8 @@ public void parseSql(String sql, SqlTree sqlTree) {
parseNode(sqlNode, sqlParseResult);
sqlParseResult.setTableName(tableName);
- sqlParseResult.setExecSql(selectSql.toUpperCase());
+ String transformSelectSql = DtStringUtil.replaceIgnoreQuota(sqlNode.toString(), "`", "");
+ sqlParseResult.setExecSql(transformSelectSql);
sqlTree.addTmpSql(sqlParseResult);
sqlTree.addTmplTableInfo(tableName, sqlParseResult);
} else {
@@ -87,7 +101,7 @@ public void parseSql(String sql, SqlTree sqlTree) {
String tableName = null;
String fieldsInfoStr = null;
if (matcher.find()){
- tableName = matcher.group(1).toUpperCase();
+ tableName = matcher.group(1);
fieldsInfoStr = matcher.group(2);
}
CreateTmpTableParser.SqlParserResult sqlParseResult = new CreateTmpTableParser.SqlParserResult();
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java
index e08540c92..40629b139 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java
@@ -20,12 +20,8 @@
package com.dtstack.flink.sql.parser;
-import org.apache.calcite.sql.SqlBasicCall;
-import org.apache.calcite.sql.SqlInsert;
-import org.apache.calcite.sql.SqlJoin;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.commons.lang3.StringUtils;
@@ -36,8 +32,7 @@
import static org.apache.calcite.sql.SqlKind.IDENTIFIER;
/**
- * 解析flink sql
- * sql 只支持 insert 开头的
+ * parser flink sql
* Date: 2018/6/22
* Company: www.dtstack.com
* @author xuchao
@@ -57,7 +52,11 @@ public static InsertSqlParser newInstance(){
@Override
public void parseSql(String sql, SqlTree sqlTree) {
- SqlParser sqlParser = SqlParser.create(sql);
+ SqlParser.Config config = SqlParser
+ .configBuilder()
+ .setLex(Lex.MYSQL)
+ .build();
+ SqlParser sqlParser = SqlParser.create(sql,config);
SqlNode sqlNode = null;
try {
sqlNode = sqlParser.parseStmt();
@@ -113,6 +112,10 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
sqlParseResult.addSourceTable(identifierNode.toString());
}
break;
+ case MATCH_RECOGNIZE:
+ SqlMatchRecognize node = (SqlMatchRecognize) sqlNode;
+ sqlParseResult.addSourceTable(node.getTableRef().toString());
+ break;
case UNION:
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/SqlTree.java b/core/src/main/java/com/dtstack/flink/sql/parser/SqlTree.java
index 3ed37c51e..754de0819 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/SqlTree.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/SqlTree.java
@@ -29,7 +29,7 @@
import java.util.Map;
/**
- * 解析sql获得的对象结构
+ * parser sql to get the Sql Tree structure
* Date: 2018/6/25
* Company: www.dtstack.com
* @author xuchao
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java b/core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java
index 303260742..a185da1bd 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java
@@ -37,7 +37,7 @@
* @author xuchao
*/
-public abstract class AllReqRow extends RichFlatMapFunction{
+public abstract class AllReqRow extends RichFlatMapFunction implements ISideReqRow {
protected SideInfo sideInfo;
@@ -48,8 +48,6 @@ public AllReqRow(SideInfo sideInfo){
}
- protected abstract Row fillData(Row input, Object sideInput);
-
protected abstract void initCache() throws SQLException;
protected abstract void reloadCache();
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java b/core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java
index 6617bb29f..6df1af1d5 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java
@@ -40,7 +40,7 @@
* @author xuchao
*/
-public abstract class AsyncReqRow extends RichAsyncFunction {
+public abstract class AsyncReqRow extends RichAsyncFunction implements ISideReqRow {
private static final long serialVersionUID = 2098635244857937717L;
@@ -79,9 +79,6 @@ protected boolean openCache(){
return sideInfo.getSideCache() != null;
}
-
- protected abstract Row fillData(Row input, Object sideInput);
-
protected void dealMissKey(Row input, ResultFuture resultFuture){
if(sideInfo.getJoinType() == JoinType.LEFT){
//Reserved left table data
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/ISideReqRow.java b/core/src/main/java/com/dtstack/flink/sql/side/ISideReqRow.java
new file mode 100644
index 000000000..88066e37f
--- /dev/null
+++ b/core/src/main/java/com/dtstack/flink/sql/side/ISideReqRow.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.flink.sql.side;
+
+import org.apache.flink.types.Row;
+
+/**
+ *
+ * Date: 2018/12/4
+ * Company: www.dtstack.com
+ * @author xuchao
+ */
+public interface ISideReqRow {
+
+ Row fillData(Row input, Object sideInput);
+
+}
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java b/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java
index 7f165ac30..b412b1d18 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java
@@ -20,7 +20,7 @@
package com.dtstack.flink.sql.side;
-import com.dtstack.flink.sql.util.DtStringUtil;
+import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.JoinType;
import org.apache.calcite.sql.SqlAsOperator;
import org.apache.calcite.sql.SqlBasicCall;
@@ -52,11 +52,14 @@
public class SideSQLParser {
public Queue