Skip to content

Commit 02e1828

Browse files
author
Istemi Ekin Akkus
authored
Merge pull request #91 from knix-microfunctions/release/0.8.7
Release/0.8.7
2 parents 631b585 + 71b5d7f commit 02e1828

File tree

146 files changed

+8335
-3008
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

146 files changed

+8335
-3008
lines changed

DataLayerService/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
/target/
55
/.settings
66
/.factorypath
7+
/.m2/

DataLayerService/Makefile

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,37 @@ all: build_thrift \
2424
build_thrift:
2525
cd ../build_tools/thrift/$(THRIFT_VERSION)/; ./build_thrift.sh; cd -
2626

27-
src/main/java/org/microfunctions/data_layer/DataLayerService.java: ../build_env.mk \
28-
thrift/DataLayerMessage.thrift \
29-
thrift/DataLayerService.thrift
30-
docker run --rm -it --name datalayer-thrift \
27+
src/main/java/org/microfunctions/data_layer/DataLayerService.java: \
28+
../build_env.mk \
29+
../build_tools/thrift/$(THRIFT_VERSION)/build_thrift.sh \
30+
thrift/DataLayerMessage.thrift \
31+
thrift/DataLayerService.thrift
32+
cd ../build_tools/thrift/$(THRIFT_VERSION)/; ./build_thrift.sh; cd -
33+
docker run --rm -i --name datalayer-thrift \
3134
--user $(UID):$(GID) \
3235
-v $(CURDIR):/usr/src/datalayer -w /usr/src/datalayer \
3336
$(THRIFT_IMAGE_NAME):$(THRIFT_VERSION) \
3437
bash -c 'thrift --gen java -out ./src/main/java/ ./thrift/DataLayerMessage.thrift; thrift --gen java -out ./src/main/java/ ./thrift/DataLayerService.thrift'
3538
touch src/main/java/org/microfunctions/data_layer/DataLayerService.java
3639

37-
PythonClient/data_layer: ../build_env.mk \
38-
thrift/DataLayerMessage.thrift \
39-
thrift/DataLayerService.thrift
40-
docker run --rm -it --name datalayer-thrift \
40+
PythonClient/data_layer: \
41+
../build_env.mk \
42+
../build_tools/thrift/$(THRIFT_VERSION)/build_thrift.sh \
43+
thrift/DataLayerMessage.thrift \
44+
thrift/DataLayerService.thrift
45+
cd ../build_tools/thrift/$(THRIFT_VERSION)/; ./build_thrift.sh; cd -
46+
docker run --rm -i --name datalayer-thrift \
4147
--user $(UID):$(GID) \
4248
-v $(CURDIR):/usr/src/datalayer -w /usr/src/datalayer \
4349
$(THRIFT_IMAGE_NAME):$(THRIFT_VERSION) \
4450
bash -c 'thrift --gen py -out ./PythonClient ./thrift/DataLayerMessage.thrift; thrift --gen py -out ./PythonClient ./thrift/DataLayerService.thrift'
4551

