HashiCorp Consul is a distributed, highly-available service
which provides service discovery with corresponding health checks, a distributed
key/value store, and a service mesh solution, which can run on a variety of
platforms and environments. It is designed so that every node which provides
services (things to be registered in service discovery, or participate in the
service mesh) runs a Consul agent, which acts as a sort of intermediary: providing
an easy interface for registering services, running local health checks for both
services and the node upon which it is running, and acting as a control plane for
service mesh components running on that local node, amongst other things. These
agents in turn talk to a small number of Consul servers, typically 3-5, which
are clustered to provide a replicated place to store data which must be persisted.
I’ve been using Consul for years, and, like most people, only interacted with
the RESTful HTTP API provided by Consul. That
communication typically happens on port 8500
. I knew there was a separate RPC
protocol, typically on port 8300
, which Consul agents used to talk to servers
(and servers in one Consul data center would use to talk to Consul servers in
other data centers), but I never gave it much thought. That is, until a few weeks
ago, when exploring some other part of Consul and dumping network traffic to see
what exactly was happening. Whatever those RPCs were, it wasn’t REST.
Let’s dig into how the Consul RPC mechanism works by tracing it through the
Consul code base. In this article we’ll be referencing the
Consul source code, tag v1.9.1
, the
latest tagged version of the code as of writing this article.
To give us an example to start with, let’s look at the agent code which gets
triggered when you make a GET /v1/kv/:key
call:
72
73
74
|
if err := s.agent.RPC(method, &listArgs, &out); err != nil {
return nil, err
}
|
(
agent/kvs_endpoint.go)
Okay, a function called RPC
, makes sense so far. method
is “KVS.Get” and s
is a pointer of type:
90
91
92
93
94
95
96
97
|
// HTTPHandlers provides an HTTP api for an agent.
type HTTPHandlers struct {
agent *Agent
denylist *Denylist
configReloaders []ConfigReloader
h http.Handler
metricsProxyCfg atomic.Value
}
|
(
agent/http.go)
agent
is a pointer of type:
150
151
152
153
154
155
156
|
// Agent is the long running process that is run on every machine.
// It exposes an RPC interface that is used by the CLI to control the
// agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
// However, it can run in either a client, or server mode. In server
// mode, it runs a full Consul server. In client-only mode, it only forwards
// requests to other Consul servers.
type Agent struct {
|
(
agent/agent.go)
This reveals a key point: both clients and servers run the agent; servers
run the full Consul server, clients simply forward requests on to one of
the Consul servers. This becomes clearer a bit later on. Finding the definition
of the RPC
function:
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
|
// RPC is used to make an RPC call to the Consul servers
// This allows the agent to implement the Consul.Interface
func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
a.endpointsLock.RLock()
// fast path: only translate if there are overrides
if len(a.endpoints) > 0 {
p := strings.SplitN(method, ".", 2)
if e := a.endpoints[p[0]]; e != "" {
method = e + "." + p[1]
}
}
a.endpointsLock.RUnlock()
return a.delegate.RPC(method, args, reply)
}
|
(
agent/agent.go)
An RPC call takes a method name (say, KVS.Get
), some arguments and a reply.
What’s that delegate
? Back in the Agent
struct:
166
167
168
|
// delegate is either a *consul.Server or *consul.Client
// depending on the configuration
delegate delegate
|
(
agent/agent.go)
A delegate
is just an interface:
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
type delegate interface {
GetLANCoordinate() (lib.CoordinateSet, error)
Leave() error
LANMembers() []serf.Member
LANMembersAllSegments() ([]serf.Member, error)
LANSegmentMembers(segment string) ([]serf.Member, error)
LocalMember() serf.Member
JoinLAN(addrs []string) (n int, err error)
RemoveFailedNode(node string, prune bool) error
ResolveToken(secretID string) (acl.Authorizer, error)
ResolveTokenToIdentity(secretID string) (structs.ACLIdentity, error)
ResolveTokenAndDefaultMeta(secretID string, entMeta *structs.EnterpriseMeta, authzContext *acl.AuthorizerContext) (acl.Authorizer, error)
RPC(method string, args interface{}, reply interface{}) error
UseLegacyACLs() bool
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error
Shutdown() error
Stats() map[string]map[string]string
ReloadConfig(config *consul.Config) error
enterpriseDelegate
}
|
(
agent/agent.go)
So far we know that when an agent makes an RPC
call, its delegate is asked to make
an RPC
call, and we have some inkling that a delegate somehow distinguishes itself
depending on if the agent is operating in client or server mode.
Digging a bit more, we find Start
, which actually starts the agent:
426
427
|
// Start verifies its configuration and runs an agent's various subprocesses.
func (a *Agent) Start(ctx context.Context) error {
|
[...]
480
481
482
483
484
485
486
487
488
489
490
491
492
493
|
// Setup either the client or the server.
if c.ServerMode {
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
a.delegate = server
} else {
client, err := consul.NewClient(consulCfg, a.baseDeps.Deps)
if err != nil {
return fmt.Errorf("Failed to start Consul client: %v", err)
}
a.delegate = client
}
|
(
agent/agent.go
If we go down the server path, we call NewServer
and that becomes our delegate,
and if we go down the client path we call NewClient
and that becomes our
delegate. Either way, the server
or client
must implement an RPC
call
to satisfy the delegate
interface. Why we do all this will become apparent
later on.
We’re still in client mode, so let’s see how a client handles being a delegate
.
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
|
// RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// This is subtle but we start measuring the time on the client side
// right at the time of the first request, vs. on the first retry as
// is done on the server side inside forward(). This is because the
// servers may already be applying the RPCHoldTimeout up there, so by
// starting the timer here we won't potentially double up the delay.
// TODO (slackpad) Plumb a deadline here with a context.
firstCheck := time.Now()
TRY:
manager, server := c.router.FindLANRoute()
if server == nil {
return structs.ErrNoServers
}
// Enforce the RPC limit.
metrics.IncrCounter([]string{"client", "rpc"}, 1)
if !c.rpcLimiter.Load().(*rate.Limiter).Allow() {
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
return structs.ErrRPCRateExceeded
}
// Make the request.
rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, method, args, reply)
if rpcErr == nil {
return nil
}
[...]
|
(
agent/consul/client.go)
On line 263 we see the client pick a server to talk to, then we do some stuff to enforce RPC
limits and finally, on line 276 we make the call using something called a connPool
. The
rest of the function deals with failures and retrying, and returning an error if we’re
not successful in time.
Back in our Client
struct we see:
64
65
66
67
68
69
|
// Connection pool to consul servers
connPool *pool.ConnPool
// router is responsible for the selection and maintenance of
// Consul servers this agent uses for RPC requests
router *router.Router
|
(
agent/consul/client.go)
I won’t go too far into the router
stuff; in brief it works with memberlist and Serf
to keep a list of healthy servers and rotate calls between them. It’s the bit that
when a Consul client goes “talk to a server” actually tells it which endpoint to talk
to to get a server.
Looking at the connPool
, let’s read the side of the can:
117
118
119
120
121
122
123
124
|
// ConnPool is used to maintain a connection pool to other Consul
// servers. This is used to reduce the latency of RPC requests between
// servers. It is only used to pool connections in the rpcConsul mode.
// Raft connections are pooled separately. Maintain at most one
// connection per host, for up to MaxTime. When MaxTime connection
// reaping is disabled. MaxStreams is used to control the number of idle
// streams allowed. If TLS settings are provided outgoing connections
// use TLS.
|
(
agent/pool/pool.go)
So, we keep a socket open and multiplex calls over it, using the RPC
function:
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
|
// RPC is used to make an RPC call to a remote host
func (p *ConnPool) RPC(
dc string,
nodeName string,
addr net.Addr,
method string,
args interface{},
reply interface{},
) error {
if nodeName == "" {
return fmt.Errorf("pool: ConnPool.RPC requires a node name")
}
// TODO (autoconf) probably will want to have a way to invoke the
// secure or insecure variant depending on whether its an ongoing
// or first time config request. For now though this is fine until
// those ongoing requests are implemented.
if method == "AutoEncrypt.Sign" || method == "AutoConfig.InitialConfiguration" {
return p.rpcInsecure(dc, addr, method, args, reply)
} else {
return p.rpc(dc, nodeName, addr, method, args, reply)
}
}
|
(
agent/pool/pool.go)
Two methods are handled differently, but neither of those are KVS.Get
, so the rpc
function is called:
591
592
593
594
595
596
597
598
599
600
601
602
|
func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, args interface{}, reply interface{}) error {
p.once.Do(p.init)
// Get a usable client
conn, sc, err := p.getClient(dc, nodeName, addr)
if err != nil {
return fmt.Errorf("rpc error getting client: %w", err)
}
// Make the RPC call
err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply)
[...]
|
(
agent/pool/pool.go)
Ah, now we’re getting somewhere: get a client for the node in question,
then use msgpackrpc.
Now we’re getting down to what’s going over the wire. From the msgpackrpc
repo:
This library provides the same functions as net/rpc/jsonrpc but
for communicating with MessagePack instead. The library is modeled
directly after the Go standard library so it should be easy to use
and obvious.
What we have here is a way of making Remote Procedure Calls: a client
says “make this call”, supplies some arguments, and the RPC library is
responsible for marshalling those arguments (here, using msgpack),
shoving the call to the server, getting the answer back, unmarshalling
the reply and returning it to the caller.
To recap where we are on the client side:
- Client needs to make an RPC call
- Both client and server run the same Consul agent, just in different modes, and to simplify things a
delegate
is the concrete implementation of how that RPC call is made
- In the client’s case, when the agent is started up it is configured to use the ‘client’ delegate
- The client delegate keeps track of the Consul servers, and with the help of memberlist and Serf,
keeps track of the healthy ones. It maintains a pool of connections to each server
- The client delegate uses this information to pick the Consul server to talk to, and
then uses an RPC library leveraging msgpack to actually send bits across the wire
What does a Consul server do to receive and handle these RPC calls? We’ve already been
at the start of this path before, in the Start
function — it’s just, this time,
we’re going down the ServerMode
route:
426
427
|
// Start verifies its configuration and runs an agent's various subprocesses.
func (a *Agent) Start(ctx context.Context) error {
|
[...]
480
481
482
483
484
485
486
487
488
489
490
491
492
493
|
// Setup either the client or the server.
if c.ServerMode {
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
a.delegate = server
} else {
client, err := consul.NewClient(consulCfg, a.baseDeps.Deps)
if err != nil {
return fmt.Errorf("Failed to start Consul client: %v", err)
}
a.delegate = client
}
|
(
agent/agent.go)
There’s a few things we care about in NewServer
:
312
313
314
|
// NewServer is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error.
func NewServer(config *Config, flat Deps) (*Server, error) {
|
(
agent/consul/server.go)
[...]
452
453
454
455
456
|
// Initialize the RPC layer.
if err := s.setupRPC(); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
}
|
(
agent/consul/server.go)
[...]
593
|
go s.listen(s.Listener)
|
(
agent/consul/server.go
The first, starting at line 452, is initializing the RPC layer, and then, on line 593,
starting the listener goroutine to actually handle RPC calls.
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
|
// endpointFactory is a function that returns an RPC endpoint bound to the given
// server.
type factory func(s *Server) interface{}
// endpoints is a list of registered RPC endpoint factories.
var endpoints []factory
// registerEndpoint registers a new RPC endpoint factory.
func registerEndpoint(fn factory) {
endpoints = append(endpoints, fn)
}
// setupRPC is used to setup the RPC listener
func (s *Server) setupRPC() error {
s.rpcConnLimiter.SetConfig(connlimit.Config{
MaxConnsPerClientIP: s.config.RPCMaxConnsPerClient,
})
for _, fn := range endpoints {
s.rpcServer.Register(fn(s))
}
|
(
agent/consul/server.go)
s.rpcServer
is a net/rpc
instance, and the Register
function
registers which interfaces the server is exposing. We loop through all the endpoints
and
register them with rpcServer
. How did we add all of those endpoints to begin with?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package consul
import "github.com/hashicorp/consul/logging"
func init() {
registerEndpoint(func(s *Server) interface{} { return &ACL{s, s.loggers.Named(logging.ACL)} })
registerEndpoint(func(s *Server) interface{} { return &Catalog{s, s.loggers.Named(logging.Catalog)} })
registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s, s.logger) })
registerEndpoint(func(s *Server) interface{} { return &ConfigEntry{s} })
registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s, logger: s.loggers.Named(logging.Connect)} })
registerEndpoint(func(s *Server) interface{} { return &FederationState{s} })
registerEndpoint(func(s *Server) interface{} { return &DiscoveryChain{s} })
registerEndpoint(func(s *Server) interface{} { return &Health{s} })
registerEndpoint(func(s *Server) interface{} { return &Intention{s, s.loggers.Named(logging.Intentions)} })
registerEndpoint(func(s *Server) interface{} { return &Internal{s, s.loggers.Named(logging.Internal)} })
registerEndpoint(func(s *Server) interface{} { return &KVS{s, s.loggers.Named(logging.KV)} })
registerEndpoint(func(s *Server) interface{} { return &Operator{s, s.loggers.Named(logging.Operator)} })
registerEndpoint(func(s *Server) interface{} { return &PreparedQuery{s, s.loggers.Named(logging.PreparedQuery)} })
registerEndpoint(func(s *Server) interface{} { return &Session{s, s.loggers.Named(logging.Session)} })
registerEndpoint(func(s *Server) interface{} { return &Status{s} })
registerEndpoint(func(s *Server) interface{} { return &Txn{s, s.loggers.Named(logging.Transaction)} })
}
|
(
agent/consul/server_register.go)
The init
function is called when the consul
package is instantiated, and that’s where
we register everything. Note line 16, registering KVS
— we’ll come back to that.
The listen
function called on line 593 of the NewServer
function is a standard
accept/handle loop:
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
// listen is used to listen for incoming RPC connections
func (s *Server) listen(listener net.Listener) {
for {
// Accept a connection
conn, err := listener.Accept()
if err != nil {
if s.shutdown {
return
}
s.rpcLogger().Error("failed to accept RPC conn", "error", err)
continue
}
free, err := s.rpcConnLimiter.Accept(conn)
if err != nil {
s.rpcLogger().Error("rejecting RPC conn from because rpc_max_conns_per_client exceeded", "conn", logConn(conn))
conn.Close()
continue
}
// Wrap conn so it will be auto-freed from conn limiter when it closes.
conn = connlimit.Wrap(conn, free)
go s.handleConn(conn, false)
metrics.IncrCounter([]string{"rpc", "accept_conn"}, 1)
}
}
|
(
agent/consul/rpc.go)
handleConn
does various things, since there may be many different things happening on the same
RPC port (RPC Calls, Raft RPC calls, etc), but what we care about is:
198
199
200
201
|
// Switch on the byte
switch typ {
case pool.RPCConsul:
s.handleConsulConn(conn)
|
(
agent/consul/rpc.go)
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
|
// handleConsulConn is used to service a single Consul RPC connection
func (s *Server) handleConsulConn(conn net.Conn) {
defer conn.Close()
rpcCodec := msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle)
for {
select {
case <-s.shutdownCh:
return
default:
}
if err := s.rpcServer.ServeRequest(rpcCodec); err != nil {
if err != io.EOF && !strings.Contains(err.Error(), "closed") {
s.rpcLogger().Error("RPC error",
"conn", logConn(conn),
"error", err,
)
metrics.IncrCounter([]string{"rpc", "request_error"}, 1)
}
return
}
metrics.IncrCounter([]string{"rpc", "request"}, 1)
}
}
|
(
agent/consul/rpc.go)
Line 404 sets up our rpcCodec
, tying together our conn
and telling the net/rpc
package
that the actual bits on the wire are encoded with Msgpack. Lines 412-422 handle the request,
processing it on line 412. If there’s an error, or if the connection has ended, it does the
right thing and the function returns. Otherwise, it will keep peeling off requests on the
connection, unless the Server is being told to shutdown, in which case it also goes away.
Remember line 16 back in agent/consul/server_register.go:
registerEndpoint(func(s *Server) interface{} { return &KVS{s, s.loggers.Named(logging.KV)} })
This is what exposes the Key/Value Store:
25
26
27
28
29
|
// KVS endpoint is used to manipulate the Key-Value store
type KVS struct {
srv *Server
logger hclog.Logger
}
|
(
agent/consul/kvs_endpoint.go)
That’s an exported type and
139
140
141
142
143
144
|
// Get is used to lookup a single key.
func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
if done, err := k.srv.ForwardRPC("KVS.Get", args, args, reply); done {
return err
}
[... actually perform get here]
|
(
agent/consul/kvs_endpoint.go)
since Get
is an exported method with the appropriate signature, the net/rpc
layer handles routing “KVS.Get” to this function, unmarshalling the request,
and marshalling the answer.
I’m not going to go over what actually happens under the covers when the get
happens, but lets look at line 141, that ForwardRPC
function call.
553
554
555
|
// ForwardRPC is used to forward an RPC request to a remote DC or to the local leader
// Returns a bool of if forwarding was performed, as well as any error
func (s *Server) ForwardRPC(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
|
(
agent/consul/rpc.go)
There’s a lot in that function, but it boils down to:
- Is the RPC call for a remote Consul data center? If so, forward the request on
to a server there and deliver back the answer (clients only ever talk to their
local server cluster, requests for other data centers are handled by their
local server cluster)
- Is this a read call and does the RPC request allow for stale reads? If so,
ForwardRPC
returns with false
, and the caller knows to just handle
it locally
- Am I the current leader? If so, return
false
and again the caller knows
to just handle it locally
- I’m not, so forward the request on to the current leader, and then deliver
back the answer (clients can make an RPC call to any server, if that server
can’t handle it then it forwards it on to the server which can)
Tying everything together:
- Both Consul servers and clients run the same agent, which acts as a server or
as a client based on configuration
- Both servers and clients may make RPC calls, so the notion of a delegate
is used to simplify things
- If an agent is started as a client, then it has a client delegate, and that
simply makes the appropriate call off to a server
- If the agent is started as a server, then it has a server delegate, and
it will either handle the request itself, or forward it on to the server
which can
- 3 and 4 both mean that anywhere in the agent code where an RPC call needs
to be made, you just make one, and the delegate does the right thing
for both client and server mode
- The
net/rpc
module in Go allows for any number of ways of actually
encoding the bits which go over the wire, in Consul’s case it uses
msgpackrpc to encode
the data using msgpack
Addendum: Astute code readers will know that modern Consul agents use
something called RPCMultiplexV2
, which uses Yamux
to multiplex multiple streams over a single connection. Functionally this
doesn’t matter, as every new stream is a new RPC channel, and those end
up being handled by handleConsulConn
as well.