diff --git a/client.go b/client.go index 0ad3f2eb..cd743a8b 100644 --- a/client.go +++ b/client.go @@ -19,7 +19,7 @@ import ( "log" "time" - elastigo "github.com/mattbaird/elastigo/lib" + elastigo "github.com/zaphod-concur/elastigo/lib" ) var ( diff --git a/lib/baserequest.go b/lib/baserequest.go index 27809ae2..6915654b 100644 --- a/lib/baserequest.go +++ b/lib/baserequest.go @@ -73,10 +73,14 @@ func (c *Conn) DoCommand(method string, url string, args map[string]interface{}, httpStatusCode, body, err = req.Do(&response) if err != nil { - return body, err + if httpStatusCode == -1 { + //connection error like *url.Error + return body, err + } + //error reading response body, or something else after we obtained an HTTP status code + return body, ESError{time.Now(), fmt.Sprintf("Error [%v] Status [%v]", err, httpStatusCode), httpStatusCode} } if httpStatusCode > 304 { - jsonErr := json.Unmarshal(body, &response) if jsonErr == nil { if res_err, ok := response["error"]; ok { @@ -84,7 +88,7 @@ func (c *Conn) DoCommand(method string, url string, args map[string]interface{}, return body, ESError{time.Now(), fmt.Sprintf("Error [%s] Status [%v]", res_err, status), httpStatusCode} } } - return body, jsonErr + return body, ESError{time.Now(), fmt.Sprintf("Error [%v] Status [%v]", jsonErr, httpStatusCode), httpStatusCode} } return body, nil } diff --git a/lib/corebulk.go b/lib/corebulk.go index 650e7331..ea76f53a 100644 --- a/lib/corebulk.go +++ b/lib/corebulk.go @@ -39,6 +39,43 @@ type ErrorBuffer struct { Buf *bytes.Buffer } +// An error implementation which contains the actual items in the bulk indexing response. +type BulkIndexingError struct { + Items []map[string]interface{} +} + +func (e BulkIndexingError) Error() string { + return fmt.Sprintf("Bulk Insertion Error. Failed item count [%d]", len(e.Items)) +} + +type DocVersion struct { + // The version to assign to the document + Version int64 + + // The Version Type to assign to the document + VersionType VersionType +} + +// An enum representing the various allowed version types. +// see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types +type VersionType string + +const ( + INTERNAL VersionType = "internal" + EXTERNAL VersionType = "external" + EXTERNAL_GT VersionType = "external_gt" + EXTERNAL_GTE VersionType = "external_gte" + FORCE VersionType = "force" +) + +// Creates a DocVersion struct with the given value and this version type. +func (t VersionType) V(v int64) *DocVersion { + return &DocVersion{ + Version: v, + VersionType: t, + } +} + // A bulk indexer creates goroutines, and channels for connecting and sending data // to elasticsearch in bulk, using buffers. type BulkIndexer struct { @@ -94,6 +131,9 @@ type BulkIndexer struct { mu sync.Mutex // Wait Group for the http sends sendWg *sync.WaitGroup + + // numPendingSends tracks queued buffers pending 'send' in sendBuf + numPendingSends int64 } func (b *BulkIndexer) NumErrors() uint64 { @@ -162,9 +202,15 @@ func (b *BulkIndexer) Stop() { } func (b *BulkIndexer) PendingDocuments() int { + b.mu.Lock() + defer b.mu.Unlock() return b.docCt } +func (b *BulkIndexer) PendingSends() int64 { + return atomic.LoadInt64(&b.numPendingSends) +} + // Flush all current documents to ElasticSearch func (b *BulkIndexer) Flush() { b.mu.Lock() @@ -184,7 +230,7 @@ func (b *BulkIndexer) startHttpSender() { b.sendWg.Add(1) go func() { for buf := range b.sendBuf { - // Copy for the potential re-send. + // Copy so we can put the buffer on the error channel, or potentially re-send it. bufCopy := bytes.NewBuffer(buf.Bytes()) err := b.Sender(buf) @@ -193,18 +239,18 @@ func (b *BulkIndexer) startHttpSender() { // 2. Retry then return error and let runner decide // 3. Retry, then log to disk? retry later? if err != nil { + buf = bufCopy if b.RetryForSeconds > 0 { + //copy again so we can keep the original buffer for the error channel. + bufCopy := bytes.NewBuffer(buf.Bytes()) time.Sleep(time.Second * time.Duration(b.RetryForSeconds)) err = b.Sender(bufCopy) - if err == nil { - // Successfully re-sent with no error - continue - } } - if b.ErrorChannel != nil { + if err != nil && b.ErrorChannel != nil { b.ErrorChannel <- &ErrorBuffer{err, buf} } } + atomic.AddInt64(&b.numPendingSends, -1) } b.sendWg.Done() }() @@ -260,6 +306,7 @@ func (b *BulkIndexer) startDocChannel() { func (b *BulkIndexer) send(buf *bytes.Buffer) { //b2 := *b.buf + atomic.AddInt64(&b.numPendingSends, 1) b.sendBuf <- buf b.buf = new(bytes.Buffer) // b.buf.Reset() @@ -277,9 +324,9 @@ func (b *BulkIndexer) shutdown() { // The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. // it operates by buffering requests, and ocassionally flushing to elasticsearch // http://www.elasticsearch.org/guide/reference/api/bulk.html -func (b *BulkIndexer) Index(index string, _type string, id, parent, ttl string, date *time.Time, data interface{}) error { +func (b *BulkIndexer) Index(index string, _type string, id, parent, ttl string, date *time.Time, version *DocVersion, data interface{}) error { //{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } } - by, err := WriteBulkBytes("index", index, _type, id, parent, ttl, date, data) + by, err := WriteBulkBytes("index", index, _type, id, parent, ttl, date, version, data) if err != nil { return err } @@ -287,9 +334,9 @@ func (b *BulkIndexer) Index(index string, _type string, id, parent, ttl string, return nil } -func (b *BulkIndexer) Update(index string, _type string, id, parent, ttl string, date *time.Time, data interface{}) error { +func (b *BulkIndexer) Update(index string, _type string, id, parent, ttl string, date *time.Time, version *DocVersion, data interface{}) error { //{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } } - by, err := WriteBulkBytes("update", index, _type, id, parent, ttl, date, data) + by, err := WriteBulkBytes("update", index, _type, id, parent, ttl, date, version, data) if err != nil { return err } @@ -297,8 +344,15 @@ func (b *BulkIndexer) Update(index string, _type string, id, parent, ttl string, return nil } -func (b *BulkIndexer) Delete(index, _type, id string) { - queryLine := fmt.Sprintf("{\"delete\":{\"_index\":%q,\"_type\":%q,\"_id\":%q}}\n", index, _type, id) +func (b *BulkIndexer) Delete(index, _type, id string, version *DocVersion) { + verStr := "" + if version != nil { + verStr = fmt.Sprintf(",\"_version\":%d", version.Version) + if version.VersionType != "" { + verStr = verStr + ",\"_version_type\":\"" + string(version.VersionType) + "\"" + } + } + queryLine := fmt.Sprintf("{\"delete\":{\"_index\":%q,\"_type\":%q,\"_id\":%q%s}}\n", index, _type, id, verStr) b.bulkChannel <- []byte(queryLine) return } @@ -307,10 +361,10 @@ func (b *BulkIndexer) UpdateWithWithScript(index string, _type string, id, paren var data map[string]interface{} = make(map[string]interface{}) data["script"] = script - return b.Update(index, _type, id, parent, ttl, date, data) + return b.Update(index, _type, id, parent, ttl, date, nil, data) } -func (b *BulkIndexer) UpdateWithPartialDoc(index string, _type string, id, parent, ttl string, date *time.Time, partialDoc interface{}, upsert bool) error { +func (b *BulkIndexer) UpdateWithPartialDoc(index string, _type string, id, parent, ttl string, date *time.Time, version *DocVersion, partialDoc interface{}, upsert bool) error { var data map[string]interface{} = make(map[string]interface{}) @@ -318,16 +372,16 @@ func (b *BulkIndexer) UpdateWithPartialDoc(index string, _type string, id, paren if upsert { data["doc_as_upsert"] = true } - return b.Update(index, _type, id, parent, ttl, date, data) + return b.Update(index, _type, id, parent, ttl, date, version, data) } // This does the actual send of a buffer, which has already been formatted // into bytes of ES formatted bulk data func (b *BulkIndexer) Send(buf *bytes.Buffer) error { type responseStruct struct { - Took int64 `json:"took"` - Errors bool `json:"errors"` - Items []map[string]interface{} `json:"items"` + Took int64 `json:"took"` + Errors bool `json:"errors"` + Items *json.RawMessage `json:"items"` } response := responseStruct{} @@ -341,17 +395,37 @@ func (b *BulkIndexer) Send(buf *bytes.Buffer) error { // check for response errors, bulk insert will give 200 OK but then include errors in response jsonErr := json.Unmarshal(body, &response) if jsonErr == nil { + //unmarshal the errors only if we need to if response.Errors { - atomic.AddUint64(&b.numErrors, uint64(len(response.Items))) - return fmt.Errorf("Bulk Insertion Error. Failed item count [%d]", len(response.Items)) + var items []map[string]interface{} + jsonErr = json.Unmarshal([]byte(*response.Items), &items) + if jsonErr != nil { + return jsonErr + } + + for _, item := range items { + for _, body := range item { + body, ok := body.(map[string]interface{}) + if !ok { + continue + } + if status, ok := body["status"]; ok && status.(float64) > 304 { + atomic.AddUint64(&b.numErrors, 1) + } + } + } + + return BulkIndexingError{items} } + } else { + return jsonErr } return nil } // Given a set of arguments for index, type, id, data create a set of bytes that is formatted for bulkd index // http://www.elasticsearch.org/guide/reference/api/bulk.html -func WriteBulkBytes(op string, index string, _type string, id, parent, ttl string, date *time.Time, data interface{}) ([]byte, error) { +func WriteBulkBytes(op string, index string, _type string, id, parent, ttl string, date *time.Time, version *DocVersion, data interface{}) ([]byte, error) { // only index and update are currently supported if op != "index" && op != "update" { return nil, errors.New(fmt.Sprintf("Operation '%s' is not yet supported", op)) @@ -376,7 +450,7 @@ func WriteBulkBytes(op string, index string, _type string, id, parent, ttl strin buf.WriteString(`"`) } - if op == "update" { + if op == "update" && version == nil { buf.WriteString(`,"_retry_on_conflict":3`) } @@ -390,6 +464,15 @@ func WriteBulkBytes(op string, index string, _type string, id, parent, ttl strin buf.WriteString(strconv.FormatInt(date.UnixNano()/1e6, 10)) buf.WriteString(`"`) } + if version != nil { + buf.WriteString(`,"_version":`) + buf.WriteString(strconv.FormatInt(version.Version, 10)) + if version.VersionType != "" { + buf.WriteString(`,"_version_type":"`) + buf.WriteString(string(version.VersionType)) + buf.WriteString(`"`) + } + } buf.WriteString(`}}`) buf.WriteRune('\n') diff --git a/lib/corebulk_test.go b/lib/corebulk_test.go index e352ae33..83bfbc91 100644 --- a/lib/corebulk_test.go +++ b/lib/corebulk_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "strings" + "github.com/araddon/gou" "github.com/bmizerany/assert" ) @@ -100,7 +102,7 @@ func TestBulkIndexerBasic(t *testing.T) { "date": "yesterday", } - err := indexer.Index(testIndex, "user", "1", "", "", &date, data) + err := indexer.Index(testIndex, "user", "1", "", "", &date, nil, data) waitFor(func() bool { return buffers.Length() > 0 }, 5) @@ -112,13 +114,14 @@ func TestBulkIndexerBasic(t *testing.T) { expectedBytes := 129 assert.T(t, totalBytesSent == expectedBytes, fmt.Sprintf("Should have sent %v bytes but was %v", expectedBytes, totalBytesSent)) - err = indexer.Index(testIndex, "user", "2", "", "", nil, data) + err = indexer.Index(testIndex, "user", "2", "", "", nil, nil, data) waitFor(func() bool { return buffers.Length() > 1 }, 5) // this will test to ensure that Flush actually catches a doc indexer.Flush() + <-time.After(time.Millisecond * 1) // we need to wait for the httpSender to read from the send buffer totalBytesSent = totalBytesSent - len(*eshost) assert.T(t, err == nil, fmt.Sprintf("Should have nil error =%v", err)) assert.T(t, buffers.Length() == 2, fmt.Sprintf("Should have another buffer ct=%d", buffers.Length())) @@ -130,6 +133,79 @@ func TestBulkIndexerBasic(t *testing.T) { indexer.Stop() } +func TestBulkIndexerErrors(t *testing.T) { + testIndex := "users" + InitTests(true) + c := NewTestConn() + + c.DeleteIndex(testIndex) + + sent := make(chan struct{}, 1) + + indexer := c.NewBulkIndexer(3) + indexer.Sender = func(buf *bytes.Buffer) error { + // log.Printf("buffer:%s", string(buf.Bytes())) + ret := indexer.Send(buf) + sent <- struct{}{} + return ret + } + indexer.Start() + errch := make(chan *ErrorBuffer, 1) + indexer.ErrorChannel = errch + + date := time.Unix(1257894000, 0) + + data := map[string]interface{}{ + "name": "smurfettes", + "age": 21, + "date": "today", + } + data2 := map[string]interface{}{ + "name": "smurfs", + "age": "this is not an int", + "date": "yesterday", + } + + _, err := c.DoCommand("PUT", fmt.Sprintf("/%s", testIndex), nil, + `{ + "mappings": { + "user": { + "properties": { + "age": { "type": "integer" } + } + } + } + }`) + assert.T(t, err == nil, fmt.Sprintf("Should not return an error: %v", err)) + + //act + err = indexer.Index(testIndex, "user", "1", "", "", &date, nil, data) + err2 := indexer.Index(testIndex, "user", "2", "", "", &date, nil, data2) + + <-sent + //assert + assert.T(t, err == nil, fmt.Sprintf("Should not return an error: %v", err)) + assert.T(t, err2 == nil, fmt.Sprintf("Should not return an error: %v", err2)) + time.Sleep(1 * time.Microsecond) + assert.T(t, indexer.NumErrors() == 1, fmt.Sprintf("Should have recorded 1 error but saw: %d", indexer.NumErrors())) + + errBuf := <-errch + bulkErr, ok := errBuf.Err.(BulkIndexingError) + assert.T(t, ok, fmt.Sprintf("Error should have been a BulkIndexingError but was %T", errBuf.Err)) + + assert.T(t, len(bulkErr.Items) == 2, fmt.Sprintf("Expected 2 items in response but got: %d", len(bulkErr.Items))) + status1 := int(bulkErr.Items[1]["index"].(map[string]interface{})["status"].(float64)) + assert.T(t, status1 == 400, fmt.Sprintf("Expected second item to have status 400 but was %d", status1)) + status0 := int(bulkErr.Items[0]["index"].(map[string]interface{})["status"].(float64)) + assert.T(t, status0 == 201, fmt.Sprintf("Expected first item to have status 201 but was %d", status0)) + + lines := strings.Split(errBuf.Buf.String(), "\n") + assert.T(t, lines[0] == `{"index":{"_index":"users","_type":"user","_id":"1","_timestamp":"1257894000000"}}`, fmt.Sprintf("Expected index header but got: %s", lines[0])) + assert.T(t, lines[1] == `{"age":21,"date":"today","name":"smurfettes"}`, fmt.Sprintf("Expected document but got: %s", lines[1])) + assert.T(t, lines[2] == `{"index":{"_index":"users","_type":"user","_id":"2","_timestamp":"1257894000000"}}`, fmt.Sprintf("Expected index header but got: %s", lines[2])) + assert.T(t, lines[3] == `{"age":"this is not an int","date":"yesterday","name":"smurfs"}`, fmt.Sprintf("Expected document but got: %s", lines[3])) +} + func TestRefreshParam(t *testing.T) { requrlChan := make(chan *url.URL, 1) InitTests(true) @@ -148,7 +224,7 @@ func TestRefreshParam(t *testing.T) { indexer.Start() <-time.After(time.Millisecond * 20) - indexer.Index("users", "user", "2", "", "", &date, data) + indexer.Index("users", "user", "2", "", "", &date, nil, data) <-time.After(time.Millisecond * 200) // indexer.Flush() @@ -174,7 +250,7 @@ func TestWithoutRefreshParam(t *testing.T) { indexer.Start() <-time.After(time.Millisecond * 20) - indexer.Index("users", "user", "2", "", "", &date, data) + indexer.Index("users", "user", "2", "", "", &date, nil, data) <-time.After(time.Millisecond * 200) // indexer.Flush() @@ -215,7 +291,7 @@ func XXXTestBulkUpdate(t *testing.T) { data := map[string]interface{}{ "script": "ctx._source.count += 2", } - err = indexer.Update("users", "user", "5", "", "", &date, data) + err = indexer.Update("users", "user", "5", "", "", &date, nil, data) // So here's the deal. Flushing does seem to work, you just have to give the // channel a moment to recieve the message ... // <- time.After(time.Millisecond * 20) @@ -261,9 +337,9 @@ func TestBulkSmallBatch(t *testing.T) { indexer.Start() <-time.After(time.Millisecond * 20) - indexer.Index("users", "user", "2", "", "", &date, data) - indexer.Index("users", "user", "3", "", "", &date, data) - indexer.Index("users", "user", "4", "", "", &date, data) + indexer.Index("users", "user", "2", "", "", &date, nil, data) + indexer.Index("users", "user", "3", "", "", &date, nil, data) + indexer.Index("users", "user", "4", "", "", &date, nil, data) <-time.After(time.Millisecond * 200) // indexer.Flush() indexer.Stop() @@ -287,7 +363,7 @@ func TestBulkDelete(t *testing.T) { indexer.Start() - indexer.Delete("fake", "fake_type", "1") + indexer.Delete("fake", "fake_type", "1", nil) indexer.Flush() indexer.Stop() @@ -316,7 +392,7 @@ func XXXTestBulkErrors(t *testing.T) { for i := 0; i < 20; i++ { date := time.Unix(1257894000, 0) data := map[string]interface{}{"name": "smurfs", "age": 22, "date": date} - indexer.Index("users", "user", strconv.Itoa(i), "", "", &date, data) + indexer.Index("users", "user", strconv.Itoa(i), "", "", &date, nil, data) } }() var errBuf *ErrorBuffer @@ -331,6 +407,166 @@ func XXXTestBulkErrors(t *testing.T) { indexer.Stop() } +func TestBulkVersioning_Internal(t *testing.T) { + testIndex := "users" + var ( + buffers = NewSharedBuffer() + totalBytesSent int + messageSets int + ) + + InitTests(true) + c := NewTestConn() + c.RequestTracer = func(method, url, body string) { + t.Logf("%s %s HTTP/1.1\n%s", method, url, body) + } + + c.DeleteIndex(testIndex) + + indexer := c.NewBulkIndexer(3) + indexer.Sender = func(buf *bytes.Buffer) error { + messageSets += 1 + totalBytesSent += buf.Len() + buffers.Append(buf) + // log.Printf("buffer:%s", string(buf.Bytes())) + return indexer.Send(buf) + } + errCh := make(chan *ErrorBuffer) + indexer.ErrorChannel = errCh + indexer.Start() + + date := time.Unix(1257894000, 0) + data := map[string]interface{}{ + "name": "smurfs", + "age": 22, + "date": "yesterday", + } + + indexer.Index(testIndex, "user", "1", "", "", &date, nil, data) + + //act + data["extra"] = "1" + indexer.Index(testIndex, "user", "1", "", "", &date, &DocVersion{Version: 1}, data) + + indexer.Update(testIndex, "user", "1", "", "", &date, &DocVersion{Version: 2, VersionType: "internal"}, map[string]interface{}{ + "doc": map[string]interface{}{ + "updated": true, + }, + }) + + data["extra"] = "3" + indexer.Delete(testIndex, "user", "1", &DocVersion{Version: 7}) + + //assert + errBuf := <-errCh + + bulkErr, ok := errBuf.Err.(BulkIndexingError) + assert.T(t, ok, fmt.Sprintf("Expected bulk indexing error but was: %T\n\t%v", errBuf.Err, errBuf.Err)) + + js, _ := json.MarshalIndent(bulkErr.Items, "", " ") + t.Logf("Items:%s", string(js)) + + assert.T(t, getStatus(0, bulkErr.Items) == 201, "First should be created") + assert.T(t, getVersion(0, bulkErr.Items) == int64(1), "First should have version 1") + assert.T(t, getStatus(1, bulkErr.Items) == 200, "Should be reindexed with version 2") + assert.T(t, getVersion(1, bulkErr.Items) == int64(2), "Should be reindexed with version 2") + assert.T(t, getStatus(2, bulkErr.Items) == 200, "Should be updated with version 3") + assert.T(t, getVersion(2, bulkErr.Items) == int64(3), "Should be updated with version 3") + assert.T(t, getStatus(3, bulkErr.Items) == 409, "Should fail to delete due to version conflict") + assert.T(t, getError(3, bulkErr.Items) == "VersionConflictEngineException[[users][2] [user][1]: version conflict, current [3], provided [7]]") + +} + +func TestBulkVersioning_External(t *testing.T) { + testIndex := "users" + var ( + buffers = NewSharedBuffer() + totalBytesSent int + messageSets int + ) + + InitTests(true) + c := NewTestConn() + c.RequestTracer = func(method, url, body string) { + t.Logf("%s %s HTTP/1.1\n%s", method, url, body) + } + + c.DeleteIndex(testIndex) + + indexer := c.NewBulkIndexer(3) + indexer.Sender = func(buf *bytes.Buffer) error { + messageSets += 1 + totalBytesSent += buf.Len() + buffers.Append(buf) + // log.Printf("buffer:%s", string(buf.Bytes())) + return indexer.Send(buf) + } + errCh := make(chan *ErrorBuffer) + indexer.ErrorChannel = errCh + indexer.Start() + + date := time.Unix(1257894000, 0) + data := map[string]interface{}{ + "name": "smurfs", + "age": 22, + "date": "yesterday", + } + + now := time.Now().Unix() + indexer.Index(testIndex, "user", "1", "", "", &date, EXTERNAL.V(now), data) + + //act + data["extra"] = "1" + indexer.Index(testIndex, "user", "1", "", "", &date, EXTERNAL_GT.V(now), data) + + data["extra"] = "2" + indexer.Index(testIndex, "user", "1", "", "", &date, EXTERNAL_GT.V(now+2), data) + + data["extra"] = "3" + indexer.Delete(testIndex, "user", "1", EXTERNAL_GT.V(now-1)) + + //assert + errBuf := <-errCh + + bulkErr, ok := errBuf.Err.(BulkIndexingError) + assert.T(t, ok, fmt.Sprintf("Expected bulk indexing error but was: %T\n\t%v", errBuf.Err, errBuf.Err)) + + js, _ := json.MarshalIndent(bulkErr.Items, "", " ") + t.Logf("Items:%s", string(js)) + + assert.T(t, getStatus(0, bulkErr.Items) == 201, "First should be created") + assert.T(t, getVersion(0, bulkErr.Items) == now, "First should have version now") + assert.T(t, getStatus(1, bulkErr.Items) == 409, "Should fail to reindex with same version") + assert.T(t, getStatus(2, bulkErr.Items) == 200, "Should be updated with version now+2") + assert.T(t, getVersion(2, bulkErr.Items) == now+2, "Should be updated with version now+2") + assert.T(t, getStatus(3, bulkErr.Items) == 409, "Should fail to delete due to version conflict") + +} + +func getStatus(index int, items []map[string]interface{}) int { + for _, doc := range items[index] { + doc := doc.(map[string]interface{}) + return int(doc["status"].(float64)) + } + panic(fmt.Sprintf("no properties in %v", items[index])) +} + +func getVersion(index int, items []map[string]interface{}) int64 { + for _, doc := range items[index] { + doc := doc.(map[string]interface{}) + return int64(doc["_version"].(float64)) + } + panic(fmt.Sprintf("no properties in %v", items[index])) +} + +func getError(index int, items []map[string]interface{}) string { + for _, doc := range items[index] { + doc := doc.(map[string]interface{}) + return doc["error"].(string) + } + panic(fmt.Sprintf("no properties in %v", items[index])) +} + /* BenchmarkSend 18:33:00 bulk_test.go:131: Sent 1 messages in 0 sets totaling 0 bytes 18:33:00 bulk_test.go:131: Sent 100 messages in 1 sets totaling 145889 bytes @@ -356,7 +592,7 @@ func BenchmarkSend(b *testing.B) { about := make([]byte, 1000) rand.Read(about) data := map[string]interface{}{"name": "smurfs", "age": 22, "date": time.Unix(1257894000, 0), "about": about} - indexer.Index("users", "user", strconv.Itoa(i), "", "", nil, data) + indexer.Index("users", "user", strconv.Itoa(i), "", "", nil, nil, data) } log.Printf("Sent %d messages in %d sets totaling %d bytes \n", b.N, sets, totalBytes) if indexer.NumErrors() != 0 { @@ -390,7 +626,7 @@ func BenchmarkSendBytes(b *testing.B) { return indexer.Send(buf) } for i := 0; i < b.N; i++ { - indexer.Index("users", "user", strconv.Itoa(i), "", "", nil, body) + indexer.Index("users", "user", strconv.Itoa(i), "", "", nil, nil, body) } log.Printf("Sent %d messages in %d sets totaling %d bytes \n", b.N, sets, totalBytes) if indexer.NumErrors() != 0 { diff --git a/lib/coreexample_test.go b/lib/coreexample_test.go index fed31dfe..943540c8 100644 --- a/lib/coreexample_test.go +++ b/lib/coreexample_test.go @@ -16,7 +16,7 @@ import ( "fmt" "strconv" - elastigo "github.com/mattbaird/elastigo/lib" + elastigo "github.com/zaphod-concur/elastigo/lib" ) // The simplest usage of background bulk indexing @@ -25,7 +25,7 @@ func ExampleBulkIndexer_simple() { indexer := c.NewBulkIndexerErrors(10, 60) indexer.Start() - indexer.Index("twitter", "user", "1", "", "", nil, `{"name":"bob"}`) + indexer.Index("twitter", "user", "1", "", "", nil, nil, `{"name":"bob"}`) indexer.Stop() } @@ -46,7 +46,7 @@ func ExampleBulkIndexer_responses() { } indexer.Start() for i := 0; i < 20; i++ { - indexer.Index("twitter", "user", strconv.Itoa(i), "", "", nil, `{"name":"bob"}`) + indexer.Index("twitter", "user", strconv.Itoa(i), "", "", nil, nil, `{"name":"bob"}`) } indexer.Stop() } diff --git a/lib/coretest_test.go b/lib/coretest_test.go index 37aa3fc9..04e67949 100644 --- a/lib/coretest_test.go +++ b/lib/coretest_test.go @@ -179,7 +179,7 @@ func LoadTestData() { log.Println("HM, already exists? ", ge.Url) } docsm[id] = true - indexer.Index(testIndex, ge.Type, id, "", "", &ge.Created, line) + indexer.Index(testIndex, ge.Type, id, "", "", &ge.Created, nil, line) docCt++ } else { log.Println("ERROR? ", string(line)) diff --git a/lib/request.go b/lib/request.go index 4f72955e..04060d55 100644 --- a/lib/request.go +++ b/lib/request.go @@ -61,8 +61,8 @@ func (r *Request) SetBody(body io.Reader) { func (r *Request) Do(v interface{}) (int, []byte, error) { response, bodyBytes, err := r.DoResponse(v) - if err != nil { - return -1, nil, err + if response == nil { + return -1, bodyBytes, err } return response.StatusCode, bodyBytes, err } @@ -84,18 +84,15 @@ func (r *Request) DoResponse(v interface{}) (*http.Response, []byte, error) { bodyBytes, err := ioutil.ReadAll(res.Body) if err != nil { - return nil, nil, err + return res, nil, err } if res.StatusCode == 404 { - return nil, bodyBytes, RecordNotFound + return res, bodyBytes, RecordNotFound } if res.StatusCode > 304 && v != nil { - jsonErr := json.Unmarshal(bodyBytes, v) - if jsonErr != nil { - return nil, nil, fmt.Errorf("Json response unmarshal error: [%s], response content: [%s]", jsonErr.Error(), string(bodyBytes)) - } + err = json.Unmarshal(bodyBytes, v) } return res, bodyBytes, err } diff --git a/lib/setup_test.go b/lib/setup_test.go index 026f2dc5..a638db8f 100644 --- a/lib/setup_test.go +++ b/lib/setup_test.go @@ -76,7 +76,7 @@ func PopulateTestDB(t *testing.T, c *Conn) { "dob": "19600218", "teams": ["EDM", "BOS", "DAL", "MTL"]}`) // HACK to let the ES magic happen - time.Sleep(time.Second) + time.Sleep(time.Second * 3) } func TearDownTestDB(c *Conn) { diff --git a/tutorial/start_1.go b/tutorial/start_1.go index db807f66..232219fa 100644 --- a/tutorial/start_1.go +++ b/tutorial/start_1.go @@ -14,9 +14,10 @@ package main import ( "flag" "fmt" - elastigo "github.com/mattbaird/elastigo/lib" "log" "os" + + elastigo "github.com/zaphod-concur/elastigo/lib" ) var (