From c9f68a4d8fc1ae5ef1dd4237adee7c3638581015 Mon Sep 17 00:00:00 2001 From: cong meng Date: Fri, 8 Jan 2021 11:55:42 +1300 Subject: [PATCH] fix(kubernetes): removes kube client cache when edge proxy is removed (#4487) (#4574) Co-authored-by: Simon Meng --- .../proxy/factory/kubernetes/transport.go | 73 +++++++++---------- api/http/proxy/manager.go | 5 ++ api/kubernetes/cli/client.go | 5 ++ 3 files changed, 45 insertions(+), 38 deletions(-) diff --git a/api/http/proxy/factory/kubernetes/transport.go b/api/http/proxy/factory/kubernetes/transport.go index 4fbacf590..c1d0de13b 100644 --- a/api/http/proxy/factory/kubernetes/transport.go +++ b/api/http/proxy/factory/kubernetes/transport.go @@ -3,6 +3,7 @@ package kubernetes import ( "crypto/tls" "fmt" + "log" "net/http" "github.com/portainer/portainer/api/http/security" @@ -13,14 +14,16 @@ import ( type ( localTransport struct { - httpTransport *http.Transport - tokenManager *tokenManager + httpTransport *http.Transport + tokenManager *tokenManager + endpointIdentifier portainer.EndpointID } agentTransport struct { - httpTransport *http.Transport - tokenManager *tokenManager - signatureService portainer.DigitalSignatureService + httpTransport *http.Transport + tokenManager *tokenManager + signatureService portainer.DigitalSignatureService + endpointIdentifier portainer.EndpointID } edgeTransport struct { @@ -50,21 +53,11 @@ func NewLocalTransport(tokenManager *tokenManager) (*localTransport, error) { // RoundTrip is the implementation of the the http.RoundTripper interface func (transport *localTransport) RoundTrip(request *http.Request) (*http.Response, error) { - tokenData, err := security.RetrieveTokenData(request) + token, err := getRoundTripToken(request, transport.tokenManager, transport.endpointIdentifier) if err != nil { return nil, err } - var token string - if tokenData.Role == portainer.AdministratorRole { - token = transport.tokenManager.getAdminServiceAccountToken() - } else { - token, err = transport.tokenManager.getUserServiceAccountToken(int(tokenData.ID)) - if err != nil { - return nil, err - } - } - request.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) return transport.httpTransport.RoundTrip(request) @@ -85,21 +78,11 @@ func NewAgentTransport(signatureService portainer.DigitalSignatureService, tlsCo // RoundTrip is the implementation of the the http.RoundTripper interface func (transport *agentTransport) RoundTrip(request *http.Request) (*http.Response, error) { - tokenData, err := security.RetrieveTokenData(request) + token, err := getRoundTripToken(request, transport.tokenManager, transport.endpointIdentifier) if err != nil { return nil, err } - var token string - if tokenData.Role == portainer.AdministratorRole { - token = transport.tokenManager.getAdminServiceAccountToken() - } else { - token, err = transport.tokenManager.getUserServiceAccountToken(int(tokenData.ID)) - if err != nil { - return nil, err - } - } - request.Header.Set(portainer.PortainerAgentKubernetesSATokenHeader, token) signature, err := transport.signatureService.CreateSignature(portainer.PortainerAgentSignatureMessage) @@ -127,21 +110,11 @@ func NewEdgeTransport(reverseTunnelService portainer.ReverseTunnelService, endpo // RoundTrip is the implementation of the the http.RoundTripper interface func (transport *edgeTransport) RoundTrip(request *http.Request) (*http.Response, error) { - tokenData, err := security.RetrieveTokenData(request) + token, err := getRoundTripToken(request, transport.tokenManager, transport.endpointIdentifier) if err != nil { return nil, err } - var token string - if tokenData.Role == portainer.AdministratorRole { - token = transport.tokenManager.getAdminServiceAccountToken() - } else { - token, err = transport.tokenManager.getUserServiceAccountToken(int(tokenData.ID)) - if err != nil { - return nil, err - } - } - request.Header.Set(portainer.PortainerAgentKubernetesSATokenHeader, token) response, err := transport.httpTransport.RoundTrip(request) @@ -154,3 +127,27 @@ func (transport *edgeTransport) RoundTrip(request *http.Request) (*http.Response return response, err } + +func getRoundTripToken( + request *http.Request, + tokenManager *tokenManager, + endpointIdentifier portainer.EndpointID, +) (string, error) { + tokenData, err := security.RetrieveTokenData(request) + if err != nil { + return "", err + } + + var token string + if tokenData.Role == portainer.AdministratorRole { + token = tokenManager.getAdminServiceAccountToken() + } else { + token, err = tokenManager.getUserServiceAccountToken(int(tokenData.ID)) + if err != nil { + log.Printf("Failed retrieving service account token: %v", err) + return "", err + } + } + + return token, nil +} diff --git a/api/http/proxy/manager.go b/api/http/proxy/manager.go index e539d89c2..491a709c9 100644 --- a/api/http/proxy/manager.go +++ b/api/http/proxy/manager.go @@ -21,6 +21,7 @@ type ( proxyFactory *factory.ProxyFactory endpointProxies cmap.ConcurrentMap legacyExtensionProxies cmap.ConcurrentMap + k8sClientFactory *cli.ClientFactory } ) @@ -29,6 +30,7 @@ func NewManager(dataStore portainer.DataStore, signatureService portainer.Digita return &Manager{ endpointProxies: cmap.New(), legacyExtensionProxies: cmap.New(), + k8sClientFactory: kubernetesClientFactory, proxyFactory: factory.NewProxyFactory(dataStore, signatureService, tunnelService, clientFactory, kubernetesClientFactory, kubernetesTokenCacheManager), } } @@ -56,8 +58,11 @@ func (manager *Manager) GetEndpointProxy(endpoint *portainer.Endpoint) http.Hand } // DeleteEndpointProxy deletes the proxy associated to a key +// and cleans the k8s endpoint client cache. DeleteEndpointProxy +// is currently only called for edge connection clean up. func (manager *Manager) DeleteEndpointProxy(endpoint *portainer.Endpoint) { manager.endpointProxies.Remove(string(endpoint.ID)) + manager.k8sClientFactory.RemoveKubeClient(endpoint) } // CreateLegacyExtensionProxy creates a new HTTP reverse proxy for a legacy extension and adds it to the registered proxies diff --git a/api/kubernetes/cli/client.go b/api/kubernetes/cli/client.go index 707bf3924..a268150c9 100644 --- a/api/kubernetes/cli/client.go +++ b/api/kubernetes/cli/client.go @@ -40,6 +40,11 @@ func NewClientFactory(signatureService portainer.DigitalSignatureService, revers } } +// Remove the cached kube client so a new one can be created +func (factory *ClientFactory) RemoveKubeClient(endpoint *portainer.Endpoint) { + factory.endpointClients.Remove(strconv.Itoa(int(endpoint.ID))) +} + // GetKubeClient checks if an existing client is already registered for the endpoint and returns it if one is found. // If no client is registered, it will create a new client, register it, and returns it. func (factory *ClientFactory) GetKubeClient(endpoint *portainer.Endpoint) (portainer.KubeClient, error) {