Consul RPC Mechanism

HashiCorp Consul is a distributed, highly-available service which provides service discovery with corresponding health checks, a distributed key/value store, and a service mesh solution, which can run on a variety of platforms and environments. It is designed so that every node which provides services (things to be registered in service discovery, or participate in the service mesh) runs a Consul agent, which acts as a sort of intermediary: providing an easy interface for registering services, running local health checks for both services and the node upon which it is running, and acting as a control plane for service mesh components running on that local node, amongst other things. These agents in turn talk to a small number of Consul servers, typically 3-5, which are clustered to provide a replicated place to store data which must be persisted.

I’ve been using Consul for years, and, like most people, only interacted with the RESTful HTTP API provided by Consul. That communication typically happens on port 8500. I knew there was a separate RPC protocol, typically on port 8300, which Consul agents used to talk to servers (and servers in one Consul data center would use to talk to Consul servers in other data centers), but I never gave it much thought. That is, until a few weeks ago, when exploring some other part of Consul and dumping network traffic to see what exactly was happening. Whatever those RPCs were, it wasn’t REST.

Let’s dig into how the Consul RPC mechanism works by tracing it through the Consul code base. In this article we’ll be referencing the Consul source code, tag v1.9.1, the latest tagged version of the code as of writing this article.

To give us an example to start with, let’s look at the agent code which gets triggered when you make a GET /v1/kv/:key call:

72
73
74
         if err := s.agent.RPC(method, &listArgs, &out); err != nil {
                 return nil, err
         }
(agent/kvs_endpoint.go)

Okay, a function called RPC, makes sense so far. method is “KVS.Get” and s is a pointer of type:

90
91
92
93
94
95
96
97
// HTTPHandlers provides an HTTP api for an agent.
type HTTPHandlers struct {
	agent           *Agent
	denylist        *Denylist
	configReloaders []ConfigReloader
	h               http.Handler
	metricsProxyCfg atomic.Value
}
(agent/http.go)

agent is a pointer of type:

150
151
152
153
154
155
156
// Agent is the long running process that is run on every machine.
// It exposes an RPC interface that is used by the CLI to control the
// agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
// However, it can run in either a client, or server mode. In server
// mode, it runs a full Consul server. In client-only mode, it only forwards
// requests to other Consul servers.
type Agent struct {
(agent/agent.go)

This reveals a key point: both clients and servers run the agent; servers run the full Consul server, clients simply forward requests on to one of the Consul servers. This becomes clearer a bit later on. Finding the definition of the RPC function:

1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
// RPC is used to make an RPC call to the Consul servers
// This allows the agent to implement the Consul.Interface
func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
	a.endpointsLock.RLock()
	// fast path: only translate if there are overrides
	if len(a.endpoints) > 0 {
		p := strings.SplitN(method, ".", 2)
		if e := a.endpoints[p[0]]; e != "" {
			method = e + "." + p[1]
		}
	}
	a.endpointsLock.RUnlock()
	return a.delegate.RPC(method, args, reply)
}
(agent/agent.go)

An RPC call takes a method name (say, KVS.Get), some arguments and a reply. What’s that delegate? Back in the Agent struct:

166
167
168
	// delegate is either a *consul.Server or *consul.Client
	// depending on the configuration
	delegate delegate
(agent/agent.go)

A delegate is just an interface:

122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
type delegate interface {
	GetLANCoordinate() (lib.CoordinateSet, error)
	Leave() error
	LANMembers() []serf.Member
	LANMembersAllSegments() ([]serf.Member, error)
	LANSegmentMembers(segment string) ([]serf.Member, error)
	LocalMember() serf.Member
	JoinLAN(addrs []string) (n int, err error)
	RemoveFailedNode(node string, prune bool) error
	ResolveToken(secretID string) (acl.Authorizer, error)
	ResolveTokenToIdentity(secretID string) (structs.ACLIdentity, error)
	ResolveTokenAndDefaultMeta(secretID string, entMeta *structs.EnterpriseMeta, authzContext *acl.AuthorizerContext) (acl.Authorizer, error)
	RPC(method string, args interface{}, reply interface{}) error
	UseLegacyACLs() bool
	SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error
	Shutdown() error
	Stats() map[string]map[string]string
	ReloadConfig(config *consul.Config) error
	enterpriseDelegate
}
(agent/agent.go)

