mirror of
https://github.com/driftywinds/driftywinds.git
synced 2026-01-30 14:33:26 +00:00
280 lines
11 KiB
Python
280 lines
11 KiB
Python
import datetime
|
|
import requests
|
|
import os
|
|
from lxml import etree
|
|
import time
|
|
import hashlib
|
|
|
|
# Fine-grained personal access token
|
|
HEADERS = {'authorization': 'token '+ os.environ['ACCESS_TOKEN']}
|
|
USER_NAME = os.environ['USER_NAME']
|
|
QUERY_COUNT = {'user_getter': 0, 'follower_getter': 0, 'graph_repos_stars': 0, 'recursive_loc': 0, 'graph_commits': 0, 'loc_query': 0}
|
|
|
|
def simple_request(func_name, query, variables):
|
|
request = requests.post('https://api.github.com/graphql', json={'query': query, 'variables':variables}, headers=HEADERS)
|
|
if request.status_code == 200:
|
|
return request
|
|
if request.status_code == 502:
|
|
return request
|
|
raise Exception(func_name, ' has failed with a', request.status_code, request.text, QUERY_COUNT)
|
|
|
|
def graph_repos_stars(count_type, owner_affiliation, cursor=None):
|
|
query_count('graph_repos_stars')
|
|
query = '''
|
|
query ($owner_affiliation: [RepositoryAffiliation], $login: String!, $cursor: String) {
|
|
user(login: $login) {
|
|
repositories(first: 100, after: $cursor, ownerAffiliations: $owner_affiliation) {
|
|
totalCount
|
|
edges {
|
|
node {
|
|
... on Repository {
|
|
nameWithOwner
|
|
stargazers {
|
|
totalCount
|
|
}
|
|
}
|
|
}
|
|
}
|
|
pageInfo {
|
|
endCursor
|
|
hasNextPage
|
|
}
|
|
}
|
|
}
|
|
}'''
|
|
variables = {'owner_affiliation': owner_affiliation, 'login': USER_NAME, 'cursor': cursor}
|
|
request = simple_request(graph_repos_stars.__name__, query, variables)
|
|
res = request.json()
|
|
if count_type == 'repos':
|
|
return res['data']['user']['repositories']['totalCount']
|
|
elif count_type == 'stars':
|
|
return stars_counter(res['data']['user']['repositories']['edges'])
|
|
|
|
def get_loc_for_repo(owner, repo_name):
|
|
"""
|
|
Non-recursive version of recursive_loc using a while loop to avoid RecursionError.
|
|
"""
|
|
addition_total = 0
|
|
deletion_total = 0
|
|
my_commits = 0
|
|
cursor = None
|
|
has_next_page = True
|
|
|
|
while has_next_page:
|
|
query_count('recursive_loc')
|
|
query = '''
|
|
query ($repo_name: String!, $owner: String!, $cursor: String) {
|
|
repository(name: $repo_name, owner: $owner) {
|
|
defaultBranchRef {
|
|
target {
|
|
... on Commit {
|
|
history(first: 100, after: $cursor) {
|
|
edges {
|
|
node {
|
|
author {
|
|
user { id }
|
|
}
|
|
deletions
|
|
additions
|
|
}
|
|
}
|
|
pageInfo {
|
|
endCursor
|
|
hasNextPage
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}'''
|
|
variables = {'repo_name': repo_name, 'owner': owner, 'cursor': cursor}
|
|
request = requests.post('https://api.github.com/graphql', json={'query': query, 'variables':variables}, headers=HEADERS)
|
|
|
|
if request.status_code != 200:
|
|
print(f" ⚠️ Skipping {owner}/{repo_name} due to API error ({request.status_code})")
|
|
break
|
|
|
|
res = request.json()
|
|
repo_data = res.get('data', {}).get('repository')
|
|
if not repo_data or not repo_data.get('defaultBranchRef'):
|
|
break
|
|
|
|
history = repo_data['defaultBranchRef']['target']['history']
|
|
for node in history['edges']:
|
|
if node['node']['author']['user'] == OWNER_ID:
|
|
my_commits += 1
|
|
addition_total += node['node']['additions']
|
|
deletion_total += node['node']['deletions']
|
|
|
|
has_next_page = history['pageInfo']['hasNextPage']
|
|
cursor = history['pageInfo']['endCursor']
|
|
|
|
return addition_total, deletion_total, my_commits
|
|
|
|
def loc_query(owner_affiliation, comment_size=0, force_cache=False):
|
|
"""
|
|
Non-recursive version of loc_query.
|
|
"""
|
|
edges = []
|
|
cursor = None
|
|
has_next_page = True
|
|
|
|
while has_next_page:
|
|
query_count('loc_query')
|
|
query = '''
|
|
query ($owner_affiliation: [RepositoryAffiliation], $login: String!, $cursor: String) {
|
|
user(login: $login) {
|
|
repositories(first: 60, after: $cursor, ownerAffiliations: $owner_affiliation) {
|
|
edges {
|
|
node {
|
|
... on Repository {
|
|
nameWithOwner
|
|
defaultBranchRef {
|
|
target {
|
|
... on Commit {
|
|
history { totalCount }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
pageInfo {
|
|
endCursor
|
|
hasNextPage
|
|
}
|
|
}
|
|
}
|
|
}'''
|
|
variables = {'owner_affiliation': owner_affiliation, 'login': USER_NAME, 'cursor': cursor}
|
|
request = simple_request('loc_query', query, variables)
|
|
res_json = request.json()
|
|
|
|
if 'data' not in res_json or res_json['data']['user'] is None:
|
|
break
|
|
|
|
repo_batch = res_json['data']['user']['repositories']
|
|
edges += repo_batch['edges']
|
|
has_next_page = repo_batch['pageInfo']['hasNextPage']
|
|
cursor = repo_batch['pageInfo']['endCursor']
|
|
|
|
return cache_builder(edges, comment_size, force_cache)
|
|
|
|
def cache_builder(edges, comment_size, force_cache, loc_add=0, loc_del=0):
|
|
filename = 'cache/'+hashlib.sha256(USER_NAME.encode('utf-8')).hexdigest()+'.txt'
|
|
if not os.path.exists('cache'): os.makedirs('cache')
|
|
|
|
try:
|
|
with open(filename, 'r') as f: data = f.readlines()
|
|
except FileNotFoundError:
|
|
data = ['\n']*comment_size
|
|
with open(filename, 'w') as f: f.writelines(data)
|
|
|
|
if len(data)-comment_size != len(edges) or force_cache:
|
|
flush_cache(edges, filename, comment_size)
|
|
with open(filename, 'r') as f: data = f.readlines()
|
|
|
|
cache_comment = data[:comment_size]
|
|
data = data[comment_size:]
|
|
|
|
for index in range(len(edges)):
|
|
repo_hash, commit_count, *__ = data[index].split()
|
|
repo_name_full = edges[index]['node']['nameWithOwner']
|
|
if repo_hash == hashlib.sha256(repo_name_full.encode('utf-8')).hexdigest():
|
|
if edges[index]['node']['defaultBranchRef'] is not None:
|
|
new_commit_count = edges[index]['node']['defaultBranchRef']['target']['history']['totalCount']
|
|
if int(commit_count) != new_commit_count:
|
|
owner, repo_name = repo_name_full.split('/')
|
|
loc = get_loc_for_repo(owner, repo_name)
|
|
data[index] = repo_hash + f' {new_commit_count} {loc[2]} {loc[0]} {loc[1]}\n'
|
|
else:
|
|
data[index] = repo_hash + ' 0 0 0 0\n'
|
|
|
|
with open(filename, 'w') as f:
|
|
f.writelines(cache_comment)
|
|
f.writelines(data)
|
|
|
|
for line in data:
|
|
loc = line.split()
|
|
loc_add += int(loc[3])
|
|
loc_del += int(loc[4])
|
|
return [loc_add, loc_del, loc_add - loc_del, True]
|
|
|
|
def flush_cache(edges, filename, comment_size):
|
|
try:
|
|
with open(filename, 'r') as f:
|
|
data = f.readlines()[:comment_size]
|
|
except: data = ['\n']*comment_size
|
|
with open(filename, 'w') as f:
|
|
f.writelines(data)
|
|
for node in edges:
|
|
f.write(hashlib.sha256(node['node']['nameWithOwner'].encode('utf-8')).hexdigest() + ' 0 0 0 0\n')
|
|
|
|
def stars_counter(data):
|
|
return sum(node['node']['stargazers']['totalCount'] for node in data)
|
|
|
|
def svg_overwrite(filename, commit_data, star_data, repo_data, contrib_data, follower_data, loc_data):
|
|
tree = etree.parse(filename)
|
|
root = tree.getroot()
|
|
justify_format(root, 'commit_data', commit_data, 23)
|
|
justify_format(root, 'star_data', star_data, 14)
|
|
justify_format(root, 'repo_data', repo_data, 7)
|
|
justify_format(root, 'contrib_data', contrib_data)
|
|
justify_format(root, 'follower_data', follower_data, 10)
|
|
justify_format(root, 'loc_data', loc_data[2], 13)
|
|
justify_format(root, 'loc_add', loc_data[0])
|
|
justify_format(root, 'loc_del', loc_data[1], 7)
|
|
tree.write(filename, encoding='utf-8', xml_declaration=True)
|
|
|
|
def justify_format(root, element_id, new_text, length=0):
|
|
if isinstance(new_text, int): new_text = f"{new_text:,}"
|
|
find_and_replace(root, element_id, str(new_text))
|
|
just_len = max(0, length - len(str(new_text)))
|
|
dot_string = ' ' + ('.' * just_len) + ' ' if just_len > 2 else ('. ' if just_len == 2 else ' ')
|
|
find_and_replace(root, f"{element_id}_dots", dot_string)
|
|
|
|
def find_and_replace(root, element_id, new_text):
|
|
element = root.find(f".//*[@id='{element_id}']")
|
|
if element is not None: element.text = new_text
|
|
|
|
def commit_counter(comment_size):
|
|
filename = 'cache/'+hashlib.sha256(USER_NAME.encode('utf-8')).hexdigest()+'.txt'
|
|
with open(filename, 'r') as f:
|
|
data = f.readlines()[comment_size:]
|
|
return sum(int(line.split()[2]) for line in data)
|
|
|
|
def user_getter(username):
|
|
query_count('user_getter')
|
|
query = 'query($login: String!){ user(login: $login) { id createdAt } }'
|
|
request = simple_request('user_getter', query, {'login': username})
|
|
return {'id': request.json()['data']['user']['id']}, request.json()['data']['user']['createdAt']
|
|
|
|
def follower_getter(username):
|
|
query_count('follower_getter')
|
|
query = 'query($login: String!){ user(login: $login) { followers { totalCount } } }'
|
|
request = simple_request('follower_getter', query, {'login': username})
|
|
return int(request.json()['data']['user']['followers']['totalCount'])
|
|
|
|
def query_count(funct_id):
|
|
global QUERY_COUNT
|
|
QUERY_COUNT[funct_id] += 1
|
|
|
|
def perf_counter(funct, *args):
|
|
start = time.perf_counter()
|
|
return funct(*args), time.perf_counter() - start
|
|
|
|
if __name__ == '__main__':
|
|
user_data, _ = perf_counter(user_getter, USER_NAME)
|
|
OWNER_ID, _ = user_data
|
|
|
|
total_loc, _ = perf_counter(loc_query, ['OWNER', 'COLLABORATOR', 'ORGANIZATION_MEMBER'], 7)
|
|
commit_data, _ = perf_counter(commit_counter, 7)
|
|
star_data, _ = perf_counter(graph_repos_stars, 'stars', ['OWNER'])
|
|
repo_data, _ = perf_counter(graph_repos_stars, 'repos', ['OWNER'])
|
|
contrib_data, _ = perf_counter(graph_repos_stars, 'repos', ['OWNER', 'COLLABORATOR', 'ORGANIZATION_MEMBER'])
|
|
follower_data, _ = perf_counter(follower_getter, USER_NAME)
|
|
|
|
formatted_loc = [f"{x:,}" if isinstance(x, int) else x for x in total_loc[:-1]]
|
|
svg_overwrite('dark_mode.svg', commit_data, star_data, repo_data, contrib_data, follower_data, formatted_loc)
|
|
print("SVG updated successfully.") |