Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ Closes the connection.
```Erlang
ered:command(client_ref(), command()) -> reply().
ered:command(client_ref(), command(), timeout()) -> reply().
ered:command(client_ref(), command(), req_opts()) -> reply().
```

Send a command and return the reply.
Expand All @@ -95,10 +96,14 @@ work as expected.
For cluster clients, a key must be provided.
Omitting timeout is the same as setting the timeout to infinity.

### `ered:command_async/3`
The third argument can be a timeout or a map of request options. See [Request
options](#request-options) below.

### `ered:command_async/3,4`

```Erlang
ered:command_async(client_ref(), command(), fun((reply()) -> any())) -> ok.
ered:command_async(client_ref(), command(), fun((reply()) -> any()), req_opts()) -> ok.
```

Like command/2,3 but asynchronous. Instead of returning the reply, the reply
Expand Down Expand Up @@ -142,6 +147,7 @@ cluster.
```Erlang
ered_cluster:command(cluster_ref(), command(), key()) -> reply().
ered_cluster:command(cluster_ref(), command(), key(), timeout()) -> reply().
ered_cluster:command(cluster_ref(), command(), key(), req_opts()) -> reply().
```

Send a command. The command is routed to
Expand All @@ -154,10 +160,14 @@ then they need to all map to the same slot for things to
work as expected.
Omitting timeout is the same as setting the timeout to infinity.

### `ered_cluster:command_async/4`
The fourth argument can be a timeout or a map of request options. See [Request
options](#request-options) below.

### `ered_cluster:command_async/4,5`

```Erlang
ered_cluster:command_async(cluster_ref(), command(), key(), fun((reply()) -> any())) -> ok.
ered_cluster:command_async(cluster_ref(), command(), key(), fun((reply()) -> any()), req_opts()) -> ok.
```

Like command/3,4 but asynchronous. Instead of returning the reply, the reply
Expand Down Expand Up @@ -370,6 +380,29 @@ options, as `{client_opts, [{connection_opts, [...]}]}`.
When a timeout happens, the connection is closed and the client attempts to
set up a new connection. See the client option `node_down_timeout` above.

Request options
---------------

The command functions accept an optional map of request options instead of a
plain timeout. The following keys are recognized:

* `timeout => timeout()`

Timeout for the `gen_server:call`. Same as passing a timeout directly.
Default infinity.

* `buffer_time => non_neg_integer()`

Buffer time in milliseconds. When non-zero, the command is not sent
immediately but buffered. A timer is started and the buffered commands are
flushed when the timer fires. If a command with `buffer_time => 0` (the
default) arrives while there are buffered commands, all buffered commands are
flushed immediately together with the new command. This can be used to
coalesce multiple commands into fewer TCP packets and TLS records.

If multiple buffered commands arrive with different buffer times, the shortest
remaining time is used.

Info messages
-------------

Expand Down
14 changes: 11 additions & 3 deletions src/ered.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
-export([connect/3,
close/1,
command/2, command/3,
command_async/3]).
command_async/3, command_async/4]).

-export_type([opt/0,
addr/0,
host/0,
command/0,
reply/0,
reply_fun/0,
req_opts/0,
client_ref/0]).

%%%===================================================================
Expand All @@ -27,6 +28,7 @@
-type command() :: ered_command:command().
-type reply() :: ered_client:reply() | {error, unmapped_slot | client_down}.
-type reply_fun() :: ered_client:reply_fun().
-type req_opts() :: #{timeout => timeout(), buffer_time => non_neg_integer()}.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For opt() we define that in ered_client:opt(), maybe we should define req_opts() in ered_client:req_opts() as well

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... I don't really know what's the best style to use.

If we define types as alias of types in other modules that are not part of the public API, then some tools that generate documentation (exdoc, edoc, etc.) don't do this in a nice way. They will display it as an alias of ered_client:req_opts() without saying what that is, since that module is not included in the docs. In docs, I prefer to see the explicit definition rather than alias. We don't use such tools now but we might in the future.

