Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak"
_ "github.com/influxdata/telegraf/plugins/inputs/salesforce"
_ "github.com/influxdata/telegraf/plugins/inputs/sensors"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy"
Expand Down
48 changes: 48 additions & 0 deletions plugins/inputs/salesforce/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Salesforce Input Plugin

The Salesforce plugin gathers metrics about the limits in your Salesforce organization and the remaining usage.
It fetches its data from the [limits endpoint](https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_limits.htm) of Salesforce's REST API.

### Configuration:

```toml
# Gather Metrics about Salesforce limits and remaining usage
[[inputs.salesforce]]
username = "your_username"
password = "your_password"
## (Optional) security tokjen
security_token = "your_security_token"
## (Optional) environment type (sandbox or production)
## default is: production
# environment = "production"
## (Optional) API version (default: "39.0")
# version = "39.0"
```

### Measurements & Fields:

Salesforce provide one measurment named "salesforce".
Each entry is converted to snake\_case and 2 fields are created.

- \<key\>_max represents the limit threshold
- \<key\>_remaining represents the usage remaining before hitting the limit threshold

- salesforce
- \<key\>_max (int)
- \<key\>_remaining (int)
- (...)

### Tags:

- All measurements have the following tags:
- host
- organization_id (t18 char organisation ID)


### Example Output:

```
$./telegraf --config telegraf.conf --input-filter salesforce --test

salesforce,organization_id=XXXXXXXXXXXXXXXXXX,host=xxxxx.salesforce.com daily_workflow_emails_max=546000i,hourly_time_based_workflow_max=50i,daily_async_apex_executions_remaining=250000i,daily_durable_streaming_api_events_remaining=1000000i,streaming_api_concurrent_clients_remaining=2000i,daily_bulk_api_requests_remaining=10000i,hourly_sync_report_runs_remaining=500i,daily_api_requests_max=5000000i,data_storage_mb_remaining=1073i,file_storage_mb_remaining=1069i,daily_generic_streaming_api_events_remaining=10000i,hourly_async_report_runs_remaining=1200i,hourly_time_based_workflow_remaining=50i,daily_streaming_api_events_remaining=1000000i,single_email_max=5000i,hourly_dashboard_refreshes_remaining=200i,streaming_api_concurrent_clients_max=2000i,daily_durable_generic_streaming_api_events_remaining=1000000i,daily_api_requests_remaining=4999998i,hourly_dashboard_results_max=5000i,hourly_async_report_runs_max=1200i,daily_durable_generic_streaming_api_events_max=1000000i,hourly_dashboard_results_remaining=5000i,concurrent_sync_report_runs_max=20i,durable_streaming_api_concurrent_clients_remaining=2000i,daily_workflow_emails_remaining=546000i,hourly_dashboard_refreshes_max=200i,daily_streaming_api_events_max=1000000i,hourly_sync_report_runs_max=500i,hourly_o_data_callout_max=10000i,mass_email_max=5000i,mass_email_remaining=5000i,single_email_remaining=5000i,hourly_dashboard_statuses_max=999999999i,concurrent_async_get_report_instances_max=200i,daily_durable_streaming_api_events_max=1000000i,daily_generic_streaming_api_events_max=10000i,hourly_o_data_callout_remaining=10000i,concurrent_sync_report_runs_remaining=20i,daily_bulk_api_requests_max=10000i,data_storage_mb_max=1073i,hourly_dashboard_statuses_remaining=999999999i,concurrent_async_get_report_instances_remaining=200i,daily_async_apex_executions_max=250000i,durable_streaming_api_concurrent_clients_max=2000i,file_storage_mb_max=1073i 1501565661000000000
```
245 changes: 245 additions & 0 deletions plugins/inputs/salesforce/salesforce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package salesforce

import (
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)

var sampleConfig = `
## specify your credentials
##
username = "your_username"
password = "your_password"
##
## (optional) security token
# security_token = "your_security_token"
##
## (optional) environment type (sandbox or production)
## default is: production
##
# environment = "production"
##
## (optional) API version (default: "39.0")
##
# version = "39.0"
`

type limit struct {
Max int
Remaining int
}

type limits map[string]limit

type Salesforce struct {
Username string
Password string
SecurityToken string
Environment string
SessionID string
ServerURL *url.URL
OrganizationID string
Version string

client *http.Client
}

const defaultVersion = "39.0"
const defaultEnvironment = "production"

