|
1 | 1 | (ns scalardb.core |
2 | | - (:require [cassandra.core :as cassandra] |
3 | | - [cheshire.core :as cheshire] |
4 | | - [clojure.string :as string] |
| 2 | + (:require [cheshire.core :as cheshire] |
5 | 3 | [clojure.tools.logging :refer [info warn]] |
6 | 4 | [jepsen.checker :as checker] |
7 | 5 | [jepsen.independent :as independent] |
|
30 | 28 | [r] |
31 | 29 | (Thread/sleep (compute-exponential-backoff r))) |
32 | 30 |
|
33 | | -(defn- get-cassandra-schema |
34 | | - "Only the current test schemata are covered |
35 | | - because this is just a workaround for the schema loader issue." |
36 | | - [schema] |
37 | | - (assert (= (count schema) 1) "The schema should have only 1 entry") |
38 | | - (let [keyspace-table (-> schema keys first name) |
39 | | - schema (-> schema vals first) |
40 | | - [keyspace table] (string/split keyspace-table #"\.") |
41 | | - partition-key (mapv keyword (:partition-key schema)) |
42 | | - clustering-key (mapv keyword (:clustering-key schema)) |
43 | | - columns (assoc (reduce |
44 | | - (fn [r [k t]] |
45 | | - (let [val-type (-> t string/lower-case keyword) |
46 | | - result (assoc r k val-type)] |
47 | | - (if (or (.contains partition-key k) |
48 | | - (.contains clustering-key k)) |
49 | | - result |
50 | | - (assoc result |
51 | | - (->> k name (str "before_") keyword) |
52 | | - val-type)))) |
53 | | - {} |
54 | | - (:columns schema)) |
55 | | - :tx_id :text |
56 | | - :tx_version :int |
57 | | - :tx_state :int |
58 | | - :tx_prepared_at :bigint |
59 | | - :tx_committed_at :bigint |
60 | | - :before_tx_id :text |
61 | | - :before_tx_version :int |
62 | | - :before_tx_state :int |
63 | | - :before_tx_prepared_at :bigint |
64 | | - :before_tx_committed_at :bigint |
65 | | - :primary-key (into partition-key clustering-key))] |
66 | | - {:keyspace keyspace |
67 | | - :table table |
68 | | - :schema columns})) |
69 | | - |
70 | | -(defn- setup-cassandra-tables |
71 | | - [test schemata] |
72 | | - (let [[cluster session] (cassandra/open-cassandra test) |
73 | | - schemata (map get-cassandra-schema schemata)] |
74 | | - (doseq [schema schemata] |
75 | | - (cassandra/create-my-keyspace session test schema) |
76 | | - (cassandra/create-my-table session schema)) |
77 | | - (cassandra/create-my-keyspace session test {:keyspace "coordinator"}) |
78 | | - (cassandra/create-my-table session {:keyspace "coordinator" |
79 | | - :table "state" |
80 | | - :schema {:tx_id :text |
81 | | - :tx_child_ids :text |
82 | | - :tx_state :int |
83 | | - :tx_created_at :bigint |
84 | | - :primary-key [:tx_id]}}) |
85 | | - (cassandra/close-cassandra cluster session))) |
86 | | - |
87 | 31 | (defn setup-transaction-tables |
88 | 32 | [test schemata] |
89 | 33 | (let [properties (ext/create-properties (:db test) test) |
90 | 34 | options (ext/create-table-opts (:db test) test)] |
91 | | - (if (= (ext/get-db-type (:db test)) :cassandra) |
92 | | - ;; Workaround the issue of the schema loader for Cassandra |
93 | | - (setup-cassandra-tables test schemata) |
94 | | - (doseq [schema (map cheshire/generate-string schemata)] |
95 | | - (loop [retries RETRIES] |
96 | | - (when (zero? retries) |
97 | | - (throw (ex-info "Failed to set up tables" {:schema schema}))) |
98 | | - (when (< retries RETRIES) |
99 | | - (exponential-backoff (- RETRIES retries)) |
100 | | - (try |
101 | | - (SchemaLoader/unload properties schema true) |
102 | | - (catch Exception e (warn (.getMessage e)))) |
103 | | - (exponential-backoff (- RETRIES retries))) |
104 | | - (let [result (try |
105 | | - (SchemaLoader/load properties schema options true) |
106 | | - :success |
107 | | - (catch Exception e |
108 | | - (warn (.getMessage e)) |
109 | | - :fail))] |
110 | | - (when (= result :fail) |
111 | | - (recur (dec retries))))))))) |
| 35 | + (doseq [schema (map cheshire/generate-string schemata)] |
| 36 | + (loop [retries RETRIES] |
| 37 | + (when (zero? retries) |
| 38 | + (throw (ex-info "Failed to set up tables" {:schema schema}))) |
| 39 | + (when (< retries RETRIES) |
| 40 | + (exponential-backoff (- RETRIES retries)) |
| 41 | + (try |
| 42 | + (SchemaLoader/repairAll properties schema options true) |
| 43 | + (catch Exception e (warn (.getMessage e)))) |
| 44 | + (exponential-backoff (- RETRIES retries))) |
| 45 | + (let [result (try |
| 46 | + (SchemaLoader/load properties schema options true) |
| 47 | + :success |
| 48 | + (catch Exception e |
| 49 | + (warn (.getMessage e)) |
| 50 | + :fail))] |
| 51 | + (when (= result :fail) |
| 52 | + (recur (dec retries)))))))) |
112 | 53 |
|
113 | 54 | (defn- close-storage! |
114 | 55 | [test] |
|
0 commit comments