import datetime import requests import os from lxml import etree import time import hashlib # Fine-grained personal access token HEADERS = {'authorization': 'token '+ os.environ['ACCESS_TOKEN']} USER_NAME = os.environ['USER_NAME'] QUERY_COUNT = {'user_getter': 0, 'follower_getter': 0, 'graph_repos_stars': 0, 'recursive_loc': 0, 'graph_commits': 0, 'loc_query': 0} def simple_request(func_name, query, variables): request = requests.post('https://api.github.com/graphql', json={'query': query, 'variables':variables}, headers=HEADERS) if request.status_code == 200: return request if request.status_code == 502: return request raise Exception(func_name, ' has failed with a', request.status_code, request.text, QUERY_COUNT) def graph_repos_stars(count_type, owner_affiliation, cursor=None): query_count('graph_repos_stars') query = ''' query ($owner_affiliation: [RepositoryAffiliation], $login: String!, $cursor: String) { user(login: $login) { repositories(first: 100, after: $cursor, ownerAffiliations: $owner_affiliation) { totalCount edges { node { ... on Repository { nameWithOwner stargazers { totalCount } } } } pageInfo { endCursor hasNextPage } } } }''' variables = {'owner_affiliation': owner_affiliation, 'login': USER_NAME, 'cursor': cursor} request = simple_request(graph_repos_stars.__name__, query, variables) res = request.json() if count_type == 'repos': return res['data']['user']['repositories']['totalCount'] elif count_type == 'stars': return stars_counter(res['data']['user']['repositories']['edges']) def get_loc_for_repo(owner, repo_name): """ Non-recursive version of recursive_loc using a while loop to avoid RecursionError. """ addition_total = 0 deletion_total = 0 my_commits = 0 cursor = None has_next_page = True while has_next_page: query_count('recursive_loc') query = ''' query ($repo_name: String!, $owner: String!, $cursor: String) { repository(name: $repo_name, owner: $owner) { defaultBranchRef { target { ... on Commit { history(first: 100, after: $cursor) { edges { node { author { user { id } } deletions additions } } pageInfo { endCursor hasNextPage } } } } } } }''' variables = {'repo_name': repo_name, 'owner': owner, 'cursor': cursor} request = requests.post('https://api.github.com/graphql', json={'query': query, 'variables':variables}, headers=HEADERS) if request.status_code != 200: print(f" ⚠️ Skipping {owner}/{repo_name} due to API error ({request.status_code})") break res = request.json() repo_data = res.get('data', {}).get('repository') if not repo_data or not repo_data.get('defaultBranchRef'): break history = repo_data['defaultBranchRef']['target']['history'] for node in history['edges']: if node['node']['author']['user'] == OWNER_ID: my_commits += 1 addition_total += node['node']['additions'] deletion_total += node['node']['deletions'] has_next_page = history['pageInfo']['hasNextPage'] cursor = history['pageInfo']['endCursor'] return addition_total, deletion_total, my_commits def loc_query(owner_affiliation, comment_size=0, force_cache=False): """ Non-recursive version of loc_query. """ edges = [] cursor = None has_next_page = True while has_next_page: query_count('loc_query') query = ''' query ($owner_affiliation: [RepositoryAffiliation], $login: String!, $cursor: String) { user(login: $login) { repositories(first: 60, after: $cursor, ownerAffiliations: $owner_affiliation) { edges { node { ... on Repository { nameWithOwner defaultBranchRef { target { ... on Commit { history { totalCount } } } } } } } pageInfo { endCursor hasNextPage } } } }''' variables = {'owner_affiliation': owner_affiliation, 'login': USER_NAME, 'cursor': cursor} request = simple_request('loc_query', query, variables) res_json = request.json() if 'data' not in res_json or res_json['data']['user'] is None: break repo_batch = res_json['data']['user']['repositories'] edges += repo_batch['edges'] has_next_page = repo_batch['pageInfo']['hasNextPage'] cursor = repo_batch['pageInfo']['endCursor'] return cache_builder(edges, comment_size, force_cache) def cache_builder(edges, comment_size, force_cache, loc_add=0, loc_del=0): filename = 'cache/'+hashlib.sha256(USER_NAME.encode('utf-8')).hexdigest()+'.txt' if not os.path.exists('cache'): os.makedirs('cache') try: with open(filename, 'r') as f: data = f.readlines() except FileNotFoundError: data = ['\n']*comment_size with open(filename, 'w') as f: f.writelines(data) if len(data)-comment_size != len(edges) or force_cache: flush_cache(edges, filename, comment_size) with open(filename, 'r') as f: data = f.readlines() cache_comment = data[:comment_size] data = data[comment_size:] for index in range(len(edges)): repo_hash, commit_count, *__ = data[index].split() repo_name_full = edges[index]['node']['nameWithOwner'] if repo_hash == hashlib.sha256(repo_name_full.encode('utf-8')).hexdigest(): if edges[index]['node']['defaultBranchRef'] is not None: new_commit_count = edges[index]['node']['defaultBranchRef']['target']['history']['totalCount'] if int(commit_count) != new_commit_count: owner, repo_name = repo_name_full.split('/') loc = get_loc_for_repo(owner, repo_name) data[index] = repo_hash + f' {new_commit_count} {loc[2]} {loc[0]} {loc[1]}\n' else: data[index] = repo_hash + ' 0 0 0 0\n' with open(filename, 'w') as f: f.writelines(cache_comment) f.writelines(data) for line in data: loc = line.split() loc_add += int(loc[3]) loc_del += int(loc[4]) return [loc_add, loc_del, loc_add - loc_del, True] def flush_cache(edges, filename, comment_size): try: with open(filename, 'r') as f: data = f.readlines()[:comment_size] except: data = ['\n']*comment_size with open(filename, 'w') as f: f.writelines(data) for node in edges: f.write(hashlib.sha256(node['node']['nameWithOwner'].encode('utf-8')).hexdigest() + ' 0 0 0 0\n') def stars_counter(data): return sum(node['node']['stargazers']['totalCount'] for node in data) def svg_overwrite(filename, commit_data, star_data, repo_data, contrib_data, follower_data, loc_data): tree = etree.parse(filename) root = tree.getroot() justify_format(root, 'commit_data', commit_data, 23) justify_format(root, 'star_data', star_data, 14) justify_format(root, 'repo_data', repo_data, 7) justify_format(root, 'contrib_data', contrib_data) justify_format(root, 'follower_data', follower_data, 10) justify_format(root, 'loc_data', loc_data[2], 13) justify_format(root, 'loc_add', loc_data[0]) justify_format(root, 'loc_del', loc_data[1], 7) tree.write(filename, encoding='utf-8', xml_declaration=True) def justify_format(root, element_id, new_text, length=0): if isinstance(new_text, int): new_text = f"{new_text:,}" find_and_replace(root, element_id, str(new_text)) just_len = max(0, length - len(str(new_text))) dot_string = ' ' + ('.' * just_len) + ' ' if just_len > 2 else ('. ' if just_len == 2 else ' ') find_and_replace(root, f"{element_id}_dots", dot_string) def find_and_replace(root, element_id, new_text): element = root.find(f".//*[@id='{element_id}']") if element is not None: element.text = new_text def commit_counter(comment_size): filename = 'cache/'+hashlib.sha256(USER_NAME.encode('utf-8')).hexdigest()+'.txt' with open(filename, 'r') as f: data = f.readlines()[comment_size:] return sum(int(line.split()[2]) for line in data) def user_getter(username): query_count('user_getter') query = 'query($login: String!){ user(login: $login) { id createdAt } }' request = simple_request('user_getter', query, {'login': username}) return {'id': request.json()['data']['user']['id']}, request.json()['data']['user']['createdAt'] def follower_getter(username): query_count('follower_getter') query = 'query($login: String!){ user(login: $login) { followers { totalCount } } }' request = simple_request('follower_getter', query, {'login': username}) return int(request.json()['data']['user']['followers']['totalCount']) def query_count(funct_id): global QUERY_COUNT QUERY_COUNT[funct_id] += 1 def perf_counter(funct, *args): start = time.perf_counter() return funct(*args), time.perf_counter() - start if __name__ == '__main__': user_data, _ = perf_counter(user_getter, USER_NAME) OWNER_ID, _ = user_data total_loc, _ = perf_counter(loc_query, ['OWNER', 'COLLABORATOR', 'ORGANIZATION_MEMBER'], 7) commit_data, _ = perf_counter(commit_counter, 7) star_data, _ = perf_counter(graph_repos_stars, 'stars', ['OWNER']) repo_data, _ = perf_counter(graph_repos_stars, 'repos', ['OWNER']) contrib_data, _ = perf_counter(graph_repos_stars, 'repos', ['OWNER', 'COLLABORATOR', 'ORGANIZATION_MEMBER']) follower_data, _ = perf_counter(follower_getter, USER_NAME) formatted_loc = [f"{x:,}" if isinstance(x, int) else x for x in total_loc[:-1]] svg_overwrite('dark_mode.svg', commit_data, star_data, repo_data, contrib_data, follower_data, formatted_loc) print("SVG updated successfully.")