mirror of
https://github.com/portainer/portainer.git
synced 2025-07-23 07:19:41 +02:00
refactor(k8s): namespace core logic (#12142)
Co-authored-by: testA113 <aliharriss1995@gmail.com> Co-authored-by: Anthony Lapenna <anthony.lapenna@portainer.io> Co-authored-by: James Carppe <85850129+jamescarppe@users.noreply.github.com> Co-authored-by: Ali <83188384+testA113@users.noreply.github.com>
This commit is contained in:
parent
da010f3d08
commit
ea228c3d6d
276 changed files with 9241 additions and 3361 deletions
|
@ -21,12 +21,11 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
DefaultKubeClientQPS = 30
|
||||
DefaultKubeClientBurst = 100
|
||||
defaultKubeClientQPS = 30
|
||||
defaultKubeClientBurst = 100
|
||||
maxConcurrency = 30
|
||||
)
|
||||
|
||||
const maxConcurrency = 30
|
||||
|
||||
type (
|
||||
// ClientFactory is used to create Kubernetes clients
|
||||
ClientFactory struct {
|
||||
|
@ -34,17 +33,17 @@ type (
|
|||
reverseTunnelService portainer.ReverseTunnelService
|
||||
signatureService portainer.DigitalSignatureService
|
||||
instanceID string
|
||||
endpointClients map[string]*KubeClient
|
||||
endpointProxyClients *cache.Cache
|
||||
AddrHTTPS string
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// KubeClient represent a service used to execute Kubernetes operations
|
||||
KubeClient struct {
|
||||
cli kubernetes.Interface
|
||||
instanceID string
|
||||
mu sync.Mutex
|
||||
cli kubernetes.Interface
|
||||
instanceID string
|
||||
mu sync.Mutex
|
||||
IsKubeAdmin bool
|
||||
NonAdminNamespaces []string
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -70,7 +69,6 @@ func NewClientFactory(signatureService portainer.DigitalSignatureService, revers
|
|||
signatureService: signatureService,
|
||||
reverseTunnelService: reverseTunnelService,
|
||||
instanceID: instanceID,
|
||||
endpointClients: make(map[string]*KubeClient),
|
||||
endpointProxyClients: cache.New(timeout, timeout),
|
||||
AddrHTTPS: addrHTTPS,
|
||||
}, nil
|
||||
|
@ -80,42 +78,33 @@ func (factory *ClientFactory) GetInstanceID() (instanceID string) {
|
|||
return factory.instanceID
|
||||
}
|
||||
|
||||
// Remove the cached kube client so a new one can be created
|
||||
func (factory *ClientFactory) RemoveKubeClient(endpointID portainer.EndpointID) {
|
||||
factory.mu.Lock()
|
||||
delete(factory.endpointClients, strconv.Itoa(int(endpointID)))
|
||||
factory.mu.Unlock()
|
||||
// Clear removes all cached kube clients
|
||||
func (factory *ClientFactory) ClearClientCache() {
|
||||
log.Debug().Msgf("kubernetes namespace permissions have changed, clearing the client cache")
|
||||
factory.endpointProxyClients.Flush()
|
||||
}
|
||||
|
||||
// GetKubeClient checks if an existing client is already registered for the environment(endpoint) and returns it if one is found.
|
||||
// If no client is registered, it will create a new client, register it, and returns it.
|
||||
func (factory *ClientFactory) GetKubeClient(endpoint *portainer.Endpoint) (*KubeClient, error) {
|
||||
factory.mu.Lock()
|
||||
key := strconv.Itoa(int(endpoint.ID))
|
||||
if client, ok := factory.endpointClients[key]; ok {
|
||||
factory.mu.Unlock()
|
||||
return client, nil
|
||||
}
|
||||
factory.mu.Unlock()
|
||||
// Remove the cached kube client so a new one can be created
|
||||
func (factory *ClientFactory) RemoveKubeClient(endpointID portainer.EndpointID) {
|
||||
factory.endpointProxyClients.Delete(strconv.Itoa(int(endpointID)))
|
||||
}
|
||||
|
||||
// EE-6901: Do not lock
|
||||
client, err := factory.createCachedAdminKubeClient(endpoint)
|
||||
// GetPrivilegedKubeClient checks if an existing client is already registered for the environment(endpoint) and returns it if one is found.
|
||||
// If no client is registered, it will create a new client, register it, and returns it.
|
||||
func (factory *ClientFactory) GetPrivilegedKubeClient(endpoint *portainer.Endpoint) (*KubeClient, error) {
|
||||
key := strconv.Itoa(int(endpoint.ID))
|
||||
pcl, ok := factory.endpointProxyClients.Get(key)
|
||||
if ok {
|
||||
return pcl.(*KubeClient), nil
|
||||
}
|
||||
|
||||
kcl, err := factory.createCachedPrivilegedKubeClient(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
factory.mu.Lock()
|
||||
defer factory.mu.Unlock()
|
||||
|
||||
// The lock was released before the client was created,
|
||||
// so we need to check again
|
||||
if c, ok := factory.endpointClients[key]; ok {
|
||||
return c, nil
|
||||
}
|
||||
|
||||
factory.endpointClients[key] = client
|
||||
|
||||
return client, nil
|
||||
factory.endpointProxyClients.Set(key, kcl, cache.DefaultExpiration)
|
||||
return kcl, nil
|
||||
}
|
||||
|
||||
// GetProxyKubeClient retrieves a KubeClient from the cache. You should be
|
||||
|
@ -123,46 +112,47 @@ func (factory *ClientFactory) GetKubeClient(endpoint *portainer.Endpoint) (*Kube
|
|||
// kubernetes middleware.
|
||||
func (factory *ClientFactory) GetProxyKubeClient(endpointID, userID string) (*KubeClient, bool) {
|
||||
client, ok := factory.endpointProxyClients.Get(endpointID + "." + userID)
|
||||
if !ok {
|
||||
return nil, false
|
||||
if ok {
|
||||
return client.(*KubeClient), true
|
||||
}
|
||||
|
||||
return client.(*KubeClient), true
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// SetProxyKubeClient stores a kubeclient in the cache.
|
||||
func (factory *ClientFactory) SetProxyKubeClient(endpointID, userID string, cli *KubeClient) {
|
||||
factory.endpointProxyClients.Set(endpointID+"."+userID, cli, 0)
|
||||
factory.endpointProxyClients.Set(endpointID+"."+userID, cli, cache.DefaultExpiration)
|
||||
}
|
||||
|
||||
// CreateKubeClientFromKubeConfig creates a KubeClient from a clusterID, and
|
||||
// Kubernetes config.
|
||||
func (factory *ClientFactory) CreateKubeClientFromKubeConfig(clusterID string, kubeConfig []byte) (*KubeClient, error) {
|
||||
func (factory *ClientFactory) CreateKubeClientFromKubeConfig(clusterID string, kubeConfig []byte, IsKubeAdmin bool, NonAdminNamespaces []string) (*KubeClient, error) {
|
||||
config, err := clientcmd.NewClientConfigFromBytes(kubeConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to create a client config from kubeconfig: %w", err)
|
||||
}
|
||||
|
||||
cliConfig, err := config.ClientConfig()
|
||||
clientConfig, err := config.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to get the complete client config from kubeconfig: %w", err)
|
||||
}
|
||||
|
||||
cliConfig.QPS = DefaultKubeClientQPS
|
||||
cliConfig.Burst = DefaultKubeClientBurst
|
||||
clientConfig.QPS = defaultKubeClientQPS
|
||||
clientConfig.Burst = defaultKubeClientBurst
|
||||
|
||||
cli, err := kubernetes.NewForConfig(cliConfig)
|
||||
cli, err := kubernetes.NewForConfig(clientConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to create a new clientset for the given config: %w", err)
|
||||
}
|
||||
|
||||
return &KubeClient{
|
||||
cli: cli,
|
||||
instanceID: factory.instanceID,
|
||||
cli: cli,
|
||||
instanceID: factory.instanceID,
|
||||
IsKubeAdmin: IsKubeAdmin,
|
||||
NonAdminNamespaces: NonAdminNamespaces,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (factory *ClientFactory) createCachedAdminKubeClient(endpoint *portainer.Endpoint) (*KubeClient, error) {
|
||||
func (factory *ClientFactory) createCachedPrivilegedKubeClient(endpoint *portainer.Endpoint) (*KubeClient, error) {
|
||||
cli, err := factory.CreateClient(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -235,8 +225,8 @@ func (factory *ClientFactory) buildAgentConfig(endpoint *portainer.Endpoint) (*r
|
|||
}
|
||||
|
||||
config.Insecure = true
|
||||
config.QPS = DefaultKubeClientQPS
|
||||
config.Burst = DefaultKubeClientBurst
|
||||
config.QPS = defaultKubeClientQPS
|
||||
config.Burst = defaultKubeClientBurst
|
||||
|
||||
config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
|
||||
return &agentHeaderRoundTripper{
|
||||
|
@ -251,7 +241,7 @@ func (factory *ClientFactory) buildAgentConfig(endpoint *portainer.Endpoint) (*r
|
|||
func (factory *ClientFactory) buildEdgeConfig(endpoint *portainer.Endpoint) (*rest.Config, error) {
|
||||
tunnelAddr, err := factory.reverseTunnelService.TunnelAddr(endpoint)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed activating tunnel")
|
||||
return nil, errors.Wrap(err, "failed to activate the chisel reverse tunnel. check if the tunnel port is open at the portainer instance")
|
||||
}
|
||||
endpointURL := fmt.Sprintf("http://%s/kubernetes", tunnelAddr)
|
||||
|
||||
|
@ -266,8 +256,8 @@ func (factory *ClientFactory) buildEdgeConfig(endpoint *portainer.Endpoint) (*re
|
|||
}
|
||||
|
||||
config.Insecure = true
|
||||
config.QPS = DefaultKubeClientQPS
|
||||
config.Burst = DefaultKubeClientBurst
|
||||
config.QPS = defaultKubeClientQPS
|
||||
config.Burst = defaultKubeClientBurst
|
||||
|
||||
config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
|
||||
return &agentHeaderRoundTripper{
|
||||
|
@ -294,8 +284,8 @@ func buildLocalConfig() (*rest.Config, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
config.QPS = DefaultKubeClientQPS
|
||||
config.Burst = DefaultKubeClientBurst
|
||||
config.QPS = defaultKubeClientQPS
|
||||
config.Burst = defaultKubeClientBurst
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue