|
1 | 1 | package cn.hashdata.datax.plugin.writer.gpdbwriter; |
2 | 2 |
|
3 | 3 | import java.sql.Connection; |
| 4 | +import java.util.ArrayList; |
4 | 5 | import java.util.List; |
5 | 6 | import java.util.concurrent.CompletionService; |
6 | 7 | import java.util.concurrent.ExecutionException; |
@@ -48,12 +49,25 @@ public Connection createConnection() { |
48 | 49 | Connection connection = DBUtil.getConnection(this.dataBaseType, this.jdbcUrl, username, password); |
49 | 50 | DBUtil.dealWithSessionConfig(connection, writerSliceConfig, this.dataBaseType, BASIC_MESSAGE); |
50 | 51 | return connection; |
| 52 | + } |
| 53 | + |
| 54 | + private String constructColumnNameList(List<String> columnList) { |
| 55 | + List<String> columns = new ArrayList<String>(); |
| 56 | + |
| 57 | + for (String column : columnList) { |
| 58 | + if (column.endsWith("\"") && column.startsWith("\"")) { |
| 59 | + columns.add(column); |
| 60 | + } else { |
| 61 | + columns.add("\"" + column + "\""); |
| 62 | + } |
| 63 | + } |
51 | 64 |
|
| 65 | + return StringUtils.join(columns, ","); |
52 | 66 | } |
53 | 67 |
|
54 | 68 | public String getCopySql(String tableName, List<String> columnList, int segment_reject_limit) { |
55 | 69 | StringBuilder sb = new StringBuilder().append("COPY ").append(tableName).append("(") |
56 | | - .append(StringUtils.join(columnList, ",")) |
| 70 | + .append(constructColumnNameList(columnList)) |
57 | 71 | .append(") FROM STDIN WITH DELIMITER '|' NULL '' CSV QUOTE '\"' ESCAPE E'\\\\'"); |
58 | 72 |
|
59 | 73 | if (segment_reject_limit >= 2) { |
@@ -110,7 +124,7 @@ public void startWrite(RecordReceiver recordReceiver, Configuration writerSliceC |
110 | 124 | try { |
111 | 125 |
|
112 | 126 | this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, |
113 | | - StringUtils.join(this.columns, ",")); |
| 127 | + constructColumnNameList(this.columns)); |
114 | 128 | for (int i = 0; i < numProcessor; i++) { |
115 | 129 | cs.submit(new CopyProcessor(this, this.columnNumber, resultSetMetaData, recordQueue, dataQueue)); |
116 | 130 | } |
|
0 commit comments