Skip to content

Commit 8287a3b

Browse files
author
Istemi Ekin Akkus
authored
Merge pull request #100 from knix-microfunctions/release/0.8.9
Release/0.8.9
2 parents 02e1828 + 06c8019 commit 8287a3b

File tree

153 files changed

+12267
-522
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

153 files changed

+12267
-522
lines changed

DataLayerService/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@ RUN mkdir -p /opt/mfn/datalayer/logs
2525
COPY target/datalayerservice.jar /opt/mfn/datalayer/
2626
WORKDIR /opt/mfn/datalayer
2727

28-
CMD ["java","-jar","datalayerservice.jar"]
28+
CMD java ${JAVA_OPTS} -jar datalayerservice.jar

DataLayerService/src/main/java/org/microfunctions/data_layer/DataLayerServer.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,11 @@
4040
import org.apache.thrift.TException;
4141
import org.apache.thrift.protocol.TCompactProtocol;
4242
import org.apache.thrift.server.TServer;
43-
import org.apache.thrift.server.TThreadedSelectorServer;
43+
import org.apache.thrift.server.TThreadPoolServer;
44+
//import org.apache.thrift.server.TThreadedSelectorServer;
4445
import org.apache.thrift.transport.TFramedTransport;
45-
import org.apache.thrift.transport.TNonblockingServerSocket;
46-
import org.apache.thrift.transport.TNonblockingServerTransport;
46+
import org.apache.thrift.transport.TServerSocket;
47+
import org.apache.thrift.transport.TServerTransport;
4748
import org.apache.thrift.transport.TTransportException;
4849
import org.microfunctions.data_layer.DataLayerService.Iface;
4950

