10
10
import com .dtstack .flink .sql .side .cache .CacheObj ;
11
11
import com .dtstack .flink .sql .side .kudu .table .KuduSideTableInfo ;
12
12
import com .dtstack .flink .sql .side .kudu .utils .KuduUtil ;
13
+ import com .dtstack .flink .sql .util .DateUtil ;
13
14
import com .dtstack .flink .sql .util .KrbUtils ;
14
15
import com .dtstack .flink .sql .util .RowDataComplete ;
15
16
import com .google .common .collect .Lists ;
38
39
import org .slf4j .LoggerFactory ;
39
40
40
41
import java .io .IOException ;
42
+ import java .math .BigDecimal ;
41
43
import java .security .PrivilegedAction ;
44
+ import java .sql .Timestamp ;
45
+ import java .time .Instant ;
42
46
import java .util .Arrays ;
43
47
import java .util .Collections ;
44
48
import java .util .List ;
@@ -91,7 +95,7 @@ private void connKuDu() throws IOException {
91
95
throw new IllegalArgumentException ("Table Open Failed , please check table exists" );
92
96
}
93
97
table = asyncClient .syncClient ().openTable (tableName );
94
- LOG .info ("connect kudu is successed !" );
98
+ LOG .info ("connect kudu is succeed !" );
95
99
}
96
100
scannerBuilder = asyncClient .newScannerBuilder (table );
97
101
Integer batchSizeBytes = kuduSideTableInfo .getBatchSizeBytes ();
@@ -139,12 +143,7 @@ private AsyncKuduClient getClient() throws IOException {
139
143
kuduSideTableInfo .getKrb5conf ()
140
144
);
141
145
return ugi .doAs (
142
- new PrivilegedAction <AsyncKuduClient >() {
143
- @ Override
144
- public AsyncKuduClient run () {
145
- return asyncKuduClientBuilder .build ();
146
- }
147
- });
146
+ (PrivilegedAction <AsyncKuduClient >) asyncKuduClientBuilder ::build );
148
147
} else {
149
148
return asyncKuduClientBuilder .build ();
150
149
}
@@ -160,19 +159,24 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, Re
160
159
connKuDu ();
161
160
Schema schema = table .getSchema ();
162
161
// @wenbaoup fix bug
163
- inputParams .entrySet ().forEach (e ->{
164
- scannerBuilder .addPredicate (KuduPredicate .newInListPredicate (schema .getColumn (e .getKey ()), Collections .singletonList (e .getValue ())));
162
+ inputParams .forEach ((key , value ) -> {
163
+ Object transformValue = transformValue (value );
164
+ scannerBuilder .addPredicate (
165
+ KuduPredicate .newInListPredicate (
166
+ schema .getColumn (key ),
167
+ Collections .singletonList (transformValue )
168
+ )
169
+ );
165
170
});
166
171
167
172
// 填充谓词信息
168
173
List <PredicateInfo > predicateInfoes = sideInfo .getSideTableInfo ().getPredicateInfoes ();
169
174
if (predicateInfoes .size () > 0 ) {
170
- predicateInfoes .stream ().map (info -> {
175
+ predicateInfoes .stream ().peek (info -> {
171
176
KuduPredicate kuduPredicate = KuduUtil .buildKuduPredicate (schema , info );
172
177
if (null != kuduPredicate ) {
173
178
scannerBuilder .addPredicate (kuduPredicate );
174
179
}
175
- return info ;
176
180
}).count ();
177
181
}
178
182
@@ -184,6 +188,42 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, Re
184
188
data .addCallbackDeferring (new GetListRowCB (inputCopy , cacheContent , rowList , asyncKuduScanner , resultFuture , buildCacheKey (inputParams )));
185
189
}
186
190
191
+ /**
192
+ * 将value转化为Java 通用类型
193
+ * @param value value
194
+ * @return 类型转化的value
195
+ */
196
+ private Object transformValue (Object value ) {
197
+ if (value == null ) {
198
+ return null ;
199
+ } else if (value instanceof Number && !(value instanceof BigDecimal )) {
200
+ return value ;
201
+ } else if (value instanceof Boolean ) {
202
+ return value ;
203
+ } else if (value instanceof String ) {
204
+ return value ;
205
+ } else if (value instanceof Character ) {
206
+ return value ;
207
+ } else if (value instanceof CharSequence ) {
208
+ return value ;
209
+ } else if (value instanceof Map ) {
210
+ return value ;
211
+ } else if (value instanceof List ) {
212
+ return value ;
213
+ } else if (value instanceof byte []) {
214
+ return value ;
215
+ } else if (value instanceof Instant ) {
216
+ return value ;
217
+ } else if (value instanceof Timestamp ) {
218
+ value = DateUtil .timestampToString ((Timestamp ) value );
219
+ } else if (value instanceof java .util .Date ) {
220
+ value = DateUtil .dateToString ((java .sql .Date ) value );
221
+ } else {
222
+ value = value .toString ();
223
+ }
224
+ return value ;
225
+ }
226
+
187
227
@ Override
188
228
public String buildCacheKey (Map <String , Object > inputParams ) {
189
229
StringBuilder sb = new StringBuilder ();
0 commit comments