From 1614f4c947b71f176aaa4e3fad840437e82f8e30 Mon Sep 17 00:00:00 2001 From: Jorge Cortes Date: Fri, 21 Feb 2025 12:07:13 -0500 Subject: [PATCH] [Components] langfuse - new components --- .../actions/add-feedback/add-feedback.mjs | 85 ++++++++ .../langfuse/actions/log-trace/log-trace.mjs | 126 +++++++++++ components/langfuse/common/constants.mjs | 40 ++++ components/langfuse/common/utils.mjs | 41 ++++ components/langfuse/langfuse.app.mjs | 196 +++++++++++++++++- components/langfuse/package.json | 6 +- .../langfuse/sources/common/polling.mjs | 109 ++++++++++ .../new-score-received/new-score-received.mjs | 40 ++++ .../sources/new-score-received/test-event.mjs | 22 ++ .../new-trace-received/new-trace-received.mjs | 40 ++++ .../sources/new-trace-received/test-event.mjs | 16 ++ pnpm-lock.yaml | 26 ++- 12 files changed, 734 insertions(+), 13 deletions(-) create mode 100644 components/langfuse/actions/add-feedback/add-feedback.mjs create mode 100644 components/langfuse/actions/log-trace/log-trace.mjs create mode 100644 components/langfuse/common/constants.mjs create mode 100644 components/langfuse/common/utils.mjs create mode 100644 components/langfuse/sources/common/polling.mjs create mode 100644 components/langfuse/sources/new-score-received/new-score-received.mjs create mode 100644 components/langfuse/sources/new-score-received/test-event.mjs create mode 100644 components/langfuse/sources/new-trace-received/new-trace-received.mjs create mode 100644 components/langfuse/sources/new-trace-received/test-event.mjs diff --git a/components/langfuse/actions/add-feedback/add-feedback.mjs b/components/langfuse/actions/add-feedback/add-feedback.mjs new file mode 100644 index 0000000000000..c6dd71fb9438e --- /dev/null +++ b/components/langfuse/actions/add-feedback/add-feedback.mjs @@ -0,0 +1,85 @@ +import constants from "../../common/constants.mjs"; +import app from "../../langfuse.app.mjs"; + +export default { + key: "langfuse-add-feedback", + name: "Add Feedback", + description: "Attach user feedback to an existing trace in Langfuse. [See the documentation](https://api.reference.langfuse.com/#tag/comments/POST/api/public/comments).", + version: "0.0.1", + type: "action", + props: { + app, + projectId: { + propDefinition: [ + app, + "projectId", + ], + }, + objectType: { + propDefinition: [ + app, + "objectType", + ], + }, + objectId: { + propDefinition: [ + app, + "objectId", + ({ objectType }) => ({ + objectType, + }), + ], + }, + content: { + type: "string", + label: "Content", + description: "The content of the comment. May include markdown. Currently limited to 3000 characters.", + }, + }, + methods: { + addFeedback(args = {}) { + return this.app.post({ + path: "/comments", + ...args, + }); + }, + async getObjectId() { + const { + app, + objectType, + objectId, + } = this; + if (objectType == constants.OBJECT_TYPE.PROMPT) { + const prompt = await app.getPrompt({ + promptName: objectId, + }); + return prompt?.id; + } + return objectId; + }, + }, + async run({ $ }) { + const { + getObjectId, + addFeedback, + projectId, + objectType, + content, + } = this; + + const objectId = await getObjectId(); + + const response = await addFeedback({ + $, + data: { + projectId, + objectType, + objectId, + content, + }, + }); + + $.export("$summary", "Successfully added feedback."); + return response; + }, +}; diff --git a/components/langfuse/actions/log-trace/log-trace.mjs b/components/langfuse/actions/log-trace/log-trace.mjs new file mode 100644 index 0000000000000..36584c094619e --- /dev/null +++ b/components/langfuse/actions/log-trace/log-trace.mjs @@ -0,0 +1,126 @@ +import { v4 as uuid } from "uuid"; +import app from "../../langfuse.app.mjs"; +import constants from "../../common/constants.mjs"; +import utils from "../../common/utils.mjs"; + +export default { + key: "langfuse-log-trace", + name: "Log Trace", + description: "Log a new trace in LangFuse with details. [See the documentation](https://api.reference.langfuse.com/#tag/ingestion/POST/api/public/ingestion).", + version: "0.0.1", + type: "action", + props: { + app, + name: { + type: "string", + label: "Name", + description: "The name of the trace", + }, + input: { + type: "string", + label: "Input", + description: "The input of the trace", + }, + output: { + type: "string", + label: "Output", + description: "The output of the trace", + }, + userId: { + type: "string", + label: "User ID", + description: "The ID of the user", + optional: true, + }, + sessionId: { + label: "Session ID", + description: "The ID of the session", + optional: true, + propDefinition: [ + app, + "objectId", + () => ({ + objectType: constants.OBJECT_TYPE.SESSION, + }), + ], + }, + release: { + type: "string", + label: "Release", + description: "The release of the trace", + optional: true, + }, + version: { + type: "string", + label: "Version", + description: "The version of the trace", + optional: true, + }, + metadata: { + type: "string", + label: "Metadata", + description: "The metadata of the trace", + optional: true, + }, + tags: { + type: "string[]", + label: "Tags", + description: "The tags of the trace", + optional: true, + }, + }, + methods: { + batchIngestion(args = {}) { + return this.app.post({ + path: "/ingestion", + ...args, + }); + }, + }, + async run({ $ }) { + const { + batchIngestion, + name, + userId, + input, + output, + sessionId, + release, + version, + metadata, + tags, + } = this; + + const timestamp = new Date().toISOString(); + const id = uuid(); + + const response = await batchIngestion({ + $, + data: { + batch: [ + { + id, + timestamp, + type: constants.INGESTION_TYPE.TRACE_CREATE, + body: { + id, + timestamp, + name, + userId, + input: utils.parseJson(input), + output: utils.parseJson(output), + sessionId, + release, + version, + metadata: utils.parseJson(metadata), + tags, + public: true, + }, + }, + ], + }, + }); + $.export("$summary", "Successfully logged a new trace"); + return response; + }, +}; diff --git a/components/langfuse/common/constants.mjs b/components/langfuse/common/constants.mjs new file mode 100644 index 0000000000000..415403567a1ad --- /dev/null +++ b/components/langfuse/common/constants.mjs @@ -0,0 +1,40 @@ +const REGION_PLACEHOLDER = "{region}"; +const BASE_URL = "https://{region}.langfuse.com"; +const VERSION_PATH = "/api/public"; + +const INGESTION_TYPE = { + TRACE_CREATE: "trace-create", + SCORE_CREATE: "score-create", + SPAN_CREATE: "span-create", + SPAN_UPDATE: "span-update", + GENERATION_CREATE: "generation-create", + GENERATION_UPDATE: "generation-update", + EVENT_CREATE: "event-create", + SDK_LOG: "sdk-log", + OBSERVATION_CREATE: "observation-create", + OBSERVATION_UPDATE: "observation-update", +}; + +const LAST_DATE_AT = "lastDateAt"; +const IS_FIRST_RUN = "isFirstRun"; +const DEFAULT_LIMIT = 100; +const DEFAULT_MAX = 1000; + +const OBJECT_TYPE = { + TRACE: "TRACE", + OBSERVATION: "OBSERVATION", + SESSION: "SESSION", + PROMPT: "PROMPT", +}; + +export default { + REGION_PLACEHOLDER, + BASE_URL, + VERSION_PATH, + INGESTION_TYPE, + LAST_DATE_AT, + IS_FIRST_RUN, + DEFAULT_LIMIT, + DEFAULT_MAX, + OBJECT_TYPE, +}; diff --git a/components/langfuse/common/utils.mjs b/components/langfuse/common/utils.mjs new file mode 100644 index 0000000000000..970052fbeef83 --- /dev/null +++ b/components/langfuse/common/utils.mjs @@ -0,0 +1,41 @@ +async function iterate(iterations) { + const items = []; + for await (const item of iterations) { + items.push(item); + } + return items; +} + +function getNestedProperty(obj, propertyString) { + const properties = propertyString.split("."); + return properties.reduce((prev, curr) => prev?.[curr], obj); +} + +const parseJson = (input) => { + const parse = (value) => { + if (typeof(value) === "string") { + try { + return parseJson(JSON.parse(value)); + } catch (e) { + return value; + } + } else if (typeof(value) === "object" && value !== null) { + return Object.entries(value) + .reduce((acc, [ + key, + val, + ]) => Object.assign(acc, { + [key]: parse(val), + }), {}); + } + return value; + }; + + return parse(input); +}; + +export default { + iterate, + getNestedProperty, + parseJson, +}; diff --git a/components/langfuse/langfuse.app.mjs b/components/langfuse/langfuse.app.mjs index 083bdbf5ace7f..6729c9d0cf1fd 100644 --- a/components/langfuse/langfuse.app.mjs +++ b/components/langfuse/langfuse.app.mjs @@ -1,11 +1,199 @@ +import { axios } from "@pipedream/platform"; +import utils from "./common/utils.mjs"; +import constants from "./common/constants.mjs"; + export default { type: "app", app: "langfuse", - propDefinitions: {}, + propDefinitions: { + projectId: { + type: "string", + label: "Trace ID", + description: "The ID of the trace to attach feedback to or to filter by for events.", + async options() { + const { data } = await this.listProjects(); + return data.map(({ + id: value, name: label, + }) => ({ + label, + value, + })); + }, + }, + objectType: { + type: "string", + label: "Object Type", + description: "The type of object to attach feedback to.", + options: Object.values(constants.OBJECT_TYPE), + }, + objectId: { + type: "string", + label: "Object ID", + description: "The id of the object to attach the comment to. If this does not reference a valid existing object, an error will be thrown.", + async options({ + objectType, page, + }) { + if (!objectType) { + return []; + } + + const resourcesFn = { + [constants.OBJECT_TYPE.TRACE]: this.listTraces, + [constants.OBJECT_TYPE.OBSERVATION]: this.listObservations, + [constants.OBJECT_TYPE.SESSION]: this.listSessions, + [constants.OBJECT_TYPE.PROMPT]: this.listPrompts, + }[objectType]; + + if (!resourcesFn) { + return []; + } + + const { data } = await resourcesFn({ + params: { + page: page + 1, + limit: constants.DEFAULT_LIMIT, + }, + }); + + return data.map(({ + id: value, name: label, + }) => ({ + label, + value: value || label, + })); + }, + }, + }, methods: { - // this.$auth contains connected account data - authKeys() { - console.log(Object.keys(this.$auth)); + getUrl(path) { + const baseUrl = constants.BASE_URL + .replace(constants.REGION_PLACEHOLDER, this.$auth.region); + return `${baseUrl}${constants.VERSION_PATH}${path}`; + }, + getAuth() { + const { + public_key: username, + secret_key: password, + } = this.$auth; + return { + username, + password, + }; + }, + _makeRequest({ + $ = this, path, ...args + } = {}) { + return axios($, { + ...args, + url: this.getUrl(path), + auth: this.getAuth(), + headers: { + "Content-Type": "application/json", + }, + }); + }, + post(args = {}) { + return this._makeRequest({ + method: "POST", + ...args, + }); + }, + listProjects(args = {}) { + return this._makeRequest({ + path: "/projects", + ...args, + }); + }, + listTraces(args = {}) { + return this._makeRequest({ + path: "/traces", + ...args, + }); + }, + listScores(args = {}) { + return this._makeRequest({ + path: "/scores", + ...args, + }); + }, + listObservations(args = {}) { + return this._makeRequest({ + path: "/observations", + ...args, + }); + }, + listSessions(args = {}) { + return this._makeRequest({ + path: "/sessions", + ...args, + }); + }, + listPrompts(args = {}) { + return this._makeRequest({ + path: "/v2/prompts", + ...args, + }); + }, + getPrompt({ + promptName, ...args + } = {}) { + return this._makeRequest({ + path: `/v2/prompts/${encodeURIComponent(promptName)}`, + ...args, + }); + }, + async *getIterations({ + resourcesFn, resourcesFnArgs, resourceName, + lastDateAt, dateField, + max = constants.DEFAULT_MAX, + }) { + let page = 1; + let resourcesCount = 0; + + while (true) { + const response = + await resourcesFn({ + ...resourcesFnArgs, + params: { + ...resourcesFnArgs?.params, + page, + limit: constants.DEFAULT_LIMIT, + }, + }); + + const nextResources = utils.getNestedProperty(response, resourceName); + + if (!nextResources?.length) { + console.log("No more resources found"); + return; + } + + for (const resource of nextResources) { + const isDateGreater = + lastDateAt + && Date.parse(resource[dateField]) >= Date.parse(lastDateAt); + + if (!lastDateAt || isDateGreater) { + yield resource; + resourcesCount += 1; + } + + if (resourcesCount >= max) { + console.log("Reached max resources"); + return; + } + } + + if (nextResources.length < constants.DEFAULT_LIMIT) { + console.log("No next page found"); + return; + } + + page += 1; + } + }, + paginate(args = {}) { + return utils.iterate(this.getIterations(args)); }, }, }; diff --git a/components/langfuse/package.json b/components/langfuse/package.json index e51adcc6c3690..fb64f91fb3225 100644 --- a/components/langfuse/package.json +++ b/components/langfuse/package.json @@ -1,6 +1,6 @@ { "name": "@pipedream/langfuse", - "version": "0.0.1", + "version": "0.1.0", "description": "Pipedream Langfuse Components", "main": "langfuse.app.mjs", "keywords": [ @@ -11,5 +11,9 @@ "author": "Pipedream (https://pipedream.com/)", "publishConfig": { "access": "public" + }, + "dependencies": { + "@pipedream/platform": "^3.0.3", + "uuid": "^11.1.0" } } diff --git a/components/langfuse/sources/common/polling.mjs b/components/langfuse/sources/common/polling.mjs new file mode 100644 index 0000000000000..71828dd527e62 --- /dev/null +++ b/components/langfuse/sources/common/polling.mjs @@ -0,0 +1,109 @@ +import { + ConfigurationError, + DEFAULT_POLLING_SOURCE_TIMER_INTERVAL, +} from "@pipedream/platform"; +import app from "../../langfuse.app.mjs"; +import constants from "../../common/constants.mjs"; + +export default { + props: { + app, + db: "$.service.db", + timer: { + type: "$.interface.timer", + label: "Polling Schedule", + description: "How often to poll the API", + default: { + intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL, + }, + }, + }, + hooks: { + deploy() { + this.setIsFirstRun(true); + }, + }, + methods: { + generateMeta() { + throw new ConfigurationError("generateMeta is not implemented"); + }, + setIsFirstRun(value) { + this.db.set(constants.IS_FIRST_RUN, value); + }, + getIsFirstRun() { + return this.db.get(constants.IS_FIRST_RUN); + }, + setLastDateAt(value) { + this.db.set(constants.LAST_DATE_AT, value); + }, + getLastDateAt() { + return this.db.get(constants.LAST_DATE_AT); + }, + getDateField() { + throw new ConfigurationError("getDateField is not implemented"); + }, + getResourceName() { + throw new ConfigurationError("getResourceName is not implemented"); + }, + getResourcesFn() { + throw new ConfigurationError("getResourcesFn is not implemented"); + }, + getResourcesFnArgs() { + throw new ConfigurationError("getResourcesFnArgs is not implemented"); + }, + processResource(resource) { + const meta = this.generateMeta(resource); + this.$emit(resource, meta); + }, + }, + async run() { + const { + app, + getDateField, + getLastDateAt, + getResourcesFn, + getResourcesFnArgs, + getResourceName, + processResource, + getIsFirstRun, + setIsFirstRun, + setLastDateAt, + } = this; + + const isFirstRun = getIsFirstRun(); + const dateField = getDateField(); + const lastDateAt = getLastDateAt(); + + const otherArgs = isFirstRun + ? { + max: constants.DEFAULT_LIMIT, + } + : { + dateField, + lastDateAt, + }; + + const resources = await app.paginate({ + resourcesFn: getResourcesFn(), + resourcesFnArgs: getResourcesFnArgs(), + resourceName: getResourceName(), + ...otherArgs, + }); + + if (resources.length) { + const [ + firstResource, + ] = Array.from(resources); + if (firstResource) { + setLastDateAt(firstResource[dateField]); + } + } + + Array.from(resources) + .forEach(processResource); + + if (isFirstRun) { + setIsFirstRun(false); + } + }, +}; diff --git a/components/langfuse/sources/new-score-received/new-score-received.mjs b/components/langfuse/sources/new-score-received/new-score-received.mjs new file mode 100644 index 0000000000000..3b5c5e28de368 --- /dev/null +++ b/components/langfuse/sources/new-score-received/new-score-received.mjs @@ -0,0 +1,40 @@ +import common from "../common/polling.mjs"; +import sampleEmit from "./test-event.mjs"; + +export default { + ...common, + key: "langfuse-new-score-received", + name: "New Score Received", + description: "Emit new event when user feedback (score) is submitted on a trace in Langfuse. [See the documentation](https://api.reference.langfuse.com/#tag/score/GET/api/public/scores).", + version: "0.0.1", + type: "source", + dedupe: "unique", + methods: { + ...common.methods, + getDateField() { + return "createdAt"; + }, + getResourceName() { + return "data"; + }, + getResourcesFn() { + return this.app.listScores; + }, + getResourcesFnArgs() { + return { + params: { + orderBy: "createdAt.desc", + fromTimestamp: this.getLastDateAt(), + }, + }; + }, + generateMeta(resource) { + return { + id: resource.id, + summary: `New Score: ${resource.name}`, + ts: Date.parse(resource.createdAt), + }; + }, + }, + sampleEmit, +}; diff --git a/components/langfuse/sources/new-score-received/test-event.mjs b/components/langfuse/sources/new-score-received/test-event.mjs new file mode 100644 index 0000000000000..ef3a13badc55d --- /dev/null +++ b/components/langfuse/sources/new-score-received/test-event.mjs @@ -0,0 +1,22 @@ +export default { + "dataType": "NUMERIC", + "trace": { + "userId": null, + "tags": [ + "…" + ] + }, + "value": 1, + "id": "132", + "traceId": "12321", + "name": "Test 1", + "source": "ANNOTATION", + "observationId": null, + "timestamp": "2025-02-20T16:54:05.109Z", + "createdAt": "2025-02-20T16:54:05.109Z", + "updatedAt": "2025-02-20T16:54:05.109Z", + "authorUserId": null, + "comment": null, + "configId": null, + "queueId": null +}; diff --git a/components/langfuse/sources/new-trace-received/new-trace-received.mjs b/components/langfuse/sources/new-trace-received/new-trace-received.mjs new file mode 100644 index 0000000000000..3e03286f84b05 --- /dev/null +++ b/components/langfuse/sources/new-trace-received/new-trace-received.mjs @@ -0,0 +1,40 @@ +import common from "../common/polling.mjs"; +import sampleEmit from "./test-event.mjs"; + +export default { + ...common, + key: "langfuse-new-trace-received", + name: "New Trace Received", + description: "Emit new event when a new trace is recorded in Langfuse. [See the documentation](https://api.reference.langfuse.com/#tag/trace/GET/api/public/traces).", + version: "0.0.1", + type: "source", + dedupe: "unique", + methods: { + ...common.methods, + getDateField() { + return "timestamp"; + }, + getResourceName() { + return "data"; + }, + getResourcesFn() { + return this.app.listTraces; + }, + getResourcesFnArgs() { + return { + params: { + orderBy: "timestamp.desc", + fromTimestamp: this.getLastDateAt(), + }, + }; + }, + generateMeta(resource) { + return { + id: resource.id, + summary: `New Trace: ${resource.name}`, + ts: Date.parse(resource.timestamp), + }; + }, + }, + sampleEmit, +}; diff --git a/components/langfuse/sources/new-trace-received/test-event.mjs b/components/langfuse/sources/new-trace-received/test-event.mjs new file mode 100644 index 0000000000000..2f172305f0970 --- /dev/null +++ b/components/langfuse/sources/new-trace-received/test-event.mjs @@ -0,0 +1,16 @@ +export default { + id: "123", + timestamp: "2025-02-20T16:54:05.109Z", + name: "New Trace Received", + input: "2+2", + output: "4", + sessionId: "abc", + release: "1.0.0", + version: "1.0.0", + userId: "123", + metadata: "metadata", + tags: [ + "test1" + ], + public: true, +}; \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f2daf306f5cef..7bc7c797e4e90 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -796,8 +796,7 @@ importers: specifier: ^8.3.2 version: 8.3.2 - components/anchor_browser: - specifiers: {} + components/anchor_browser: {} components/annature: {} @@ -3104,8 +3103,7 @@ importers: specifier: ^1.5.1 version: 1.6.6 - components/databricks_oauth: - specifiers: {} + components/databricks_oauth: {} components/datadog: dependencies: @@ -5571,8 +5569,7 @@ importers: specifier: ^3.0.0 version: 3.0.3 - components/griptape: - specifiers: {} + components/griptape: {} components/grist: dependencies: @@ -6831,7 +6828,14 @@ importers: specifier: ^3.0.3 version: 3.0.3 - components/langfuse: {} + components/langfuse: + dependencies: + '@pipedream/platform': + specifier: ^3.0.3 + version: 3.0.3 + uuid: + specifier: ^11.1.0 + version: 11.1.0 components/laposta: dependencies: @@ -28085,6 +28089,10 @@ packages: resolution: {integrity: sha512-508e6IcKLrhxKdBbcA2b4KQZlLVp2+J5UwQ6F7Drckkc5N9ZJwFa4TgWtsww9UG8fGHbm6gbV19TdM5pQ4GaIA==} hasBin: true + uuid@11.1.0: + resolution: {integrity: sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==} + hasBin: true + uuid@3.3.2: resolution: {integrity: sha512-yXJmeNaw3DnnKAOKJE51sL/ZaYfWJRl1pK9dr19YFCu0ObS231AB1/LbqTKRAQ5kw8A90rA6fr4riOUpTZvQZA==} deprecated: Please upgrade to version 7 or higher. Older versions may use Math.random() in certain circumstances, which is known to be problematic. See https://v8.dev/blog/math-random for details. @@ -39451,7 +39459,7 @@ snapshots: jsonwebtoken: 9.0.2 jwks-rsa: 3.1.0 node-forge: 1.3.1 - uuid: 11.0.3 + uuid: 11.0.5 optionalDependencies: '@google-cloud/firestore': 7.11.0 '@google-cloud/storage': 7.14.0 @@ -47297,6 +47305,8 @@ snapshots: uuid@11.0.5: {} + uuid@11.1.0: {} + uuid@3.3.2: {} uuid@3.4.0: {}