Skip to content

Commit fcf8a4c

Browse files
Allow Epmd strategy to reconnect after connection failures (#183)
Co-authored-by: Paul Schoenfelder <[email protected]>
1 parent d908239 commit fcf8a4c

File tree

3 files changed

+86
-17
lines changed

3 files changed

+86
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## Unreleased
44

55
- Use new cypher names
6+
- Allow Epmd strategy to reconnect after connection failures
67
- Detect Self Signed Certificate Authority for Kubernetes Strategy
78
- Remove calls to deprecated `Logger.warn/2`
89

lib/strategy/epmd.ex

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,56 @@ defmodule Cluster.Strategy.Epmd do
1010
epmd_example: [
1111
strategy: #{__MODULE__},
1212
config: [
13+
timeout: 30_000,
1314
1415
16+
An optional timeout can be specified in the config. This is the timeout that
17+
will be used in the GenServer to connect the nodes. This defaults to
18+
`:infinity` meaning that the connection process will only happen when the
19+
worker is started. Any integer timeout will result in the connection process
20+
being triggered. In the example above, it has been configured for 30 seconds.
1521
"""
22+
use GenServer
1623
use Cluster.Strategy
1724

1825
alias Cluster.Strategy.State
1926

27+
@impl true
2028
def start_link([%State{config: config} = state]) do
2129
case Keyword.get(config, :hosts, []) do
2230
[] ->
2331
:ignore
2432

2533
nodes when is_list(nodes) ->
26-
Cluster.Strategy.connect_nodes(state.topology, state.connect, state.list_nodes, nodes)
27-
:ignore
34+
GenServer.start_link(__MODULE__, [state])
2835
end
2936
end
37+
38+
@impl true
39+
def init([state]) do
40+
connect_hosts(state)
41+
{:ok, state, configured_timeout(state)}
42+
end
43+
44+
@impl true
45+
def handle_info(:timeout, state) do
46+
handle_info(:connect, state)
47+
end
48+
49+
def handle_info(:connect, state) do
50+
connect_hosts(state)
51+
{:noreply, state, configured_timeout(state)}
52+
end
53+
54+
@spec configured_timeout(State.t()) :: integer() | :infinity
55+
defp configured_timeout(%State{config: config}) do
56+
Keyword.get(config, :timeout, :infinity)
57+
end
58+
59+
@spec connect_hosts(State.t()) :: State.t()
60+
defp connect_hosts(%State{config: config} = state) do
61+
nodes = Keyword.get(config, :hosts, [])
62+
Cluster.Strategy.connect_nodes(state.topology, state.connect, state.list_nodes, nodes)
63+
state
64+
end
3065
end

test/epmd_test.exs

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,56 @@ defmodule Cluster.Strategy.EpmdTest do
55

66
alias Cluster.Strategy.Epmd
77

8-
import ExUnit.CaptureLog
8+
require Cluster.Nodes
99

1010
describe "start_link/1" do
11-
test "calls right functions" do
12-
capture_log(fn ->
13-
:ignore =
14-
Epmd.start_link([
15-
%Cluster.Strategy.State{
16-
topology: :name,
17-
config: [hosts: [:foo@bar]],
18-
connect: {Cluster.Nodes, :connect, [self()]},
19-
list_nodes: {Cluster.Nodes, :list_nodes, [[]]}
20-
}
21-
])
22-
23-
assert_receive {:connect, :foo@bar}, 5_000
24-
end)
11+
@tag capture_log: true
12+
test "starts GenServer and connects nodes" do
13+
{:ok, pid} =
14+
Epmd.start_link([
15+
%Cluster.Strategy.State{
16+
topology: :name,
17+
config: [hosts: [:foo@bar]],
18+
connect: {Cluster.Nodes, :connect, [self()]},
19+
list_nodes: {Cluster.Nodes, :list_nodes, [[]]}
20+
}
21+
])
22+
23+
assert is_pid(pid)
24+
25+
assert_receive {:connect, :foo@bar}, 5_000
26+
end
27+
28+
@tag capture_log: true
29+
test "reconnects every time the configured timeout was reached" do
30+
timeout = 500
31+
start_timestamp = NaiveDateTime.utc_now()
32+
33+
{:ok, _pid} =
34+
Epmd.start_link([
35+
%Cluster.Strategy.State{
36+
topology: :name,
37+
config: [hosts: [:foo@bar], timeout: timeout],
38+
connect: {Cluster.Nodes, :connect, [self()]},
39+
list_nodes: {Cluster.Nodes, :list_nodes, [[]]}
40+
}
41+
])
42+
43+
# Initial connect
44+
assert_receive {:connect, :foo@bar}, 5_000
45+
46+
# First reconnect should not have happened right away,
47+
# but it should happen after a timeout
48+
refute_received {:connect, _}
49+
assert_receive {:connect, :foo@bar}, 2 * timeout
50+
51+
# A consecutive reconnect should not have happened right away,
52+
# but it should happen after a timeout
53+
refute_received {:connect, _}
54+
assert_receive {:connect, :foo@bar}, 2 * timeout
55+
56+
duration = NaiveDateTime.diff(NaiveDateTime.utc_now(), start_timestamp, :millisecond)
57+
assert duration > 2 * timeout
2558
end
2659
end
2760
end

0 commit comments

Comments
 (0)