Skip to content

Commit 5c6a062

Browse files
committed
fix 修复hbase-side 别名异常
1 parent b7e4aa9 commit 5c6a062

File tree

3 files changed

+33
-19
lines changed

3 files changed

+33
-19
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import org.apache.commons.io.Charsets;
5757
import org.apache.commons.lang3.StringUtils;
5858
import org.apache.flink.api.common.typeinfo.TypeInformation;
59-
import org.apache.flink.api.java.tuple.Tuple2;
6059
import org.apache.flink.api.java.typeutils.RowTypeInfo;
6160
import org.apache.flink.streaming.api.datastream.DataStream;
6261
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -65,7 +64,6 @@
6564
import org.apache.flink.table.api.TableEnvironment;
6665
import org.apache.flink.table.api.java.StreamTableEnvironment;
6766
import org.apache.flink.table.sinks.TableSink;
68-
import org.apache.flink.types.Row;
6967
import org.slf4j.Logger;
7068
import org.slf4j.LoggerFactory;
7169

@@ -81,9 +79,10 @@
8179
import java.util.Set;
8280

8381
/**
84-
* 任务执行时的流程方法
82+
* 任务执行时的流程方法
8583
* Date: 2020/2/17
8684
* Company: www.dtstack.com
85+
*
8786
* @author maqi
8887
*/
8988
public class ExecuteProcessHelper {
@@ -126,11 +125,11 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
126125
.setConfProp(confProperties)
127126
.setJarUrlList(jarUrlList)
128127
.build();
129-
130128
}
131129

132130
/**
133-
* 非local模式或者shipfile部署模式,remoteSqlPluginPath必填
131+
* 非local模式或者shipfile部署模式,remoteSqlPluginPath必填
132+
*
134133
* @param remoteSqlPluginPath
135134
* @param deployMode
136135
* @param pluginLoadMode
@@ -147,7 +146,7 @@ public static boolean checkRemoteSqlPluginPath(String remoteSqlPluginPath, Strin
147146

148147
public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInfo) throws Exception {
149148
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExeEnv(paramsInfo.getConfProp(), paramsInfo.getDeployMode());
150-
StreamTableEnvironment tableEnv = getStreamTableEnv(env,paramsInfo.getConfProp());
149+
StreamTableEnvironment tableEnv = getStreamTableEnv(env, paramsInfo.getConfProp());
151150

152151
SqlParser.setLocalSqlPluginRoot(paramsInfo.getLocalSqlPluginPath());
153152
SqlTree sqlTree = SqlParser.parseSql(paramsInfo.getSql());
@@ -188,7 +187,7 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
188187

189188
private static void sqlTranslation(String localSqlPluginPath,
190189
StreamTableEnvironment tableEnv,
191-
SqlTree sqlTree,Map<String, AbstractSideTableInfo> sideTableMap,
190+
SqlTree sqlTree, Map<String, AbstractSideTableInfo> sideTableMap,
192191
Map<String, Table> registerTableCache) throws Exception {
193192

194193
SideSqlExec sideSqlExec = new SideSqlExec();
@@ -251,13 +250,14 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrl
251250
}
252251

253252
/**
254-
* 向Flink注册源表和结果表,返回执行时插件包的全路径
253+
* 向Flink注册源表和结果表,返回执行时插件包的全路径
254+
*
255255
* @param sqlTree
256256
* @param env
257257
* @param tableEnv
258258
* @param localSqlPluginPath
259259
* @param remoteSqlPluginPath
260-
* @param pluginLoadMode 插件加载模式 classpath or shipfile
260+
* @param pluginLoadMode 插件加载模式 classpath or shipfile
261261
* @param sideTableMap
262262
* @param registerTableCache
263263
* @return
@@ -322,7 +322,8 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
322322
}
323323

324324
/**
325-
* perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
325+
* perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
326+
*
326327
* @param env
327328
* @param classPathSet
328329
*/

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
20-
2119
package com.dtstack.flink.sql.side.hbase;
2220

2321
import com.dtstack.flink.sql.enums.ECacheContentType;
@@ -42,9 +40,7 @@
4240
import org.slf4j.Logger;
4341
import org.slf4j.LoggerFactory;
4442

45-
import java.util.Collections;
46-
import java.util.List;
47-
import java.util.Map;
43+
import java.util.*;
4844
import java.util.concurrent.ExecutorService;
4945
import java.util.concurrent.LinkedBlockingQueue;
5046
import java.util.concurrent.ThreadPoolExecutor;
@@ -73,9 +69,9 @@ public class HbaseAsyncReqRow extends BaseAsyncReqRow {
7369

7470
private transient AbstractRowKeyModeDealer rowKeyMode;
7571

76-
private String tableName;
72+
private final String tableName;
7773

78-
private String[] colNames;
74+
private final String[] colNames;
7975

8076
public HbaseAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
8177
super(new HbaseAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
@@ -131,7 +127,7 @@ public void asyncInvoke(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,R
131127
dealMissKey(inputCopy, resultFuture);
132128
return;
133129
}
134-
refData.put(sideInfo.getEqualFieldList().get(i), equalObj);
130+
refData.put(getAliasFieldsName(sideInfo.getEqualFieldList().get(i), sideInfo.getSideTableInfo().getPhysicalFields()), equalObj);
135131
}
136132

137133
String rowKeyStr = ((HbaseAsyncSideInfo)sideInfo).getRowKeyBuilder().getRowKey(refData);
@@ -189,6 +185,23 @@ public Row fillData(Row input, Object sideInput){
189185
return row;
190186
}
191187

188+
// 根据实际字段名获得对应的别名
189+
public String getAliasFieldsName(String realFieldName, Map<String, String> physicalFields) {
190+
Collection<String> values = physicalFields.values();
191+
Set<String> keySet = physicalFields.keySet();
192+
if (!values.contains(realFieldName)) {
193+
// TODO Error ? or Warn ?
194+
LOG.warn(realFieldName + "不存在别名");
195+
} else {
196+
for (String key : keySet) {
197+
if (physicalFields.get(key).equals(realFieldName)) {
198+
return key;
199+
}
200+
}
201+
}
202+
return realFieldName;
203+
}
204+
192205
@Override
193206
public void close() throws Exception {
194207
super.close();

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/RowKeyBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public static String[] splitIgnoreQuotaBrackets(String str, String delimiter){
101101
public ReplaceInfo getReplaceInfo(String field){
102102

103103
field = field.trim();
104-
if(field.length() <= 2){
104+
if(field.length() <= 0){
105105
throw new RuntimeException(field + " \n" +
106106
"Format defined exceptions");
107107
}

0 commit comments

Comments
 (0)