So far we know that when an agent makes an RPC call, its delegate is asked to make an RPC call, and we have some inkling that a delegate somehow distinguishes itself depending on if the agent is operating in client or server mode.

Digging a bit more, we find Start, which actually starts the agent:

426
427
// Start verifies its configuration and runs an agent's various subprocesses.
func (a *Agent) Start(ctx context.Context) error {
[...]
480
481
482
483
484
485
486
487
488
489
490
491
492
493
	// Setup either the client or the server.
	if c.ServerMode {
		server, err := consul.NewServer(consulCfg, a.baseDeps.Deps)
		if err != nil {
			return fmt.Errorf("Failed to start Consul server: %v", err)
		}
		a.delegate = server
	} else {
		client, err := consul.NewClient(consulCfg, a.baseDeps.Deps)
		if err != nil {
			return fmt.Errorf("Failed to start Consul client: %v", err)
		}
		a.delegate = client
	}
(agent/agent.go

If we go down the server path, we call NewServer and that becomes our delegate, and if we go down the client path we call NewClient and that becomes our delegate. Either way, the server or client must implement an RPC call to satisfy the delegate interface. Why we do all this will become apparent later on.

We’re still in client mode, so let’s see how a client handles being a delegate.

252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
// RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
	// This is subtle but we start measuring the time on the client side
	// right at the time of the first request, vs. on the first retry as
	// is done on the server side inside forward(). This is because the
	// servers may already be applying the RPCHoldTimeout up there, so by
	// starting the timer here we won't potentially double up the delay.
	// TODO (slackpad) Plumb a deadline here with a context.
	firstCheck := time.Now()

TRY:
	manager, server := c.router.FindLANRoute()
	if server == nil {
		return structs.ErrNoServers
	}

	// Enforce the RPC limit.
	metrics.IncrCounter([]string{"client", "rpc"}, 1)
	if !c.rpcLimiter.Load().(*rate.Limiter).Allow() {
		metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
		return structs.ErrRPCRateExceeded
	}

	// Make the request.
	rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, method, args, reply)
	if rpcErr == nil {
		return nil
	}
[...]
(agent/consul/client.go)

On line 263 we see the client pick a server to talk to, then we do some stuff to enforce RPC limits and finally, on line 276 we make the call using something called a connPool. The rest of the function deals with failures and retrying, and returning an error if we’re not successful in time.

Back in our Client struct we see:

64
65
66
67
68
69
	// Connection pool to consul servers
	connPool *pool.ConnPool

	// router is responsible for the selection and maintenance of
	// Consul servers this agent uses for RPC requests
	router *router.Router
(agent/consul/client.go)

I won’t go too far into the router stuff; in brief it works with memberlist and Serf to keep a list of healthy servers and rotate calls between them. It’s the bit that when a Consul client goes “talk to a server” actually tells it which endpoint to talk to to get a server.

Looking at the connPool, let’s read the side of the can:

117
118
119
120
121
122
123
124
// ConnPool is used to maintain a connection pool to other Consul
// servers. This is used to reduce the latency of RPC requests between
// servers. It is only used to pool connections in the rpcConsul mode.
// Raft connections are pooled separately. Maintain at most one
// connection per host, for up to MaxTime. When MaxTime connection
// reaping is disabled. MaxStreams is used to control the number of idle
// streams allowed. If TLS settings are provided outgoing connections
// use TLS.
(agent/pool/pool.go)

So, we keep a socket open and multiplex calls over it, using the RPC function:

541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
// RPC is used to make an RPC call to a remote host
func (p *ConnPool) RPC(
	dc string,
	nodeName string,
	addr net.Addr,
	method string,
	args interface{},
	reply interface{},
) error {
	if nodeName == "" {
		return fmt.Errorf("pool: ConnPool.RPC requires a node name")
	}

	// TODO (autoconf) probably will want to have a way to invoke the
	// secure or insecure variant depending on whether its an ongoing
	// or first time config request. For now though this is fine until
	// those ongoing requests are implemented.
	if method == "AutoEncrypt.Sign" || method == "AutoConfig.InitialConfiguration" {
		return p.rpcInsecure(dc, addr, method, args, reply)
	} else {
		return p.rpc(dc, nodeName, addr, method, args, reply)
	}
}
(agent/pool/pool.go)

Two methods are handled differently, but neither of those are KVS.Get, so the rpc function is called:

591
592
593
594
595
596
597
598
599
600
601
602
func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, args interface{}, reply interface{}) error {
	p.once.Do(p.init)

	// Get a usable client
	conn, sc, err := p.getClient(dc, nodeName, addr)
	if err != nil {
		return fmt.Errorf("rpc error getting client: %w", err)
	}

	// Make the RPC call
	err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply)
        [...]
