Skip to content

Commit c7b1e77

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.0.x_32001' into 1.10_release_4.0.x
2 parents ee5384c + dd89190 commit c7b1e77

File tree

6 files changed

+226
-9
lines changed

6 files changed

+226
-9
lines changed

postgresql/postgresql-sink/src/main/java/com/dtstack/flink/sql/sink/postgresql/PostgresqlSink.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,15 @@
2020
package com.dtstack.flink.sql.sink.postgresql;
2121

2222

23+
import com.dtstack.flink.sql.enums.EUpdateMode;
2324
import com.dtstack.flink.sql.sink.IStreamSinkGener;
25+
import com.dtstack.flink.sql.sink.postgresql.writer.CopyWriter;
2426
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
2527
import com.dtstack.flink.sql.sink.rdb.AbstractRdbSink;
2628
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
29+
import com.dtstack.flink.sql.sink.rdb.writer.AbstractUpsertWriter;
30+
import com.dtstack.flink.sql.sink.rdb.writer.JDBCWriter;
31+
import org.apache.commons.lang3.StringUtils;
2732

2833
/**
2934
* @author maqi
@@ -53,6 +58,15 @@ public JDBCUpsertOutputFormat getOutputFormat() {
5358
.setKeyFields(primaryKeys)
5459
.setAllReplace(allReplace)
5560
.setUpdateMode(updateMode)
61+
.setJDBCWriter(createJdbcWriter())
5662
.build();
5763
}
64+
private JDBCWriter createJdbcWriter(){
65+
if (StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.APPEND.name()) || primaryKeys == null || primaryKeys.size() == 0) {
66+
return new CopyWriter(tableName, fieldNames, null);
67+
}
68+
return AbstractUpsertWriter.create(
69+
jdbcDialect, schema, tableName, fieldNames, sqlTypes, primaryKeys.toArray(new String[primaryKeys.size()]), null,
70+
true, allReplace, null);
71+
}
5872
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flink.sql.sink.postgresql.writer;
19+
20+
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
21+
import com.dtstack.flink.sql.sink.rdb.writer.JDBCWriter;
22+
import com.google.common.collect.Lists;
23+
import org.apache.commons.collections.CollectionUtils;
24+
import org.apache.flink.api.java.tuple.Tuple2;
25+
import org.apache.flink.types.Row;
26+
import org.postgresql.copy.CopyManager;
27+
import org.postgresql.core.BaseConnection;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.io.ByteArrayInputStream;
32+
import java.nio.charset.StandardCharsets;
33+
import java.sql.Connection;
34+
import java.sql.SQLException;
35+
import java.util.List;
36+
37+
/**
38+
* Company: www.dtstack.com
39+
* support pg copy mode
40+
*
41+
* @author dapeng
42+
* @date 2020-09-21
43+
*/
44+
public class CopyWriter implements JDBCWriter {
45+
46+
private static final long serialVersionUID = 1L;
47+
48+
private static final Logger LOG = LoggerFactory.getLogger(CopyWriter.class);
49+
50+
private static final String COPY_SQL_TEMPL = "copy %s(%s) from stdin DELIMITER '%s' NULL as '%s'";
51+
52+
private static final String DEFAULT_FIELD_DELIM = "\001";
53+
54+
private static final String DEFAULT_NULL_DELIM = "\002";
55+
56+
private static final String LINE_DELIMITER = "\n";
57+
58+
private CopyManager copyManager;
59+
60+
private transient List<Row> rows;
61+
62+
private String copySql;
63+
64+
private String tableName;
65+
66+
private String[] fieldNames;
67+
68+
// only use metric
69+
private transient AbstractDtRichOutputFormat metricOutputFormat;
70+
71+
public CopyWriter(String tableName, String[] fieldNames, AbstractDtRichOutputFormat metricOutputFormat) {
72+
this.tableName = tableName;
73+
this.fieldNames = fieldNames;
74+
this.metricOutputFormat = metricOutputFormat;
75+
this.copySql = String.format(COPY_SQL_TEMPL, tableName, String.join(",", fieldNames), DEFAULT_FIELD_DELIM, DEFAULT_NULL_DELIM);
76+
}
77+
78+
@Override
79+
public void initMetricOutput(AbstractDtRichOutputFormat metricOutputFormat) {
80+
this.metricOutputFormat = metricOutputFormat;
81+
}
82+
83+
@Override
84+
public void open(Connection connection) throws SQLException {
85+
copyManager = new CopyManager((BaseConnection) connection);
86+
this.rows = Lists.newArrayList();
87+
}
88+
89+
@Override
90+
public void prepareStatement(Connection connection) throws SQLException {
91+
92+
}
93+
94+
@Override
95+
public void addRecord(Tuple2<Boolean, Row> record) throws SQLException {
96+
if (!record.f0) {
97+
return;
98+
}
99+
rows.add(Row.copy(record.f1));
100+
}
101+
102+
@Override
103+
public void executeBatch(Connection connection) throws SQLException {
104+
if(CollectionUtils.isEmpty(rows)){
105+
return;
106+
}
107+
//write with copy
108+
StringBuilder sb = new StringBuilder();
109+
for (Row row : rows) {
110+
int lastIndex = row.getArity() - 1;
111+
for (int index = 0; index < row.getArity(); index++) {
112+
Object rowData = row.getField(index);
113+
sb.append(rowData == null ? DEFAULT_NULL_DELIM : rowData);
114+
if (index != lastIndex) {
115+
sb.append(DEFAULT_FIELD_DELIM);
116+
}
117+
}
118+
sb.append(LINE_DELIMITER);
119+
}
120+
try {
121+
ByteArrayInputStream bi = new ByteArrayInputStream(sb.toString().getBytes(StandardCharsets.UTF_8));
122+
copyManager.copyIn(copySql, bi);
123+
connection.commit();
124+
rows.clear();
125+
} catch (Exception e) {
126+
connection.rollback();
127+
connection.commit();
128+
executeUpdate(connection);
129+
}
130+
131+
}
132+
133+
@Override
134+
public void cleanBatchWhenError() throws SQLException {
135+
136+
}
137+
138+
@Override
139+
public void executeUpdate(Connection connection) throws SQLException {
140+
int index = 0;
141+
StringBuilder sb = new StringBuilder();
142+
for (Row row : rows) {
143+
try {
144+
for (; index < row.getArity(); index++) {
145+
Object rowData = row.getField(index);
146+
sb.append(rowData)
147+
.append(DEFAULT_FIELD_DELIM);
148+
}
149+
150+
String rowVal = sb.toString();
151+
ByteArrayInputStream bi = new ByteArrayInputStream(rowVal.getBytes(StandardCharsets.UTF_8));
152+
copyManager.copyIn(copySql, bi);
153+
connection.commit();
154+
} catch (Exception e) {
155+
connection.rollback();
156+
connection.commit();
157+
if (metricOutputFormat.outDirtyRecords.getCount() % DIRTYDATA_PRINT_FREQUENTY == 0 || LOG.isDebugEnabled()) {
158+
LOG.error("record insert failed ,this row is {}", row.toString());
159+
LOG.error("", e);
160+
}
161+
metricOutputFormat.outDirtyRecords.inc();
162+
}
163+
}
164+
rows.clear();
165+
166+
}
167+
168+
@Override
169+
public void close() throws SQLException {
170+
171+
}
172+
173+
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public JDBCUpsertOutputFormat(
8686
int flushMaxSize,
8787
long flushIntervalMills,
8888
boolean allReplace,
89-
String updateMode) {
89+
String updateMode,
90+
JDBCWriter jdbcWriter) {
9091
super(options.getUsername(), options.getPassword(), options.getDriverName(), options.getDbUrl());
9192
this.schema = options.getSchema();
9293
this.tableName = options.getTableName();
@@ -99,6 +100,7 @@ public JDBCUpsertOutputFormat(
99100
this.flushIntervalMills = flushIntervalMills;
100101
this.allReplace = allReplace;
101102
this.updateMode = updateMode;
103+
this.jdbcWriter = jdbcWriter;
102104
}
103105

104106
/**
@@ -117,14 +119,18 @@ public void openJdbc() throws IOException {
117119
try {
118120
establishConnection();
119121
initMetric();
120-
if (StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.APPEND.name()) || keyFields == null || keyFields.length == 0) {
121-
String insertSql = dialect.getInsertIntoStatement(schema, tableName, fieldNames, partitionFields);
122-
LOG.info("execute insert sql: {}", insertSql);
123-
jdbcWriter = new AppendOnlyWriter(insertSql, fieldTypes, this);
122+
if(jdbcWriter == null){
123+
if (StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.APPEND.name()) || keyFields == null || keyFields.length == 0) {
124+
String insertSql = dialect.getInsertIntoStatement(schema, tableName, fieldNames, partitionFields);
125+
LOG.info("execute insert sql: {}", insertSql);
126+
jdbcWriter = new AppendOnlyWriter(insertSql, fieldTypes, this);
127+
} else {
128+
jdbcWriter = AbstractUpsertWriter.create(
129+
dialect, schema, tableName, fieldNames, fieldTypes, keyFields, partitionFields,
130+
getRuntimeContext().getExecutionConfig().isObjectReuseEnabled(), allReplace, this);
131+
}
124132
} else {
125-
jdbcWriter = AbstractUpsertWriter.create(
126-
dialect, schema, tableName, fieldNames, fieldTypes, keyFields, partitionFields,
127-
getRuntimeContext().getExecutionConfig().isObjectReuseEnabled(), allReplace, this);
133+
jdbcWriter.initMetricOutput(this);
128134
}
129135
jdbcWriter.open(connection);
130136
} catch (SQLException sqe) {
@@ -242,6 +248,7 @@ public static class Builder {
242248
protected long flushIntervalMills = DEFAULT_FLUSH_INTERVAL_MILLS;
243249
protected boolean allReplace = DEFAULT_ALLREPLACE_VALUE;
244250
protected String updateMode;
251+
protected JDBCWriter jdbcWriter;
245252

246253
/**
247254
* required, jdbc options.
@@ -313,6 +320,11 @@ public Builder setUpdateMode(String updateMode) {
313320
return this;
314321
}
315322

323+
public Builder setJDBCWriter(JDBCWriter jdbcWriter) {
324+
this.jdbcWriter = jdbcWriter;
325+
return this;
326+
}
327+
316328
/**
317329
* Finalizes the configuration and checks validity.
318330
*
@@ -322,7 +334,7 @@ public JDBCUpsertOutputFormat build() {
322334
checkNotNull(options, "No options supplied.");
323335
checkNotNull(fieldNames, "No fieldNames supplied.");
324336
return new JDBCUpsertOutputFormat(
325-
options, fieldNames, keyFields, partitionFields, fieldTypes, flushMaxSize, flushIntervalMills, allReplace, updateMode);
337+
options, fieldNames, keyFields, partitionFields, fieldTypes, flushMaxSize, flushIntervalMills, allReplace, updateMode, jdbcWriter);
326338
}
327339
}
328340
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AbstractUpsertWriter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,11 @@ private UpsertWriterUsingUpsertStatement(
226226
this.upsertSql = upsertSql;
227227
}
228228

229+
@Override
230+
public void initMetricOutput(AbstractDtRichOutputFormat metricOutputFormat) {
231+
232+
}
233+
229234
@Override
230235
public void open(Connection connection) throws SQLException {
231236
super.open(connection);
@@ -293,6 +298,11 @@ private UpsertWriterUsingInsertUpdateStatement(
293298
this.pkFields = pkFields;
294299
}
295300

301+
@Override
302+
public void initMetricOutput(AbstractDtRichOutputFormat metricOutputFormat) {
303+
304+
}
305+
296306
@Override
297307
public void open(Connection connection) throws SQLException {
298308
super.open(connection);

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AppendOnlyWriter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ public AppendOnlyWriter(String insertSql, int[] fieldTypes, AbstractDtRichOutput
5656
this.metricOutputFormat = metricOutputFormat;
5757
}
5858

59+
@Override
60+
public void initMetricOutput(AbstractDtRichOutputFormat metricOutputFormat) {
61+
this.metricOutputFormat = metricOutputFormat;
62+
}
63+
5964
@Override
6065
public void open(Connection connection) throws SQLException {
6166
this.rows = new ArrayList();

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/JDBCWriter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.rdb.writer;
2020

21+
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2122
import org.apache.flink.api.java.tuple.Tuple2;
2223
import org.apache.flink.types.Row;
2324

@@ -32,6 +33,8 @@ public interface JDBCWriter extends Serializable {
3233

3334
int DIRTYDATA_PRINT_FREQUENTY = 1000;
3435

36+
void initMetricOutput(AbstractDtRichOutputFormat metricOutputFormat);
37+
3538
/**
3639
* Open the writer by JDBC Connection.
3740
*/

0 commit comments

Comments
 (0)