Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions provider/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,7 @@ def __getOffsetList(self, messages):

return offsets

def __encodeMessageIfNeeded(self, value):
# let's make sure whatever data we're getting is utf-8 encoded
def __getUTF8Encoding(self, value):
try:
value.decode('utf-8')
except UnicodeDecodeError:
Expand All @@ -497,19 +496,24 @@ def __encodeMessageIfNeeded(self, value):
logging.warn('[{}] Value contains non-unicode bytes. Replacing invalid bytes.'.format(self.trigger))
value = unicode(value, errors='replace').encode('utf-8')
except AttributeError:
logging.warn('[{}] Cannot decode a NoneType message value'.format(self.trigger))
return value
logging.warn('[{}] Cannot decode a NoneType message value'.format(self.trigger))

return value


def __encodeMessageIfNeeded(self, value):
value = self.__getUTF8Encoding(value)

if self.encodeValueAsJSON:
try:
parsed = json.loads(value, parse_constant=self.__errorOnJSONConstant, parse_float=self.__parseFloat)
logging.debug('[{}] Successfully encoded a message as JSON.'.format(self.trigger))
return parsed
except ValueError as e:
# no big deal, just return the original value
# message is not a JSON object, return the message as a JSON value
logging.warn('[{}] I was asked to encode a message as JSON, but I failed with "{}".'.format(self.trigger, e))
value = "\"{}\"".format(value)
pass
return value
elif self.encodeValueAsBase64:
try:
parsed = value.encode("base64").strip()
Expand All @@ -532,6 +536,8 @@ def __encodeKeyIfNeeded(self, key):
logging.warn('[{}] Unable to encode a binary key.'.format(self.trigger))
pass

key = self.__getUTF8Encoding(key)

logging.debug('[{}] Returning un-encoded message'.format(self.trigger))
return key

Expand Down