diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index bfdf64187..93cf8b5da 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -143,15 +143,17 @@ def _process_bulk_chunk( bulk_data, map(methodcaller("popitem"), resp["items"]) ): ok = 200 <= item.get("status", 500) < 300 - if not ok and raise_on_error: + if not ok: # include original document source - if len(data) > 1: + if op_type != "delete": item["data"] = data[1] - errors.append({op_type: item}) - - if ok or not errors: - # if we are not just recording all errors to be able to raise - # them all at once, yield items individually + if raise_on_error: + errors.append({op_type: item}) + else: + # if we are not just recording all errors to be able to raise + # them all at once, yield items individually + yield ok, {op_type: item} + else: yield ok, {op_type: item} if errors: diff --git a/test_elasticsearch/test_server/test_helpers.py b/test_elasticsearch/test_server/test_helpers.py index 81fc4f220..88f938e2e 100644 --- a/test_elasticsearch/test_server/test_helpers.py +++ b/test_elasticsearch/test_server/test_helpers.py @@ -306,6 +306,47 @@ def test_errors_are_collected_properly(self): self.assertEquals(1, success) self.assertEquals(1, failed) + def test_return_docs_when_not_raising_errors(self): + bulk_load_index_name = "test_bulk_load" + mappings = { + "properties": { + "test_field": {"type": "integer"} + } + } + self.client.indices.create(bulk_load_index_name, body={"mappings": mappings}) + docs = [{"test_field": 1}, + {"test_field": "string_error"}, + {"test_field": "second_string_error"}, + {"test_field": 5}, + {"test_field": 6} + ] + success, errors = helpers.bulk(self.client, docs, raise_on_error=False, + index=bulk_load_index_name) + self.assertEqual(success, 3) + self.assertEqual(len(errors), 2) + self.assertEqual(errors[0]["index"]["data"], {"test_field": "string_error"}) + self.assertEqual(errors[1]["index"]["data"], {"test_field": "second_string_error"}) + + def test_return_docs_when_not_raising_errors_passing_docs_as_strings(self): + bulk_load_index_name = "test_bulk_load_string_docs" + mappings = { + "properties": { + "test_field": {"type": "integer"} + } + } + self.client.indices.create(bulk_load_index_name, body={"mappings": mappings}) + docs = ['{"test_field": 1}', + '{"test_field": "string_error"}', + '{"test_field": "second_string_error"}', + '{"test_field": 5}', + '{"test_field": 6}' + ] + success, errors = helpers.bulk(self.client, docs, raise_on_error=False, + index=bulk_load_index_name) + self.assertEqual(success, 3) + self.assertEqual(len(errors), 2) + self.assertEqual(errors[0]["index"]["data"], '{"test_field": "string_error"}') + self.assertEqual(errors[1]["index"]["data"], '{"test_field": "second_string_error"}') class TestScan(ElasticsearchTestCase): mock_scroll_responses = [