From 7bfd892f7f9b78df3082c54c11dd198f9f5f5067 Mon Sep 17 00:00:00 2001 From: Tequilac <58514571+tequilac@users.noreply.github.com> Date: Sat, 7 Jan 2023 17:38:50 +0100 Subject: [PATCH] Add knative command --- examples/KNative/README.md | 30 ++++++++ examples/KNative/workflow.json | 37 +++++++++ functions/index.js | 2 + functions/knative/kNativeCommand.js | 114 ++++++++++++++++++++++++++++ 4 files changed, 183 insertions(+) create mode 100644 examples/KNative/README.md create mode 100644 examples/KNative/workflow.json create mode 100644 functions/knative/kNativeCommand.js diff --git a/examples/KNative/README.md b/examples/KNative/README.md new file mode 100644 index 0000000..a436c6b --- /dev/null +++ b/examples/KNative/README.md @@ -0,0 +1,30 @@ +# Knative example + +This example demonstrates using the hyperflow engine to deploy and invoke Knative function workflows. + +## Prerequisites + +- [**kind**](https://kind.sigs.k8s.io/docs/user/quick-start) or [**minikube**](https://minikube.sigs.k8s.io/docs/start/) +to run a local Kubernetes cluster +- [**kubectl**](https://kubernetes.io/docs/tasks/tools/) - the Kubernetes CLI +- [**kn**](https://kubernetes.io/docs/tasks/tools/) - the knative CLI + +After installing those tools install [**kn quickstart**](https://kubernetes.io/docs/tasks/tools/) - a Knative quickstart +plugin and run one of the below commands depending on the used K8s distribution: +``` +kn quickstart minikube +``` +``` +kn quickstart kind +``` +Next follow the steps recommended by the command's output. + + +Once the cluster is set up you are ready to run this example workflow. + +## Parameters + +- `name` - name under which the service will be executed +- `image` - address of the image of the application +- `namespace` - Kubernetes namespace in which the service and its' pod will be created +(when none is provided the default namespace is used) diff --git a/examples/KNative/workflow.json b/examples/KNative/workflow.json new file mode 100644 index 0000000..c08e1fc --- /dev/null +++ b/examples/KNative/workflow.json @@ -0,0 +1,37 @@ +{ + "name": "knative-example", + "processes": [ + { + "name": "knative-example", + "type": "dataflow", + "firingLimit": 1, + "function": "kNativeCommand", + "config": { + "image": "tequilac/line-counter", + "namespace": "exns" + }, + "ins": [ + "dataUrl" + ], + "outs": [ + "results" + ] + } + ], + "signals": [ + { + "name": "dataUrl", + "data": [[ + "http://raw.githubusercontent.com/elastic/examples/master/Common%20Data%20Formats/apache_logs/apache_logs" + ]] + }, { + "name": "results" + } + ], + "ins": [ + "dataUrl" + ], + "outs": [ + "results" + ] +} diff --git a/functions/index.js b/functions/index.js index 4476ae1..8c8e58c 100644 --- a/functions/index.js +++ b/functions/index.js @@ -6,6 +6,7 @@ var cmd = require('./command.js'), commandLocalMock = require('./commandLocalMock.js'), redisCommand = require('./redisCommand.js').redisCommand, k8sCommand = require('./kubernetes/k8sCommand.js').k8sCommand, + kNativeCommand = require('./knative/kNativeCommand').kNativeCommand, bojK8sCommand = require('./kubernetes/bojK8sCommand.js').bojK8sCommand; function print(ins, outs, context, cb) { @@ -190,4 +191,5 @@ exports.genCollection = genCollection; exports.commandLocalMock = commandLocalMock.commandLocalMock; exports.redisCommand = redisCommand; exports.k8sCommand = k8sCommand; +exports.kNativeCommand = kNativeCommand; exports.bojK8sCommand = bojK8sCommand; diff --git a/functions/knative/kNativeCommand.js b/functions/knative/kNativeCommand.js new file mode 100644 index 0000000..faaa81a --- /dev/null +++ b/functions/knative/kNativeCommand.js @@ -0,0 +1,114 @@ +// Runs a service in a KNative cluster + +const k8s = require('@kubernetes/client-node'); +const yaml = require('js-yaml'); +const axios = require("axios"); + +SERVICE_YAML_TEMPLATE = ` +apiVersion: serving.knative.dev/v1 +kind: Service +metadata: + name: {name} + namespace: {namespace} +spec: + template: + spec: + containers: + - image: docker.io/{image} + env:{dataParams}`; + + +const interpolate = (tpl, args) => tpl.replace(/{(\w+)}/g, (_, v) => args[v]); + +async function kNativeCommand(ins, outs, context, cb) { + + function createData() { + const data = ins[0].data[0]; + const dataString = ` + - name: {key} + value: "{value}"`; + let amountParams = { + key: "DATA_NUM", + value: data.length + } + let result = interpolate(dataString, amountParams); + for (let i = 0; i < data.length; i++) { + let dataParams = { + key: `DATA${i}`, + value: data[i] + } + result += interpolate(dataString, dataParams); + } + return result; + } + + async function execute(spec, client, url) { + const startTime = Date.now(); + axios({ + method: 'get', + url: url, + headers: {'content-type': 'application/json'} + }).then(async response => { + const endTime = Date.now(); + console.log(`Execution time: ${(endTime - startTime) / 1000} seconds`); + const json = response.data; + console.log(json); + await deleteService(spec, client); + outs[0].data = json; + cb(null, outs); + }).catch(() => { + console.log("Waiting for pod to become ready"); + setTimeout(() => execute(spec, client, url), 5000); + }); + } + + async function getCondition(spec, client, name, url) { + response = await client.read({ + apiVersion: "apps/v1", + kind: "Deployment", + metadata: { + name: `${name}-deployment`, + namespace: context.namespace ? context.namespace : "default", + } + }); + const condition = response.body.status.conditions[0].type; + console.log(`Current condition: ${condition}`); + if (condition !== "Available") { + setTimeout(() => getCondition(spec, client, name, url), 1000); + } else { + console.log("Executing..."); + await execute(spec, client, url); + } + } + + async function deleteService(spec, client) { + console.log("Deleting..."); + await client.delete(spec); + console.log("Service deleted"); + } + + async function scheduleExecution(spec, client) { + let response = await client.read(spec); + const url = response.body.status.url; + console.log("Obtained service url: " + url); + setTimeout(() => getCondition(spec, client, response.body.status.latestCreatedRevisionName, url), 1000); + } + + const kubeconfig = new k8s.KubeConfig(); + kubeconfig.loadFromDefault(); + + const params = { + name: context.name, + namespace: context.namespace ? context.namespace : "default", + image: context.image, + dataParams: createData(ins) + } + + const spec = yaml.safeLoad(interpolate(SERVICE_YAML_TEMPLATE, params)); + + const client = k8s.KubernetesObjectApi.makeApiClient(kubeconfig); + let response = await client.create(spec); + setTimeout(() => scheduleExecution(spec, client, outs, cb), 3000); +} + +exports.kNativeCommand = kNativeCommand;