(agent/pool/pool.go)

Ah, now we’re getting somewhere: get a client for the node in question, then use msgpackrpc. Now we’re getting down to what’s going over the wire. From the msgpackrpc repo:

This library provides the same functions as net/rpc/jsonrpc but
for communicating with MessagePack instead. The library is modeled
directly after the Go standard library so it should be easy to use
and obvious.

What we have here is a way of making Remote Procedure Calls: a client says “make this call”, supplies some arguments, and the RPC library is responsible for marshalling those arguments (here, using msgpack), shoving the call to the server, getting the answer back, unmarshalling the reply and returning it to the caller.

To recap where we are on the client side:

  1. Client needs to make an RPC call
  2. Both client and server run the same Consul agent, just in different modes, and to simplify things a delegate is the concrete implementation of how that RPC call is made
  3. In the client’s case, when the agent is started up it is configured to use the ‘client’ delegate
  4. The client delegate keeps track of the Consul servers, and with the help of memberlist and Serf, keeps track of the healthy ones. It maintains a pool of connections to each server
  5. The client delegate uses this information to pick the Consul server to talk to, and then uses an RPC library leveraging msgpack to actually send bits across the wire

What does a Consul server do to receive and handle these RPC calls? We’ve already been at the start of this path before, in the Start function — it’s just, this time, we’re going down the ServerMode route:

