Skip to content

Commit 8b52498

Browse files
authored
Merge pull request #1874 from steve-community/1873-data-import-and-export
add impl for data export and imports
2 parents d5fede6 + a6837f6 commit 8b52498

File tree

7 files changed

+612
-0
lines changed

7 files changed

+612
-0
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* SteVe - SteckdosenVerwaltung - https://github.com/steve-community/steve
3+
* Copyright (C) 2013-2025 SteVe Community Team
4+
* All Rights Reserved.
5+
*
6+
* This program is free software: you can redistribute it and/or modify
7+
* it under the terms of the GNU General Public License as published by
8+
* the Free Software Foundation, either version 3 of the License, or
9+
* (at your option) any later version.
10+
*
11+
* This program is distributed in the hope that it will be useful,
12+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
* GNU General Public License for more details.
15+
*
16+
* You should have received a copy of the GNU General Public License
17+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
18+
*/
19+
package de.rwth.idsg.steve.repository;
20+
21+
import org.jooq.Table;
22+
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.io.Writer;
26+
27+
/**
28+
* @author Sevket Goekay <sevketgokay@gmail.com>
29+
* @since 20.11.2025
30+
*/
31+
public interface DataImportExportRepository {
32+
33+
void exportCsv(Writer writer, Table<?> table) throws IOException;
34+
35+
void beforeImport();
36+
void afterImport();
37+
void importCsv(InputStream in, Table<?> table);
38+
}
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/*
2+
* SteVe - SteckdosenVerwaltung - https://github.com/steve-community/steve
3+
* Copyright (C) 2013-2025 SteVe Community Team
4+
* All Rights Reserved.
5+
*
6+
* This program is free software: you can redistribute it and/or modify
7+
* it under the terms of the GNU General Public License as published by
8+
* the Free Software Foundation, either version 3 of the License, or
9+
* (at your option) any later version.
10+
*
11+
* This program is distributed in the hope that it will be useful,
12+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
* GNU General Public License for more details.
15+
*
16+
* You should have received a copy of the GNU General Public License
17+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
18+
*/
19+
package de.rwth.idsg.steve.repository.impl;
20+
21+
import de.rwth.idsg.steve.repository.DataImportExportRepository;
22+
import lombok.RequiredArgsConstructor;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.commons.lang3.StringUtils;
25+
import org.jooq.CSVFormat;
26+
import org.jooq.Converter;
27+
import org.jooq.Cursor;
28+
import org.jooq.DSLContext;
29+
import org.jooq.Field;
30+
import org.jooq.LoaderError;
31+
import org.jooq.SQLDialect;
32+
import org.jooq.Table;
33+
import org.jooq.TableField;
34+
import org.jooq.impl.AbstractConverter;
35+
import org.jooq.impl.DSL;
36+
import org.jooq.impl.SQLDataType;
37+
import org.springframework.stereotype.Repository;
38+
import org.springframework.util.CollectionUtils;
39+
40+
import java.io.IOException;
41+
import java.io.InputStream;
42+
import java.io.Writer;
43+
import java.nio.charset.StandardCharsets;
44+
import java.sql.Timestamp;
45+
import java.time.Instant;
46+
import java.util.Arrays;
47+
import java.util.List;
48+
49+
/**
50+
* @author Sevket Goekay <sevketgokay@gmail.com>
51+
* @since 20.11.2025
52+
*/
53+
@Slf4j
54+
@Repository
55+
@RequiredArgsConstructor
56+
public class DataImportExportRepositoryImpl implements DataImportExportRepository {
57+
58+
private static final int BATCH_SIZE = 1000;
59+
60+
private final DSLContext ctx;
61+
62+
private final CSVFormat csvFormatWithHeader = new CSVFormat().nullString("");
63+
private final CSVFormat csvFormatNoHeader = csvFormatWithHeader.header(false);
64+
private final Converter<String, Timestamp> isoTimestampConverter = new IsoTimestampConverter();
65+
66+
/**
67+
* DateTime will be exported via its toString method in the else-block of {@link org.jooq.impl.AbstractResult#format0(Object, boolean, boolean)}
68+
* because nothing else matches. The serialized values will be ISO8601 format.
69+
*/
70+
@Override
71+
public void exportCsv(Writer writer, Table<?> table) throws IOException {
72+
// write header line
73+
{
74+
// the csv has two lines: header line and another line for the empty record with empty cells
75+
String csv = ctx.newRecord(table).formatCSV(csvFormatWithHeader);
76+
// get only the first header line
77+
String header = csv.split(csvFormatWithHeader.newline())[0];
78+
writer.write(header);
79+
writer.write(csvFormatNoHeader.newline());
80+
}
81+
82+
try (Cursor<?> cursor = ctx.selectFrom(table).fetchSize(BATCH_SIZE).fetchLazy()) {
83+
while (cursor.hasNext()) {
84+
var row = cursor.fetchNext();
85+
if (row != null) {
86+
row.formatCSV(writer, csvFormatNoHeader);
87+
}
88+
}
89+
}
90+
}
91+
92+
@Override
93+
public void beforeImport() {
94+
SQLDialect dialectFamily = ctx.configuration().dialect().family();
95+
96+
boolean isOk = dialectFamily == SQLDialect.MYSQL || dialectFamily == SQLDialect.MARIADB;
97+
if (!isOk) {
98+
throw new IllegalStateException("Unsupported dialect " + dialectFamily);
99+
}
100+
101+
ctx.execute("set foreign_key_checks=0");
102+
}
103+
104+
@Override
105+
public void afterImport() {
106+
ctx.execute("set foreign_key_checks=1");
107+
}
108+
109+
/**
110+
* https://www.jooq.org/doc/latest/manual/sql-execution/importing/importing-api/
111+
* https://www.jooq.org/doc/latest/manual/sql-execution/importing/importing-sources/importing-source-csv/
112+
* https://www.jooq.org/doc/latest/manual/sql-execution/importing/importing-options/importing-option-throttling/
113+
*/
114+
@Override
115+
public void importCsv(InputStream in, Table<?> table) {
116+
ctx.transaction(configuration -> {
117+
DSLContext ctx = DSL.using(configuration);
118+
119+
ctx.deleteFrom(table).execute();
120+
121+
var loader = ctx.loadInto(table)
122+
.bulkAfter(BATCH_SIZE) // Put up to X rows in a single bulk statement.
123+
.batchAfter(BATCH_SIZE) // Put up to X statements (bulk or not) in a single statement batch.
124+
.loadCSV(in, StandardCharsets.UTF_8)
125+
.fields(getTableFields(table))
126+
.nullString("")
127+
.execute();
128+
129+
if (!CollectionUtils.isEmpty(loader.errors())) {
130+
for (LoaderError error : loader.errors()) {
131+
log.error("Exception happened", error.exception());
132+
}
133+
throw new RuntimeException("There were errors loading data into table " + table.getName());
134+
}
135+
136+
int processed = loader.processed();
137+
138+
log.info("Imported '{}' with processedRows={}, storedRows={}, errorCount={}, errorMessages={}",
139+
table.getName(),
140+
loader.processed(),
141+
loader.stored(),
142+
loader.errors().size(),
143+
loader.errors().stream().map(it -> it.exception().getMessage()).toList()
144+
);
145+
146+
// Update the sequence/auto-increment after bulk insert
147+
if (processed > 0) {
148+
resetAutoIncrement(ctx, table);
149+
}
150+
});
151+
}
152+
153+
/**
154+
* Reset the auto-increment/sequence for a table to the max ID value + 1
155+
*/
156+
private static void resetAutoIncrement(DSLContext ctx, Table<?> table) {
157+
var primaryKey = table.getPrimaryKey();
158+
if (primaryKey == null) {
159+
return;
160+
}
161+
162+
var primaryKeyFields = primaryKey.getFields();
163+
if (primaryKeyFields.isEmpty()) {
164+
return;
165+
} else if (primaryKeyFields.size() > 1) {
166+
log.warn("Found more than one PK for table {}", table.getName());
167+
return;
168+
}
169+
170+
// Get the primary key field (assuming it's the auto-increment field)
171+
TableField<?, ?> pkField = primaryKeyFields.get(0);
172+
173+
// Get the maximum ID value from the table
174+
Object maxId = ctx.select(DSL.max(pkField))
175+
.from(table)
176+
.fetchOne(0);
177+
178+
if (maxId == null) {
179+
return; // Table is empty
180+
}
181+
182+
if (!(maxId instanceof Number)) {
183+
log.debug("Nothing to auto-increment: PK '{}' for table '{}' is not a number, skipping", pkField.getName(), table.getName());
184+
return;
185+
}
186+
187+
long nextVal = ((Number) maxId).longValue() + 1;
188+
189+
ctx.execute(DSL.sql("ALTER TABLE {0} AUTO_INCREMENT = {1}", table, DSL.val(nextVal)));
190+
}
191+
192+
// -------------------------------------------------------------------------
193+
// Loader API cannot import temporal values in ISO8601 UTC format into a
194+
// Timestamp. More context: https://groups.google.com/g/jooq-user/c/VzZdIT7Xdnc
195+
//
196+
// Because of this, we are overriding the default converter of TIMESTAMP
197+
// table fields during the import.
198+
// -------------------------------------------------------------------------
199+
200+
private List<Field<?>> getTableFields(Table<?> table) {
201+
return Arrays.stream(table.fields())
202+
.map(it -> {
203+
if (it.getDataType().isTimestamp()) {
204+
return DSL.field(it.getName(), SQLDataType.VARCHAR(50)).convert(isoTimestampConverter);
205+
} else {
206+
return it;
207+
}
208+
}).toList();
209+
}
210+
211+
private static class IsoTimestampConverter extends AbstractConverter<String, Timestamp> {
212+
213+
private IsoTimestampConverter() {
214+
super(String.class, Timestamp.class);
215+
}
216+
217+
@Override
218+
public Timestamp from(String str) {
219+
if (StringUtils.isEmpty(str)) {
220+
return null;
221+
}
222+
return Timestamp.from(Instant.parse(str));
223+
}
224+
225+
@Override
226+
public String to(Timestamp ts) {
227+
return ts == null ? null : ts.toString();
228+
}
229+
}
230+
231+
}

0 commit comments

Comments
 (0)