46-
# In case the user doesn't have maven configurations in the home directory, we need to create it
47-
~/.m2/settings-docker.xml: settings-docker.xml.sample
48-
if [ ! -e ~/.m2 ]; then mkdir ~/.m2; fi
52+
# Always create maven configuration and repo
53+
.PHONY: .m2/settings-docker.xml
54+
.m2/settings-docker.xml:
55+
if [ ! -e .m2 ]; then mkdir .m2; fi
4956
cp settings-docker.xml.sample $@.tmp; \
57+
sed "s#<localRepository>[^<]\+</localRepository>#<localRepository>/usr/src/datalayer/.m2/repository</localRepository>#" -i $@.tmp; \
5058
noproxyhosts=$$(echo $$no_proxy|sed 's/,/|/g'); \
5159
http_proxy=$${http_proxy:-$${HTTP_PROXY}}; \
5260
https_proxy=$${https_proxy:-$${HTTPS_PROXY}}; \
@@ -62,15 +70,14 @@ PythonClient/data_layer: ../build_env.mk \
6270
JAVA_SOURCES=$(shell find src/ -type f|grep -v src/main/java/org/microfunctions/data_layer/DataLayerService.java)
6371
GIT_REVISION:=`git describe --tags --always --dirty=-dirty`
6472
target/datalayerservice.jar: $(JAVA_SOURCES) \
65-
src/main/java/org/microfunctions/data_layer/DataLayerService.java \
66-
~/.m2/settings-docker.xml
67-
if [ ! -e ~/.m2/repository ]; then mkdir -p ~/.m2/repository; fi
68-
docker run --rm -it --name datalayer-maven \
73+
src/main/java/org/microfunctions/data_layer/DataLayerService.java \
74+
.m2/settings-docker.xml
75+
if [ ! -e .m2/repository ]; then mkdir -p .m2/repository; fi
76+
docker run --rm -i --name datalayer-maven \
6977
--user $(UID):$(GID) \
70-
-v ~/.m2:/usr/share/maven/ref \
71-
-v $(CURDIR):/usr/src/datalayer -w /usr/src/datalayer \
78+
-e HOME=/usr/src/datalayer -v $(CURDIR):/usr/src/datalayer -w /usr/src/datalayer \
7279
maven:3.6.1-jdk-8 \
73-
mvn -Drevision=$(GIT_REVISION) -Dlibthrift.version=$(MVN_LIBTHRIFT_VERSION) -DargLine="-Duser.home=/tmp" -s /usr/share/maven/ref/settings-docker.xml package
80+
mvn -Drevision=$(GIT_REVISION) -Dlibthrift.version=$(MVN_LIBTHRIFT_VERSION) -DargLine="-Duser.home=/tmp" -s /usr/src/datalayer/.m2/settings-docker.xml package
7481

7582
clean:
7683
rm -rf ./target