@@ -65,8 +66,9 @@ public class DataLayerServer implements Iface, Callable<Object> {
6566
private static final List<AbstractMap.SimpleEntry<String, Integer>> NO_KEYSPACES = new ArrayList<AbstractMap.SimpleEntry<String, Integer>>(0);
6667
private static final List<String> NO_KEYS = new ArrayList<String>(0);
6768

68-
public static final int DEFAULT_SELECTOR_THREADS = 50; //Math.max(2, 2 * Runtime.getRuntime().availableProcessors());
69-
public static final int DEFAULT_WORKER_THREADS = 100; //Math.max(4, 4 * Runtime.getRuntime().availableProcessors());
69+
// public static final int DEFAULT_SELECTOR_THREADS = 50; //Math.max(2, 2 * Runtime.getRuntime().availableProcessors());
70+
// public static final int DEFAULT_WORKER_THREADS = 100; //Math.max(4, 4 * Runtime.getRuntime().availableProcessors());
71+
public static final int DEFAULT_MAX_WORKER_THREADS = Integer.MAX_VALUE;
7072
public static final int DEFAULT_CLIENT_TIMEOUT = 0;
7173
public static final int DEFAULT_MAX_FRAME_LENGTH = Integer.MAX_VALUE;
7274

@@ -1625,14 +1627,22 @@ public boolean updateTableTypeCache(String action, String table, String tableTyp
16251627

16261628

16271629
public void start(InetSocketAddress bindAddr) throws TTransportException {
1628-
TNonblockingServerTransport transport = new TNonblockingServerSocket(bindAddr, DEFAULT_CLIENT_TIMEOUT);
1630+
/* TServerTransport transport = new TServerSocket(bindAddr, DEFAULT_CLIENT_TIMEOUT);
16291631
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport)
16301632
.transportFactory(new TFramedTransport.Factory(DEFAULT_MAX_FRAME_LENGTH))
16311633
.protocolFactory(new TCompactProtocol.Factory())
16321634
.processor(new DataLayerService.Processor<Iface>(this))
16331635
.selectorThreads(DEFAULT_SELECTOR_THREADS)
16341636
.workerThreads(DEFAULT_WORKER_THREADS);
16351637
server = new TThreadedSelectorServer(args);
1638+
*/
1639+
TServerTransport transport = new TServerSocket(bindAddr, DEFAULT_CLIENT_TIMEOUT);
1640+
TThreadPoolServer.Args args = new TThreadPoolServer.Args(transport)
1641+
.transportFactory(new TFramedTransport.Factory(DEFAULT_MAX_FRAME_LENGTH))
1642+
.protocolFactory(new TCompactProtocol.Factory())
1643+
.processor(new DataLayerService.Processor<Iface>(this))
1644+
.maxWorkerThreads(DEFAULT_MAX_WORKER_THREADS);
1645+
server = new TThreadPoolServer(args);
16361646

16371647
LOGGER.info("Listening on "+bindAddr);
16381648
server.serve();

DataLayerService/src/main/java/org/microfunctions/data_layer/RiakAccess.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ private boolean createTableWithType (String keyspace, String table, String table
393393
}
394394

395395
Namespace bucket = new Namespace(tableType, keyspace + ";" + table);
396-
StoreBucketProperties props = new StoreBucketProperties.Builder(bucket).withNVal(replicationFactor).build();
396+
StoreBucketProperties props = new StoreBucketProperties.Builder(bucket).withNVal(replicationFactor).withW(replicationFactor).withR(replicationFactor).withNotFoundOk(false).build();
397397
client.execute(props);
398398
LOGGER.info("createTableWithType() Keyspace: " + keyspace + " Table: " + table + " TableType: " + tableType);
399399
boolean success = this.insertRow(keyspace, null, table, ByteBuffer.wrap(tableType.getBytes(StandardCharsets.UTF_8)));

FunctionWorker/MicroFunctionsAPI.thrift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ service MicroFunctionsAPIService {
6060
void put(1: string key, 2: string value, 3: bool is_private, 4: bool is_queued),
6161
string get(1: string key, 2: bool is_private),
6262
void remove(1: string key, 2: bool is_private, 3: bool is_queued),
63+
list<string> getKeys(1: i32 start_index, 2: i32 end_index, 3: bool is_private),
6364

6465
void createMap(1: string mapname, 2: bool is_private, 3: bool is_queued),
6566
void putMapEntry(1: string mapname, 2: string key, 3: string value, 4: bool is_private, 5: bool is_queued),

FunctionWorker/python/DataLayerClient.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ def getMapKeys(self, mapname):
271271
try:
272272
keyset = self.datalayer.retrieveKeysetFromMap(self.keyspace, self.maptablename, mapname, self.locality)
273273
if keyset.key != "" and keyset.key == mapname:
274-
keys = keyset.items
274+
keys = list(keyset.items)
275275
break
276276
except TTransport.TTransportException as exc:
277277
print("[DataLayerClient] Reconnecting because of failed getMapKeys: " + str(exc))
@@ -330,7 +330,7 @@ def deleteMap(self, mapname):
330330
return status
331331

332332
def getMapNames(self, start_index=0, end_index=2147483647):
333-
maps = None
333+
maps = []
334334
for retry in range(MAX_RETRIES):
335335
try:
336336
maps = self.datalayer.selectMaps(self.keyspace, self.maptablename, start_index, end_index, self.locality)
@@ -406,7 +406,7 @@ def retrieveSet(self, setname):
406406
try:
407407
itemsset = self.datalayer.retrieveSet(self.keyspace, self.settablename, setname, self.locality)
408408
if itemsset.key != "" and itemsset.key == setname:
409-
items = itemsset.items
409+
items = list(itemsset.items)
410410
break
411411
except TTransport.TTransportException as exc:
412412
print("[DataLayerClient] Reconnecting because of failed retrieveSet: " + str(exc))
@@ -446,7 +446,7 @@ def deleteSet(self, setname):
446446
return status
447447

448448
def getSetNames(self, start_index=0, end_index=2147483647):
449-
sets = None
449+
sets = []
450450
for retry in range(MAX_RETRIES):
451451
try:
452452
sets = self.datalayer.selectSets(self.keyspace, self.settablename, start_index, end_index, self.locality)
@@ -541,7 +541,7 @@ def deleteCounter(self, countername, tableName=None):
541541
return status
542542

543543
def getCounterNames(self, start_index=0, end_index=2147483647, tableName=None):
544-
counters = None
544+
counters = []
545545
table = self.countertablename if tableName is None else tableName
546546
for retry in range(MAX_RETRIES):
547547
try:
@@ -570,14 +570,21 @@ def createTriggerableTable(self, tableName):
570570
return status
571571

572572
def listKeys(self, start, count, tableName=None):
573-
listkeys_response = []
573+
keys_response = []
574574
table = self.tablename if tableName is None else tableName
575575

576576
for retry in range(MAX_RETRIES):
577577
try:
578578
listkeys_response = self.datalayer.selectKeys(self.keyspace, table, start, count, self.locality)
579-
if listkeys_response == None or type(listkeys_response) != type([]):
580-
listkeys_response = []
579+
if listkeys_response is not None or isinstance(listkeys_response, list):
580+
for key in listkeys_response:
581+
if key.startswith("grain_requirements_") or\
582+
key.startswith("grain_source_") or\
583+
key.startswith("workflow_json_") or\
584+
key.endswith("_metadata"):
585+
continue
586+
else:
587+
keys_response.append(key)
581588
break
582589
except TTransport.TTransportException as exc:
583590
print("[DataLayerClient] Reconnecting because of failed selectKeys: " + str(exc))
@@ -586,7 +593,7 @@ def listKeys(self, start, count, tableName=None):
586593
print("[DataLayerClient] failed selectKeys: " + str(exc))
587594
raise
588595

589-
return listkeys_response
596+
return keys_response
590597

591598
def shutdown(self):
592599
try:

FunctionWorker/python/DataLayerOperator.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,25 @@ def delete(self, key, is_private=False, is_queued=False, table=None):
9595
data_layer_client = self._get_data_layer_client(is_private)
9696
data_layer_client.delete(key, tableName=table)
9797

98+
def getKeys(self, start_index, end_index, is_private=False):
99+
keys = set()
100+
101+
# XXX: should follow "read your writes"
102+
# the final result should include:
103+
# 1. all created locally
104+
# 2. all existing globally minus the ones deleted locally
105+
106+
# TODO: 1. check local data layer first: get locally created and deleted
107+
108+
# 2. retrieve all existing globally
109+
dlc = self._get_data_layer_client(is_private)
110+
m2 = dlc.listKeys(start_index, end_index)
111+
if m2 is not None:
112+
# TODO: 3. remove the ones deleted locally
113+
keys = keys.union(m2)
114+
115+
return list(keys)
116+
98117
# map operations
99118
def createMap(self, mapname, is_private=False, is_queued=False):
100119
if is_queued:

FunctionWorker/python/LocalQueueClient.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ def connect(self):
4242
try:
4343
self.socket = TSocket.TSocket(host, int(port))
4444
self.transport = TTransport.TFramedTransport(self.socket)
45-
self.transport.open()
4645
self.protocol = TCompactProtocol.TCompactProtocol(self.transport)
4746
self.queue = LocalQueueService.Client(self.protocol)
47+
self.transport.open()
4848
break
4949
except Thrift.TException as exc:
5050
if retry < 60:

0 commit comments

Comments
 (0)