-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Add Apache Flink Stateful Functions #7749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
c18cc51 to
178593e
Compare
|
Comments for some of the checklist points:
|
This comment has been minimized.
This comment has been minimized.
Diff for 178593e:failed fetching repo "flink-statefun"
unable to find a manifest named "flink-statefun" (in "/tmp/tmp.3oIQ3D5UtJ/oi/library" or as a remote URL)
diff --git a/_bashbrew-arches b/_bashbrew-arches
index e69de29..9845e82 100644
--- a/_bashbrew-arches
+++ b/_bashbrew-arches
@@ -0,0 +1 @@
+flink-statefun:latest @ amd64
diff --git a/_bashbrew-list b/_bashbrew-list
index e69de29..a8480b4 100644
--- a/_bashbrew-list
+++ b/_bashbrew-list
@@ -0,0 +1,3 @@
+flink-statefun:2.0
+flink-statefun:2.0.0
+flink-statefun:latest
diff --git a/_bashbrew.err b/_bashbrew.err
index 90fbeaf..e69de29 100644
--- a/_bashbrew.err
+++ b/_bashbrew.err
@@ -1,6 +0,0 @@
-failed fetching repo "flink-statefun"
-unable to find a manifest named "flink-statefun" (in "/tmp/tmp.3oIQ3D5UtJ/oi/library" or as a remote URL)
-failed fetching repo "flink-statefun"
-unable to find a manifest named "flink-statefun" (in "/tmp/tmp.3oIQ3D5UtJ/oi/library" or as a remote URL)
-failed fetching repo "flink-statefun"
-unable to find a manifest named "flink-statefun" (in "/tmp/tmp.3oIQ3D5UtJ/oi/library" or as a remote URL)
diff --git a/flink-statefun_latest/Dockerfile b/flink-statefun_latest/Dockerfile
new file mode 100644
index 0000000..3df876c
--- /dev/null
+++ b/flink-statefun_latest/Dockerfile
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM flink:1.10.0
+
+ENV STATEFUN_VERSION=2.0.0 \
+ GPG_KEY=1C1E2394D3194E1944613488F320986D35C33D6A
+
+ENV ROLE worker
+ENV MASTER_HOST localhost
+ENV STATEFUN_HOME /opt/statefun
+ENV STATEFUN_MODULES $STATEFUN_HOME/modules
+
+# Cleanup flink-lib
+RUN rm -fr $FLINK_HOME/lib/flink-table*jar
+
+# Copy our distriubtion template
+COPY flink-distribution/ $FLINK_HOME/
+
+# Install Stateful Functions dependencies in Flink lib
+ENV DIST_JAR_URL=https://repo.maven.apache.org/maven2/org/apache/flink/statefun-flink-distribution/${STATEFUN_VERSION}/statefun-flink-distribution-${STATEFUN_VERSION}.jar \
+ DIST_ASC_URL=https://repo.maven.apache.org/maven2/org/apache/flink/statefun-flink-distribution/${STATEFUN_VERSION}/statefun-flink-distribution-${STATEFUN_VERSION}.jar.asc \
+ CORE_JAR_URL=https://repo.maven.apache.org/maven2/org/apache/flink/statefun-flink-core/${STATEFUN_VERSION}/statefun-flink-core-${STATEFUN_VERSION}.jar \
+ CORE_ASC_URL=https://repo.maven.apache.org/maven2/org/apache/flink/statefun-flink-core/${STATEFUN_VERSION}/statefun-flink-core-${STATEFUN_VERSION}.jar.asc
+
+RUN set -ex; \
+ wget -nv -O statefun-flink-distribution.jar "$DIST_JAR_URL"; \
+ wget -nv -O statefun-flink-distribution.jar.asc "$DIST_ASC_URL"; \
+ wget -nv -O statefun-flink-core.jar "$CORE_JAR_URL"; \
+ wget -nv -O statefun-flink-core.jar.asc "$CORE_ASC_URL"; \
+ \
+ export GNUPGHOME="$(mktemp -d)"; \
+ for server in ha.pool.sks-keyservers.net $(shuf -e \
+ hkp://p80.pool.sks-keyservers.net:80 \
+ keyserver.ubuntu.com \
+ hkp://keyserver.ubuntu.com:80 \
+ pgp.mit.edu) ; do \
+ gpg --batch --keyserver "$server" --recv-keys "$GPG_KEY" && break || : ; \
+ done && \
+ gpg --batch --verify statefun-flink-distribution.jar.asc statefun-flink-distribution.jar; \
+ gpg --batch --verify statefun-flink-core.jar.asc statefun-flink-core.jar; \
+ gpgconf --kill all; \
+ rm -rf "$GNUPGHOME" statefun-flink-distribution.jar.asc statefun-flink-core.jar.asc; \
+ \
+ mkdir -p $FLINK_HOME/lib; \
+ mv statefun-flink-distribution.jar $FLINK_HOME/lib; \
+ mv statefun-flink-core.jar $FLINK_HOME/lib;
+
+# add user modules
+USER root
+
+RUN mkdir -p $STATEFUN_MODULES && \
+ useradd --system --home-dir $STATEFUN_HOME --uid=9998 --gid=flink statefun && \
+ chown -R statefun:flink $STATEFUN_HOME && \
+ chmod -R g+rw $STATEFUN_HOME
+
+# entry point
+ADD docker-entry-point.sh /docker-entry-point.sh
+
+ENTRYPOINT ["/docker-entry-point.sh"]
diff --git a/flink-statefun_latest/docker-entry-point.sh b/flink-statefun_latest/docker-entry-point.sh
new file mode 100755
index 0000000..4e502b4
--- /dev/null
+++ b/flink-statefun_latest/docker-entry-point.sh
@@ -0,0 +1,56 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+#
+# Role types
+#
+WORKER="worker"
+MASTER="master"
+
+#
+# Environment
+#
+FLINK_HOME=${FLINK_HOME:-"/opt/flink/bin"}
+ROLE=${ROLE:-"worker"}
+MASTER_HOST=${MASTER_HOST:-"localhost"}
+
+#
+# Start a service depending on the role.
+#
+if [[ "${ROLE}" == "${WORKER}" ]]; then
+ #
+ # start the TaskManager (worker role)
+ #
+ exec ${FLINK_HOME}/bin/taskmanager.sh start-foreground \
+ -Djobmanager.rpc.address=${MASTER_HOST}
+
+elif [[ "${ROLE}" == "${MASTER}" ]]; then
+ #
+ # start the JobManager (master role) with our predefined job.
+ #
+ exec $FLINK_HOME/bin/standalone-job.sh \
+ start-foreground \
+ -Djobmanager.rpc.address=${MASTER_HOST} \
+ "$@"
+else
+ #
+ # unknown role
+ #
+ echo "unknown role ${ROLE}"
+ exit 1
+fi
diff --git a/flink-statefun_latest/flink-distribution/bin/flink-console.sh b/flink-statefun_latest/flink-distribution/bin/flink-console.sh
new file mode 100755
index 0000000..16f97ad
--- /dev/null
+++ b/flink-statefun_latest/flink-distribution/bin/flink-console.sh
@@ -0,0 +1,78 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This file was taken from Apache Flink, and modified to include another entry point
+
+# Start a Flink service as a console application. Must be stopped with Ctrl-C
+# or with SIGTERM by kill or the controlling process.
+USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|statefun) [args]"
+
+SERVICE=$1
+ARGS=("${@:2}") # get remaining arguments as array
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+case ${SERVICE} in
+ (taskexecutor)
+ CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
+ ;;
+
+ (historyserver)
+ CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
+ ;;
+
+ (zookeeper)
+ CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
+ ;;
+
+ (standalonesession)
+ CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
+ ;;
+
+ (standalonejob)
+ CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
+ ;;
+
+ (statefun)
+ CLASS_TO_RUN=org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint
+ ;;
+
+ (*)
+ echo "Unknown service '${SERVICE}'. $USAGE."
+ exit 1
+ ;;
+esac
+
+FLINK_TM_CLASSPATH=`constructFlinkClassPath`
+
+log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
+
+JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
+
+# Only set JVM 8 arguments if we have correctly extracted the version
+if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
+ if [ "$JAVA_VERSION" -lt 18 ]; then
+ JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
+ fi
+fi
+
+echo "Starting $SERVICE as a console application on host $HOSTNAME."
+exec $JAVA_RUN ${JVM_ARGS} ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
diff --git a/flink-statefun_latest/flink-distribution/bin/standalone-job.sh b/flink-statefun_latest/flink-distribution/bin/standalone-job.sh
new file mode 100755
index 0000000..e9def43
--- /dev/null
+++ b/flink-statefun_latest/flink-distribution/bin/standalone-job.sh
@@ -0,0 +1,65 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This file was taken from Apache Flink, and modified to include another entry point
+
+# Start/stop a Flink JobManager.
+USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop [args]"
+
+STARTSTOP=$1
+ENTRY_POINT_NAME="statefun"
+
+if [[ ${STARTSTOP} != "start" ]] && [[ ${STARTSTOP} != "start-foreground" ]] && [[ ${STARTSTOP} != "stop" ]]; then
+ echo ${USAGE}
+ exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# Startup parameters
+ARGS=("--configDir" "${FLINK_CONF_DIR}" "${@:2}")
+
+if [[ ${STARTSTOP} == "start" ]] || [[ ${STARTSTOP} == "start-foreground" ]]; then
+ if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
+ echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`"
+ else
+ flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
+ FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
+ fi
+
+ if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then
+ echo "[ERROR] Configured memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
+ exit 1
+ fi
+
+ if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
+ export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"
+ fi
+
+ # Add cluster entry point specific JVM options
+ export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
+fi
+
+if [[ $STARTSTOP == "start-foreground" ]]; then
+ exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}"
+else
+ "${FLINK_BIN_DIR}"/flink-daemon.sh ${STARTSTOP} ${ENTRY_POINT_NAME} "${ARGS[@]}"
+fi
diff --git a/flink-statefun_latest/flink-distribution/conf/flink-conf.yaml b/flink-statefun_latest/flink-distribution/conf/flink-conf.yaml
new file mode 100644
index 0000000..574ccb0
--- /dev/null
+++ b/flink-statefun_latest/flink-distribution/conf/flink-conf.yaml
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# This file is the base for the Apache Flink configuration
+
+#==============================================================================
+# Configurations strictly required by Stateful Functions. Do not change.
+#==============================================================================
+
+classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
+execution.checkpointing.max-concurrent-checkpoints: 1
+jobmanager.scheduler: legacy
+
+#==============================================================================
+# Recommended configurations. Users may change according to their needs.
+#==============================================================================
+
+state.backend: rocksdb
+state.backend.rocksdb.timer-service.factory: ROCKSDB
+state.checkpoints.dir: file:///checkpoint-dir
+state.backend.incremental: true
+taskmanager.memory.process.size: 4g |
|
Hi @tianon / @yosifkit, just wondering if we could get some feedback on this. As @tzulitai mention, this is an extension of the relatively popular Flink image (page 3 of the official images list these days!), but with a different deployment model that lends itself to being a separate image. We're looking into alternative ways to build and host this ourselves, but we think it's a good candidate to coexist with Flink as an official image. Thanks! |
|
Sorry for the delay, and thank you for your patience with us. 🙏 I have to admit, I'm struggling here with the idea of this being a separate image, especially since it seems to essentially be a framework within Flink, officially supported by the Flink project, and even shares the same official webpage (https://flink.apache.org/). Can you share some additional detail on why you believe it's a good candidate for a separate repository? |
|
@tianon Thank you for getting back to this PR! We're still very excited to have this as an official image. We believe that this should be a separate repository for the following reasons:
I hope that clarifies some things for you! |
Associated docs PR: docker-library/docs#1695.
Checklist for Review
NOTE: This checklist is intended for the use of the Official Images maintainers both to track the status of your PR and to help inform you and others of where we're at. As such, please leave the "checking" of items to the repository maintainers. If there is a point below for which you would like to provide additional information or note completion, please do so by commenting on the PR. Thanks! (and thanks for staying patient with us ❤️)
foobarneeds Node.js, hasFROM node:...instead of grabbingnodevia other means been considered?)FROM scratch, tarballs only exist in a single commit within the associated history?