1 package kclient
2
3 import (
4 "context"
5 "fmt"
6 "strings"
7 "time"
8
9 "golang.org/x/sync/errgroup"
10
11 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
13 "k8s.io/apimachinery/pkg/runtime/schema"
14 "k8s.io/apimachinery/pkg/util/runtime"
15 "k8s.io/client-go/discovery"
16 "k8s.io/client-go/dynamic"
17 "k8s.io/klog"
18 )
19
20
21
22
23 func (c *Client) GetAllResourcesFromSelector(selector string, ns string) ([]unstructured.Unstructured, error) {
24 apis, err := findAPIs(c.cachedDiscoveryClient)
25 if err != nil {
26 return nil, err
27 }
28 return getAllResources(c.DynamicClient, apis.list, ns, selector)
29 }
30
31 func getAllResources(client dynamic.Interface, apis []apiResource, ns string, selector string) ([]unstructured.Unstructured, error) {
32 var out []unstructured.Unstructured
33 outChan := make(chan []unstructured.Unstructured)
34
35 var apisOfInterest []apiResource
36 for _, api := range apis {
37 if !api.r.Namespaced {
38 klog.V(5).Infof("[query api] api (%s) is non-namespaced, skipping", api.r.Name)
39 continue
40 }
41 apisOfInterest = append(apisOfInterest, api)
42 }
43
44 start := time.Now()
45 group := new(errgroup.Group)
46 klog.V(2).Infof("starting to concurrently query %d APIs", len(apis))
47
48 for _, api := range apisOfInterest {
49 api := api
50 group.Go(func() error {
51 klog.V(5).Infof("[query api] start: %s", api.GroupVersionResource())
52 v, err := queryAPI(client, api, ns, selector)
53 if err != nil {
54 klog.V(5).Infof("[query api] error querying: %s, error=%v", api.GroupVersionResource(), err)
55 return err
56 }
57 outChan <- v
58 klog.V(5).Infof("[query api] done: %s, found %d apis", api.GroupVersionResource(), len(v))
59 return nil
60 })
61 }
62 klog.V(2).Infof("fired up all goroutines to query APIs")
63
64 errChan := make(chan error)
65 go func() {
66 err := group.Wait()
67 klog.V(2).Infof("all goroutines have returned in %v", time.Since(start))
68 close(outChan)
69 errChan <- err
70 }()
71
72 for v := range outChan {
73 out = append(out, v...)
74 }
75
76 klog.V(2).Infof("query result: objects=%d", len(out))
77 return out, <-errChan
78 }
79
80 func queryAPI(client dynamic.Interface, api apiResource, ns string, selector string) ([]unstructured.Unstructured, error) {
81 var out []unstructured.Unstructured
82
83 var next string
84 for {
85 var intf dynamic.ResourceInterface
86 nintf := client.Resource(api.GroupVersionResource())
87 intf = nintf.Namespace(ns)
88 resp, err := intf.List(context.TODO(), metav1.ListOptions{
89 Limit: 250,
90 Continue: next,
91 LabelSelector: selector,
92 })
93 if err != nil {
94 klog.V(5).Infof("listing resources failed (%s): %v", api.GroupVersionResource(), err)
95 return nil, nil
96 }
97 out = append(out, resp.Items...)
98
99 next = resp.GetContinue()
100 if next == "" {
101 break
102 }
103 }
104 return out, nil
105 }
106
107 type apiResource struct {
108 r metav1.APIResource
109 gv schema.GroupVersion
110 }
111
112 func (a apiResource) GroupVersionResource() schema.GroupVersionResource {
113 return schema.GroupVersionResource{
114 Group: a.gv.Group,
115 Version: a.gv.Version,
116 Resource: a.r.Name,
117 }
118 }
119
120 type resourceNameLookup map[string][]apiResource
121
122 type resourceMap struct {
123 list []apiResource
124 m resourceNameLookup
125 }
126
127 func findAPIs(client discovery.DiscoveryInterface) (*resourceMap, error) {
128 start := time.Now()
129
130 var resList []*metav1.APIResourceList
131
132
133
134
135
136 originalErrorHandlers := runtime.ErrorHandlers
137 runtime.ErrorHandlers = nil
138 groups, err := client.ServerGroups()
139 if groups == nil {
140 resList = nil
141 } else {
142 resList, err = client.ServerPreferredResources()
143 }
144 runtime.ErrorHandlers = originalErrorHandlers
145
146 if err != nil {
147 return nil, fmt.Errorf("failed to fetch api groups from kubernetes: %w", err)
148 }
149 klog.V(5).Infof("queried api discovery in %v", time.Since(start))
150 klog.V(5).Infof("found %d items (groups) in server-preferred APIResourceList", len(resList))
151
152 rm := &resourceMap{
153 m: make(resourceNameLookup),
154 }
155 for _, group := range resList {
156 klog.V(5).Infof("iterating over group %s/%s (%d apis)", group.GroupVersion, group.APIVersion, len(group.APIResources))
157 gv, err := schema.ParseGroupVersion(group.GroupVersion)
158 if err != nil {
159 return nil, fmt.Errorf("%q cannot be parsed into groupversion: %w", group.GroupVersion, err)
160 }
161
162 for _, apiRes := range group.APIResources {
163 klog.V(5).Infof(" api=%s namespaced=%v", apiRes.Name, apiRes.Namespaced)
164 if !contains(apiRes.Verbs, "list") {
165 klog.V(5).Infof(" api (%s) doesn't have required verb, skipping: %v", apiRes.Name, apiRes.Verbs)
166 continue
167 }
168 v := apiResource{
169 gv: gv,
170 r: apiRes,
171 }
172 names := apiNames(apiRes, gv)
173 klog.V(5).Infof("names: %s", strings.Join(names, ", "))
174 for _, name := range names {
175 rm.m[name] = append(rm.m[name], v)
176 }
177 rm.list = append(rm.list, v)
178 }
179 }
180 klog.V(5).Infof(" found %d apis", len(rm.m))
181 return rm, nil
182 }
183
184 func contains(v []string, s string) bool {
185 for _, vv := range v {
186 if vv == s {
187 return true
188 }
189 }
190 return false
191 }
192
193
194 func apiNames(a metav1.APIResource, gv schema.GroupVersion) []string {
195 var out []string
196 singularName := a.SingularName
197 if singularName == "" {
198
199 singularName = strings.ToLower(a.Kind)
200 }
201 pluralName := a.Name
202 shortNames := a.ShortNames
203 names := append([]string{singularName, pluralName}, shortNames...)
204 for _, n := range names {
205 fmtBare := n
206 fmtWithGroup := strings.Join([]string{n, gv.Group}, ".")
207 fmtWithGroupVersion := strings.Join([]string{n, gv.Version, gv.Group}, ".")
208
209 out = append(out,
210 fmtBare, fmtWithGroup, fmtWithGroupVersion)
211 }
212 return out
213 }
214
View as plain text