Skip to content

Commit 9343f80

Browse files
authored
Update exception handling for workloads (#158)
1 parent 800362d commit 9343f80

13 files changed

+247
-194
lines changed

scalardb/src/scalardb/core.clj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,8 @@
171171
(defn rollback-txs
172172
"Given transactions as a vector are rollbacked."
173173
[txs]
174-
(doseq [tx txs] (.rollback tx)))
174+
(doseq [tx txs] (try (.rollback tx)
175+
(catch Exception e (warn (.getMessage e))))))
175176

176177
(defmacro with-retry
177178
"If the result of the body is nil, it retries it"

scalardb/src/scalardb/elle_append.clj

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -97,23 +97,26 @@
9797
(scalar/prepare-transaction-service! test))))
9898

9999
(invoke! [_ test op]
100-
(let [tx (try (scalar/start-transaction test)
101-
(catch Exception e
102-
(warn (.getMessage e))))
103-
[seq-id txn] (:value op)]
104-
(try
105-
(when (<= @(:table-id test) seq-id)
106-
;; add tables for the next sequence
107-
(add-tables test (inc seq-id)))
108-
(let [txn' (mapv (partial tx-execute seq-id tx) txn)]
109-
(.commit tx)
110-
(assoc op :type :ok :value (independent/tuple seq-id txn')))
111-
(catch UnknownTransactionStatusException _
112-
(swap! (:unknown-tx test) conj (.getId tx))
113-
(assoc op :type :info :error {:unknown-tx-status (.getId tx)}))
114-
(catch Exception e
115-
(scalar/try-reconnection! test scalar/prepare-transaction-service!)
116-
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))
100+
(if-let [tx (try (scalar/start-transaction test)
101+
(catch Exception e (warn (.getMessage e))))]
102+
(let [[seq-id txn] (:value op)]
103+
(try
104+
(when (<= @(:table-id test) seq-id)
105+
;; add tables for the next sequence
106+
(add-tables test (inc seq-id)))
107+
(let [txn' (mapv (partial tx-execute seq-id tx) txn)]
108+
(.commit tx)
109+
(assoc op :type :ok :value (independent/tuple seq-id txn')))
110+
(catch UnknownTransactionStatusException _
111+
(swap! (:unknown-tx test) conj (.getId tx))
112+
(assoc op :type :info :error {:unknown-tx-status (.getId tx)}))
113+
(catch Exception e
114+
(scalar/rollback-txs [tx])
115+
(scalar/try-reconnection! test scalar/prepare-transaction-service!)
116+
(assoc op :type :fail :error {:crud-error (.getMessage e)}))))
117+
(do
118+
(scalar/try-reconnection! test scalar/prepare-transaction-service!)
119+
(assoc op :type :fail :error {:tx-error "starting tx failed"}))))
117120

118121
(close! [_ _])
119122

scalardb/src/scalardb/elle_append_2pc.clj

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,30 @@
4141
(let [tx1 (try (scalar/start-2pc test)
4242
(catch Exception e
4343
(warn (.getMessage e))))
44-
tx2 (try (scalar/join-2pc test (.getId tx1))
45-
(catch Exception e
46-
(warn (.getMessage e))))
47-
[seq-id txn] (:value op)]
48-
(try
49-
(when (<= @(:table-id test) seq-id)
50-
;; add tables for the next sequence
51-
(append/add-tables test (inc seq-id)))
52-
(let [txn' (mapv (partial tx-execute seq-id tx1 tx2) txn)]
53-
(scalar/prepare-validate-commit-txs [tx1 tx2])
54-
(assoc op :type :ok :value (independent/tuple seq-id txn')))
55-
(catch UnknownTransactionStatusException _
56-
(swap! (:unknown-tx test) conj (.getId tx1))
57-
(assoc op :type :info :error {:unknown-tx-status (.getId tx1)}))
58-
(catch Exception e
59-
(scalar/rollback-txs [tx1 tx2])
44+
tx2 (when tx1
45+
(try (scalar/join-2pc test (.getId tx1))
46+
(catch Exception e
47+
(warn (.getMessage e)))))]
48+
(if (and tx1 tx2)
49+
(let [[seq-id txn] (:value op)]
50+
(try
51+
(when (<= @(:table-id test) seq-id)
52+
;; add tables for the next sequence
53+
(append/add-tables test (inc seq-id)))
54+
(let [txn' (mapv (partial tx-execute seq-id tx1 tx2) txn)]
55+
(scalar/prepare-validate-commit-txs [tx1 tx2])
56+
(assoc op :type :ok :value (independent/tuple seq-id txn')))
57+
(catch UnknownTransactionStatusException _
58+
(swap! (:unknown-tx test) conj (.getId tx1))
59+
(assoc op :type :info :error {:unknown-tx-status (.getId tx1)}))
60+
(catch Exception e
61+
(scalar/rollback-txs [tx1 tx2])
62+
(scalar/try-reconnection! test scalar/prepare-2pc-service!)
63+
(assoc op :type :fail :error {:crud-error (.getMessage e)}))))
64+
(do
65+
(when tx1 (scalar/rollback-txs [tx1]))
6066
(scalar/try-reconnection! test scalar/prepare-2pc-service!)
61-
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))
67+
(assoc op :type :fail :error {:tx-error "starting tx failed"})))))
6268

6369
(close! [_ _])
6470

scalardb/src/scalardb/elle_write_read.clj

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
(ns scalardb.elle-write-read
2-
(:require [clojure.tools.logging :refer [info]]
2+
(:require [clojure.tools.logging :refer [info warn]]
33
[jepsen.client :as client]
44
[jepsen.generator :as gen]
55
[jepsen.independent :as independent]
@@ -92,21 +92,26 @@
9292
(scalar/prepare-transaction-service! test))))
9393

9494
(invoke! [_ test op]
95-
(let [tx (scalar/start-transaction test)
96-
[seq-id txn] (:value op)]
97-
(try
98-
(when (<= @(:table-id test) seq-id)
99-
;; add tables for the next sequence
100-
(add-tables test (inc seq-id)))
101-
(let [txn' (mapv (partial tx-execute seq-id tx) txn)]
102-
(.commit tx)
103-
(assoc op :type :ok :value (independent/tuple seq-id txn')))
104-
(catch UnknownTransactionStatusException _
105-
(swap! (:unknown-tx test) conj (.getId tx))
106-
(assoc op :type :info :error {:unknown-tx-status (.getId tx)}))
107-
(catch Exception e
108-
(scalar/try-reconnection! test scalar/prepare-transaction-service!)
109-
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))
95+
(if-let [tx (try (scalar/start-transaction test)
96+
(catch Exception e (warn (.getMessage e))))]
97+
(let [[seq-id txn] (:value op)]
98+
(try
99+
(when (<= @(:table-id test) seq-id)
100+
;; add tables for the next sequence
101+
(add-tables test (inc seq-id)))
102+
(let [txn' (mapv (partial tx-execute seq-id tx) txn)]
103+
(.commit tx)
104+
(assoc op :type :ok :value (independent/tuple seq-id txn')))
105+
(catch UnknownTransactionStatusException _
106+
(swap! (:unknown-tx test) conj (.getId tx))
107+
(assoc op :type :info :error {:unknown-tx-status (.getId tx)}))
108+
(catch Exception e
109+
(scalar/rollback-txs [tx])
110+
(scalar/try-reconnection! test scalar/prepare-transaction-service!)
111+
(assoc op :type :fail :error {:crud-error (.getMessage e)}))))
112+
(do
113+
(scalar/try-reconnection! test scalar/prepare-transaction-service!)
114+
(assoc op :type :fail :error {:tx-error "starting tx failed"}))))
110115

111116
(close! [_ _])
112117

scalardb/src/scalardb/elle_write_read_2pc.clj

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
(ns scalardb.elle-write-read-2pc
2-
(:require [jepsen.client :as client]
2+
(:require [clojure.tools.logging :refer [warn]]
3+
[jepsen.client :as client]
34
[jepsen.generator :as gen]
45
[jepsen.independent :as independent]
56
[scalardb.core :as scalar :refer [DEFAULT_TABLE_COUNT]]
@@ -29,23 +30,33 @@
2930
(scalar/prepare-2pc-service! test))))
3031

3132
(invoke! [_ test op]
32-
(let [tx1 (scalar/start-2pc test)
33-
tx2 (scalar/join-2pc test (.getId tx1))
34-
[seq-id txn] (:value op)]
35-
(try
36-
(when (<= @(:table-id test) seq-id)
37-
;; add tables for the next sequence
38-
(wr/add-tables test (inc seq-id)))
39-
(let [txn' (mapv (partial tx-execute seq-id tx1 tx2) txn)]
40-
(scalar/prepare-validate-commit-txs [tx1 tx2])
41-
(assoc op :type :ok :value (independent/tuple seq-id txn')))
42-
(catch UnknownTransactionStatusException _
43-
(swap! (:unknown-tx test) conj (.getId tx1))
44-
(assoc op :type :info :error {:unknown-tx-status (.getId tx1)}))
45-
(catch Exception e
46-
(scalar/rollback-txs [tx1 tx2])
33+
(let [tx1 (try (scalar/start-2pc test)
34+
(catch Exception e
35+
(warn (.getMessage e))))
36+
tx2 (when tx1
37+
(try (scalar/join-2pc test (.getId tx1))
38+
(catch Exception e
39+
(warn (.getMessage e)))))]
40+
(if (and tx1 tx2)
41+
(let [[seq-id txn] (:value op)]
42+
(try
43+
(when (<= @(:table-id test) seq-id)
44+
;; add tables for the next sequence
45+
(wr/add-tables test (inc seq-id)))
46+
(let [txn' (mapv (partial tx-execute seq-id tx1 tx2) txn)]
47+
(scalar/prepare-validate-commit-txs [tx1 tx2])
48+
(assoc op :type :ok :value (independent/tuple seq-id txn')))
49+
(catch UnknownTransactionStatusException _
50+
(swap! (:unknown-tx test) conj (.getId tx1))
51+
(assoc op :type :info :error {:unknown-tx-status (.getId tx1)}))
52+
(catch Exception e
53+
(scalar/rollback-txs [tx1 tx2])
54+
(scalar/try-reconnection! test scalar/prepare-2pc-service!)
55+
(assoc op :type :fail :error {:crud-error (.getMessage e)}))))
56+
(do
57+
(when tx1 (scalar/rollback-txs [tx1]))
4758
(scalar/try-reconnection! test scalar/prepare-2pc-service!)
48-
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))
59+
(assoc op :type :fail :error {:tx-error "starting tx failed"})))))
4960

5061
(close! [_ _])
5162

scalardb/src/scalardb/transfer.clj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
:unknown-tx-status)
114114
(catch Exception e
115115
(warn (.getMessage e))
116+
(scalar/rollback-txs [tx])
116117
:fail))
117118
:start-fail))
118119

scalardb/src/scalardb/transfer_2pc.clj

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,43 +10,43 @@
1010

1111
(defn- tx-transfer
1212
[tx1 tx2 from to amount]
13-
(try
14-
(infof "Transferring %d from %d to %d (tx: %s)" amount from to (.getId tx1))
15-
(let [fromResult (.get tx1 (transfer/prepare-get from))
16-
toResult (.get tx2 (transfer/prepare-get to))]
17-
(infof "fromID: %d, the balance: %d, the version: %d (tx: %s)" from (transfer/get-balance fromResult) (transfer/get-version fromResult) (.getId tx1))
18-
(->> (transfer/calc-new-balance fromResult (- amount))
19-
(transfer/prepare-put from)
20-
(.put tx1))
21-
(infof "toID: %d, the balance: %d, the version: %d (tx: %s)" to (transfer/get-balance toResult) (transfer/get-version toResult) (.getId tx1))
22-
(->> (transfer/calc-new-balance toResult amount)
23-
(transfer/prepare-put to)
24-
(.put tx2)))
25-
(scalar/prepare-validate-commit-txs [tx1 tx2])
26-
(catch UnknownTransactionStatusException e
27-
(throw e))
28-
(catch Exception e
29-
(scalar/rollback-txs [tx1 tx2])
30-
(throw e))))
13+
(infof "Transferring %d from %d to %d (tx: %s)" amount from to (.getId tx1))
14+
(let [fromResult (.get tx1 (transfer/prepare-get from))
15+
toResult (.get tx2 (transfer/prepare-get to))]
16+
(infof "fromID: %d, the balance: %d, the version: %d (tx: %s)" from (transfer/get-balance fromResult) (transfer/get-version fromResult) (.getId tx1))
17+
(->> (transfer/calc-new-balance fromResult (- amount))
18+
(transfer/prepare-put from)
19+
(.put tx1))
20+
(infof "toID: %d, the balance: %d, the version: %d (tx: %s)" to (transfer/get-balance toResult) (transfer/get-version toResult) (.getId tx1))
21+
(->> (transfer/calc-new-balance toResult amount)
22+
(transfer/prepare-put to)
23+
(.put tx2)))
24+
(scalar/prepare-validate-commit-txs [tx1 tx2]))
3125

3226
(defn- try-tx-transfer
3327
[test {:keys [from to amount]}]
3428
(let [tx1 (try (scalar/start-2pc test)
3529
(catch Exception e
3630
(warn (.getMessage e))))
37-
tx2 (try (scalar/join-2pc test (.getId tx1))
38-
(catch Exception e
39-
(warn (.getMessage e))))]
40-
(try
41-
(tx-transfer tx1 tx2 from to amount)
42-
:commit
43-
(catch UnknownTransactionStatusException _
44-
(swap! (:unknown-tx test) conj (.getId tx1))
45-
(warn "Unknown transaction: " (.getId tx1))
46-
:unknown-tx-status)
47-
(catch Exception e
48-
(warn (.getMessage e))
49-
:fail))))
31+
tx2 (when tx1
32+
(try (scalar/join-2pc test (.getId tx1))
33+
(catch Exception e
34+
(warn (.getMessage e)))))]
35+
(if (and tx1 tx2)
36+
(try
37+
(tx-transfer tx1 tx2 from to amount)
38+
:commit
39+
(catch UnknownTransactionStatusException _
40+
(swap! (:unknown-tx test) conj (.getId tx1))
41+
(warn "Unknown transaction: " (.getId tx1))
42+
:unknown-tx-status)
43+
(catch Exception e
44+
(warn (.getMessage e))
45+
(scalar/rollback-txs [tx1 tx2])
46+
:fail))
47+
(do
48+
(when tx1 (scalar/rollback-txs [tx1]))
49+
:start-fail))))
5050

5151
(defrecord TransferClient [initialized? n initial-balance max-txs]
5252
client/Client

scalardb/src/scalardb/transfer_append.clj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@
127127
:unknown-tx-status)
128128
(catch Exception e
129129
(warn (.getMessage e))
130+
(scalar/rollback-txs [tx])
130131
:fail))
131132
:start-fail))
132133

scalardb/src/scalardb/transfer_append_2pc.clj

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,48 +12,49 @@
1212

1313
(defn- tx-transfer
1414
[tx1 tx2 from to amount]
15-
(try
16-
(infof "Transferring %d from %d to %d (tx: %s)" amount from to (.getId tx1))
17-
(let [^Result from-result
18-
(t-append/scan-for-latest tx1 (t-append/prepare-scan-for-latest from))
19-
^Result to-result
20-
(t-append/scan-for-latest tx2 (t-append/prepare-scan-for-latest to))]
21-
(infof "fromID: %d, the latest balance: %d, the latest age: %d (tx: %s)" from (t-append/get-balance from-result) (t-append/get-age from-result) (.getId tx1))
22-
(->> (t-append/prepare-put from
23-
(t-append/calc-new-age from-result)
24-
(t-append/calc-new-balance from-result
25-
(- amount)))
26-
(.put tx1))
27-
(infof "toID: %d, the latest balance: %d, the latest age: %d (tx: %s)" to (t-append/get-balance to-result) (t-append/get-age to-result) (.getId tx1))
28-
(->> (t-append/prepare-put to
29-
(t-append/calc-new-age to-result)
30-
(t-append/calc-new-balance to-result amount))
31-
(.put tx2)))
32-
(scalar/prepare-validate-commit-txs [tx1 tx2])
33-
(catch UnknownTransactionStatusException e
34-
(throw e))
35-
(catch Exception e
36-
(scalar/rollback-txs [tx1 tx2])
37-
(throw e))))
15+
(infof "Transferring %d from %d to %d (tx: %s)" amount from to (.getId tx1))
16+
(let [^Result from-result
17+
(t-append/scan-for-latest tx1 (t-append/prepare-scan-for-latest from))
18+
^Result to-result
19+
(t-append/scan-for-latest tx2 (t-append/prepare-scan-for-latest to))]
20+
(infof "fromID: %d, the latest balance: %d, the latest age: %d (tx: %s)" from (t-append/get-balance from-result) (t-append/get-age from-result) (.getId tx1))
21+
(->> (t-append/prepare-put from
22+
(t-append/calc-new-age from-result)
23+
(t-append/calc-new-balance from-result
24+
(- amount)))
25+
(.put tx1))
26+
(infof "toID: %d, the latest balance: %d, the latest age: %d (tx: %s)" to (t-append/get-balance to-result) (t-append/get-age to-result) (.getId tx1))
27+
(->> (t-append/prepare-put to
28+
(t-append/calc-new-age to-result)
29+
(t-append/calc-new-balance to-result amount))
30+
(.put tx2)))
31+
(scalar/prepare-validate-commit-txs [tx1 tx2]))
3832

3933
(defn- try-tx-transfer
4034
[test {:keys [from to amount]}]
4135
(let [tx1 (try (scalar/start-2pc test)
4236
(catch Exception e
4337
(warn (.getMessage e))))
44-
tx2 (try (scalar/join-2pc test (.getId tx1))
45-
(catch Exception e
46-
(warn (.getMessage e))))]
47-
(try
48-
(tx-transfer tx1 tx2 from to amount)
49-
:commit
50-
(catch UnknownTransactionStatusException _
51-
(swap! (:unknown-tx test) conj (.getId tx1))
52-
(warn "Unknown transaction: " (.getId tx1))
53-
:unknown-tx-status)
54-
(catch Exception e
55-
(warn (.getMessage e))
56-
:fail))))
38+
tx2 (if tx1
39+
(try (scalar/join-2pc test (.getId tx1))
40+
(catch Exception e
41+
(warn (.getMessage e))))
42+
nil)]
43+
(if (and tx1 tx2)
44+
(try
45+
(tx-transfer tx1 tx2 from to amount)
46+
:commit
47+
(catch UnknownTransactionStatusException _
48+
(swap! (:unknown-tx test) conj (.getId tx1))
49+
(warn "Unknown transaction: " (.getId tx1))
50+
:unknown-tx-status)
51+
(catch Exception e
52+
(warn (.getMessage e))
53+
(scalar/rollback-txs [tx1 tx2])
54+
:fail))
55+
(do
56+
(when tx1 (scalar/rollback-txs [tx1]))
57+
:start-fail))))
5758

5859
(defrecord TransferClient [initialized? n initial-balance max-txs]
5960
client/Client

0 commit comments

Comments
 (0)