-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathMULTI-TEAM-SUPPORT.py
More file actions
1495 lines (1202 loc) · 61.5 KB
/
MULTI-TEAM-SUPPORT.py
File metadata and controls
1495 lines (1202 loc) · 61.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import json
import base64
import requests
import boto3
import traceback
from kubernetes import client
from kubernetes.client import ApiClient, ApiException
from botocore.signers import RequestSigner
import datetime
import secrets
import re
from urllib.parse import urlparse, urlunparse
# --------------------
# Team Configuration Helper
# --------------------
def get_team_config(team_name, platform="github", use_enterprise=None):
"""
Fetches team-specific configuration (base URL, token, and username) from environment variables.
Environment variable naming convention:
For Enterprise instances:
- TEAM_<TEAMNAME>_GITHUB_ENTERPRISE_URL (required for enterprise)
- TEAM_<TEAMNAME>_GITHUB_ENTERPRISE_TOKEN (required for enterprise)
- TEAM_<TEAMNAME>_GITHUB_ENTERPRISE_USERNAME (optional)
- TEAM_<TEAMNAME>_GITLAB_ENTERPRISE_URL (required for enterprise)
- TEAM_<TEAMNAME>_GITLAB_ENTERPRISE_TOKEN (required for enterprise)
- TEAM_<TEAMNAME>_GITLAB_ENTERPRISE_USERNAME (optional)
For Public instances (fallback if enterprise vars not set):
- TEAM_<TEAMNAME>_GITHUB_URL (optional - if not set, uses public GitHub)
- TEAM_<TEAMNAME>_GITHUB_TOKEN (required)
- TEAM_<TEAMNAME>_GITHUB_USERNAME (optional)
- TEAM_<TEAMNAME>_GITLAB_URL (optional - if not set, uses public GitLab)
- TEAM_<TEAMNAME>_GITLAB_TOKEN (required)
- TEAM_<TEAMNAME>_GITLAB_USERNAME (optional)
Args:
team_name: Name of the team (will be uppercased for env var lookup)
platform: "github" or "gitlab"
use_enterprise: Optional boolean to explicitly specify enterprise mode.
If None, auto-detects based on enterprise token presence.
If True, uses enterprise configuration.
If False, uses regular/public configuration.
Returns:
dict: {
"base_url": str or None,
"token": str,
"username": str or None,
"is_enterprise": bool
}
Raises:
ValueError: If team configuration is not found or token is missing
"""
if not team_name:
raise ValueError("teamName parameter is required")
team_upper = team_name.replace("-", "_").replace(" ", "_")
platform_upper = platform.upper()
print(f"[INFO] Fetching configuration for team: {team_name} (platform: {platform}, use_enterprise: {use_enterprise})")
# First check for enterprise-specific environment variables
enterprise_url_key = f"TEAM_{team_upper}_{platform_upper}_ENTERPRISE_URL"
enterprise_token_key = f"TEAM_{team_upper}_{platform_upper}_ENTERPRISE_TOKEN"
enterprise_username_key = f"TEAM_{team_upper}_{platform_upper}_ENTERPRISE_USERNAME"
enterprise_url = os.environ.get(enterprise_url_key)
enterprise_token = os.environ.get(enterprise_token_key)
enterprise_username = os.environ.get(enterprise_username_key)
# Check regular configuration as well
base_url_key = f"TEAM_{team_upper}_{platform_upper}_URL"
token_key = f"TEAM_{team_upper}_{platform_upper}_TOKEN"
username_key = f"TEAM_{team_upper}_{platform_upper}_USERNAME"
regular_base_url = os.environ.get(base_url_key)
regular_token = os.environ.get(token_key)
regular_username = os.environ.get(username_key)
# Determine which configuration to use
if use_enterprise is None:
# Default to public/regular credentials
is_enterprise = False
else:
# Use explicit flag from caller
is_enterprise = use_enterprise
if is_enterprise:
# Use enterprise configuration
base_url = enterprise_url
token = enterprise_token
username = enterprise_username
print(f"[DEBUG] Selected ENTERPRISE credentials - Token key: {enterprise_token_key}, Token exists: {token is not None}, Token length: {len(token) if token else 0}")
if not token or token.strip() == "":
raise ValueError(
f"Enterprise token not found for team '{team_name}' on platform '{platform}'. "
f"Please set environment variable: {enterprise_token_key}"
)
if not base_url or base_url.strip() == "":
raise ValueError(
f"Enterprise URL not found for team '{team_name}' on platform '{platform}'. "
f"Please set environment variable: {enterprise_url_key}"
)
base_url = base_url.rstrip('/')
print(f"[INFO] Using enterprise {platform} URL for team '{team_name}': {base_url}")
if username:
print(f"[INFO] Using enterprise {platform} username for team '{team_name}': {username}")
else:
# Use regular (public) configuration
base_url = regular_base_url
token = regular_token
username = regular_username
print(f"[DEBUG] Selected REGULAR/PUBLIC credentials - Token key: {token_key}, Token exists: {token is not None}, Token length: {len(token) if token else 0}")
if not token:
raise ValueError(
f"Token not found for team '{team_name}' on platform '{platform}'. "
f"Please set environment variable: {token_key} or {enterprise_token_key}"
)
if base_url and base_url.strip() != "":
base_url = base_url.rstrip('/')
print(f"[INFO] Using custom {platform} URL for team '{team_name}': {base_url}")
else:
print(f"[INFO] Using public {platform} for team '{team_name}'")
if username:
print(f"[INFO] Using {platform} username for team '{team_name}': {username}")
return {
"base_url": base_url,
"token": token,
"username": username,
"is_enterprise": is_enterprise
}
# ======================================================================
# GITHUB FUNCTIONS
# ======================================================================
# --------------------
# GitHub API Helper
# --------------------
def get_github_api_base_url(base_url=None):
"""
Returns the appropriate GitHub API base URL.
Args:
base_url: Custom base URL for enterprise GitHub (optional)
Returns:
str: API base URL
"""
if base_url:
# Enterprise URL - expecting format like https://your-ghe-instance.com/api/v3
return base_url.rstrip('/')
else:
return "https://api.github.com"
# --------------------
# GitHub Repo Listing
# --------------------
def github_list_repositories(name, target_type, token, base_url=None, page=1, per_page=100, visibility=None, search=None):
"""
Fetches a list of repositories for a given GitHub user or organization,
supporting both public and enterprise GitHub with visibility filtering.
Args:
name: Username or organization name
target_type: "user" or "org"
token: GitHub access token (team-specific)
base_url: Custom GitHub base URL for enterprise (optional)
page: Page number for pagination
per_page: Number of results per page (default: 100)
visibility: Filter by visibility - "all", "public", "private", or "internal"
search: Optional repository name to filter by (case-insensitive partial match)
"""
is_enterprise = base_url is not None
print(f"[INFO] Fetching GitHub repositories (Enterprise: {is_enterprise}, Visibility: {visibility}, Search: {search})...")
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"X-GitHub-Api-Version": "2022-11-28"
}
api_base_url = get_github_api_base_url(base_url)
formatted_repos = []
page_num = 1
# If search is provided, fetch *all* repos to ensure we don't miss matches
fetch_all = search is not None and search.strip() != ""
print(f"[INFO] Mode: {'Full repository fetch (search enabled)' if fetch_all else 'Paged fetch'}")
while True:
# Build API URL for the current page
if target_type == "user":
repos_url = f"{api_base_url}/user/repos?per_page={per_page}&page={page_num}"
vis_param = visibility if visibility and visibility != "internal" else "all"
repos_url += f"&visibility={vis_param}"
else:
repos_url = f"{api_base_url}/orgs/{name}/repos?per_page={per_page}&page={page_num}"
repos_url += f"&type={visibility or 'all'}"
print(f"[DEBUG] Calling URL: {repos_url}")
response = requests.get(repos_url, headers=headers)
if response.status_code != 200:
print(f"[ERROR] GitHub API error ({response.status_code}): {response.text}")
raise Exception(f"GitHub API error ({response.status_code}): {response.text}")
data = response.json()
if not data:
print(f"[INFO] No more repositories found at page {page_num}.")
break
for repo in data:
formatted_repos.append({
"name": repo.get("full_name"),
"cloneUrl": repo.get("clone_url"),
"htmlUrl": repo.get("html_url"),
"language": "Github",
"private": repo.get("private", False),
"visibility": repo.get("visibility", "public")
})
# Check for next page
link_header = response.headers.get("Link", "")
has_next = 'rel="next"' in link_header
# If search mode is on, always continue until all repos are fetched
if not fetch_all and (not has_next or page_num >= page):
break
page_num += 1
# Optional short-circuit: stop early if a match is already found
if fetch_all and any(search.lower() in repo["name"].lower() for repo in formatted_repos):
pass
# Safety: avoid infinite loops
if page_num > 100: # 100 pages * 100 repos = 10,000 max
print("[WARN] Reached pagination limit (10,000 repos). Stopping early.")
break
# Apply search filter if requested
if search:
search_lower = search.lower()
filtered_repos = [
repo for repo in formatted_repos
if search_lower in repo.get("name", "").lower()
]
print(f"[INFO] Search filter applied: Found {len(filtered_repos)} matching repository(ies)")
return {"content": filtered_repos, "hasNext": False}
print(f"[INFO] Retrieved {len(formatted_repos)} repositories total.")
return {"content": formatted_repos, "hasNext": has_next if not fetch_all else False}
# --------------------
# GitHub Branch Listing
# --------------------
def github_list_branches(repo_url, token, base_url=None):
"""
Fetches all branch names for a given GitHub repository URL,
supporting both public and enterprise GitHub.
Args:
repo_url: GitHub repository URL
token: GitHub access token (team-specific)
base_url: Custom GitHub base URL for enterprise (optional)
"""
is_enterprise = base_url is not None
print(f"[INFO] Fetching branches for repo: {repo_url} (Enterprise: {is_enterprise})")
# Generic regex to extract owner/repo from public or enterprise URLs
match = re.search(r"https://[^/]+/([^/]+)/([^/]+?)(?:\.git)?$", repo_url)
if not match:
raise ValueError("Invalid GitHub repository URL format.")
owner, repo_name = match.groups()
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json"
}
api_base_url = get_github_api_base_url(base_url)
branches = []
page = 1
while True:
branches_url = f"{api_base_url}/repos/{owner}/{repo_name}/branches?per_page=100&page={page}"
response = requests.get(branches_url, headers=headers)
if response.status_code != 200:
print(f"[ERROR] GitHub API error ({response.status_code}): {response.text}")
raise Exception(f"GitHub API error ({response.status_code}): {response.text}")
data = response.json()
if not data:
break
for branch in data:
branches.append(branch.get("name"))
page += 1
print(f"[INFO] Retrieved {len(branches)} branches.")
return {"content": branches}
# --------------------
# GitHub Default Branch
# --------------------
def github_get_default_branch(repo_url, token, base_url=None):
"""
Fetches the default branch for a given GitHub repository URL,
supporting both public and enterprise GitHub.
Args:
repo_url: GitHub repository URL
token: GitHub access token (team-specific)
base_url: Custom GitHub base URL for enterprise (optional)
"""
is_enterprise = base_url is not None
print(f"[INFO] Fetching default branch for repo: {repo_url} (Enterprise: {is_enterprise})")
# Generic regex to extract owner/repo from public or enterprise URLs
match = re.search(r"https://[^/]+/([^/]+)/([^/]+?)(?:\.git)?$", repo_url)
if not match:
raise ValueError("Invalid GitHub repository URL format.")
owner, repo_name = match.groups()
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json"
}
api_base_url = get_github_api_base_url(base_url)
repo_api_url = f"{api_base_url}/repos/{owner}/{repo_name}"
response = requests.get(repo_api_url, headers=headers)
if response.status_code != 200:
print(f"[ERROR] GitHub API error ({response.status_code}): {response.text}")
raise Exception(f"GitHub API error ({response.status_code}): {response.text}")
default_branch = response.json().get("default_branch")
print(f"[INFO] Retrieved default branch: {default_branch}")
return {"content": default_branch}
# ======================================================================
# GITLAB FUNCTIONS
# ======================================================================
# --------------------
# GitLab API Helper
# --------------------
def get_gitlab_api_base_url(base_url=None):
"""
Returns the appropriate GitLab API base URL.
Args:
base_url: Custom base URL for enterprise GitLab (optional)
Returns:
str: API base URL (always includes /api/v4)
"""
if not base_url:
return "https://gitlab.com/api/v4"
base_url = base_url.rstrip('/')
# If the URL is just https://gitlab.com (public GitLab without /api/v4), treat as public
if base_url == "https://gitlab.com":
return "https://gitlab.com/api/v4"
# For any URL, ensure /api/v4 is present
if not base_url.endswith('/api/v4'):
base_url = f"{base_url}/api/v4"
return base_url
# --------------------
# GitLab Repo Listing
# --------------------
def gitlab_list_repositories(name, target_type, token, base_url=None, page=0, per_page=50, search=None):
"""
Fetches a list of repositories for a given GitLab user or group,
supporting both public and enterprise GitLab.
Args:
name: Username or group name
target_type: "user" or "org" (org = group in GitLab)
token: GitLab access token (team-specific)
base_url: Custom GitLab base URL for enterprise (optional)
page: 0-based page number from client (will be converted to 1-based for GitLab)
per_page: Number of items per page (max 100 for GitLab API)
search: Optional repository name to search for (uses GitLab's native search)
"""
# Determine if this is truly enterprise (not just https://gitlab.com)
is_enterprise = base_url is not None and base_url.rstrip('/') not in ["https://gitlab.com", "https://gitlab.com/api/v4"]
print(f"[INFO] Fetching GitLab repositories (Enterprise: {is_enterprise}, Search: {search})...")
headers = {
"PRIVATE-TOKEN": token,
"Accept": "application/json"
}
api_base_url = get_gitlab_api_base_url(base_url)
# Convert 0-based pagination (from client) to 1-based (GitLab API)
gitlab_page = page + 1
# Ensure per_page is within GitLab's limits (1-100)
per_page = min(max(1, per_page), 100)
print(f"[DEBUG] Client page: {page}, GitLab page: {gitlab_page}, per_page: {per_page}")
if target_type == "user":
# For user repos, we can use the /projects endpoint with membership filter
if search and search.strip():
# Use GitLab's native search API for projects
# This searches across all projects the user has access to
search_url = f"{api_base_url}/projects?membership=true&search={requests.utils.quote(search)}&per_page={per_page}&page={gitlab_page}"
print(f"[DEBUG] Using GitLab native search: {search_url}")
response = requests.get(search_url, headers=headers)
if response.status_code != 200:
print(f"[ERROR] GitLab API error ({response.status_code}): {response.text}")
raise Exception(f"GitLab API error ({response.status_code}): {response.text}")
has_next = response.headers.get("X-Next-Page", "").strip() != ""
repos = response.json()
formatted_repos = []
for repo in repos:
formatted_repos.append({
"name": repo.get("path_with_namespace"),
"cloneUrl": repo.get("http_url_to_repo"),
"htmlUrl": repo.get("web_url"),
"language": "Gitlab",
"private": repo.get("visibility") == "private",
"visibility": repo.get("visibility", "public")
})
print(f"[INFO] GitLab search returned {len(formatted_repos)} matching repositories")
return {"content": formatted_repos, "hasNext": has_next}
else:
# No search - fetch owned + contributed projects
# Get user ID first
user_url = f"{api_base_url}/users?username={name}"
user_response = requests.get(user_url, headers=headers)
if user_response.status_code != 200 or not user_response.json():
print(f"[ERROR] GitLab API error fetching user: {user_response.text}")
raise Exception(f"GitLab API error fetching user: {user_response.text}")
user_id = user_response.json()[0]['id']
# Fetch owned projects
owned_repos_url = f"{api_base_url}/users/{user_id}/projects?per_page={per_page}&page={gitlab_page}"
print(f"[DEBUG] Fetching owned projects: {owned_repos_url}")
owned_response = requests.get(owned_repos_url, headers=headers)
if owned_response.status_code != 200:
print(f"[ERROR] GitLab API error fetching owned repos ({owned_response.status_code}): {owned_response.text}")
raise Exception(f"GitLab API error ({owned_response.status_code}): {owned_response.text}")
owned_has_next = owned_response.headers.get("X-Next-Page", "").strip() != ""
owned_repos = owned_response.json()
# Fetch contributed projects
contributed_repos_url = f"{api_base_url}/projects?membership=true&owned=false&per_page={per_page}&page={gitlab_page}"
print(f"[DEBUG] Fetching contributed projects: {contributed_repos_url}")
contributed_response = requests.get(contributed_repos_url, headers=headers)
contributed_repos = []
contributed_has_next = False
if contributed_response.status_code == 200:
contributed_repos = contributed_response.json()
contributed_has_next = contributed_response.headers.get("X-Next-Page", "").strip() != ""
print(f"[INFO] Retrieved {len(contributed_repos)} contributed repositories")
else:
print(f"[WARN] Could not fetch contributed projects ({contributed_response.status_code}): {contributed_response.text}")
# Combine and deduplicate
all_repos = owned_repos + contributed_repos
seen_paths = set()
unique_repos = []
for repo in all_repos:
path = repo.get("path_with_namespace")
if path not in seen_paths:
seen_paths.add(path)
unique_repos.append(repo)
has_next = owned_has_next or contributed_has_next
formatted_repos = []
for repo in unique_repos:
formatted_repos.append({
"name": repo.get("path_with_namespace"),
"cloneUrl": repo.get("http_url_to_repo"),
"htmlUrl": repo.get("web_url"),
"language": "Gitlab",
"private": repo.get("visibility") == "private",
"visibility": repo.get("visibility", "public")
})
print(f"[INFO] Retrieved {len(formatted_repos)} total repositories (owned + contributed, deduplicated)")
return {"content": formatted_repos, "hasNext": has_next}
else:
# For groups (orgs in GitLab)
if search and search.strip():
# Use group search API
# First, we need to get the group ID
group_url = f"{api_base_url}/groups/{requests.utils.quote(name, safe='')}"
group_response = requests.get(group_url, headers=headers)
if group_response.status_code != 200:
print(f"[ERROR] GitLab API error fetching group ({group_response.status_code}): {group_response.text}")
raise Exception(f"GitLab API error ({group_response.status_code}): {group_response.text}")
group_id = group_response.json().get("id")
# Use group search endpoint
search_url = f"{api_base_url}/groups/{group_id}/search?scope=projects&search={requests.utils.quote(search)}&per_page={per_page}&page={gitlab_page}"
print(f"[DEBUG] Using GitLab group search: {search_url}")
response = requests.get(search_url, headers=headers)
if response.status_code != 200:
print(f"[ERROR] GitLab API error ({response.status_code}): {response.text}")
raise Exception(f"GitLab API error ({response.status_code}): {response.text}")
has_next = response.headers.get("X-Next-Page", "").strip() != ""
repos = response.json()
formatted_repos = []
for repo in repos:
formatted_repos.append({
"name": repo.get("path_with_namespace"),
"cloneUrl": repo.get("http_url_to_repo"),
"htmlUrl": repo.get("web_url"),
"language": "Gitlab",
"private": repo.get("visibility") == "private",
"visibility": repo.get("visibility", "public")
})
print(f"[INFO] GitLab group search returned {len(formatted_repos)} matching repositories")
return {"content": formatted_repos, "hasNext": has_next}
else:
# No search - normal group project listing
repos_url = f"{api_base_url}/groups/{name}/projects?per_page={per_page}&page={gitlab_page}&include_subgroups=true"
print(f"[DEBUG] Calling URL: {repos_url}")
response = requests.get(repos_url, headers=headers)
if response.status_code != 200:
print(f"[ERROR] GitLab API error ({response.status_code}): {response.text}")
raise Exception(f"GitLab API error ({response.status_code}): {response.text}")
has_next = response.headers.get("X-Next-Page", "").strip() != ""
formatted_repos = []
for repo in response.json():
formatted_repos.append({
"name": repo.get("path_with_namespace"),
"cloneUrl": repo.get("http_url_to_repo"),
"htmlUrl": repo.get("web_url"),
"language": "Gitlab",
"private": repo.get("visibility") == "private",
"visibility": repo.get("visibility", "public")
})
print(f"[INFO] Retrieved {len(formatted_repos)} repositories")
return {"content": formatted_repos, "hasNext": has_next}
# --------------------
# GitLab Branch Listing
# --------------------
def gitlab_list_branches(repo_url, token, base_url=None):
"""
Fetches all branch names for a given GitLab repository URL,
supporting both public and enterprise GitLab.
Args:
repo_url: GitLab repository URL
token: GitLab access token (team-specific)
base_url: Custom GitLab base URL for enterprise (optional)
"""
# Determine if this is truly enterprise (not just https://gitlab.com)
is_enterprise = base_url is not None and base_url.rstrip('/') not in ["https://gitlab.com", "https://gitlab.com/api/v4"]
print(f"[INFO] Fetching branches for repo: {repo_url} (Enterprise: {is_enterprise})")
# Extract project path from GitLab URL
match = re.search(r"https://[^/]+/(.+?)(?:\.git)?$", repo_url)
if not match:
raise ValueError("Invalid GitLab repository URL format.")
project_path = match.group(1)
# URL encode the project path for API calls
encoded_project_path = requests.utils.quote(project_path, safe='')
headers = {
"PRIVATE-TOKEN": token,
"Accept": "application/json"
}
api_base_url = get_gitlab_api_base_url(base_url)
branches = []
page = 1
while True:
branches_url = f"{api_base_url}/projects/{encoded_project_path}/repository/branches?per_page=100&page={page}"
response = requests.get(branches_url, headers=headers)
if response.status_code != 200:
print(f"[ERROR] GitLab API error ({response.status_code}): {response.text}")
raise Exception(f"GitLab API error ({response.status_code}): {response.text}")
data = response.json()
if not data:
break
for branch in data:
branches.append(branch.get("name"))
# Check if there's a next page
if not response.headers.get("X-Next-Page", "").strip():
break
page += 1
print(f"[INFO] Retrieved {len(branches)} branches.")
return {"content": branches}
# --------------------
# GitLab Default Branch
# --------------------
def gitlab_get_default_branch(repo_url, token, base_url=None):
"""
Fetches the default branch for a given GitLab repository URL,
supporting both public and enterprise GitLab.
Args:
repo_url: GitLab repository URL
token: GitLab access token (team-specific)
base_url: Custom GitLab base URL for enterprise (optional)
"""
# Determine if this is truly enterprise (not just https://gitlab.com)
is_enterprise = base_url is not None and base_url.rstrip('/') not in ["https://gitlab.com", "https://gitlab.com/api/v4"]
print(f"[INFO] Fetching default branch for repo: {repo_url} (Enterprise: {is_enterprise})")
# Extract project path from GitLab URL
match = re.search(r"https://[^/]+/(.+?)(?:\.git)?$", repo_url)
if not match:
raise ValueError("Invalid GitLab repository URL format.")
project_path = match.group(1)
# URL encode the project path for API calls
encoded_project_path = requests.utils.quote(project_path, safe='')
headers = {
"PRIVATE-TOKEN": token,
"Accept": "application/json"
}
api_base_url = get_gitlab_api_base_url(base_url)
repo_api_url = f"{api_base_url}/projects/{encoded_project_path}"
response = requests.get(repo_api_url, headers=headers)
if response.status_code != 200:
print(f"[ERROR] GitLab API error ({response.status_code}): {response.text}")
raise Exception(f"GitLab API error ({response.status_code}): {response.text}")
default_branch = response.json().get("default_branch")
print(f"[INFO] Retrieved default branch: {default_branch}")
return {"content": default_branch}
# ======================================================================
# EKS FUNCTIONS
# ======================================================================
# --------------------
# EKS Token Generator
# --------------------
def get_eks_token(cluster_name, region):
"""
Generates a presigned STS URL for EKS authentication.
"""
print("[INFO] Generating EKS token with custom signing...")
try:
session = boto3.session.Session()
sts_client = session.client('sts', region_name=region)
service_model = sts_client.meta.service_model
signer = RequestSigner(
service_model.service_id, region, 'sts', 'v4',
session.get_credentials(), session.events
)
request_dict = {
'method': 'GET',
'url': f'https://sts.{region}.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15',
'body': {}, 'headers': {'x-k8s-aws-id': cluster_name}, 'context': {}
}
presigned_url = signer.generate_presigned_url(
request_dict=request_dict, expires_in=60, operation_name='GetCallerIdentity'
)
token = 'k8s-aws-v1.' + base64.urlsafe_b64encode(
presigned_url.encode('utf-8')
).decode('utf-8').rstrip('=')
print("[INFO] EKS token generated successfully.")
return token
except Exception as e:
print(f"[ERROR] Failed to generate EKS token: {str(e)}")
raise
# --------------------
# EKS API Configuration
# --------------------
def configure_kubernetes_client(cluster_name, region):
"""
Configures the Kubernetes client to communicate with an EKS cluster.
"""
print(f"[INFO] Configuring Kubernetes client for cluster: {cluster_name}")
eks_client = boto3.client('eks', region_name=region)
cluster_info = eks_client.describe_cluster(name=cluster_name)
endpoint = cluster_info['cluster']['endpoint']
cert_authority = cluster_info['cluster']['certificateAuthority']['data']
configuration = client.Configuration()
configuration.host = endpoint
configuration.ssl_ca_cert = write_temp_cert(cert_authority)
configuration.api_key['authorization'] = get_eks_token(cluster_name, region)
configuration.api_key_prefix['authorization'] = 'Bearer'
client.Configuration.set_default(configuration)
print("[INFO] Kubernetes client configured.")
def write_temp_cert(cert_data):
"""
Writes the cluster's certificate authority data to a temporary file.
"""
import tempfile
cert_file = tempfile.NamedTemporaryFile(delete=False)
cert_file.write(base64.b64decode(cert_data))
cert_file.close()
return cert_file.name
# --------------------
# Job Trigger Function
# --------------------
def trigger_eks_job(cluster_name, region, job_name, image, env_vars, args_list):
namespace = "default"
"""
Triggers a Kubernetes Job on an EKS cluster using the Kubernetes Python client.
"""
print("[INFO] Setting up Kubernetes client...")
configure_kubernetes_client(cluster_name, region)
batch_v1 = client.BatchV1Api()
print("[INFO] Defining Kubernetes job spec...")
# Container security context for non-root execution
container_security_context = client.V1SecurityContext(
allow_privilege_escalation=False,
run_as_non_root=True
)
# Resource requests and limits from environment variables
resources = client.V1ResourceRequirements(
requests={
"cpu": os.environ.get("K8_RESOURCES_REQUEST_CPU", "2000m"),
"memory": os.environ.get("K8_RESOURCES_REQUEST_MEMORY", "3Gi"),
},
limits={
"memory": os.environ.get("K8_RESOURCES_LIMIT_MEMORY", "8Gi"),
}
)
container = client.V1Container(
name="scanner", image=image, args=args_list,
env=[client.V1EnvVar(name=k, value=v) for k, v in env_vars.items()],
security_context=container_security_context,
resources=resources
)
# Pod security context for non-root user (UID 1000)
pod_security_context = client.V1PodSecurityContext(
run_as_user=1000,
run_as_group=1000,
fs_group=1000
)
pod_spec = client.V1PodSpec(
restart_policy="Never",
containers=[container],
image_pull_secrets=[client.V1LocalObjectReference(name="image-secret")],
security_context=pod_security_context
)
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(name=job_name), spec=pod_spec
)
spec = client.V1JobSpec(
template=template, backoff_limit=1, ttl_seconds_after_finished=240
)
job = client.V1Job(
api_version="batch/v1", kind="Job",
metadata=client.V1ObjectMeta(name=job_name), spec=spec
)
try:
print("[INFO] Submitting job to Kubernetes cluster...")
batch_v1.create_namespaced_job(namespace=namespace, body=job)
print(f"[SUCCESS] Job '{job_name}' successfully created in namespace '{namespace}'.")
except ApiException as e:
print(f"[ERROR] Exception when calling BatchV1Api->create_namespaced_job: {e}")
raise
# --------------------
# Job Status Check Function
# --------------------
def check_job_status(cluster_name, region, job_name):
"""
Checks the status of a Kubernetes Job on an EKS cluster.
"""
print(f"[INFO] Checking status for job: {job_name}")
configure_kubernetes_client(cluster_name, region)
batch_v1 = client.BatchV1Api()
try:
job = batch_v1.read_namespaced_job(name=job_name, namespace="default")
status = job.status
result = {
"jobName": job_name,
"active": status.active if status.active else 0,
"succeeded": status.succeeded if status.succeeded else 0,
"failed": status.failed if status.failed else 0,
"startTime": status.start_time.isoformat() if status.start_time else None,
"completionTime": status.completion_time.isoformat() if status.completion_time else None
}
if status.conditions:
result["conditions"] = [
{
"type": c.type,
"status": c.status,
"reason": c.reason,
"message": c.message
} for c in status.conditions
]
print(f"[INFO] Job status retrieved: {result}")
return {"content": result}
except ApiException as e:
if e.status == 404:
print(f"[INFO] Job '{job_name}' not found.")
return {"content": {"jobName": job_name, "status": "Not Found"}}
else:
print(f"[ERROR] Exception when calling BatchV1Api->read_namespaced_job: {e}")
raise
# ======================================================================
# LAMBDA HANDLER
# ======================================================================
def lambda_handler(event, context):
"""
AWS Lambda handler function to process GitHub/GitLab and EKS-related actions.
Supports multi-team configuration via environment variables.
"""
print("[INFO] Lambda invocation started.")
print(f"[DEBUG] Received event: {json.dumps(event, indent=2)}")
try:
# Parse request body
if 'body' in event and isinstance(event['body'], str):
body = json.loads(event['body'])
elif 'body' in event and isinstance(event['body'], dict):
body = event['body']
else:
body = event
print(f"[DEBUG] Parsed body: {json.dumps(body, indent=2)}")
# Extract action
action = body.get("action")
if not action:
return {
"statusCode": 400,
"body": json.dumps({"error": "Missing required parameter: 'action'."})
}
# Extract teamName (required for all actions)
team_name = body.get("teamName")
if not team_name:
return {
"statusCode": 400,
"body": json.dumps({"error": "Missing required parameter: 'teamName'."})
}
# Extract is-enterprise flag (optional, can be passed to control which credentials to use)
# Check both top level and inside 'args' object
use_enterprise = body.get("is-enterprise")
if use_enterprise is None and "args" in body:
use_enterprise = body["args"].get("is-enterprise")
if use_enterprise is not None:
# Convert to boolean if string
if isinstance(use_enterprise, str):
use_enterprise = use_enterprise.lower() in ['true', '1', 'yes']
else:
use_enterprise = bool(use_enterprise)
print(f"[DEBUG] is-enterprise flag from event: {use_enterprise} (checked top level and args)")
print(f"[INFO] Processing action '{action}' for team '{team_name}'")
# ======================
# Action: list_repos
# ======================
if action == "list_repos":
print("[INFO] Action: list_repos")
# Get repo type to determine platform
repo_type = body.get("type")
platform = body.get("platform", "github").lower() # Default to github for backward compatibility
# Get team-specific configuration
try:
team_config = get_team_config(team_name, platform=platform, use_enterprise=use_enterprise)
except ValueError as e:
return {"statusCode": 400, "body": json.dumps({"error": str(e)})}
# Use username from team config, fallback to request body if not set
name = team_config.get("username") or body.get("name", "")
if not name:
return {
"statusCode": 400,
"body": json.dumps({
"error": f"Username not found for team '{team_name}' on platform '{platform}'. "
f"Please set environment variable: TEAM_{team_name.replace('-', '_').replace(' ', '_')}_{platform.upper()}_USERNAME "
f"or provide 'name' in the request body."
})
}
target_type = repo_type if repo_type in ["user", "org"] else "user"
page = int(body.get("page", 0)) # 0-based pagination from client
page_size = int(body.get("pageSize", 50))
search = body.get("search") # Extract search parameter for both platforms
if platform == "github":
visibility = body.get("visibility", "all")
# Validate visibility parameter
valid_visibilities = ["all", "public", "private", "internal"]
if visibility not in valid_visibilities:
return {
"statusCode": 400,
"body": json.dumps({"error": f"Invalid 'visibility' parameter. Must be one of: {', '.join(valid_visibilities)}"})
}