From c9ddaa9444f316e81253744144e582b5c5f78b4b Mon Sep 17 00:00:00 2001 From: tiezhu Date: Mon, 18 Jan 2021 20:44:50 +0800 Subject: [PATCH 1/4] =?UTF-8?q?[opt-31342][rdb]=E4=BC=98=E5=8C=96=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flink/sql/core/rdb/JdbcResourceCheck.java | 23 ++++++++++--------- .../sql/side/rdb/async/RdbAsyncReqRow.java | 19 ++++++++------- .../sql/side/rdb/table/RdbSideTableInfo.java | 2 +- .../flink/sql/sink/rdb/JDBCOptions.java | 2 +- .../sql/sink/rdb/table/RdbTableInfo.java | 2 +- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/JdbcResourceCheck.java b/rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/JdbcResourceCheck.java index 514d4c5d9..2b28258a7 100644 --- a/rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/JdbcResourceCheck.java +++ b/rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/JdbcResourceCheck.java @@ -20,6 +20,7 @@ import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil; import com.dtstack.flink.sql.resource.ResourceCheck; +import org.apache.commons.lang.StringUtils; import org.apache.flink.runtime.execution.SuppressRestartsException; import java.sql.Connection; @@ -43,9 +44,9 @@ public class JdbcResourceCheck extends ResourceCheck { private static final String UPDATE_STR = "update"; private static final String REPLACE_STR = "replace"; - private static final String CHECK_SELECT_SQL = "select 1 from %s where 1=1;"; - private static final String CHECK_DELETE_SQL = "delete from %s where 1 = 3;"; - private static final String CHECK_INSERT_SQL = "insert into %s (select * from %s where 1 = 2);"; + private static final String CHECK_SELECT_SQL = "select 1 from $table where 1=2;"; + private static final String CHECK_DELETE_SQL = "delete from $table where 1=3;"; + private static final String CHECK_INSERT_SQL = "insert into $table (select * from $table where 1=2);"; private static final Map PRIVILEGE_SQL_MAP = new HashMap<>(); private static final JdbcResourceCheck Instance = new JdbcResourceCheck(); @@ -116,13 +117,13 @@ public void checkPrivilege( statement = connection.createStatement(); for (String s : privilegeList) { privilege = s; - if (privilege.startsWith(SELECT_STR)) { - statement.executeQuery( - String.format(PRIVILEGE_SQL_MAP.get(privilege.toLowerCase()), tableInfo)); - } else { - statement.executeUpdate( - String.format(PRIVILEGE_SQL_MAP.get(privilege.toLowerCase()), tableInfo, tableInfo)); - } + statement.execute( + StringUtils.replace( + PRIVILEGE_SQL_MAP.get(privilege.toLowerCase()), + "$table", + tableName + ) + ); } } catch (SQLException sqlException) { if (sqlException.getMessage().contains("command denied")) { @@ -130,7 +131,7 @@ public void checkPrivilege( String.format("user [%s] don't have [%s] privilege of table [%s]", userName, privilege, tableInfo))); } - throw new SuppressRestartsException(new Throwable(sqlException.getMessage())); + throw new SuppressRestartsException(new IllegalArgumentException(sqlException.getMessage())); } finally { JdbcConnectUtil.closeConnectionResource(null, statement, connection, false); } diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java index d4cd6c5b5..daec5d309 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java @@ -19,11 +19,9 @@ package com.dtstack.flink.sql.side.rdb.async; -import com.dtstack.flink.sql.core.rdb.JdbcCheckKeys; import com.dtstack.flink.sql.core.rdb.JdbcResourceCheck; import com.dtstack.flink.sql.enums.ECacheContentType; import com.dtstack.flink.sql.factory.DTThreadFactory; -import com.dtstack.flink.sql.resource.ResourceCheck; import com.dtstack.flink.sql.side.BaseAsyncReqRow; import com.dtstack.flink.sql.side.BaseSideInfo; import com.dtstack.flink.sql.side.CacheMissVal; @@ -111,7 +109,8 @@ protected void init(BaseSideInfo sideInfo) { } @Override - protected void preInvoke(BaseRow input, ResultFuture resultFuture) { } + protected void preInvoke(BaseRow input, ResultFuture resultFuture) { + } @Override public void handleAsyncInvoke(Map inputParams, BaseRow input, ResultFuture resultFuture) throws Exception { @@ -144,13 +143,13 @@ protected void asyncQueryData(Map inputParams, } final protected void doAsyncQueryData( - Map inputParams, - BaseRow input, - ResultFuture resultFuture, - SQLClient rdbSqlClient, - AtomicLong failCounter, - AtomicBoolean finishFlag, - CountDownLatch latch) { + Map inputParams, + BaseRow input, + ResultFuture resultFuture, + SQLClient rdbSqlClient, + AtomicLong failCounter, + AtomicBoolean finishFlag, + CountDownLatch latch) { rdbSqlClient.getConnection(conn -> { try { if (conn.failed()) { diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java index 6099c583e..19e747c3a 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java @@ -135,7 +135,7 @@ public Map buildCheckProperties() { properties.put(JdbcCheckKeys.TABLE_NAME_KEY, getTableName()); properties.put(JdbcCheckKeys.OPERATION_NAME_KEY, getName()); properties.put(JdbcCheckKeys.TABLE_TYPE_KEY, "side"); - properties.put(JdbcCheckKeys.NEED_CHECK, ResourceCheck.NEED_CHECK+""); + properties.put(JdbcCheckKeys.NEED_CHECK, String.valueOf(ResourceCheck.NEED_CHECK)); return properties; } } diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/JDBCOptions.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/JDBCOptions.java index 686203b2b..eacdc9b37 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/JDBCOptions.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/JDBCOptions.java @@ -90,7 +90,7 @@ public Map buildCheckProperties() { properties.put(JdbcCheckKeys.TABLE_NAME_KEY, getTableName()); properties.put(JdbcCheckKeys.OPERATION_NAME_KEY, "jdbcOutputFormat"); properties.put(JdbcCheckKeys.TABLE_TYPE_KEY, "sink"); - properties.put(JdbcCheckKeys.NEED_CHECK, ResourceCheck.NEED_CHECK+""); + properties.put(JdbcCheckKeys.NEED_CHECK, String.valueOf(ResourceCheck.NEED_CHECK)); return properties; } diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java index c3ea736a4..f5ca46f6a 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java @@ -233,7 +233,7 @@ public Map buildCheckProperties() { properties.put(JdbcCheckKeys.TABLE_NAME_KEY, getTableName()); properties.put(JdbcCheckKeys.OPERATION_NAME_KEY, getName()); properties.put(JdbcCheckKeys.TABLE_TYPE_KEY, "sink"); - properties.put(JdbcCheckKeys.NEED_CHECK, ResourceCheck.NEED_CHECK+""); + properties.put(JdbcCheckKeys.NEED_CHECK, String.valueOf(ResourceCheck.NEED_CHECK)); return properties; } From 7f277ea579b86cdb27d1d78d2170aa7a54ba9a2a Mon Sep 17 00:00:00 2001 From: chuixue Date: Tue, 19 Jan 2021 15:46:39 +0800 Subject: [PATCH 2/4] [feat-31342][rdb] load Driver class use ClassLoaderManager --- .../flink/sql/core/rdb/util/JdbcConnectUtil.java | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/util/JdbcConnectUtil.java b/rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/util/JdbcConnectUtil.java index 41e01f0f4..a5a33d51b 100644 --- a/rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/util/JdbcConnectUtil.java +++ b/rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/util/JdbcConnectUtil.java @@ -136,7 +136,7 @@ public static Connection getConnectWithRetry( + "\nerror message: "); String errorCause = null; - forName(driverName); + ClassLoaderManager.forName(driverName, JdbcConnectUtil.class.getClassLoader()); Preconditions.checkNotNull(url, "url can't be null!"); for (int i = 0; i < DEFAULT_RETRY_NUM; i++) { @@ -157,16 +157,4 @@ public static Connection getConnectWithRetry( } throw new SuppressRestartsException(new Throwable(errorMessage + errorCause)); } - - /** - * @param clazz - */ - public synchronized static void forName(String clazz) { - try { - Class driverClass = Class.forName(clazz); - driverClass.newInstance(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } } From 43a84c7f67a90b38997beb898e764bc31cc557c4 Mon Sep 17 00:00:00 2001 From: chuixue Date: Wed, 20 Jan 2021 09:34:16 +0800 Subject: [PATCH 3/4] [feat-31342][impala] add impala connection check --- .../flink/sql/side/impala/table/ImpalaSideParser.java | 3 +++ .../flink/sql/side/impala/table/ImpalaSideTableInfo.java | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/impala/impala-side/impala-side-core/src/main/java/com/dtstack/flink/sql/side/impala/table/ImpalaSideParser.java b/impala/impala-side/impala-side-core/src/main/java/com/dtstack/flink/sql/side/impala/table/ImpalaSideParser.java index 4a35162d5..318ef6066 100644 --- a/impala/impala-side/impala-side-core/src/main/java/com/dtstack/flink/sql/side/impala/table/ImpalaSideParser.java +++ b/impala/impala-side/impala-side-core/src/main/java/com/dtstack/flink/sql/side/impala/table/ImpalaSideParser.java @@ -19,6 +19,7 @@ package com.dtstack.flink.sql.side.impala.table; import com.dtstack.flink.sql.side.rdb.table.RdbSideParser; +import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo; import com.dtstack.flink.sql.table.AbstractTableInfo; import com.dtstack.flink.sql.util.MathUtil; import com.fasterxml.jackson.databind.ObjectMapper; @@ -61,6 +62,8 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map Date: Wed, 20 Jan 2021 15:03:06 +0800 Subject: [PATCH 4/4] [hotfix-34329] optimize sampling print data record. --- .../sink/cassandra/CassandraOutputFormat.java | 8 +-- .../flink/sql/constrant/ConfigConstrant.java | 2 +- .../flink/sql/exec/ExecuteProcessHelper.java | 12 +++- .../format/DeserializationMetricWrapper.java | 14 +++-- .../AbstractDtRichOutputFormat.java | 3 +- .../dtstack/flink/sql/util/SampleUtils.java | 59 +++++++++++++++++++ docs/config.md | 1 + .../sql/sink/hbase/HbaseOutputFormat.java | 12 ++-- .../sql/sink/impala/ImpalaOutputFormat.java | 6 +- .../flink/sql/sink/kudu/KuduOutputFormat.java | 7 +-- .../flink/sql/launcher/LauncherMain.java | 1 - .../sql/sink/mongo/MongoOutputFormat.java | 7 +-- .../rdb/format/JDBCUpsertOutputFormat.java | 10 +--- .../sql/sink/redis/RedisOutputFormat.java | 7 +-- 14 files changed, 104 insertions(+), 45 deletions(-) create mode 100644 core/src/main/java/com/dtstack/flink/sql/util/SampleUtils.java diff --git a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java index 4428fbc76..e5ea13ccc 100644 --- a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java +++ b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java @@ -38,6 +38,7 @@ package com.dtstack.flink.sql.sink.cassandra; +import com.dtstack.flink.sql.util.SampleUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; @@ -192,7 +193,7 @@ public void open(int taskNumber, int numTasks) { * @see PreparedStatement */ @Override - public void writeRecord(Tuple2 tuple2) throws IOException { + public void writeRecord(Tuple2 tuple2) { Tuple2 tupleTrans = tuple2; Boolean retract = tupleTrans.getField(0); Row row = tupleTrans.getField(1); @@ -209,10 +210,7 @@ public void writeRecord(Tuple2 tuple2) throws IOException { private void insertWrite(Row row) { try { - - if(outRecords.getCount() % ROW_PRINT_FREQUENCY == 0){ - LOG.info("Receive data : {}", row); - } + SampleUtils.samplingSinkPrint(samplingIntervalCount, LOG, outRecords.getCount(), row.toString()); String cql = buildSql(row); if (cql != null) { diff --git a/core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java b/core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java index 69a4bdcf8..76f01aea6 100644 --- a/core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java +++ b/core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java @@ -67,7 +67,7 @@ public class ConfigConstrant { public static final String STATE_BACKEND_KEY = "state.backend"; public static final String CHECKPOINTS_DIRECTORY_KEY = "state.checkpoints.dir"; public static final String STATE_BACKEND_INCREMENTAL_KEY = "state.backend.incremental"; - + public static final String SAMPLE_INTERVAL_COUNT = "sample.interval.count"; public static final String RESTOREENABLE = "restore.enable"; public static final String LOG_LEVEL_KEY = "logLevel"; diff --git a/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java b/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java index e438587d8..767dda481 100644 --- a/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java +++ b/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java @@ -48,6 +48,7 @@ import com.dtstack.flink.sql.util.DataTypeUtils; import com.dtstack.flink.sql.util.DtStringUtil; import com.dtstack.flink.sql.util.PluginUtil; +import com.dtstack.flink.sql.util.SampleUtils; import com.dtstack.flink.sql.util.SqlFormatterUtil; import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner; import com.fasterxml.jackson.databind.ObjectMapper; @@ -92,6 +93,8 @@ import java.util.Set; import java.util.TimeZone; +import static com.dtstack.flink.sql.constrant.ConfigConstrant.SAMPLE_INTERVAL_COUNT; + /** * 任务执行时的流程方法 * Date: 2020/2/17 @@ -177,7 +180,6 @@ public static boolean checkRemoteSqlPluginPath(String remoteSqlPluginPath, Strin return true; } - public static StreamTableEnvironment getStreamExecution(ParamsInfo paramsInfo) throws Exception { ClassLoader envClassLoader = Thread.currentThread().getContextClassLoader(); @@ -187,6 +189,7 @@ public static StreamTableEnvironment getStreamExecution(ParamsInfo paramsInfo) t StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExeEnv(paramsInfo.getConfProp(), paramsInfo.getDeployMode()); StreamTableEnvironment tableEnv = getStreamTableEnv(env, paramsInfo.getConfProp()); + setSamplingIntervalCount(paramsInfo); ResourceCheck.NEED_CHECK = Boolean.parseBoolean(paramsInfo.getConfProp().getProperty(ResourceCheck.CHECK_STR, "true")); String planner = paramsInfo.getPlanner(); @@ -228,6 +231,13 @@ public static StreamTableEnvironment getStreamExecution(ParamsInfo paramsInfo) t return tableEnv; } + private static void setSamplingIntervalCount(ParamsInfo paramsInfo) { + SampleUtils.setSamplingIntervalCount( + Integer.parseInt( + paramsInfo.getConfProp().getProperty(SAMPLE_INTERVAL_COUNT, "0") + ) + ); + } public static List getExternalJarUrls(String addJarListStr) throws java.io.IOException { List jarUrlList = Lists.newArrayList(); diff --git a/core/src/main/java/com/dtstack/flink/sql/format/DeserializationMetricWrapper.java b/core/src/main/java/com/dtstack/flink/sql/format/DeserializationMetricWrapper.java index 5696c1f61..323defe6e 100644 --- a/core/src/main/java/com/dtstack/flink/sql/format/DeserializationMetricWrapper.java +++ b/core/src/main/java/com/dtstack/flink/sql/format/DeserializationMetricWrapper.java @@ -20,6 +20,7 @@ import com.dtstack.flink.sql.dirtyManager.manager.DirtyDataManager; import com.dtstack.flink.sql.metric.MetricConstant; +import com.dtstack.flink.sql.util.SampleUtils; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -45,7 +46,7 @@ public class DeserializationMetricWrapper extends AbstractDeserializationSchema< private static final Logger LOG = LoggerFactory.getLogger(DeserializationMetricWrapper.class); - private static int dataPrintFrequency = 1000; + private int samplingIntervalCount = SampleUtils.getSamplingIntervalCount(); private DeserializationSchema deserializationSchema; @@ -96,11 +97,14 @@ public void initMetric() { } @Override - public Row deserialize(byte[] message) throws IOException { + public Row deserialize(byte[] message) { try { - if (numInRecord.getCount() % dataPrintFrequency == 0) { - LOG.info("receive source data:" + new String(message, StandardCharsets.UTF_8)); - } + SampleUtils.samplingSourcePrint( + samplingIntervalCount, + LOG, + numInRecord.getCount(), + new String(message, StandardCharsets.UTF_8) + ); numInRecord.inc(); numInBytes.inc(message.length); beforeDeserialize(); diff --git a/core/src/main/java/com/dtstack/flink/sql/outputformat/AbstractDtRichOutputFormat.java b/core/src/main/java/com/dtstack/flink/sql/outputformat/AbstractDtRichOutputFormat.java index fbcc86bbd..91b6df46c 100644 --- a/core/src/main/java/com/dtstack/flink/sql/outputformat/AbstractDtRichOutputFormat.java +++ b/core/src/main/java/com/dtstack/flink/sql/outputformat/AbstractDtRichOutputFormat.java @@ -18,6 +18,7 @@ package com.dtstack.flink.sql.outputformat; import com.dtstack.flink.sql.metric.MetricConstant; +import com.dtstack.flink.sql.util.SampleUtils; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Meter; @@ -32,8 +33,8 @@ public abstract class AbstractDtRichOutputFormat extends RichOutputFormat{ public transient Counter outRecords; public transient Counter outDirtyRecords; public transient Meter outRecordsRate; + protected int samplingIntervalCount = SampleUtils.getSamplingIntervalCount(); - protected static int ROW_PRINT_FREQUENCY = 1000; protected static int DIRTY_PRINT_FREQUENCY = 1000; public void initMetric() { diff --git a/core/src/main/java/com/dtstack/flink/sql/util/SampleUtils.java b/core/src/main/java/com/dtstack/flink/sql/util/SampleUtils.java new file mode 100644 index 000000000..b7de6e2d8 --- /dev/null +++ b/core/src/main/java/com/dtstack/flink/sql/util/SampleUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.util; + +import org.slf4j.Logger; + +/** + * @program: flinkStreamSQL + * @author: wuren + * @create: 2021/01/19 + **/ +public class SampleUtils { + + private static int samplingIntervalCount = 0; + + /** + * static变量无法序列化到TaskManager,所以设定此方法,赋值给每个具体使用的类。 + * @return + */ + public static int getSamplingIntervalCount() { + return samplingIntervalCount; + } + + public static void setSamplingIntervalCount(int interval) { + samplingIntervalCount = interval; + } + + public static void samplingSourcePrint(int samplingIntervalCount, Logger logger, long count, String message) { + Runnable func = () -> logger.info("sampling source input data: " + message); + samplingPrint(samplingIntervalCount, count, func); + } + + public static void samplingSinkPrint(int samplingIntervalCount, Logger logger, long count, String message) { + Runnable func = () -> logger.info("sampling sink output data: " + message); + samplingPrint(samplingIntervalCount, count, func); + } + + private static void samplingPrint(int samplingIntervalCount, long count, Runnable func) { + if (samplingIntervalCount > 0 && count % samplingIntervalCount == 0) { + func.run(); + } + } +} diff --git a/docs/config.md b/docs/config.md index aa1860800..f618f86c8 100644 --- a/docs/config.md +++ b/docs/config.md @@ -79,6 +79,7 @@ sh submit.sh -key1 val1 -key2 val2 * savePointPath:任务恢复点的路径(默认无) * allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false) * logLevel: 日志级别动态配置(默认info) + * sample.interval.count:间隔一定数据条数后,将本次进入Flink的数据抽样打印到日志中。默认为0,不进行抽样打印。可以设置一个整数,例如:1000000。 * [prometheus 相关参数](./prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例 diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java index 434ba694d..433c90666 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java @@ -22,6 +22,7 @@ import com.dtstack.flink.sql.dirtyManager.manager.DirtyDataManager; import com.dtstack.flink.sql.factory.DTThreadFactory; import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat; +import com.dtstack.flink.sql.util.SampleUtils; import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.tuple.Tuple2; @@ -241,11 +242,8 @@ protected synchronized void dealBatchOperation(List records) { outRecords.inc(); } } - // 打印结果 - if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) { - // 只打印最后一条数据 - LOG.info(records.get(records.size() - 1).toString()); - } + // 只打印最后一条数据 + SampleUtils.samplingSinkPrint(samplingIntervalCount, LOG, outRecords.getCount(), records.get(records.size() - 1).toString()); } catch (IOException | InterruptedException e) { LOG.error("", e); } finally { @@ -271,9 +269,7 @@ protected void dealInsert(Row record) { outDirtyRecords.inc(); } - if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) { - LOG.info(record.toString()); - } + SampleUtils.samplingSinkPrint(samplingIntervalCount, LOG, outRecords.getCount(), record.toString()); outRecords.inc(); } diff --git a/impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java b/impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java index e0b9eafce..70a021e46 100644 --- a/impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java +++ b/impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java @@ -24,6 +24,7 @@ import com.dtstack.flink.sql.sink.rdb.JDBCTypeConvertUtils; import com.dtstack.flink.sql.table.AbstractTableInfo; import com.dtstack.flink.sql.util.KrbUtils; +import com.dtstack.flink.sql.util.SampleUtils; import com.google.common.collect.Maps; import org.apache.commons.collections.CollectionUtils; import org.apache.flink.api.java.tuple.Tuple2; @@ -78,7 +79,6 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat record) throws IOException { return; } - if (outRecords.getCount() % RECEIVE_DATA_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) { - LOG.info("Receive data : {}", record); - } + SampleUtils.samplingSinkPrint(samplingIntervalCount, LOG, outRecords.getCount(), record.toString()); if (updateMode.equalsIgnoreCase(UPDATE_MODE)) { rows.add(Row.copy(record.f1)); diff --git a/kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java b/kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java index e8b508f1f..ea5d68416 100644 --- a/kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java +++ b/kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java @@ -20,6 +20,7 @@ import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat; import com.dtstack.flink.sql.util.KrbUtils; +import com.dtstack.flink.sql.util.SampleUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; @@ -131,7 +132,7 @@ private void establishConnection() throws IOException { } @Override - public void writeRecord(Tuple2 record) throws IOException { + public void writeRecord(Tuple2 record) { Tuple2 tupleTrans = record; Boolean retract = tupleTrans.getField(0); if (!retract) { @@ -148,9 +149,7 @@ public void writeRecord(Tuple2 record) throws IOException { } try { - if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) { - LOG.info("Receive data : {}", row); - } + SampleUtils.samplingSinkPrint(samplingIntervalCount, LOG, outRecords.getCount(), row.toString()); session.apply(toOperation(writeMode, row)); outRecords.inc(); } catch (KuduException e) { diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java b/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java index 07b2fa4a2..1f354fe35 100644 --- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java +++ b/launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java @@ -137,7 +137,6 @@ private static String[] parseJson(String[] args) { return list.toArray(new String[0]); } - public static void main(String[] args) throws Exception { JobParamsInfo jobParamsInfo = parseArgs(args); ClusterMode execMode = ClusterMode.valueOf(jobParamsInfo.getMode()); diff --git a/mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoOutputFormat.java b/mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoOutputFormat.java index df8293522..3f1bdebb4 100644 --- a/mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoOutputFormat.java +++ b/mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoOutputFormat.java @@ -20,6 +20,7 @@ package com.dtstack.flink.sql.sink.mongo; import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat; +import com.dtstack.flink.sql.util.SampleUtils; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.client.MongoCollection; @@ -70,7 +71,7 @@ public void open(int taskNumber, int numTasks) throws IOException { } @Override - public void writeRecord(Tuple2 tuple2) throws IOException { + public void writeRecord(Tuple2 tuple2) { Tuple2 tupleTrans = tuple2; Boolean retract = tupleTrans.getField(0); @@ -102,9 +103,7 @@ public void writeRecord(Tuple2 tuple2) throws IOException { dbCollection.insertOne(doc); } - if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0){ - LOG.info(record.toString()); - } + SampleUtils.samplingSinkPrint(samplingIntervalCount, LOG, outRecords.getCount(), record.toString()); outRecords.inc(); } diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java index 55d9a0ca7..5626def9e 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java @@ -27,6 +27,7 @@ import com.dtstack.flink.sql.sink.rdb.writer.AbstractUpsertWriter; import com.dtstack.flink.sql.sink.rdb.writer.AppendOnlyWriter; import com.dtstack.flink.sql.sink.rdb.writer.JDBCWriter; +import com.dtstack.flink.sql.util.SampleUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.types.Row; @@ -55,8 +56,6 @@ public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat tuple2) throws IOException { + public synchronized void writeRecord(Tuple2 tuple2) { checkConnectionOpen(); try { - if (outRecords.getCount() % RECEIVEDATA_PRINT_FREQUENTY == 0 || LOG.isDebugEnabled()) { - LOG.info("Receive data : {}", tuple2); - } - // Receive data + SampleUtils.samplingSinkPrint(samplingIntervalCount, LOG, outRecords.getCount(), tuple2.toString()); outRecords.inc(); jdbcWriter.addRecord(tuple2); diff --git a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java index 801eebc08..311676929 100644 --- a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java +++ b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java @@ -20,6 +20,7 @@ import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat; import com.dtstack.flink.sql.sink.redis.enums.RedisType; +import com.dtstack.flink.sql.util.SampleUtils; import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -148,7 +149,7 @@ private void establishConnection() { } @Override - public void writeRecord(Tuple2 record) throws IOException { + public void writeRecord(Tuple2 record) { Tuple2 tupleTrans = record; Boolean retract = tupleTrans.getField(0); if (!retract) { @@ -163,9 +164,7 @@ public void writeRecord(Tuple2 record) throws IOException { refData.put(fieldNames[i], row.getField(i)); } save(refData); - if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) { - LOG.info(record.toString()); - } + SampleUtils.samplingSinkPrint(samplingIntervalCount, LOG, outRecords.getCount(), row.toString()); outRecords.inc(); }