Skip to content

Commit 23ffc03

Browse files
committed
Merge branch 'hotfix_1.10_4.0.x_31268' into '1.10_release_4.0.x'
[fix] RDB全量维表使用别名异常 See merge request dt-insight-engine/flinkStreamSQL!184
2 parents 4114079 + 0e59518 commit 23ffc03

File tree

2 files changed

+29
-12
lines changed

2 files changed

+29
-12
lines changed

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@
4242
import java.time.LocalDateTime;
4343
import java.util.ArrayList;
4444
import java.util.Calendar;
45+
import java.util.HashMap;
4546
import java.util.List;
4647
import java.util.Map;
48+
import java.util.Objects;
4749
import java.util.concurrent.atomic.AtomicReference;
4850
import java.util.stream.Collectors;
4951

@@ -103,7 +105,7 @@ public void flatMap(Row value, Collector<BaseRow> out) throws Exception {
103105
List<Integer> equalValIndex = sideInfo.getEqualValIndex();
104106
ArrayList<Object> inputParams = equalValIndex.stream()
105107
.map(value::getField)
106-
.filter(object -> null != object)
108+
.filter(Objects::nonNull)
107109
.collect(Collectors.toCollection(ArrayList::new));
108110

109111
if (inputParams.size() != equalValIndex.size() && sideInfo.getJoinType() == JoinType.LEFT) {
@@ -121,7 +123,7 @@ public void flatMap(Row value, Collector<BaseRow> out) throws Exception {
121123
Row row = fillData(value, null);
122124
RowDataComplete.collectRow(out, row);
123125
} else if (!CollectionUtils.isEmpty(cacheList)) {
124-
cacheList.stream().forEach(one -> out.collect(RowDataConvert.convertToBaseRow(fillData(value, one))));
126+
cacheList.forEach(one -> out.collect(RowDataConvert.convertToBaseRow(fillData(value, one))));
125127
}
126128
}
127129

@@ -166,13 +168,17 @@ private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, C
166168
ResultSet resultSet = statement.executeQuery(sql);
167169

168170
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
169-
String[] fields = sideInfo.getSideTableInfo().getFieldTypes();
171+
String[] sideFieldTypes = sideInfo.getSideTableInfo().getFieldTypes();
172+
Map<String, String> sideFieldNamesAndTypes = Maps.newHashMap();
173+
for (int i = 0; i < sideFieldNames.length; i++) {
174+
sideFieldNamesAndTypes.put(sideFieldNames[i], sideFieldTypes[i]);
175+
}
176+
170177
while (resultSet.next()) {
171178
Map<String, Object> oneRow = Maps.newHashMap();
172179
for (String fieldName : sideFieldNames) {
173180
Object object = resultSet.getObject(fieldName.trim());
174-
int fieldIndex = sideInfo.getSideTableInfo().getFieldList().indexOf(fieldName.trim());
175-
object = SwitchUtil.getTarget(object, fields[fieldIndex]);
181+
object = SwitchUtil.getTarget(object, sideFieldNamesAndTypes.get(fieldName));
176182
oneRow.put(fieldName.trim(), object);
177183
}
178184

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllSideInfo.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,24 @@
1818

1919
package com.dtstack.flink.sql.side.rdb.all;
2020

21-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22-
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.BaseSideInfo;
2323
import com.dtstack.flink.sql.side.FieldInfo;
2424
import com.dtstack.flink.sql.side.JoinInfo;
2525
import com.dtstack.flink.sql.side.PredicateInfo;
26-
import com.dtstack.flink.sql.side.BaseSideInfo;
27-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2826
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
2927
import com.dtstack.flink.sql.util.ParseUtils;
3028
import com.google.common.collect.Lists;
3129
import org.apache.calcite.sql.SqlNode;
3230
import org.apache.commons.collections.CollectionUtils;
3331
import org.apache.commons.lang3.StringUtils;
32+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3433
import org.slf4j.Logger;
3534
import org.slf4j.LoggerFactory;
3635

37-
import java.util.Arrays;
3836
import java.util.List;
37+
import java.util.Map;
38+
import java.util.Objects;
3939
import java.util.stream.Collectors;
4040

4141
/**
@@ -59,7 +59,18 @@ public RdbAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
5959
@Override
6060
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
6161
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideTableInfo;
62-
sqlCondition = getSelectFromStatement(getTableName(rdbSideTableInfo), Arrays.asList(StringUtils.split(sideSelectFields, ",")), sideTableInfo.getPredicateInfoes());
62+
List<String> selectFields = Lists.newArrayList();
63+
Map<String, String> physicalFields = rdbSideTableInfo.getPhysicalFields();
64+
physicalFields.keySet().forEach(
65+
item -> {
66+
if (Objects.isNull(physicalFields.get(item))) {
67+
selectFields.add(quoteIdentifier(item));
68+
} else {
69+
selectFields.add(quoteIdentifier(physicalFields.get(item)) + " AS " + quoteIdentifier(item));
70+
}
71+
}
72+
);
73+
sqlCondition = getSelectFromStatement(getTableName(rdbSideTableInfo), selectFields, sideTableInfo.getPredicateInfoes());
6374
LOG.info("--------dimension sql query-------\n{}" + sqlCondition);
6475
}
6576

@@ -68,7 +79,7 @@ public String getAdditionalWhereClause() {
6879
}
6980

7081
private String getSelectFromStatement(String tableName, List<String> selectFields, List<PredicateInfo> predicateInfoes) {
71-
String fromClause = selectFields.stream().map(this::quoteIdentifier).collect(Collectors.joining(", "));
82+
String fromClause = String.join(", ", selectFields);
7283
String predicateClause = predicateInfoes.stream().map(this::buildFilterCondition).collect(Collectors.joining(" AND "));
7384
String whereClause = buildWhereClause(predicateClause);
7485
return "SELECT " + fromClause + " FROM " + tableName + whereClause;

0 commit comments

Comments
 (0)