Skip to content

Commit abcdd3a

Browse files
authored
Merge pull request #119 from knix-microfunctions/release/0.9.0
Release/0.9.0
2 parents ff6184f + 4066efe commit abcdd3a

File tree

58 files changed

+5017
-660
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+5017
-660
lines changed

DataLayerService/src/main/java/org/microfunctions/data_layer/RiakAccess.java

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,10 @@ public void connect (Map<String,Integer> riakNodes) {
147147
try {
148148
Namespace bucket = new Namespace(BUCKET_TYPE_DEFAULT, MFN_KEYSPACES);
149149
StoreBucketProperties props = new StoreBucketProperties.Builder(bucket).withNVal(NUM_NODES).build();
150+
151+
long t_start = System.currentTimeMillis();
150152
client.execute(props);
153+
this.logExecutionTime("connect(bucketProperties)", System.currentTimeMillis() - t_start);
151154

152155
this.initiateBucketToTypeMapping();
153156
} catch (Exception e) {
@@ -176,6 +179,11 @@ public void close() {
176179
LOGGER.info("Riak client shutdown.");
177180
}
178181

182+
private void logExecutionTime(String commandName, long duration)
183+
{
184+
LOGGER.info(commandName + " execution time: " + duration + " ms.");
185+
}
186+
179187
private boolean detectInvalidName (String str) {
180188
if (str == null) {
181189
return false;
@@ -203,7 +211,11 @@ public boolean createKeyspace (String keyspace, Metadata metadata) {
203211
try {
204212
Namespace bucket = new Namespace(BUCKET_TYPE_DEFAULT, keyspace);
205213
StoreBucketProperties props = new StoreBucketProperties.Builder(bucket).withNVal(NUM_NODES).build();
214+
215+
long t_start = System.currentTimeMillis();
206216
client.execute(props);
217+
this.logExecutionTime("createKeyspace(bucketProperties)", System.currentTimeMillis() - t_start);
218+
207219
LOGGER.info("createKeyspace() Keyspace: " + keyspace + " Metadata: replication factor: " + Integer.toString(replicationFactor));
208220
return this.insertRow(MFN_KEYSPACES, null, keyspace, ByteBuffer.allocate(Integer.BYTES).putInt(replicationFactor));
209221
} catch (Exception e) {
@@ -394,7 +406,11 @@ private boolean createTableWithType (String keyspace, String table, String table
394406

395407
Namespace bucket = new Namespace(tableType, keyspace + ";" + table);
396408
StoreBucketProperties props = new StoreBucketProperties.Builder(bucket).withNVal(replicationFactor).withW(replicationFactor).withR(replicationFactor).withNotFoundOk(false).build();
409+
410+
long t_start = System.currentTimeMillis();
397411
client.execute(props);
412+
this.logExecutionTime("createTableWithType(bucketProperties)", System.currentTimeMillis() - t_start);
413+
398414
LOGGER.info("createTableWithType() Keyspace: " + keyspace + " Table: " + table + " TableType: " + tableType);
399415
boolean success = this.insertRow(keyspace, null, table, ByteBuffer.wrap(tableType.getBytes(StandardCharsets.UTF_8)));
400416
if (success) {
@@ -534,7 +550,11 @@ public boolean insertRow (String keyspace, String table, String key, ByteBuffer
534550
Location location = new Location(bucket, key);
535551
RiakObject object = new RiakObject().setContentType(Constants.CTYPE_OCTET_STREAM).setValue(BinaryValue.unsafeCreate(value.array()));
536552
StoreValue store = new StoreValue.Builder(object).withLocation(location).build();
553+
554+
long t_start = System.currentTimeMillis();
537555
client.execute(store);
556+
this.logExecutionTime("insertRow()", System.currentTimeMillis() - t_start);
557+
538558
return true;
539559
} catch (Exception e) {
540560
LOGGER.error("insertRow() failed. Keyspace: " + keyspace + " Table: " + table, e);
@@ -570,7 +590,10 @@ public AbstractMap.SimpleEntry<String, ByteBuffer> selectRow (String keyspace, S
570590

571591
Location location = new Location(bucket, key);
572592
FetchValue fetch = new FetchValue.Builder(location).build();
593+
594+
long t_start = System.currentTimeMillis();
573595
FetchValue.Response response = client.execute(fetch);
596+
this.logExecutionTime("selectRow()", System.currentTimeMillis() - t_start);
574597

575598
RiakObject object = response.getValue(RiakObject.class);
576599
if (object == null || object.getValue() == null) {
@@ -623,13 +646,20 @@ public boolean updateRow (String keyspace, String table, String key, ByteBuffer
623646

624647
Location location = new Location(bucket, key);
625648
FetchValue fetch = new FetchValue.Builder(location).withOption(FetchValue.Option.DELETED_VCLOCK, true).build();
649+
650+
long t_start = System.currentTimeMillis();
626651
FetchValue.Response response = client.execute(fetch);
652+
this.logExecutionTime("updateRow(fetch)", System.currentTimeMillis() - t_start);
627653

628654
RiakObject object = response.getValue(RiakObject.class);
629655
object.setValue(BinaryValue.unsafeCreate(value.array()));
630656

631657
StoreValue store = new StoreValue.Builder(object).withLocation(location).build();
658+
659+
t_start = System.currentTimeMillis();
632660
client.execute(store);
661+
this.logExecutionTime("updateRow(store)", System.currentTimeMillis() - t_start);
662+
633663
return true;
634664
} catch (Exception e) {
635665
LOGGER.error("updateRow() failed. Keyspace: " + keyspace + " Table: " + table, e);
@@ -669,7 +699,11 @@ private boolean deleteRowWithType (String keyspace, String table, String key, St
669699

670700
Location location = new Location(bucket, key);
671701
DeleteValue delete = new DeleteValue.Builder(location).build();
702+
703+
long t_start = System.currentTimeMillis();
672704
client.execute(delete);
705+
this.logExecutionTime("deleteRowWithType()", System.currentTimeMillis() - t_start);
706+
673707
return true;
674708
} catch (Exception e) {
675709
LOGGER.error("deleteRowWithType() failed. Keyspace: " + keyspace + " Table: " + table + " TableType: " + tableType, e);
@@ -708,7 +742,10 @@ private List<String> selectKeysWithType (String keyspace, String table, int star
708742
}
709743

710744
ListKeys list = new ListKeys.Builder(bucket).build();
745+
746+
long t_start = System.currentTimeMillis();
711747
ListKeys.Response response = client.execute(list);
748+
this.logExecutionTime("selectKeysWithType()", System.currentTimeMillis() - t_start);
712749

713750
List<String> keys = new ArrayList<String>();
714751
for (Location location: response) {
@@ -744,8 +781,11 @@ private List<String> selectAllKeysWithType (String keyspace, String table, Strin
744781
}
745782

746783
ListKeys list = new ListKeys.Builder(bucket).build();
747-
ListKeys.Response response = client.execute(list);
748784

785+
long t_start = System.currentTimeMillis();
786+
ListKeys.Response response = client.execute(list);
787+
this.logExecutionTime("selectAllKeysWithType()", System.currentTimeMillis() - t_start);
788+
749789
List<String> keys = new ArrayList<String>();
750790
for (Location location: response) {
751791
keys.add(location.getKeyAsString());
@@ -787,7 +827,11 @@ public AbstractMap.SimpleEntry<String, Long> getCounter (String keyspace, String
787827
Location location = new Location(bucket, counterName);
788828

789829
FetchCounter fetch = new FetchCounter.Builder(location).build();
830+
831+
long t_start = System.currentTimeMillis();
790832
FetchCounter.Response response = client.execute(fetch);
833+
this.logExecutionTime("getCounter()", System.currentTimeMillis() - t_start);
834+
791835
RiakCounter counter = response.getDatatype();
792836
Long counterValue = counter.view();
793837
return new AbstractMap.SimpleEntry<String, Long>(counterName, counterValue);
@@ -815,7 +859,11 @@ public AbstractMap.SimpleEntry<String, Long> incrementCounter (String keyspace,
815859

816860
CounterUpdate delta = new CounterUpdate(increment);
817861
UpdateCounter update = new UpdateCounter.Builder(location, delta).withReturnDatatype(true).build();
862+
863+
long t_start = System.currentTimeMillis();
818864
UpdateCounter.Response response = client.execute(update);
865+
this.logExecutionTime("incrementCounter()", System.currentTimeMillis() - t_start);
866+
819867
RiakCounter counter = response.getDatatype();
820868
Long counterValue = counter.view();
821869
return new AbstractMap.SimpleEntry<String, Long>(counterName, counterValue);
@@ -880,7 +928,11 @@ public AbstractMap.SimpleEntry<String, Set<String>> retrieveSet (String keyspace
880928
Location location = new Location(bucket, setName);
881929

882930
FetchSet fetch = new FetchSet.Builder(location).build();
931+
932+
long t_start = System.currentTimeMillis();
883933
FetchSet.Response response = client.execute(fetch);
934+
this.logExecutionTime("retrieveSet()", System.currentTimeMillis() - t_start);
935+
884936
RiakSet rSet = response.getDatatype();
885937
Set<BinaryValue> binarySet = rSet.view();
886938

@@ -907,7 +959,10 @@ public boolean addItemToSet (String keyspace, String table, String setName, Stri
907959

908960
SetUpdate item = new SetUpdate().add(setItem);
909961
UpdateSet update = new UpdateSet.Builder(location, item).build();
962+
long t_start = System.currentTimeMillis();
910963
client.execute(update);
964+
this.logExecutionTime("addItemToSet()", System.currentTimeMillis() - t_start);
965+
911966
return true;
912967
} catch (Exception e) {
913968
LOGGER.error("addItemToSet() failed. Keyspace: " + keyspace + " Table: " + table, e);
@@ -926,12 +981,20 @@ public boolean removeItemFromSet (String keyspace, String table, String setName,
926981
Location location = new Location(bucket, setName);
927982

928983
FetchSet fetch = new FetchSet.Builder(location).build();
984+
985+
long t_start = System.currentTimeMillis();
929986
FetchSet.Response response = client.execute(fetch);
987+
this.logExecutionTime("removeItemFromSet(fetch)", System.currentTimeMillis() - t_start);
988+
930989
Context context = response.getContext();
931990

932991
SetUpdate item = new SetUpdate().remove(setItem);
933992
UpdateSet update = new UpdateSet.Builder(location, item).withContext(context).build();
993+
994+
t_start = System.currentTimeMillis();
934995
client.execute(update);
996+
this.logExecutionTime("removeItemFromSet(update)", System.currentTimeMillis() - t_start);
997+
935998
return true;
936999
} catch (Exception e) {
9371000
LOGGER.error("removeItemFromSet() failed. Keyspace: " + keyspace + " Table: " + table, e);
@@ -950,7 +1013,11 @@ public boolean containsItemInSet (String keyspace, String table, String setName,
9501013
Location location = new Location(bucket, setName);
9511014

9521015
FetchSet fetch = new FetchSet.Builder(location).build();
1016+
1017+
long t_start = System.currentTimeMillis();
9531018
FetchSet.Response response = client.execute(fetch);
1019+
this.logExecutionTime("containsItemInSet()", System.currentTimeMillis() - t_start);
1020+
9541021
RiakSet rSet = response.getDatatype();
9551022
Set<BinaryValue> binarySet = rSet.view();
9561023
return binarySet.contains(BinaryValue.create(setItem));
@@ -995,7 +1062,11 @@ public int getSizeOfSet (String keyspace, String table, String setName) {
9951062
Location location = new Location(bucket, setName);
9961063

9971064
FetchSet fetch = new FetchSet.Builder(location).build();
1065+
1066+
long t_start = System.currentTimeMillis();
9981067
FetchSet.Response response = client.execute(fetch);
1068+
this.logExecutionTime("getSizeOfSet()", System.currentTimeMillis() - t_start);
1069+
9991070
RiakSet rSet = response.getDatatype();
10001071
Set<BinaryValue> binarySet = rSet.view();
10011072
return binarySet.size();
@@ -1039,7 +1110,11 @@ public AbstractMap.SimpleEntry<String, Set<String>> retrieveKeysetFromMap (Strin
10391110
Location location = new Location(bucket, mapName);
10401111

10411112
FetchMap fetch = new FetchMap.Builder(location).build();
1113+
1114+
long t_start = System.currentTimeMillis();
10421115
FetchMap.Response response = client.execute(fetch);
1116+
this.logExecutionTime("retrieveKeysetFromMap()", System.currentTimeMillis() - t_start);
1117+
10431118
RiakMap rMap = response.getDatatype();
10441119
Map<BinaryValue, List<RiakDatatype>> entries = rMap.view();
10451120

@@ -1065,7 +1140,11 @@ public AbstractMap.SimpleEntry<String, Map<String, ByteBuffer>> retrieveAllEntri
10651140
Location location = new Location(bucket, mapName);
10661141

10671142
FetchMap fetch = new FetchMap.Builder(location).build();
1143+
1144+
long t_start = System.currentTimeMillis();
10681145
FetchMap.Response response = client.execute(fetch);
1146+
this.logExecutionTime("retrieveAllEntriesFromMap()", System.currentTimeMillis() - t_start);
1147+
10691148
RiakMap rMap = response.getDatatype();
10701149
Map<BinaryValue, List<RiakDatatype>> entries = rMap.view();
10711150

@@ -1096,7 +1175,11 @@ public boolean putEntryToMap (String keyspace, String table, String mapName, Str
10961175
RegisterUpdate register = new RegisterUpdate(BinaryValue.unsafeCreate(entryValue.array()));
10971176
MapUpdate entry = new MapUpdate().update(entryKey, register);
10981177
UpdateMap update = new UpdateMap.Builder(location, entry).build();
1178+
1179+
long t_start = System.currentTimeMillis();
10991180
client.execute(update);
1181+
this.logExecutionTime("putEntryToMap()", System.currentTimeMillis() - t_start);
1182+
11001183
return true;
11011184
} catch (Exception e) {
11021185
LOGGER.error("putEntryToMap() failed. Keyspace: " + keyspace + " Table: " + table, e);
@@ -1115,7 +1198,11 @@ public AbstractMap.SimpleEntry<String, ByteBuffer> getEntryFromMap (String keysp
11151198
Location location = new Location(bucket, mapName);
11161199

11171200
FetchMap fetch = new FetchMap.Builder(location).build();
1201+
1202+
long t_start = System.currentTimeMillis();
11181203
FetchMap.Response response = client.execute(fetch);
1204+
this.logExecutionTime("getEntryFromMap()", System.currentTimeMillis() - t_start);
1205+
11191206
RiakMap rMap = response.getDatatype();
11201207
RiakRegister rRegister = rMap.getRegister(entryKey);
11211208
ByteBuffer entryValue = ByteBuffer.wrap(rRegister.view().unsafeGetValue());
@@ -1137,12 +1224,20 @@ public boolean removeEntryFromMap (String keyspace, String table, String mapName
11371224
Location location = new Location(bucket, mapName);
11381225

11391226
FetchMap fetch = new FetchMap.Builder(location).build();
1227+
1228+
long t_start = System.currentTimeMillis();
11401229
FetchMap.Response response = client.execute(fetch);
1230+
this.logExecutionTime("removeEntryFromMap(fetch)", System.currentTimeMillis() - t_start);
1231+
11411232
Context context = response.getContext();
11421233

11431234
MapUpdate entry = new MapUpdate().removeRegister(entryKey);
11441235
UpdateMap update = new UpdateMap.Builder(location, entry).withContext(context).build();
1236+
1237+
t_start = System.currentTimeMillis();
11451238
client.execute(update);
1239+
this.logExecutionTime("removeEntryFromMap(update)", System.currentTimeMillis() - t_start);
1240+
11461241
return true;
11471242
} catch (Exception e) {
11481243
LOGGER.error("removeEntryFromMap() failed. Keyspace: " + keyspace + " Table: " + table, e);
@@ -1161,7 +1256,11 @@ public boolean containsKeyInMap (String keyspace, String table, String mapName,
11611256
Location location = new Location(bucket, mapName);
11621257

11631258
FetchMap fetch = new FetchMap.Builder(location).build();
1259+
1260+
long t_start = System.currentTimeMillis();
11641261
FetchMap.Response response = client.execute(fetch);
1262+
this.logExecutionTime("containsKeyInMap()", System.currentTimeMillis() - t_start);
1263+
11651264
RiakMap rMap = response.getDatatype();
11661265
Map<BinaryValue, List<RiakDatatype>> entries = rMap.view();
11671266
return entries.containsKey(BinaryValue.create(entryKey));
@@ -1206,7 +1305,11 @@ public int getSizeOfMap (String keyspace, String table, String mapName) {
12061305
Location location = new Location(bucket, mapName);
12071306

12081307
FetchMap fetch = new FetchMap.Builder(location).build();
1308+
1309+
long t_start = System.currentTimeMillis();
12091310
FetchMap.Response response = client.execute(fetch);
1311+
this.logExecutionTime("getSizeOfMap()", System.currentTimeMillis() - t_start);
1312+
12101313
RiakMap rMap = response.getDatatype();
12111314
Map<BinaryValue, List<RiakDatatype>> entries = rMap.view();
12121315
return entries.size();

FunctionWorker/python/DataLayerClient.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ def __init__(self, locality=1, sid=None, wid=None, suid=None, is_wf_private=Fals
6969
#print("Creating datalayer client in keyspace=%s, tablename=%s, maptablename=%s, settablename=%s, countertablename=%s" % (self.keyspace,self.tablename, self.maptablename, self.settablename, self.countertablename))
7070
self.locality = locality
7171

72+
self._is_running = True
73+
7274
self.connect()
7375

7476
if init_tables:
@@ -105,7 +107,7 @@ def _drop_keyspace(self):
105107

106108
def connect(self):
107109
retry = 0.5 #s
108-
while True:
110+
while self._is_running:
109111
try:
110112
host, port = self.dladdress.split(':')
111113
self.socket = TSocket.TSocket(host, int(port))
@@ -607,6 +609,7 @@ def listKeys(self, start, count, tableName=None):
607609
return keys_response
608610

609611
def shutdown(self):
612+
self._is_running = False
610613
try:
611614
self.transport.close()
612615
except Thrift.TException as exc:

0 commit comments

Comments
 (0)