-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathenumerateGit.py
116 lines (100 loc) · 4.26 KB
/
enumerateGit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import urllib.request as R
import urllib.error as ER
import re
import json
import itertools as I
import sqlite3 as S
class publicRSAKey:
def __init__(self, key : str):
self.key : str = key
def __eq__(self, other):
return self.key == other.key
def __hash__(self):
return hash(self.key)
def __str__(self):
return self.key
class User:
def __init__(self, username : str):
self.username : str = username
self._cachedProjects : Project = None
self._cachedKeys : [publicRSAKey] = None
def __eq__(self, other):
return self.username == other.username
def __hash__(self):
return hash(self.username)
def __str__(self):
return self.username
def getPublicRSAKey(self, cache : bool, **kwargs):
geturl = 'https://api.github.com/users/{user}/keys'.format(user = self.username)
if "clientID" in kwargs and "clientSecret" in kwargs:
geturl += "?client_id={id}&client_secret={secret}"
geturl = geturl.format(id = kwargs["clientID"], secret = kwargs["clientSecret"])
with R.urlopen(geturl) as remot:
resp = json.loads(remot.read())
keys = [publicRSAKey(keyStr["key"]) for keyStr in resp]
if cache:
self._cachedKeys = keys
return keys
# getProjects is also defined, but later
class Project:
def __init__(self, url : str):
self.url : str = url
matches = re.match("""http[s]{0,1}:\/\/github.com\/(.+)\/(.+)""", url).groups()
self.owner : User = User(matches[0])
self.name : str = matches[1]
self._cachedUsers : [User] = None
def __eq__(self, other):
return self.name == other.name and other.owner == other.owner
def __hash__(self):
return hash((name, owner))
def getUsers(self, cache : bool) -> {User}:
if (self._cachedUsers):
return self._cachedUsers
else:
geturl = 'https://api.github.com/repos/{owner}/{name}/contributors'.format(owner = self.owner.username, name = self.name)
if "clientID" in kwargs and "clientSecret" in kwargs:
geturl += "?client_id={id}&client_secret={secret}"
geturl = geturl.format(id = kwargs["clientID"], secret = kwargs["clientSecret"])
# print(geturl)
with R.urlopen(geturl) as remot:
resp = json.loads(remot.read())
usersStr = map(lambda x: x["login"], resp)
users = set(map(User, usersStr))
if cache:
self._cachedUsers = users
return users
def getProjects(self, cache : bool) -> {Project}:
if (self._cachedProjects):
return self._cachedProjects
else:
geturl = 'https://api.github.com/users/{user}/repos?type=owner'.format(user = self.username)
if "clientID" in kwargs and "clientSecret" in kwargs:
geturl += "?client_id={id}&client_secret={secret}"
geturl = geturl.format(id = kwargs["clientID"], secret = kwargs["clientSecret"])
# print(geturl)
with R.urlopen(geturl) as remot:
resp = json.loads(remot.read())
projectsStr = map(lambda x: x["name"], resp)
projects = [Project("https://github.com/{user}/{repo}".format(user = self.username, repo = x)) for x in projectsStr]
if cache:
self._cachedProjects = projects
return projects
User.getProjects = getProjects
def recursivelyTraverse(seedUser : User) -> User:
currentUser = seedUser
traversedUsers = {seedUser}
currentGeneration = set(I.chain(*[project.getUsers(False) for project in currentUser.getProjects(False)]))
while True:
yield from currentGeneration
for x in currentGeneration: #TODO: make this recurse more effeciently
if x in traversedUsers:
continue
traversedUsers.add(x)
currentUser = x
try:
currentGeneration = set(I.chain(*[project.getUsers(False) for project in currentUser.getProjects(False)]))
except ER.HTTPError as err:
if (err.code == 403):
yield err
else:
raise err