Also when reading the code, I don't like the indirection, but it's good to avoid duplicating the types. Sometimes I think it'd be better move all types to the ered module and let the other modules use that. WDYT?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have a .hrl file since we now have two user-facing APIs, ered_cluster.erl for cluster and ered.erl for "standalone",
but since its name would most likely be erl.hrl we can put it in ered.erl directly I guess, sounds fine with me.

-type client_ref() :: gen_server:server_ref().

%%%===================================================================
Expand All @@ -51,7 +53,7 @@ close(Pid) ->

%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec command(client_ref(), command()) -> reply().
-spec command(client_ref(), command(), timeout()) -> reply().
-spec command(client_ref(), command(), timeout() | req_opts()) -> reply().
%%
%% Send a command.
%% If the command is a single command then it is represented as a
Expand All @@ -64,10 +66,13 @@ close(Pid) ->
command(Pid, Command) ->
ered_client:command(Pid, Command, infinity).
command(Pid, Command, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
ered_client:command(Pid, Command, Timeout).
ered_client:command(Pid, Command, Timeout);
command(Pid, Command, Opts) when is_map(Opts) ->
ered_client:command(Pid, Command, Opts).

%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec command_async(client_ref(), command(), fun((reply()) -> any())) -> ok.
-spec command_async(client_ref(), command(), fun((reply()) -> any()), req_opts()) -> ok.
%%
%% Like command/2,3 but asynchronous. Instead of returning the reply,
%% the reply function is applied to the reply when it is available.
Expand All @@ -76,3 +81,6 @@ command(Pid, Command, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
command_async(Pid, Command, ReplyFun) when is_function(ReplyFun, 1) ->
ered_client:command_async(Pid, Command, ReplyFun).

command_async(Pid, Command, ReplyFun, Opts) when is_function(ReplyFun, 1), is_map(Opts) ->
ered_client:command_async(Pid, Command, ReplyFun, Opts).
63 changes: 53 additions & 10 deletions src/ered_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
connect/3, close/1,
deactivate/1, reactivate/1,
command/2, command/3,
command_async/3]).
command_async/3, command_async/4]).

%% testing/debugging
-export([state_to_map/1]).
Expand Down Expand Up @@ -83,6 +83,8 @@
status :: init | up | node_down | node_deactivated,
node_down_timer = none :: none | reference(),
connected_at = none :: none | integer(), % erlang:monotonic_time(millisecond)
buffer_until = none :: none | integer(), % erlang:monotonic_time(millisecond)
buffer_timer = none :: none | reference(),
opts = #opts{}

}).
Expand Down Expand Up @@ -258,7 +260,7 @@ reactivate(ServerRef) ->

%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec command(pid(), ered_command:command()) -> reply().
-spec command(pid(), ered_command:command(), timeout()) -> reply().
-spec command(pid(), ered_command:command(), timeout() | ered:req_opts()) -> reply().
%%
%% Send a command to the connected node. The argument can be a
%% single command as a list of binaries, a pipeline of command as a
Expand All @@ -267,20 +269,30 @@ reactivate(ServerRef) ->
command(ServerRef, Command) ->
command(ServerRef, Command, infinity).

