mirror of
https://github.com/portainer/portainer.git
synced 2025-07-23 07:19:41 +02:00
fix(kubernetes): create proxied kubeclient EE-4326 (#7850)
This commit is contained in:
parent
f6d6be90e4
commit
0c995ae1c8
18 changed files with 485 additions and 71 deletions
|
@ -5,8 +5,10 @@ import (
|
|||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cmap "github.com/orcaman/concurrent-map"
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/pkg/errors"
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/dataservices"
|
||||
|
@ -23,6 +25,8 @@ type (
|
|||
signatureService portainer.DigitalSignatureService
|
||||
instanceID string
|
||||
endpointClients cmap.ConcurrentMap
|
||||
endpointProxyClients *cache.Cache
|
||||
AddrHTTPS string
|
||||
}
|
||||
|
||||
// KubeClient represent a service used to execute Kubernetes operations
|
||||
|
@ -34,14 +38,24 @@ type (
|
|||
)
|
||||
|
||||
// NewClientFactory returns a new instance of a ClientFactory
|
||||
func NewClientFactory(signatureService portainer.DigitalSignatureService, reverseTunnelService portainer.ReverseTunnelService, instanceID string, dataStore dataservices.DataStore) *ClientFactory {
|
||||
func NewClientFactory(signatureService portainer.DigitalSignatureService, reverseTunnelService portainer.ReverseTunnelService, dataStore dataservices.DataStore, instanceID, addrHTTPS, userSessionTimeout string) (*ClientFactory, error) {
|
||||
if userSessionTimeout == "" {
|
||||
userSessionTimeout = portainer.DefaultUserSessionTimeout
|
||||
}
|
||||
timeout, err := time.ParseDuration(userSessionTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ClientFactory{
|
||||
dataStore: dataStore,
|
||||
signatureService: signatureService,
|
||||
reverseTunnelService: reverseTunnelService,
|
||||
instanceID: instanceID,
|
||||
endpointClients: cmap.New(),
|
||||
}
|
||||
endpointProxyClients: cache.New(timeout, timeout),
|
||||
AddrHTTPS: addrHTTPS,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (factory *ClientFactory) GetInstanceID() (instanceID string) {
|
||||
|
@ -59,7 +73,7 @@ func (factory *ClientFactory) GetKubeClient(endpoint *portainer.Endpoint) (porta
|
|||
key := strconv.Itoa(int(endpoint.ID))
|
||||
client, ok := factory.endpointClients.Get(key)
|
||||
if !ok {
|
||||
client, err := factory.createKubeClient(endpoint)
|
||||
client, err := factory.createCachedAdminKubeClient(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -71,7 +85,49 @@ func (factory *ClientFactory) GetKubeClient(endpoint *portainer.Endpoint) (porta
|
|||
return client.(portainer.KubeClient), nil
|
||||
}
|
||||
|
||||
func (factory *ClientFactory) createKubeClient(endpoint *portainer.Endpoint) (portainer.KubeClient, error) {
|
||||
// GetProxyKubeClient retrieves a KubeClient from the cache. You should be
|
||||
// calling SetProxyKubeClient before first. It is normally, called the
|
||||
// kubernetes middleware.
|
||||
func (factory *ClientFactory) GetProxyKubeClient(endpointID, token string) (portainer.KubeClient, bool) {
|
||||
client, ok := factory.endpointProxyClients.Get(endpointID + "." + token)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return client.(portainer.KubeClient), true
|
||||
}
|
||||
|
||||
// SetProxyKubeClient stores a kubeclient in the cache.
|
||||
func (factory *ClientFactory) SetProxyKubeClient(endpointID, token string, cli portainer.KubeClient) {
|
||||
factory.endpointProxyClients.Set(endpointID+"."+token, cli, 0)
|
||||
}
|
||||
|
||||
// CreateKubeClientFromKubeConfig creates a KubeClient from a clusterID, and
|
||||
// Kubernetes config.
|
||||
func (factory *ClientFactory) CreateKubeClientFromKubeConfig(clusterID string, kubeConfig []byte) (portainer.KubeClient, error) {
|
||||
config, err := clientcmd.NewClientConfigFromBytes([]byte(kubeConfig))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cliConfig, err := config.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cli, err := kubernetes.NewForConfig(cliConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
kubecli := &KubeClient{
|
||||
cli: cli,
|
||||
instanceID: factory.instanceID,
|
||||
lock: &sync.Mutex{},
|
||||
}
|
||||
|
||||
return kubecli, nil
|
||||
}
|
||||
|
||||
func (factory *ClientFactory) createCachedAdminKubeClient(endpoint *portainer.Endpoint) (portainer.KubeClient, error) {
|
||||
cli, err := factory.CreateClient(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -5,11 +5,12 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/portainer/portainer/api/database/models"
|
||||
"github.com/rs/zerolog/log"
|
||||
netv1 "k8s.io/api/networking/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func (kcl *KubeClient) GetIngressControllers() models.K8sIngressControllers {
|
||||
func (kcl *KubeClient) GetIngressControllers() (models.K8sIngressControllers, error) {
|
||||
var controllers []models.K8sIngressController
|
||||
|
||||
// We know that each existing class points to a controller so we can start
|
||||
|
@ -17,19 +18,22 @@ func (kcl *KubeClient) GetIngressControllers() models.K8sIngressControllers {
|
|||
classClient := kcl.cli.NetworkingV1().IngressClasses()
|
||||
classList, err := classClient.List(context.Background(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// We want to know which of these controllers is in use.
|
||||
var ingresses []models.K8sIngressInfo
|
||||
namespaces, err := kcl.GetNamespaces()
|
||||
if err != nil {
|
||||
return nil
|
||||
return nil, err
|
||||
}
|
||||
for namespace := range namespaces {
|
||||
t, err := kcl.GetIngresses(namespace)
|
||||
if err != nil {
|
||||
return nil
|
||||
// User might not be able to list ingresses in system/not allowed
|
||||
// namespaces.
|
||||
log.Debug().Err(err).Msg("failed to list ingresses for the current user, skipped sending ingress")
|
||||
continue
|
||||
}
|
||||
ingresses = append(ingresses, t...)
|
||||
}
|
||||
|
@ -58,7 +62,7 @@ func (kcl *KubeClient) GetIngressControllers() models.K8sIngressControllers {
|
|||
}
|
||||
controllers = append(controllers, controller)
|
||||
}
|
||||
return controllers
|
||||
return controllers, nil
|
||||
}
|
||||
|
||||
// GetIngresses gets all the ingresses for a given namespace in a k8s endpoint.
|
||||
|
|
|
@ -85,7 +85,7 @@ func Test_GenerateYAML(t *testing.T) {
|
|||
t.Errorf("generateYamlConfig failed; err=%s", err)
|
||||
}
|
||||
|
||||
if compareYAMLStrings(yaml, ryt.wantYAML) != 0 {
|
||||
if compareYAMLStrings(string(yaml), ryt.wantYAML) != 0 {
|
||||
t.Errorf("generateYamlConfig failed;\ngot=\n%s\nwant=\n%s", yaml, ryt.wantYAML)
|
||||
}
|
||||
})
|
||||
|
|
|
@ -25,6 +25,11 @@ func getPortainerUserDefaultPolicies() []rbacv1.PolicyRule {
|
|||
Resources: []string{"namespaces", "pods", "nodes"},
|
||||
APIGroups: []string{"metrics.k8s.io"},
|
||||
},
|
||||
{
|
||||
Verbs: []string{"list"},
|
||||
Resources: []string{"ingressclasses"},
|
||||
APIGroups: []string{"networking.k8s.io"},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue