@@ -112,44 +112,48 @@ public void afterImport() {
112112 * https://www.jooq.org/doc/latest/manual/sql-execution/importing/importing-options/importing-option-throttling/
113113 */
114114 @ Override
115- public void importCsv (InputStream in , Table <?> table ) throws IOException {
116- ctx .deleteFrom (table ).execute ();
117-
118- var loader = ctx .loadInto (table )
119- .bulkAfter (BATCH_SIZE ) // Put up to X rows in a single bulk statement.
120- .batchAfter (BATCH_SIZE ) // Put up to X statements (bulk or not) in a single statement batch.
121- .loadCSV (in , StandardCharsets .UTF_8 )
122- .fields (getTableFields (table ))
123- .nullString ("" )
124- .execute ();
125-
126- if (!CollectionUtils .isEmpty (loader .errors ())) {
127- for (LoaderError error : loader .errors ()) {
128- log .error ("Exception happened" , error .exception ());
115+ public void importCsv (InputStream in , Table <?> table ) {
116+ ctx .transaction (configuration -> {
117+ DSLContext ctx = DSL .using (configuration );
118+
119+ ctx .deleteFrom (table ).execute ();
120+
121+ var loader = ctx .loadInto (table )
122+ .bulkAfter (BATCH_SIZE ) // Put up to X rows in a single bulk statement.
123+ .batchAfter (BATCH_SIZE ) // Put up to X statements (bulk or not) in a single statement batch.
124+ .loadCSV (in , StandardCharsets .UTF_8 )
125+ .fields (getTableFields (table ))
126+ .nullString ("" )
127+ .execute ();
128+
129+ if (!CollectionUtils .isEmpty (loader .errors ())) {
130+ for (LoaderError error : loader .errors ()) {
131+ log .error ("Exception happened" , error .exception ());
132+ }
133+ throw new RuntimeException ("There were errors loading data into table " + table .getName ());
129134 }
130- throw new RuntimeException ("There were errors loading data into table " + table .getName ());
131- }
132135
133- int processed = loader .processed ();
136+ int processed = loader .processed ();
134137
135- log .info ("Imported '{}' with processedRows={}, storedRows={}, errorCount={}, errorMessages={}" ,
136- table .getName (),
137- loader .processed (),
138- loader .stored (),
139- loader .errors ().size (),
140- loader .errors ().stream ().map (it -> it .exception ().getMessage ()).toList ()
141- );
138+ log .info ("Imported '{}' with processedRows={}, storedRows={}, errorCount={}, errorMessages={}" ,
139+ table .getName (),
140+ loader .processed (),
141+ loader .stored (),
142+ loader .errors ().size (),
143+ loader .errors ().stream ().map (it -> it .exception ().getMessage ()).toList ()
144+ );
142145
143- // Update the sequence/auto-increment after bulk insert
144- if (processed > 0 ) {
145- resetAutoIncrement (table );
146- }
146+ // Update the sequence/auto-increment after bulk insert
147+ if (processed > 0 ) {
148+ resetAutoIncrement (ctx , table );
149+ }
150+ });
147151 }
148152
149153 /**
150154 * Reset the auto-increment/sequence for a table to the max ID value + 1
151155 */
152- private void resetAutoIncrement (Table <?> table ) {
156+ private static void resetAutoIncrement (DSLContext ctx , Table <?> table ) {
153157 var primaryKey = table .getPrimaryKey ();
154158 if (primaryKey == null ) {
155159 return ;
0 commit comments