426
427
// Start verifies its configuration and runs an agent's various subprocesses.
func (a *Agent) Start(ctx context.Context) error {
[...]
480
481
482
483
484
485
486
487
488
489
490
491
492
493
	// Setup either the client or the server.
	if c.ServerMode {
		server, err := consul.NewServer(consulCfg, a.baseDeps.Deps)
		if err != nil {
			return fmt.Errorf("Failed to start Consul server: %v", err)
		}
		a.delegate = server
	} else {
		client, err := consul.NewClient(consulCfg, a.baseDeps.Deps)
		if err != nil {
			return fmt.Errorf("Failed to start Consul client: %v", err)
		}
		a.delegate = client
	}
(agent/agent.go)

There’s a few things we care about in NewServer:

312
313
314
// NewServer is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error.
func NewServer(config *Config, flat Deps) (*Server, error) {
(agent/consul/server.go)

[...]

452
453
454
455
456
	// Initialize the RPC layer.
	if err := s.setupRPC(); err != nil {
		s.Shutdown()
		return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
	}
(agent/consul/server.go)

[...]

593
	go s.listen(s.Listener)
(agent/consul/server.go

The first, starting at line 452, is initializing the RPC layer, and then, on line 593, starting the listener goroutine to actually handle RPC calls.

822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
// endpointFactory is a function that returns an RPC endpoint bound to the given
// server.
type factory func(s *Server) interface{}

// endpoints is a list of registered RPC endpoint factories.
var endpoints []factory

// registerEndpoint registers a new RPC endpoint factory.
func registerEndpoint(fn factory) {
	endpoints = append(endpoints, fn)
}

// setupRPC is used to setup the RPC listener
func (s *Server) setupRPC() error {
	s.rpcConnLimiter.SetConfig(connlimit.Config{
		MaxConnsPerClientIP: s.config.RPCMaxConnsPerClient,
	})

	for _, fn := range endpoints {
		s.rpcServer.Register(fn(s))
	}
(agent/consul/server.go)

s.rpcServer is a net/rpc instance, and the Register function registers which interfaces the server is exposing. We loop through all the endpoints and register them with rpcServer. How did we add all of those endpoints to begin with?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package consul

import "github.com/hashicorp/consul/logging"

func init() {
	registerEndpoint(func(s *Server) interface{} { return &ACL{s, s.loggers.Named(logging.ACL)} })
	registerEndpoint(func(s *Server) interface{} { return &Catalog{s, s.loggers.Named(logging.Catalog)} })
	registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s, s.logger) })
	registerEndpoint(func(s *Server) interface{} { return &ConfigEntry{s} })
	registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s, logger: s.loggers.Named(logging.Connect)} })
	registerEndpoint(func(s *Server) interface{} { return &FederationState{s} })
	registerEndpoint(func(s *Server) interface{} { return &DiscoveryChain{s} })
	registerEndpoint(func(s *Server) interface{} { return &Health{s} })
	registerEndpoint(func(s *Server) interface{} { return &Intention{s, s.loggers.Named(logging.Intentions)} })
	registerEndpoint(func(s *Server) interface{} { return &Internal{s, s.loggers.Named(logging.Internal)} })
	registerEndpoint(func(s *Server) interface{} { return &KVS{s, s.loggers.Named(logging.KV)} })
	registerEndpoint(func(s *Server) interface{} { return &Operator{s, s.loggers.Named(logging.Operator)} })
	registerEndpoint(func(s *Server) interface{} { return &PreparedQuery{s, s.loggers.Named(logging.PreparedQuery)} })
	registerEndpoint(func(s *Server) interface{} { return &Session{s, s.loggers.Named(logging.Session)} })
	registerEndpoint(func(s *Server) interface{} { return &Status{s} })
	registerEndpoint(func(s *Server) interface{} { return &Txn{s, s.loggers.Named(logging.Transaction)} })
}
(agent/consul/server_register.go)

The init function is called when the consul package is instantiated, and that’s where we register everything. Note line 16, registering KVS — we’ll come back to that.

The listen function called on line 593 of the NewServer function is a standard accept/handle loop:

102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// listen is used to listen for incoming RPC connections
func (s *Server) listen(listener net.Listener) {
	for {
		// Accept a connection
		conn, err := listener.Accept()
		if err != nil {
			if s.shutdown {
				return
			}
			s.rpcLogger().Error("failed to accept RPC conn", "error", err)
			continue
		}

		free, err := s.rpcConnLimiter.Accept(conn)
		if err != nil {
			s.rpcLogger().Error("rejecting RPC conn from because rpc_max_conns_per_client exceeded", "conn", logConn(conn))
			conn.Close()
			continue
		}
		// Wrap conn so it will be auto-freed from conn limiter when it closes.
		conn = connlimit.Wrap(conn, free)

		go s.handleConn(conn, false)
		metrics.IncrCounter([]string{"rpc", "accept_conn"}, 1)
	}
}
(agent/consul/rpc.go)

handleConn does various things, since there may be many different things happening on the same RPC port (RPC Calls, Raft RPC calls, etc), but what we care about is:

198
199
200
201
	// Switch on the byte
	switch typ {
	case pool.RPCConsul:
		s.handleConsulConn(conn)
(agent/consul/rpc.go)

401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
// handleConsulConn is used to service a single Consul RPC connection
func (s *Server) handleConsulConn(conn net.Conn) {
	defer conn.Close()
	rpcCodec := msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle)
	for {
		select {
		case <-s.shutdownCh:
			return
		default:
		}

		if err := s.rpcServer.ServeRequest(rpcCodec); err != nil {
			if err != io.EOF && !strings.Contains(err.Error(), "closed") {
				s.rpcLogger().Error("RPC error",
					"conn", logConn(conn),
					"error", err,
				)
				metrics.IncrCounter([]string{"rpc", "request_error"}, 1)
			}
			return
		}
		metrics.IncrCounter([]string{"rpc", "request"}, 1)
	}
}
(agent/consul/rpc.go)

Line 404 sets up our rpcCodec, tying together our conn and telling the net/rpc package that the actual bits on the wire are encoded with Msgpack. Lines 412-422 handle the request, processing it on line 412. If there’s an error, or if the connection has ended, it does the right thing and the function returns. Otherwise, it will keep peeling off requests on the connection, unless the Server is being told to shutdown, in which case it also goes away.

Remember line 16 back in agent/consul/server_register.go:

	registerEndpoint(func(s *Server) interface{} { return &KVS{s, s.loggers.Named(logging.KV)} })

This is what exposes the Key/Value Store:

25
26
27
28
29
// KVS endpoint is used to manipulate the Key-Value store
type KVS struct {
	srv    *Server
	logger hclog.Logger
}
(agent/consul/kvs_endpoint.go)

That’s an exported type and

139
140
141
142
143
144
// Get is used to lookup a single key.
func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
	if done, err := k.srv.ForwardRPC("KVS.Get", args, args, reply); done {
		return err
	}
	[... actually perform get here]
(agent/consul/kvs_endpoint.go)

since Get is an exported method with the appropriate signature, the net/rpc layer handles routing “KVS.Get” to this function, unmarshalling the request, and marshalling the answer.

I’m not going to go over what actually happens under the covers when the get happens, but lets look at line 141, that ForwardRPC function call.

553
554
555
// ForwardRPC is used to forward an RPC request to a remote DC or to the local leader
// Returns a bool of if forwarding was performed, as well as any error
func (s *Server) ForwardRPC(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
(agent/consul/rpc.go)

There’s a lot in that function, but it boils down to:

  1. Is the RPC call for a remote Consul data center? If so, forward the request on to a server there and deliver back the answer (clients only ever talk to their local server cluster, requests for other data centers are handled by their local server cluster)
  2. Is this a read call and does the RPC request allow for stale reads? If so, ForwardRPC returns with false, and the caller knows to just handle it locally
  3. Am I the current leader? If so, return false and again the caller knows to just handle it locally
  4. I’m not, so forward the request on to the current leader, and then deliver back the answer (clients can make an RPC call to any server, if that server can’t handle it then it forwards it on to the server which can)

Tying everything together:

  1. Both Consul servers and clients run the same agent, which acts as a server or as a client based on configuration
  2. Both servers and clients may make RPC calls, so the notion of a delegate is used to simplify things
  3. If an agent is started as a client, then it has a client delegate, and that simply makes the appropriate call off to a server
  4. If the agent is started as a server, then it has a server delegate, and it will either handle the request itself, or forward it on to the server which can
  5. 3 and 4 both mean that anywhere in the agent code where an RPC call needs to be made, you just make one, and the delegate does the right thing for both client and server mode
  6. The net/rpc module in Go allows for any number of ways of actually encoding the bits which go over the wire, in Consul’s case it uses msgpackrpc to encode the data using msgpack

Addendum: Astute code readers will know that modern Consul agents use something called RPCMultiplexV2, which uses Yamux to multiplex multiple streams over a single connection. Functionally this doesn’t matter, as every new stream is a new RPC channel, and those end up being handled by handleConsulConn as well.