Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,14 @@ kafka = KafkaClient("localhost:9092")

# To send messages synchronously
producer = SimpleProducer(kafka)

# Note that the application is responsible for encoding messages to type str
producer.send_messages("my-topic", "some message")
producer.send_messages("my-topic", "this method", "is variadic")

# Send unicode message
producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))

# To send messages asynchronously
producer = SimpleProducer(kafka, async=True)
producer.send_messages("my-topic", "async message")
Expand Down Expand Up @@ -78,6 +83,8 @@ producer = SimpleProducer(kafka, batch_send=True,
# To consume messages
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
for message in consumer:
# message is raw byte string -- decode if necessary!
# e.g., for unicode: `message.decode('utf-8')`
print(message)

kafka.close()
Expand Down
21 changes: 21 additions & 0 deletions kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,28 @@ def __init__(self, client, async=False,
def send_messages(self, topic, partition, *msg):
"""
Helper method to send produce requests
@param: topic, name of topic for produce request -- type str
@param: partition, partition number for produce request -- type int
@param: *msg, one or more message payloads -- type str
@returns: ResponseRequest returned by server
raises on error

Note that msg type *must* be encoded to str by user.
Passing unicode message will not work, for example
you should encode before calling send_messages via
something like `unicode_message.encode('utf-8')`

All messages produced via this method will set the message 'key' to Null
"""

# Guarantee that msg is actually a list or tuple (should always be true)
if not isinstance(msg, (list, tuple)):
raise TypeError("msg is not a list or tuple!")

# Raise TypeError if any message is not encoded as a str
if any(not isinstance(m, str) for m in msg):
raise TypeError("all produce message payloads must be type str")

if self.async:
for m in msg:
self.queue.put((TopicAndPartition(topic, partition), m))
Expand Down
31 changes: 31 additions & 0 deletions test/test_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-

import logging
import os
import random
import struct
import unittest2

from mock import MagicMock, patch

from kafka import KafkaClient
from kafka.producer import Producer

class TestKafkaProducer(unittest2.TestCase):
def test_producer_message_types(self):

producer = Producer(MagicMock())
topic = "test-topic"
partition = 0

bad_data_types = (u'你怎么样?', 12, ['a','list'], ('a','tuple'), {'a': 'dict'})
for m in bad_data_types:
with self.assertRaises(TypeError):
logging.debug("attempting to send message of type %s", type(m))
producer.send_messages(topic, partition, m)

good_data_types = ('a string!',)
for m in good_data_types:
# This should not raise an exception
producer.send_messages(topic, partition, m)