DataLayerService/src/main/java/org/microfunctions/data_layer/RiakAccess.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,10 @@ public class RiakAccess {
7474
private static final Logger LOGGER = LogManager.getLogger(RiakAccess.class);
7575

7676
public static String BUCKET_TYPE_DEFAULT = "default";
77-
public static final String BUCKET_TYPE_STRONG_CONSISTENCY = "strong"; // make sure there is a Riak bucket type called "strong" for strong consistency.
78-
public static final String BUCKET_TYPE_TRIGGERS = "triggers"; // make sure there is a Riak bucket type called "triggers".
77+
//public static final String BUCKET_TYPE_STRONG_CONSISTENCY = "strong"; // make sure there is a Riak bucket type called "strong" for strong consistency.
78+
// public static final String BUCKET_TYPE_TRIGGERS = "triggers"; // make sure there is a Riak bucket type called "triggers".
7979
public static final String BUCKET_TYPE_COUNTERS = "counters"; // make sure there is a Riak bucket type called "counters" with its data type being "counter".
80-
public static final String BUCKET_TYPE_TRIGGERABLE_COUNTERS = "mfn_counter_trigger"; // make sure there is a Riak bucket type called "mfn_counter_trigger".
80+
// public static final String BUCKET_TYPE_TRIGGERABLE_COUNTERS = "mfn_counter_trigger"; // make sure there is a Riak bucket type called "mfn_counter_trigger".
8181
public static final String BUCKET_TYPE_SETS = "sets"; // make sure there is a Riak bucket type called "sets" with its data type being "set".
8282
public static final String BUCKET_TYPE_MAPS = "maps"; // make sure there is a Riak bucket type called "maps" with its data type being "map".
8383
public static final String BUCKET_TYPE_ALL = "all";
@@ -119,28 +119,28 @@ public void connect (Map<String,Integer> riakNodes) {
119119
cluster.start();
120120

121121
ALL_BUCKET_TYPES.add(BUCKET_TYPE_DEFAULT);
122-
ALL_BUCKET_TYPES.add(BUCKET_TYPE_STRONG_CONSISTENCY);
123-
ALL_BUCKET_TYPES.add(BUCKET_TYPE_TRIGGERS);
122+
//ALL_BUCKET_TYPES.add(BUCKET_TYPE_STRONG_CONSISTENCY);
123+
// ALL_BUCKET_TYPES.add(BUCKET_TYPE_TRIGGERS);
124124
ALL_BUCKET_TYPES.add(BUCKET_TYPE_COUNTERS);
125-
ALL_BUCKET_TYPES.add(BUCKET_TYPE_TRIGGERABLE_COUNTERS);
125+
// ALL_BUCKET_TYPES.add(BUCKET_TYPE_TRIGGERABLE_COUNTERS);
126126
ALL_BUCKET_TYPES.add(BUCKET_TYPE_SETS);
127127
ALL_BUCKET_TYPES.add(BUCKET_TYPE_MAPS);
128128
ALL_BUCKET_TYPES.add(BUCKET_TYPE_ALL);
129129

130130
KV_BUCKET_TYPES.add(BUCKET_TYPE_DEFAULT);
131-
KV_BUCKET_TYPES.add(BUCKET_TYPE_STRONG_CONSISTENCY);
132-
KV_BUCKET_TYPES.add(BUCKET_TYPE_TRIGGERS);
131+
//KV_BUCKET_TYPES.add(BUCKET_TYPE_STRONG_CONSISTENCY);
132+
// KV_BUCKET_TYPES.add(BUCKET_TYPE_TRIGGERS);
133133

134134
COUNTER_BUCKET_TYPES.add(BUCKET_TYPE_COUNTERS);
135-
COUNTER_BUCKET_TYPES.add(BUCKET_TYPE_TRIGGERABLE_COUNTERS);
135+
// COUNTER_BUCKET_TYPES.add(BUCKET_TYPE_TRIGGERABLE_COUNTERS);
136136

137137
CONSISTENCY_BUCKET_TYPES.add(BUCKET_TYPE_DEFAULT);
138-
CONSISTENCY_BUCKET_TYPES.add(BUCKET_TYPE_STRONG_CONSISTENCY);
138+
//CONSISTENCY_BUCKET_TYPES.add(BUCKET_TYPE_STRONG_CONSISTENCY);
139139

140140
NUM_NODES = cluster.getNodes().size();
141-
if (NUM_NODES >= MIN_NODES_FOR_STRONG_CONSISTENCY) {
142-
BUCKET_TYPE_DEFAULT = BUCKET_TYPE_STRONG_CONSISTENCY;
143-
}
141+
//if (NUM_NODES >= MIN_NODES_FOR_STRONG_CONSISTENCY) {
142+
// BUCKET_TYPE_DEFAULT = BUCKET_TYPE_STRONG_CONSISTENCY;
143+
//}
144144

145145
client = new RiakClient(cluster);
146146

FunctionWorker/.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
/python/local_queue/
33
/doc/
44
__init__.py
5-
__pycache__
5+
__pycache__
6+
/.m2

FunctionWorker/Makefile

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,16 @@ default: doc/MicroFunctionsAPI.html
2323

2424
PYTHON_DATALAYER=$(shell find python/data_layer -type f -size +1c) python/data_layer
2525
$(PYTHON_DATALAYER): ../build_env.mk \
26-
../DataLayerService/thrift/DataLayerMessage.thrift \
27-
../DataLayerService/thrift/DataLayerService.thrift
26+
../DataLayerService/thrift/DataLayerMessage.thrift \
27+
../DataLayerService/thrift/DataLayerService.thrift \
28+
../build_tools/thrift/$(THRIFT_VERSION)/build_thrift.sh
29+
cd ../build_tools/thrift/$(THRIFT_VERSION)/; ./build_thrift.sh; cd -
2830
docker run --user $(UID):$(GID) --rm -v $(CURDIR)/..:/root -w /root $(THRIFT_IMAGE_NAME):$(THRIFT_VERSION) bash -c '\
2931
thrift --gen py -out FunctionWorker/python/ DataLayerService/thrift/DataLayerMessage.thrift; \
3032
thrift --gen py -out FunctionWorker/python/ DataLayerService/thrift/DataLayerService.thrift'
3133

3234
doc/MicroFunctionsAPI.html: $(PYTHON_DATALAYER)
33-
docker run --user $(UID):$(GID) --rm -it \
35+
docker run --user $(UID):$(GID) --rm -i \
3436
--name mfnapi_doc_gen \
3537
-v $(CURDIR):/opt/mfnapi \
3638
-w /opt/mfnapi \

FunctionWorker/MicroFunctionsAPI.thrift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
namespace java org.microfunctions.mfnapi
1717

1818
service MicroFunctionsAPIService {
19+
20+
string get_context_object_properties(),
21+
1922
string ping(1:i32 n),
2023

2124
void update_metadata(1: string metadata_name, 2: string metadata_value, 3: bool is_privileged_metadata),
@@ -25,7 +28,7 @@ service MicroFunctionsAPIService {
2528
void send_to_all_running_functions_in_session(1: string message, 2: bool send_now), //message
2629
void send_to_running_function_in_session_with_alias(1: string als, 2: string message, 3: bool send_now), // message
2730

28-
list<string> get_session_update_messages(1: i32 count),
31+
list<string> get_session_update_messages(1: i32 count, 2: bool blck),
2932

3033
void set_session_alias(1: string als),
3134
void unset_session_alias(),

FunctionWorker/python/DataLayerClient.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
class DataLayerClient:
2929

30-
def __init__(self, locality=1, sid=None, wid=None, suid=None, is_wf_private=False, for_mfn=False, connect="127.0.0.1:4998", init_tables=False, drop_keyspace=False):
30+
def __init__(self, locality=1, sid=None, wid=None, suid=None, is_wf_private=False, for_mfn=False, connect="127.0.0.1:4998", init_tables=False, drop_keyspace=False, tableName=None):
3131
self.dladdress = connect
3232

3333
if for_mfn:
@@ -52,7 +52,10 @@ def __init__(self, locality=1, sid=None, wid=None, suid=None, is_wf_private=Fals
5252

5353
else:
5454
self.keyspace = "storage_" + suid
55-
self.tablename = "defaultTable"
55+
if tableName is not None:
56+
self.tablename = tableName
57+
else:
58+
self.tablename = "defaultTable"
5659
self.maptablename = "defaultMapTable"
5760
self.settablename = "defaultSetTable"
5861
self.countertablename = "defaultCounterTable"
@@ -84,7 +87,7 @@ def _initialize_tables(self):
8487
self.datalayer.createSetTable(self.keyspace, self.settablename, self.locality)
8588
self.datalayer.createCounterTable(self.keyspace, self.countertablename, Metadata(tableType="counters"), self.locality)
8689
self.datalayer.createTable(self.keyspace, self.triggersinfotablename, Metadata(tableType="default"), self.locality)
87-
self.datalayer.createCounterTable(self.keyspace, self.countertriggerstable, Metadata(tableType="mfn_counter_trigger"), self.locality)
90+
self.datalayer.createCounterTable(self.keyspace, self.countertriggerstable, Metadata(tableType="counters"), self.locality)
8891
self.datalayer.createTable(self.keyspace, self.countertriggersinfotable, Metadata(tableType="default"), self.locality)
8992
except Thrift.TException as exc:
9093
print("Could not initialize tables: " + str(exc))
@@ -263,7 +266,7 @@ def containsMapKey(self, mapname, key):
263266
return ret
264267

265268
def getMapKeys(self, mapname):
266-
keys = None
269+
keys = []
267270
for retry in range(MAX_RETRIES):
268271
try:
269272
keyset = self.datalayer.retrieveKeysetFromMap(self.keyspace, self.maptablename, mapname, self.locality)
@@ -280,7 +283,7 @@ def getMapKeys(self, mapname):
280283
return keys
281284

282285
def retrieveMap(self, mapname):
283-
mapentries = None
286+
mapentries = {}
284287
for retry in range(MAX_RETRIES):
285288
try:
286289
mapentries = self.datalayer.retrieveAllEntriesFromMap(self.keyspace, self.maptablename, mapname, self.locality)
@@ -398,7 +401,7 @@ def containsSetItem(self, setname, item):
398401
return ret
399402

400403
def retrieveSet(self, setname):
401-
items = None
404+
items = []
402405
for retry in range(MAX_RETRIES):
403406
try:
404407
itemsset = self.datalayer.retrieveSet(self.keyspace, self.settablename, setname, self.locality)
@@ -557,7 +560,7 @@ def createTriggerableTable(self, tableName):
557560
status = False
558561
for retry in range(MAX_RETRIES):
559562
try:
560-
status = self.datalayer.createTable(self.keyspace, tableName, Metadata(tableType="triggers"), self.locality)
563+
status = self.datalayer.createTable(self.keyspace, tableName, Metadata(tableType="default"), self.locality)
561564
break
562565
except TTransport.TTransportException as exc:
563566
self.connect()

FunctionWorker/python/FunctionWorker.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020
import imp
2121
import json
2222
import logging
23+
import random
2324
import socket
2425
import subprocess
2526
import shlex
26-
#import hashlib
27+
import hashlib
2728
from threading import Timer
2829

2930
import thriftpy2
@@ -116,6 +117,10 @@ def __init__(self, args_dict):
116117

117118
signal(SIGCHLD, SIG_IGN)
118119

120+
# do this once rather than at every forked process
121+
if self._state_utils.isTaskState():
122+
os.chdir(self._function_folder)
123+
119124
self._is_running = False
120125
#self._print_self()
121126

@@ -137,6 +142,7 @@ def _set_args(self, args):
137142
self._datalayer = args["datalayer"]
138143
self._external_endpoint = args["externalendpoint"]
139144
self._internal_endpoint = args["internalendpoint"]
145+
self._management_endpoints = args["managementendpoints"]
140146
self._wf_next = args["fnext"]
141147
self._wf_pot_next = args["fpotnext"]
142148
self._function_runtime = args["fruntime"]
@@ -208,6 +214,7 @@ def _print_self(self):
208214
self._logger.debug("\tself._datalayer: %s", self._datalayer)
209215
self._logger.debug("\tself._external_endpoint: %s", str(self._external_endpoint))
210216
self._logger.debug("\tself._internal_endpoint: %s", str(self._internal_endpoint))
217+
self._logger.debug("\tself._management_endpoints: %s", str(self._management_endpoints))
211218
self._logger.debug("\tself._wf_next: %s", ",".join(self._wf_next))
212219
self._logger.debug("\tself._wf_pot_next: %s", ",".join(self._wf_pot_next))
213220
self._logger.debug("\tself._wf_function_list: %s", ",".join(self._wf_function_list))
@@ -301,10 +308,7 @@ def _fork_and_handle_message(self, key, encapsulated_value):
301308
error_type = "User Input Decapsulation Error"
302309
has_error = True
303310

304-
timestamp_map["t_start_chdir"] = time.time() * 1000.0
305311
signal(SIGCHLD, SIG_DFL)
306-
if self._state_utils.isTaskState():
307-
os.chdir(self._function_folder)
308312

309313
# 2. Decode input. Input (value) must be a valid JSON Text.
310314
# Note: JSON Text is not the same as JSON string. JSON string a one variable type that can be contained inside a JSON Text.
@@ -393,7 +397,7 @@ def _fork_and_handle_message(self, key, encapsulated_value):
393397
# Maybe allow only if the destination is a session function? Requires a list of session functions and passing them to the MicroFunctionsAPI and SessionUtils
394398
# Nonetheless, currently, MicroFunctionsAPI and SessionUtils write warning messages to the workflow log to indicate such problems
395399
# (e.g., when this is not a workflow session or session function, when the destination running function instance does not exist)
396-
sapi = MicroFunctionsAPI(self._storage_userid, self._sandboxid, self._workflowid, self._function_state_name, key, publication_utils, self._is_session_workflow, self._is_session_function, session_utils, self._logger, self._datalayer, self._external_endpoint, self._internal_endpoint, self._userid, self._usertoken)
400+
sapi = MicroFunctionsAPI(self._storage_userid, self._sandboxid, self._workflowid, self._function_state_name, key, publication_utils, self._is_session_workflow, self._is_session_function, session_utils, self._logger, self._datalayer, self._external_endpoint, self._internal_endpoint, self._userid, self._usertoken, self._management_endpoints)
397401
# need this to retrieve and publish the in-memory, transient data (i.e., stored/deleted via is_queued = True)
398402
publication_utils.set_sapi(sapi)
399403
except Exception as exc:
@@ -450,7 +454,10 @@ def _fork_and_handle_message(self, key, encapsulated_value):
450454
elif self._function_runtime == "java":
451455
exec_arguments = {}
452456

453-
api_uds = "/tmp/" + self._function_state_name + "_" + key + ".uds"
457+
random.seed()
458+
name = self._function_state_name + "_" + key + "_" + str(time.time() * 1000.0) + "_" + str(random.uniform(0, 100000))
459+
sha = hashlib.sha256(name.encode()).hexdigest()
460+
api_uds = "/tmp/" + sha + ".uds"
454461

455462
exec_arguments["api_uds"] = api_uds
456463
exec_arguments["thriftAPIService"] = self._api_thrift.MicroFunctionsAPIService

0 commit comments

Comments
 (0)