command(ServerRef, Command, Timeout) ->
gen_server:call(ServerRef, {command, ered_command:convert_to(Command)}, Timeout).
command(ServerRef, Command, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
gen_server:call(ServerRef, {command, ered_command:convert_to(Command), 0}, Timeout);
command(ServerRef, Command, Opts) when is_map(Opts) ->
Timeout = maps:get(timeout, Opts, infinity),
BufferTime = maps:get(buffer_time, Opts, 0),
gen_server:call(ServerRef, {command, ered_command:convert_to(Command), BufferTime}, Timeout).

%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec command_async(pid(), ered_command:command(), reply_fun()) -> ok.
-spec command_async(pid(), ered_command:command(), reply_fun(), ered:req_opts()) -> ok.
%%
%% Send a command to the connected node in asynchronous
%% fashion. The provided callback function will be called with the
%% reply. Note that the callback function will executing in the redis
%% client process and should not hang or perform any lengthy task.
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
command_async(ServerRef, Command, CallbackFun) ->
gen_server:cast(ServerRef, #command{data = ered_command:convert_to(Command),
replyto = CallbackFun}).
command_async(ServerRef, Command, CallbackFun, #{}).

command_async(ServerRef, Command, CallbackFun, Opts) when is_map(Opts) ->
BufferTime = maps:get(buffer_time, Opts, 0),
gen_server:cast(ServerRef, {#command{data = ered_command:convert_to(Command),
replyto = CallbackFun},
BufferTime}).

%% Converts a state record to a map, for easier testing.
%% Used in tests, after calling sys:get_state(EredClientPid).
Expand Down Expand Up @@ -347,16 +359,16 @@ handle_connection_opts(OptsRecord, Opts) ->
timeout = ResponseTimeout,
push_cb = PushCb}.

handle_call({command, Command}, From, State) ->
handle_call({command, Command, BufferTime}, From, State) ->
Fun = fun(Reply) -> gen_server:reply(From, Reply) end,
handle_cast(#command{data = Command, replyto = Fun}, State).
handle_cast({#command{data = Command, replyto = Fun}, BufferTime}, State).


handle_cast(Command = #command{}, State) ->
handle_cast({Command = #command{}, BufferTime}, State) ->
case State#st.status of
Up when Up =:= up; Up =:= init ->
State1 = State#st{waiting = q_in(Command, State#st.waiting)},
State2 = process_commands(State1),
State2 = maybe_buffer(BufferTime, State1),
{noreply, State2, response_timeout(State2)};
NodeProblem when NodeProblem =:= node_down; NodeProblem =:= node_deactivated ->
reply_command(Command, {error, NodeProblem}),
Expand Down Expand Up @@ -456,6 +468,11 @@ handle_info({timeout, TimerRef, node_down}, State) when TimerRef == State#st.nod
State2 = reply_all({error, node_down}, State1),
{noreply, process_commands(State2#st{status = node_down})};

handle_info({timeout, Ref, flush_buffer}, #st{buffer_timer = Ref} = State) ->
State1 = State#st{buffer_timer = none, buffer_until = none},
State2 = process_commands(State1),
{noreply, State2, response_timeout(State2)};

handle_info(timeout, #st{socket = Socket} = State) when Socket =/= none ->
%% Request timeout
Transport = State#st.opts#opts.transport,
Expand Down Expand Up @@ -807,6 +824,32 @@ response_timeout(State) when not ?q_is_empty(State#st.pending) ->
response_timeout(_State) ->
infinity.

maybe_buffer(0, State) ->
cancel_buffer_timer(process_commands(State));
maybe_buffer(BufferTime, State) ->
Now = erlang:monotonic_time(millisecond),
Until = Now + BufferTime,
case State#st.buffer_until of
none ->
start_buffer_timer(Until, State);
Existing when Until < Existing ->
erlang:cancel_timer(State#st.buffer_timer),
start_buffer_timer(Until, State);
_Later ->
State
end.

start_buffer_timer(Until, State) ->
Ms = max(1, Until - erlang:monotonic_time(millisecond)),
Ref = erlang:start_timer(Ms, self(), flush_buffer),
State#st{buffer_timer = Ref, buffer_until = Until}.

cancel_buffer_timer(#st{buffer_timer = none} = State) ->
State;
cancel_buffer_timer(#st{buffer_timer = Ref} = State) ->
erlang:cancel_timer(Ref),
State#st{buffer_timer = none, buffer_until = none}.

reply_command(#command{replyto = Fun} = _Command, Reply) ->
Fun(Reply).

Expand Down
Loading
Loading