diff --git a/backend/server/app.py b/backend/server/app.py index e0c5883..215e87c 100644 --- a/backend/server/app.py +++ b/backend/server/app.py @@ -2,11 +2,11 @@ from eventlet import greenthread from flask import Flask, Response from flask_cors import CORS, cross_origin -from flask_socketio import SocketIO, emit +from flask_socketio import SocketIO from functools import wraps -from kafka import KafkaConsumer import eventlet import json +from kafka import KafkaConsumer import logging import os import server.setup as setup @@ -76,24 +76,8 @@ def get_health(): def kafkaconsumer(): - consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_IP + ":" + KAFKA_PORT) - try: - while True: - msg_pack = consumer.poll() - if not msg_pack: - greenthread.sleep(1) - continue - for _, messages in msg_pack.items(): - for message in messages: - message = json.loads(message.value.decode("utf8")) - log.info("Message: " + str(message)) - try: - socketio.emit("consumer", {"data": message}) - except Exception as error: - log.info(f"`{message}`, {repr(error)}") - continue - except KeyboardInterrupt: - pass + pass + # TODO: Await messages from the Kafka topic @app.before_first_request diff --git a/backend/server/setup.py b/backend/server/setup.py index e00931b..12ee854 100644 --- a/backend/server/setup.py +++ b/backend/server/setup.py @@ -26,6 +26,7 @@ def run(memgraph): memgraph.drop_database() log.info("Setting up PageRank") + memgraph.execute("CALL pagerank_online.set(100, 0.2) YIELD *") memgraph.execute( """CREATE TRIGGER pagerank_trigger @@ -36,6 +37,7 @@ def run(memgraph): ) log.info("Setting up community detection") + memgraph.execute( "CALL community_detection_online.set(False, False, 0.7, 4.0, 0.1, 'weight', 1.0, 100, 5) YIELD *;" ) @@ -49,24 +51,9 @@ def run(memgraph): ) log.info("Creating stream connections on Memgraph") - stream = MemgraphKafkaStream( - name="retweets", - topics=["retweets"], - transform="twitter.tweet", - bootstrap_servers="'kafka:9092'", - ) - memgraph.create_stream(stream) - memgraph.start_stream(stream) - - log.info("Creating triggers on Memgraph") - trigger = MemgraphTrigger( - name="created_trigger", - event_type=TriggerEventType.CREATE, - event_object=TriggerEventObject.ALL, - execution_phase=TriggerExecutionPhase.AFTER, - statement="CALL publisher.create(createdObjects)", - ) - memgraph.create_trigger(trigger) + # TODO Create and start stream + log.info("Creating trigger on Memgraph") + # TODO Create trigger except Exception as e: log.info(f"Error on stream and trigger creation: {e}") diff --git a/memgraph/transformations/twitter.py b/memgraph/transformations/twitter.py index 80e1688..dce5f78 100644 --- a/memgraph/transformations/twitter.py +++ b/memgraph/transformations/twitter.py @@ -1,28 +1,13 @@ +from email.errors import MessageError import mgp import json @mgp.transformation -def tweet(messages: mgp.Messages - ) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]): +def tweet( + messages: mgp.Messages, +) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]): result_queries = [] - for i in range(messages.total_messages()): - message = messages.message_at(i) - tweet_dict = json.loads(message.payload().decode('utf8')) - if tweet_dict["target_username"]: - result_queries.append( - mgp.Record( - query=("MERGE (u1:User {username: $source_username}) " - "MERGE (u2:User {username: $target_username}) " - "MERGE (u1)-[:RETWEETED]-(u2)"), - parameters={ - "source_username": tweet_dict["source_username"], - "target_username": tweet_dict["target_username"]})) - else: - result_queries.append( - mgp.Record( - query=("MERGE (:User {username: $source_username})"), - parameters={ - "source_username": tweet_dict["source_username"]})) + # TODO: Write a transformation module return result_queries