|
74 | 74 | "\n",
|
75 | 75 | "Kafka is primarily a distributed event-streaming platform which provides scalable and fault-tolerant streaming data across data pipelines. It is an essential technical component of a plethora of major enterprises where mission-critical data delivery is a primary requirement.\n",
|
76 | 76 | "\n",
|
77 |
| - "**NOTE:** A basic understanding of the [kafka components](https://kafka.apache.org/documentation/#intro_concepts_and_terms) will help you in following the tutorial with ease.", |
| 77 | + "**NOTE:** A basic understanding of the [kafka components](https://kafka.apache.org/documentation/#intro_concepts_and_terms) will help you in following the tutorial with ease.\n", |
78 | 78 | "\n",
|
79 | 79 | "**NOTE:** A Java runtime environment is required to run this tutorial."
|
80 | 80 | ]
|
|
755 | 755 | "source": [
|
756 | 756 | "### The tfio training dataset for online learning\n",
|
757 | 757 | "\n",
|
758 |
| - "The `streaming.KafkaBatchIODataset` is similar to the `streaming.KafkaGroupIODataset` in it's API. Additionally, it is recommended to utilize the `stream_timeout` parameter to configure the duration for which the dataset will block for new messages before timing out. In the instance below, the dataset is configured with a `stream_timeout` of `30000` milliseconds. This implies that, after all the messages from the topic have been consumed, the dataset will wait for an additional 30 seconds before timing out and disconnecting from the kafka cluster. If new messages are streamed into the topic before timing out, the data consumption and model training resumes for those newly consumed data points. To block indefinitely, set it to `-1`." |
| 758 | + "The `streaming.KafkaBatchIODataset` is similar to the `streaming.KafkaGroupIODataset` in it's API. Additionally, it is recommended to utilize the `stream_timeout` parameter to configure the duration for which the dataset will block for new messages before timing out. In the instance below, the dataset is configured with a `stream_timeout` of `10000` milliseconds. This implies that, after all the messages from the topic have been consumed, the dataset will wait for an additional 10 seconds before timing out and disconnecting from the kafka cluster. If new messages are streamed into the topic before timing out, the data consumption and model training resumes for those newly consumed data points. To block indefinitely, set it to `-1`." |
759 | 759 | ]
|
760 | 760 | },
|
761 | 761 | {
|
|
770 | 770 | " topics=[\"susy-train\"],\n",
|
771 | 771 | " group_id=\"cgonline\",\n",
|
772 | 772 | " servers=\"127.0.0.1:9092\",\n",
|
773 |
| - " stream_timeout=30000, # in milliseconds, to block indefinitely, set it to -1.\n", |
| 773 | + " stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.\n", |
774 | 774 | " configuration=[\n",
|
775 | 775 | " \"session.timeout.ms=7000\",\n",
|
776 | 776 | " \"max.poll.interval.ms=8000\",\n",
|
|
779 | 779 | ")"
|
780 | 780 | ]
|
781 | 781 | },
|
782 |
| - { |
783 |
| - "cell_type": "markdown", |
784 |
| - "metadata": { |
785 |
| - "id": "sJronJPnZhyR" |
786 |
| - }, |
787 |
| - "source": [ |
788 |
| - "In addition to training the model on existing data, a background thread will be started, which will start streaming additional data into the `susy-train` topic after a sleep duration of 30 seconds. This demonstrates the functionality of resuming the training as soons as new data is fed into the topic without the need for building the dataset over and over again." |
789 |
| - ] |
790 |
| - }, |
791 |
| - { |
792 |
| - "cell_type": "code", |
793 |
| - "execution_count": null, |
794 |
| - "metadata": { |
795 |
| - "id": "iaBjhFkmZd1C" |
796 |
| - }, |
797 |
| - "outputs": [], |
798 |
| - "source": [ |
799 |
| - "def error_callback(exc):\n", |
800 |
| - " raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))\n", |
801 |
| - "\n", |
802 |
| - "def write_to_kafka_after_sleep(topic_name, items):\n", |
803 |
| - " time.sleep(30)\n", |
804 |
| - " print(\"#\"*100)\n", |
805 |
| - " print(\"Writing messages into topic: {0} after a nice sleep !\".format(topic_name))\n", |
806 |
| - " print(\"#\"*100)\n", |
807 |
| - " count=0\n", |
808 |
| - " producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])\n", |
809 |
| - " for message, key in items:\n", |
810 |
| - " producer.send(topic_name,\n", |
811 |
| - " key=key.encode('utf-8'),\n", |
812 |
| - " value=message.encode('utf-8')\n", |
813 |
| - " ).add_errback(error_callback)\n", |
814 |
| - " count+=1\n", |
815 |
| - " producer.flush()\n", |
816 |
| - " print(\"#\"*100)\n", |
817 |
| - " print(\"Wrote {0} messages into topic: {1}\".format(count, topic_name))\n", |
818 |
| - " print(\"#\"*100)\n", |
819 |
| - "\n", |
820 |
| - "def decode_kafka_online_item(raw_message, raw_key):\n", |
821 |
| - " message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])\n", |
822 |
| - " key = tf.strings.to_number(raw_key)\n", |
823 |
| - " return (message, key)\n" |
824 |
| - ] |
825 |
| - }, |
826 | 782 | {
|
827 | 783 | "cell_type": "markdown",
|
828 | 784 | "metadata": {
|
|
840 | 796 | },
|
841 | 797 | "outputs": [],
|
842 | 798 | "source": [
|
843 |
| - "thread = threading.Thread(target=write_to_kafka_after_sleep,\n", |
844 |
| - " args=(\"susy-train\", zip(x_train, y_train)))\n", |
845 |
| - "thread.daemon = True\n", |
846 |
| - "thread.start()\n", |
847 |
| - "\n", |
| 799 | + "def decode_kafka_online_item(raw_message, raw_key):\n", |
| 800 | + " message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])\n", |
| 801 | + " key = tf.strings.to_number(raw_key)\n", |
| 802 | + " return (message, key)\n", |
| 803 | + " \n", |
848 | 804 | "for mini_ds in online_train_ds:\n",
|
849 | 805 | " mini_ds = mini_ds.shuffle(buffer_size=32)\n",
|
850 | 806 | " mini_ds = mini_ds.map(decode_kafka_online_item)\n",
|
|
0 commit comments