Skip to content

Commit 541e4c3

Browse files
committed
refactor logic to check copy error. Report copy error first
1 parent 350d451 commit 541e4c3

File tree

1 file changed

+17
-16
lines changed
  • gpdbwriter/src/main/java/cn/hashdata/datax/plugin/writer/gpdbwriter

1 file changed

+17
-16
lines changed

gpdbwriter/src/main/java/cn/hashdata/datax/plugin/writer/gpdbwriter/CopyWorker.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ public Long call() throws Exception {
5555
try {
5656
CopyManager mgr = new CopyManager((BaseConnection) connection);
5757
return mgr.copyIn(sql, pipeIn);
58-
} catch (PSQLException e) {
59-
throw new IOException("无法向目标表写入数据: " + e.getMessage());
6058
} finally {
6159
try {
6260
pipeIn.close();
@@ -72,18 +70,6 @@ public Long call() throws Exception {
7270
copyBackendThread.start();
7371
}
7472

75-
public void write(byte[] record) throws InterruptedException {
76-
queue.put(record);
77-
78-
if (copyResult.isDone()) {
79-
try {
80-
copyResult.get();
81-
} catch (ExecutionException e) {
82-
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
83-
}
84-
}
85-
}
86-
8773
@Override
8874
public Long call() throws Exception {
8975
Thread.currentThread().setName("CopyWorker");
@@ -104,8 +90,6 @@ public Long call() throws Exception {
10490

10591
pipeOut.flush();
10692
pipeOut.close();
107-
Long count = copyResult.get();
108-
return count;
10993
} catch (Exception e) {
11094
try {
11195
((BaseConnection) connection).cancelQuery();
@@ -118,6 +102,16 @@ public Long call() throws Exception {
118102
} catch (SecurityException ignore) {
119103
}
120104

105+
try {
106+
copyResult.get();
107+
} catch (ExecutionException exec) {
108+
if (exec.getCause() instanceof PSQLException) {
109+
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, exec.getCause());
110+
}
111+
// ignore others
112+
} catch (Exception ignore) {
113+
}
114+
121115
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
122116
} finally {
123117
try {
@@ -134,6 +128,13 @@ public Long call() throws Exception {
134128

135129
DBUtil.closeDBResources(null, null, connection);
136130
}
131+
132+
try {
133+
Long count = copyResult.get();
134+
return count;
135+
} catch (Exception e) {
136+
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
137+
}
137138
}
138139

139140
private void changeCsvSizelimit(Connection conn) {

0 commit comments

Comments
 (0)