From bb279064a881bcbb733291072ad92aee1aae0a4e Mon Sep 17 00:00:00 2001 From: guangning Date: Thu, 1 Apr 2021 18:06:27 +0800 Subject: [PATCH 1/2] Add example for python function --- functions/python/README.md | 47 ++++++++++++++++ functions/python/python_function/__init__.py | 0 .../python_function/custom_object_function.py | 53 +++++++++++++++++++ .../python_function/pyserde/__init__.py | 0 .../python/python_function/pyserde/serde.py | 20 +++++++ functions/python/test-producer-consumer.py | 41 ++++++++++++++ 6 files changed, 161 insertions(+) create mode 100644 functions/python/README.md create mode 100755 functions/python/python_function/__init__.py create mode 100755 functions/python/python_function/custom_object_function.py create mode 100755 functions/python/python_function/pyserde/__init__.py create mode 100755 functions/python/python_function/pyserde/serde.py create mode 100644 functions/python/test-producer-consumer.py diff --git a/functions/python/README.md b/functions/python/README.md new file mode 100644 index 0000000..90174e3 --- /dev/null +++ b/functions/python/README.md @@ -0,0 +1,47 @@ +# python-function + +This is an example of how to use the python function. + +## Project Structure + +``` +python_function +├── __init__.py +├── custom_object_function.py +└── pyserde + ├── __init__.py + └── serde.py +``` + +In python, if a folder contains `__init__` file, we call it a package, python_function and pyserde are both packages. Compressing `python_function` directory will generate `python_function.zip`. + +## How to use + +### Start pulsar standalone + +``` +docker run -d -it \ + -p 6650:6650 \ + -p 8080:8080 \ + --name pulsar-standalone \ + apachepulsar/pulsar-all:2.7.1 \ + bin/pulsar standalone +``` + +### Start python function by use zip package +``` +./bin/pulsar-admin functions create \ + --tenant public --namespace default --name my_function \ + --py /YOUR-PATH/python_function.zip \ + --classname python_function.custom_object_function.CustomObjectFunction \ + --custom-serde-inputs '{"input-topic-1":"python_function.pyserde.serde.CustomSerDe","input-topic-2":"python_function.pyserde.serde.CustomSerDe"}' \ + --output-serde-classname python_function.pyserde.serde.CustomSerDe \ + --output output-topic-3 +``` +Replace `YOUR-PATH` with your file path. + +### Run produce and consume + +``` +python test-producer-consumer.py +``` \ No newline at end of file diff --git a/functions/python/python_function/__init__.py b/functions/python/python_function/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/functions/python/python_function/custom_object_function.py b/functions/python/python_function/custom_object_function.py new file mode 100755 index 0000000..b4b0569 --- /dev/null +++ b/functions/python/python_function/custom_object_function.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +from pulsar import Function, SerDe +from python_function.pyserde.serde import MyObject + +#class MyObject(object): +# def __init__(self): +# self.a = 0 +# self.b = 0 + +#class CustomSerDe(SerDe): +# def __init__(self): +# pass + +# def serialize(self, object): +# return ("%d,%d" % (object.a, object.b)).encode('utf-8') + +# def deserialize(self, input_bytes): +# split = str(input_bytes.decode()).split(',') +# retval = MyObject() +# retval.a = int(split[0]) +# retval.b = int(split[1]) +# return retval + +# Function that deals with custom objects +class CustomObjectFunction(Function): + def __init__(self): + pass + + def process(self, input, context): + retval = MyObject() + retval.a = input.a + 11 + retval.b = input.b + 24 + return retval diff --git a/functions/python/python_function/pyserde/__init__.py b/functions/python/python_function/pyserde/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/functions/python/python_function/pyserde/serde.py b/functions/python/python_function/pyserde/serde.py new file mode 100755 index 0000000..5428466 --- /dev/null +++ b/functions/python/python_function/pyserde/serde.py @@ -0,0 +1,20 @@ +from pulsar import SerDe + +class MyObject(object): + def __init__(self): + self.a = 0 + self.b = 0 + +class CustomSerDe(SerDe): + def __init__(self): + pass + + def serialize(self, object): + return ("%d,%d" % (object.a, object.b)).encode('utf-8') + + def deserialize(self, input_bytes): + split = str(input_bytes.decode()).split(',') + retval = MyObject() + retval.a = int(split[0]) + retval.b = int(split[1]) + return retval diff --git a/functions/python/test-producer-consumer.py b/functions/python/test-producer-consumer.py new file mode 100644 index 0000000..9f21955 --- /dev/null +++ b/functions/python/test-producer-consumer.py @@ -0,0 +1,41 @@ +from pulsar import Client, SerDe + +class MyObject(object): + def __init__(self): + self.a = 0 + self.b = 0 + +class CustomSerDe(SerDe): + def __init__(self): + pass + def serialize(self, object): + return ("%d,%d" % (object.a, object.b)).encode('utf-8') + def deserialize(self, input_bytes): + split = str(input_bytes.decode()).split(',') + retval = MyObject() + retval.a = int(split[0]) + retval.b = int(split[1]) + return retval + +client = Client('pulsar://localhost:6650') +producer1 = client.create_producer(topic='input-topic-1') +producer2 = client.create_producer(topic='input-topic-2') +consumer = client.subscribe( + topic='output-topic-3', + subscription_name='my-subscription') + +ser = CustomSerDe() +for i in range(100): + dataObject = MyObject() + dataObject.a = i + dataObject.b = i + 100 + d = ser.serialize(dataObject) + producer1.send(d) + producer2.send(d) + print('send msg "%d", "%d"' % (dataObject.a, dataObject.b)) +for i in range(100): + msg = consumer.receive() + consumer.acknowledge(msg) + ex = msg.value() + d = ser.deserialize(ex) + print('receive and ack msg "%d", "%d"' % (d.a, d.b)) From d114f502af784aa14c52f7fbb48a9271b56676ed Mon Sep 17 00:00:00 2001 From: guangning Date: Thu, 1 Apr 2021 18:09:04 +0800 Subject: [PATCH 2/2] Delete no used code --- .../python_function/custom_object_function.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/functions/python/python_function/custom_object_function.py b/functions/python/python_function/custom_object_function.py index b4b0569..1459fc8 100755 --- a/functions/python/python_function/custom_object_function.py +++ b/functions/python/python_function/custom_object_function.py @@ -22,25 +22,6 @@ from pulsar import Function, SerDe from python_function.pyserde.serde import MyObject -#class MyObject(object): -# def __init__(self): -# self.a = 0 -# self.b = 0 - -#class CustomSerDe(SerDe): -# def __init__(self): -# pass - -# def serialize(self, object): -# return ("%d,%d" % (object.a, object.b)).encode('utf-8') - -# def deserialize(self, input_bytes): -# split = str(input_bytes.decode()).split(',') -# retval = MyObject() -# retval.a = int(split[0]) -# retval.b = int(split[1]) -# return retval - # Function that deals with custom objects class CustomObjectFunction(Function): def __init__(self):