Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 103 additions & 51 deletions cli/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ import (
"github.com/aws/aws-sdk-go/service/elbv2"
"github.com/cortexlabs/cortex/cli/cluster"
"github.com/cortexlabs/cortex/cli/types/cliconfig"
"github.com/cortexlabs/cortex/cli/types/flags"
"github.com/cortexlabs/cortex/pkg/lib/archive"
"github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/console"
"github.com/cortexlabs/cortex/pkg/lib/docker"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/exit"
"github.com/cortexlabs/cortex/pkg/lib/files"
libjson "github.com/cortexlabs/cortex/pkg/lib/json"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
"github.com/cortexlabs/cortex/pkg/lib/prompt"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
Expand Down Expand Up @@ -76,6 +78,7 @@ func clusterInit() {
addClusterConfigFlag(_clusterInfoCmd)
addClusterNameFlag(_clusterInfoCmd)
addClusterRegionFlag(_clusterInfoCmd)
_clusterInfoCmd.Flags().VarP(&_flagOutput, "output", "o", fmt.Sprintf("output format: one of %s", strings.Join(flags.UserOutputTypeStrings(), "|")))
_clusterInfoCmd.Flags().StringVarP(&_flagClusterInfoEnv, "configure-env", "e", "", "name of environment to configure")
_clusterInfoCmd.Flags().BoolVarP(&_flagClusterInfoDebug, "debug", "d", false, "save the current cluster state to a file")
_clusterInfoCmd.Flags().BoolVarP(&_flagClusterDisallowPrompt, "yes", "y", false, "skip prompts")
Expand Down Expand Up @@ -158,7 +161,7 @@ var _clusterUpCmd = &cobra.Command{
exit.Error(err)
}

awsClient, err := newAWSClient(accessConfig.Region)
awsClient, err := newAWSClient(accessConfig.Region, true)
if err != nil {
exit.Error(err)
}
Expand Down Expand Up @@ -276,7 +279,7 @@ var _clusterUpCmd = &cobra.Command{
exit.Error(ErrorClusterUp(out + helpStr))
}

loadBalancer, err := getAWSOperatorLoadBalancer(clusterConfig.ClusterName, awsClient)
loadBalancer, err := getLoadBalancer(clusterConfig.ClusterName, OperatorLoadBalancer, awsClient)
if err != nil {
exit.Error(errors.Append(err, fmt.Sprintf("\n\nyou can attempt to resolve this issue and configure your cli environment by running `cortex cluster info --configure-env %s`", _flagClusterUpEnv)))
}
Expand Down Expand Up @@ -326,7 +329,7 @@ var _clusterScaleCmd = &cobra.Command{
exit.Error(err)
}

awsClient, err := newAWSClient(accessConfig.Region)
awsClient, err := newAWSClient(accessConfig.Region, true)
if err != nil {
exit.Error(err)
}
Expand All @@ -341,7 +344,7 @@ var _clusterScaleCmd = &cobra.Command{
exit.Error(err)
}

clusterConfig := refreshCachedClusterConfig(*awsClient, accessConfig)
clusterConfig := refreshCachedClusterConfig(*awsClient, accessConfig, true)
clusterConfig, err = updateNodeGroupScale(clusterConfig, _flagClusterScaleNodeGroup, scaleMinIntances, scaleMaxInstances, _flagClusterDisallowPrompt)
if err != nil {
exit.Error(err)
Expand Down Expand Up @@ -380,15 +383,18 @@ var _clusterInfoCmd = &cobra.Command{
exit.Error(err)
}

awsClient, err := newAWSClient(accessConfig.Region)
awsClient, err := newAWSClient(accessConfig.Region, _flagOutput == flags.PrettyOutputType)
if err != nil {
exit.Error(err)
}

if _flagClusterInfoDebug {
if _flagOutput != flags.PrettyOutputType {
exit.Error(ErrorJSONOutputNotSupportedWithFlag("--debug"))
}
cmdDebug(awsClient, accessConfig)
} else {
cmdInfo(awsClient, accessConfig, _flagClusterDisallowPrompt)
cmdInfo(awsClient, accessConfig, _flagOutput, _flagClusterDisallowPrompt)
}
},
}
Expand All @@ -410,7 +416,7 @@ var _clusterDownCmd = &cobra.Command{
}

// Check AWS access
awsClient, err := newAWSClient(accessConfig.Region)
awsClient, err := newAWSClient(accessConfig.Region, true)
if err != nil {
exit.Error(err)
}
Expand Down Expand Up @@ -447,7 +453,7 @@ var _clusterDownCmd = &cobra.Command{
}

// updating CLI env is best-effort, so ignore errors
loadBalancer, _ := getAWSOperatorLoadBalancer(accessConfig.ClusterName, awsClient)
loadBalancer, _ := getLoadBalancer(accessConfig.ClusterName, OperatorLoadBalancer, awsClient)

if _flagClusterDisallowPrompt {
fmt.Printf("your cluster named \"%s\" in %s will be spun down and all apis will be deleted\n\n", accessConfig.ClusterName, accessConfig.Region)
Expand Down Expand Up @@ -561,7 +567,7 @@ var _clusterExportCmd = &cobra.Command{
}

// Check AWS access
awsClient, err := newAWSClient(accessConfig.Region)
awsClient, err := newAWSClient(accessConfig.Region, true)
if err != nil {
exit.Error(err)
}
Expand All @@ -577,7 +583,7 @@ var _clusterExportCmd = &cobra.Command{
exit.Error(err)
}

loadBalancer, err := getAWSOperatorLoadBalancer(accessConfig.ClusterName, awsClient)
loadBalancer, err := getLoadBalancer(accessConfig.ClusterName, OperatorLoadBalancer, awsClient)
if err != nil {
exit.Error(err)
}
Expand Down Expand Up @@ -663,38 +669,60 @@ var _clusterExportCmd = &cobra.Command{
},
}

func cmdInfo(awsClient *aws.Client, accessConfig *clusterconfig.AccessConfig, disallowPrompt bool) {
if err := printInfoClusterState(awsClient, accessConfig); err != nil {
exit.Error(err)
func cmdInfo(awsClient *aws.Client, accessConfig *clusterconfig.AccessConfig, outputType flags.OutputType, disallowPrompt bool) {
if outputType == flags.PrettyOutputType {
if err := printInfoClusterState(awsClient, accessConfig); err != nil {
exit.Error(err)
}
}

clusterConfig := refreshCachedClusterConfig(*awsClient, accessConfig)
clusterConfig := refreshCachedClusterConfig(*awsClient, accessConfig, outputType == flags.PrettyOutputType)

out, exitCode, err := runManagerWithClusterConfig("/root/info.sh", &clusterConfig, awsClient, nil, nil, nil)
operatorLoadBalancer, err := getLoadBalancer(accessConfig.ClusterName, OperatorLoadBalancer, awsClient)
if err != nil {
exit.Error(err)
}
if exitCode == nil || *exitCode != 0 {
exit.Error(ErrorClusterInfo(out))
apiLoadBalancer, err := getLoadBalancer(accessConfig.ClusterName, APILoadBalancer, awsClient)
if err != nil {
exit.Error(err)
}

fmt.Println()
operatorEndpoint := *operatorLoadBalancer.DNSName
apiEndpoint := *apiLoadBalancer.DNSName

var operatorEndpoint string
for _, line := range strings.Split(out, "\n") {
// before modifying this, search for this prefix
if strings.HasPrefix(line, "operator: ") {
operatorEndpoint = "https://" + strings.TrimSpace(strings.TrimPrefix(line, "operator: "))
break
if outputType == flags.JSONOutputType {
infoResponse, err := getInfoOperatorResponse("https://" + operatorEndpoint)
if err != nil {
exit.Error(err)
}
infoResponse.ClusterConfig.Config = clusterConfig

jsonBytes, err := libjson.Marshal(map[string]interface{}{
"cluster_config": infoResponse.ClusterConfig.Config,
"cluster_metadata": infoResponse.ClusterConfig.OperatorMetadata,
"node_infos": infoResponse.NodeInfos,
"endpoint_operator": operatorEndpoint,
"endpoint_api": apiEndpoint,
})
if err != nil {
exit.Error(err)
}

fmt.Println(string(jsonBytes))
}
if outputType == flags.PrettyOutputType {
fmt.Println(console.Bold("endpoints:"))
fmt.Println("operator: ", operatorEndpoint)
fmt.Println("api load balancer:", apiEndpoint)
fmt.Println()

if err := printInfoOperatorResponse(clusterConfig, operatorEndpoint); err != nil {
exit.Error(err)
if err := printInfoOperatorResponse(clusterConfig, "https://"+operatorEndpoint); err != nil {
exit.Error(err)
}
}

if _flagClusterInfoEnv != "" {
if err := updateAWSCLIEnv(_flagClusterInfoEnv, operatorEndpoint, disallowPrompt); err != nil {
if err := updateCLIEnv(_flagClusterInfoEnv, "https://"+operatorEndpoint, disallowPrompt, outputType == flags.PrettyOutputType); err != nil {
exit.Error(err)
}
}
Expand Down Expand Up @@ -729,13 +757,7 @@ func printInfoOperatorResponse(clusterConfig clusterconfig.Config, operatorEndpo
}
yamlString := string(yamlBytes)

operatorConfig := cluster.OperatorConfig{
Telemetry: isTelemetryEnabled(),
ClientID: clientID(),
OperatorEndpoint: operatorEndpoint,
}

infoResponse, err := cluster.Info(operatorConfig)
infoResponse, err := getInfoOperatorResponse(operatorEndpoint)
if err != nil {
fmt.Println(yamlString)
return err
Expand All @@ -752,6 +774,15 @@ func printInfoOperatorResponse(clusterConfig clusterconfig.Config, operatorEndpo
return nil
}

func getInfoOperatorResponse(operatorEndpoint string) (*schema.InfoResponse, error) {
operatorConfig := cluster.OperatorConfig{
Telemetry: isTelemetryEnabled(),
ClientID: clientID(),
OperatorEndpoint: operatorEndpoint,
}
return cluster.Info(operatorConfig)
}

func printInfoPricing(infoResponse *schema.InfoResponse, clusterConfig clusterconfig.Config) {
eksPrice := aws.EKSPrices[clusterConfig.Region]
operatorInstancePrice := aws.InstanceMetadatas[clusterConfig.Region]["t3.medium"].Price
Expand Down Expand Up @@ -882,7 +913,7 @@ func printInfoNodes(infoResponse *schema.InfoResponse) {
t.MustPrint(&table.Opts{Sort: pointer.Bool(false)})
}

func updateAWSCLIEnv(envName string, operatorEndpoint string, disallowPrompt bool) error {
func updateCLIEnv(envName string, operatorEndpoint string, disallowPrompt bool, printToStdout bool) error {
prevEnv, err := readEnv(envName)
if err != nil {
return err
Expand All @@ -897,14 +928,20 @@ func updateAWSCLIEnv(envName string, operatorEndpoint string, disallowPrompt boo
envWasUpdated := false
if prevEnv == nil {
shouldWriteEnv = true
fmt.Println()
if printToStdout {
fmt.Println()
}
} else if prevEnv.OperatorEndpoint != operatorEndpoint {
envWasUpdated = true
if disallowPrompt {
shouldWriteEnv = true
fmt.Println()
if printToStdout {
if disallowPrompt {
shouldWriteEnv = true
fmt.Println()
} else {
shouldWriteEnv = prompt.YesOrNo(fmt.Sprintf("\nfound an existing environment named \"%s\"; would you like to overwrite it to connect to this cluster?", envName), "", "")
}
} else {
shouldWriteEnv = prompt.YesOrNo(fmt.Sprintf("\nfound an existing environment named \"%s\"; would you like to overwrite it to connect to this cluster?", envName), "", "")
shouldWriteEnv = true
}
}

Expand All @@ -914,10 +951,12 @@ func updateAWSCLIEnv(envName string, operatorEndpoint string, disallowPrompt boo
return err
}

if envWasUpdated {
fmt.Printf(console.Bold("the environment named \"%s\" has been updated to point to this cluster (and was set as the default environment)\n"), envName)
} else {
fmt.Printf(console.Bold("an environment named \"%s\" has been configured to point to this cluster (and was set as the default environment)\n"), envName)
if printToStdout {
if envWasUpdated {
fmt.Printf(console.Bold("the environment named \"%s\" has been updated to point to this cluster (and was set as the default environment)\n"), envName)
} else {
fmt.Printf(console.Bold("an environment named \"%s\" has been configured to point to this cluster (and was set as the default environment)\n"), envName)
}
}
}

Expand Down Expand Up @@ -948,7 +987,7 @@ func cmdDebug(awsClient *aws.Client, accessConfig *clusterconfig.AccessConfig) {
return
}

func refreshCachedClusterConfig(awsClient aws.Client, accessConfig *clusterconfig.AccessConfig) clusterconfig.Config {
func refreshCachedClusterConfig(awsClient aws.Client, accessConfig *clusterconfig.AccessConfig, printToStdout bool) clusterconfig.Config {
// add empty file if cached cluster doesn't exist so that the file output by manager container maintains current user permissions
cachedClusterConfigPath := cachedClusterConfigPath(accessConfig.ClusterName, accessConfig.Region)
containerConfigPath := fmt.Sprintf("/out/%s", filepath.Base(cachedClusterConfigPath))
Expand All @@ -960,7 +999,9 @@ func refreshCachedClusterConfig(awsClient aws.Client, accessConfig *clusterconfi
},
}

fmt.Print("syncing cluster configuration ...\n\n")
if printToStdout {
fmt.Print("syncing cluster configuration ...\n\n")
}
out, exitCode, err := runManagerAccessCommand("/root/refresh.sh "+containerConfigPath, *accessConfig, &awsClient, nil, copyFromPaths)
if err != nil {
exit.Error(err)
Expand Down Expand Up @@ -1109,18 +1150,29 @@ func createLogGroupIfNotFound(awsClient *aws.Client, logGroup string, tags map[s
return nil
}

// Will return error if load balancer can't be found
func getAWSOperatorLoadBalancer(clusterName string, awsClient *aws.Client) (*elbv2.LoadBalancer, error) {
type LoadBalancer string

var (
OperatorLoadBalancer LoadBalancer = "operator"
APILoadBalancer LoadBalancer = "api"
)

func (lb LoadBalancer) String() string {
return string(lb)
}

// Will return error if the load balancer can't be found
func getLoadBalancer(clusterName string, whichLB LoadBalancer, awsClient *aws.Client) (*elbv2.LoadBalancer, error) {
loadBalancer, err := awsClient.FindLoadBalancer(map[string]string{
clusterconfig.ClusterNameTag: clusterName,
"cortex.dev/load-balancer": "operator",
"cortex.dev/load-balancer": whichLB.String(),
})
if err != nil {
return nil, errors.Wrap(err, "unable to locate operator load balancer")
return nil, errors.Wrap(err, fmt.Sprintf("unable to locate %s load balancer", whichLB.String()))
}

if loadBalancer == nil {
return nil, ErrorNoOperatorLoadBalancer()
return nil, ErrorNoOperatorLoadBalancer(whichLB.String())
}

return loadBalancer, nil
Expand Down
Loading