From 4a9dc0f1b9e02e4dfbf5cd3e981555203cc5fc27 Mon Sep 17 00:00:00 2001 From: Ivan Despot Date: Tue, 22 Feb 2022 17:02:35 +0100 Subject: [PATCH 1/6] Create template for workshop --- backend/server/app.py | 29 ++++----------------- backend/server/setup.py | 58 +++-------------------------------------- 2 files changed, 8 insertions(+), 79 deletions(-) diff --git a/backend/server/app.py b/backend/server/app.py index e0c5883..fdba8b6 100644 --- a/backend/server/app.py +++ b/backend/server/app.py @@ -1,10 +1,8 @@ from argparse import ArgumentParser -from eventlet import greenthread from flask import Flask, Response from flask_cors import CORS, cross_origin -from flask_socketio import SocketIO, emit +from flask_socketio import SocketIO from functools import wraps -from kafka import KafkaConsumer import eventlet import json import logging @@ -76,28 +74,11 @@ def get_health(): def kafkaconsumer(): - consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_IP + ":" + KAFKA_PORT) - try: - while True: - msg_pack = consumer.poll() - if not msg_pack: - greenthread.sleep(1) - continue - for _, messages in msg_pack.items(): - for message in messages: - message = json.loads(message.value.decode("utf8")) - log.info("Message: " + str(message)) - try: - socketio.emit("consumer", {"data": message}) - except Exception as error: - log.info(f"`{message}`, {repr(error)}") - continue - except KeyboardInterrupt: - pass + pass + # TODO: Await messages from the Kafka topic @app.before_first_request def execute_this(): - init_log() - greenthread.spawn(set_up_memgraph_and_kafka()) - greenthread.spawn(kafkaconsumer) + pass + # TODO: Set up Memgraph and start emitting messages to the React app diff --git a/backend/server/setup.py b/backend/server/setup.py index e00931b..23912ab 100644 --- a/backend/server/setup.py +++ b/backend/server/setup.py @@ -1,9 +1,4 @@ -from gqlalchemy import Memgraph, MemgraphKafkaStream, MemgraphTrigger -from gqlalchemy.models import ( - TriggerEventType, - TriggerEventObject, - TriggerExecutionPhase, -) +from gqlalchemy import Memgraph from time import sleep import logging @@ -22,52 +17,5 @@ def connect_to_memgraph(memgraph_ip, memgraph_port): def run(memgraph): - try: - memgraph.drop_database() - - log.info("Setting up PageRank") - memgraph.execute("CALL pagerank_online.set(100, 0.2) YIELD *") - memgraph.execute( - """CREATE TRIGGER pagerank_trigger - BEFORE COMMIT - EXECUTE CALL pagerank_online.update(createdVertices, createdEdges, deletedVertices, deletedEdges) YIELD * - SET node.rank = rank - CALL publisher.update_rank(node, rank);""" - ) - - log.info("Setting up community detection") - memgraph.execute( - "CALL community_detection_online.set(False, False, 0.7, 4.0, 0.1, 'weight', 1.0, 100, 5) YIELD *;" - ) - memgraph.execute( - """CREATE TRIGGER labelrankt_trigger - BEFORE COMMIT - EXECUTE CALL community_detection_online.update(createdVertices, createdEdges, updatedVertices, updatedEdges, deletedVertices, deletedEdges) - YIELD node, community_id - SET node.cluster=community_id - CALL publisher.update_cluster(node, community_id);""" - ) - - log.info("Creating stream connections on Memgraph") - stream = MemgraphKafkaStream( - name="retweets", - topics=["retweets"], - transform="twitter.tweet", - bootstrap_servers="'kafka:9092'", - ) - memgraph.create_stream(stream) - memgraph.start_stream(stream) - - log.info("Creating triggers on Memgraph") - trigger = MemgraphTrigger( - name="created_trigger", - event_type=TriggerEventType.CREATE, - event_object=TriggerEventObject.ALL, - execution_phase=TriggerExecutionPhase.AFTER, - statement="CALL publisher.create(createdObjects)", - ) - memgraph.create_trigger(trigger) - - except Exception as e: - log.info(f"Error on stream and trigger creation: {e}") - pass + pass + # TODO: Add the graph analytics setup From 5c16f85e7b267fa80de684f20ebaedf635d469bb Mon Sep 17 00:00:00 2001 From: katarinasupe Date: Wed, 23 Feb 2022 13:23:08 +0100 Subject: [PATCH 2/6] Remove the transormation module code --- memgraph/transformations/twitter.py | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/memgraph/transformations/twitter.py b/memgraph/transformations/twitter.py index 80e1688..dce5f78 100644 --- a/memgraph/transformations/twitter.py +++ b/memgraph/transformations/twitter.py @@ -1,28 +1,13 @@ +from email.errors import MessageError import mgp import json @mgp.transformation -def tweet(messages: mgp.Messages - ) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]): +def tweet( + messages: mgp.Messages, +) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]): result_queries = [] - for i in range(messages.total_messages()): - message = messages.message_at(i) - tweet_dict = json.loads(message.payload().decode('utf8')) - if tweet_dict["target_username"]: - result_queries.append( - mgp.Record( - query=("MERGE (u1:User {username: $source_username}) " - "MERGE (u2:User {username: $target_username}) " - "MERGE (u1)-[:RETWEETED]-(u2)"), - parameters={ - "source_username": tweet_dict["source_username"], - "target_username": tweet_dict["target_username"]})) - else: - result_queries.append( - mgp.Record( - query=("MERGE (:User {username: $source_username})"), - parameters={ - "source_username": tweet_dict["source_username"]})) + # TODO: Write a transformation module return result_queries From 62decb84d8977a0406c6bf5394a752abf7f78a0b Mon Sep 17 00:00:00 2001 From: katarinasupe Date: Wed, 23 Feb 2022 13:38:10 +0100 Subject: [PATCH 3/6] Add greenthread --- backend/server/app.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/server/app.py b/backend/server/app.py index fdba8b6..01dcbdd 100644 --- a/backend/server/app.py +++ b/backend/server/app.py @@ -1,4 +1,5 @@ from argparse import ArgumentParser +from eventlet import greenthread from flask import Flask, Response from flask_cors import CORS, cross_origin from flask_socketio import SocketIO @@ -80,5 +81,6 @@ def kafkaconsumer(): @app.before_first_request def execute_this(): - pass - # TODO: Set up Memgraph and start emitting messages to the React app + init_log() + greenthread.spawn(set_up_memgraph_and_kafka()) + greenthread.spawn(kafkaconsumer) From 4f4e105adfc719d9e8dbb63cc98dc2120a7b996e Mon Sep 17 00:00:00 2001 From: katarinasupe Date: Wed, 23 Feb 2022 13:42:42 +0100 Subject: [PATCH 4/6] Add setting up alorithms --- backend/server/setup.py | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/backend/server/setup.py b/backend/server/setup.py index 23912ab..f84cde2 100644 --- a/backend/server/setup.py +++ b/backend/server/setup.py @@ -17,5 +17,39 @@ def connect_to_memgraph(memgraph_ip, memgraph_port): def run(memgraph): - pass - # TODO: Add the graph analytics setup + try: + memgraph.drop_database() + + log.info("Setting up PageRank") + + memgraph.execute("CALL pagerank_online.set(100, 0.2) YIELD *") + memgraph.execute( + """CREATE TRIGGER pagerank_trigger + BEFORE COMMIT + EXECUTE CALL pagerank_online.update(createdVertices, createdEdges, deletedVertices, deletedEdges) YIELD * + SET node.rank = rank + CALL publisher.update_rank(node, rank);""" + ) + + log.info("Setting up community detection") + + memgraph.execute( + "CALL community_detection_online.set(False, False, 0.7, 4.0, 0.1, 'weight', 1.0, 100, 5) YIELD *;" + ) + memgraph.execute( + """CREATE TRIGGER labelrankt_trigger + BEFORE COMMIT + EXECUTE CALL community_detection_online.update(createdVertices, createdEdges, updatedVertices, updatedEdges, deletedVertices, deletedEdges) + YIELD node, community_id + SET node.cluster=community_id + CALL publisher.update_cluster(node, community_id);""" + ) + + log.info("Creating stream connections on Memgraph") + # TODO Create and start stream + log.info("Creating trigger on Memgraph") + # TODO Create trigger + + except Exception as e: + log.info(f"Error on stream and trigger creation: {e}") + pass From 714ac6a4cb3aa07cfe2e46734f3248c224b8388a Mon Sep 17 00:00:00 2001 From: katarinasupe Date: Wed, 23 Feb 2022 13:44:56 +0100 Subject: [PATCH 5/6] Add imports --- backend/server/setup.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/backend/server/setup.py b/backend/server/setup.py index f84cde2..12ee854 100644 --- a/backend/server/setup.py +++ b/backend/server/setup.py @@ -1,4 +1,9 @@ -from gqlalchemy import Memgraph +from gqlalchemy import Memgraph, MemgraphKafkaStream, MemgraphTrigger +from gqlalchemy.models import ( + TriggerEventType, + TriggerEventObject, + TriggerExecutionPhase, +) from time import sleep import logging From 4f763ec8f36170dbbd082ba09e2060bf7a134ccc Mon Sep 17 00:00:00 2001 From: katarinasupe Date: Wed, 23 Feb 2022 13:57:23 +0100 Subject: [PATCH 6/6] Import kafka consumer --- backend/server/app.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/server/app.py b/backend/server/app.py index 01dcbdd..215e87c 100644 --- a/backend/server/app.py +++ b/backend/server/app.py @@ -6,6 +6,7 @@ from functools import wraps import eventlet import json +from kafka import KafkaConsumer import logging import os import server.setup as setup