// returns a new Salesforce plugin instance
func NewSalesforce() *Salesforce {
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(5 * time.Second),
}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(10 * time.Second),
}
return &Salesforce{
client: client,
Version: defaultVersion,
Environment: defaultEnvironment}
}

func (s *Salesforce) SampleConfig() string {
return sampleConfig
}

func (s *Salesforce) Description() string {
return "Read API usage and limits for a Salesforce organisation"
}

// Reads limits values from Salesforce API
func (s *Salesforce) Gather(acc telegraf.Accumulator) error {
limits, err := s.fetchLimits()
if err != nil {
return err
}

tags := map[string]string{
"organization_id": s.OrganizationID,
"host": s.ServerURL.Host,
}

fields := make(map[string]interface{})
for k, v := range limits {
key := internal.SnakeCase(k)
fields[key+"_max"] = v.Max
fields[key+"_remaining"] = v.Remaining
}

acc.AddFields("salesforce", fields, tags)
return nil
}

// query the limits endpoint
func (s *Salesforce) queryLimits() (*http.Response, error) {
endpoint := fmt.Sprintf("%s://%s/services/data/v%s/limits", s.ServerURL.Scheme, s.ServerURL.Host, s.Version)
req, err := http.NewRequest(http.MethodGet, endpoint, nil)
if err != nil {
return nil, err
}
req.Header.Add("Accept", "encoding/json")
req.Header.Add("Authorization", "Bearer "+s.SessionID)
return s.client.Do(req)
}

func (s *Salesforce) isAuthenticated() bool {
return s.SessionID != ""
}

func (s *Salesforce) fetchLimits() (limits, error) {
var l limits
if !s.isAuthenticated() {
if err := s.login(); err != nil {
return l, err
}
}

resp, err := s.queryLimits()
if err != nil {
return l, err
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusUnauthorized {
if err = s.login(); err != nil {
return l, err
}
resp, err = s.queryLimits()
if err != nil {
return l, err
}
defer resp.Body.Close()
}

if resp.StatusCode != http.StatusOK {
return l, fmt.Errorf("Salesforce responded with unexpected status code %d", resp.StatusCode)
}

l = limits{}
err = json.NewDecoder(resp.Body).Decode(&l)
return l, err
}

func (s *Salesforce) getLoginEndpoint() (string, error) {
switch s.Environment {
case "sandbox":
return fmt.Sprintf("https://test.salesforce.com/services/Soap/c/%s/", s.Version), nil
case "production":
return fmt.Sprintf("https://login.salesforce.com/services/Soap/c/%s/", s.Version), nil
default:
return "", fmt.Errorf("unknown environment type: %s", s.Environment)
}
}

// Authenticate with Salesfroce
func (s *Salesforce) login() error {
if s.Username == "" || s.Password == "" {
return errors.New("missing username or password")
}

body := fmt.Sprintf(`<?xml version="1.0" encoding="utf-8"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:urn="urn:enterprise.soap.sforce.com">
<soapenv:Body>
<urn:login>
<urn:username>%s</urn:username>
<urn:password>%s%s</urn:password>
</urn:login>
</soapenv:Body>
</soapenv:Envelope>`,
s.Username, s.Password, s.SecurityToken)

loginEndpoint, err := s.getLoginEndpoint()
if err != nil {
return err
}

req, err := http.NewRequest(http.MethodPost, loginEndpoint, strings.NewReader(body))
if err != nil {
return err
}
req.Header.Add("Content-Type", "text/xml")
req.Header.Add("SOAPAction", "login")

resp, err := s.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

soapFault := struct {
Code string `xml:"Body>Fault>faultcode"`
Message string `xml:"Body>Fault>faultstring"`
}{}

err = xml.Unmarshal(respBody, &soapFault)
if err != nil {
return err
}

if soapFault.Code != "" {
return fmt.Errorf("login failed: %s", soapFault.Message)
}

loginResult := struct {
ServerURL string `xml:"Body>loginResponse>result>serverUrl"`
SessionID string `xml:"Body>loginResponse>result>sessionId"`
OrganizationID string `xml:"Body>loginResponse>result>userInfo>organizationId"`
}{}

err = xml.Unmarshal(respBody, &loginResult)
if err != nil {
return err
}

s.SessionID = loginResult.SessionID
s.OrganizationID = loginResult.OrganizationID
s.ServerURL, err = url.Parse(loginResult.ServerURL)

return err
}

func init() {
inputs.Add("salesforce", func() telegraf.Input {
return NewSalesforce()
})
}
Loading