#!/usr/bin/env python3 # # A script to setup a Working tree from abe git submodules DEBUG = 1 def main(): global DEBUG p = parser() args = p.parse_args() DEBUG = args.debug DEBUG = DEBUG - (1 if args.quiet else 0) print(args) if DEBUG>1 else None count_submodules = 0 last_parent = None jobs = [] if args.abe_command: executeAbeCommand( args.output_dir, args.abe_command ) else: for sub in abe_submodules( args.output_dir, args.source_url, args.tag, recurse_bsps=args.recurse_bsps, recurse_bootloaders=args.recurse_bootloaders ): count_submodules += 1 if DEBUG > 0: print("Count:",count_submodules) if args.execute_command: from subprocess import run, CalledProcessError import sys try: run(args.execute_command, shell=True,check=True, cwd=sub.fullpath) except CalledProcessError as err: print(f"Command: \"{args.execute_command}\" in \"{sub.fullpath}\" returned exitcode \"{err.returncode}\"") sys.exit(err.returncode) else: from threading import Thread import time job = Thread(target=sub.clone_repo, kwargs={'cache_dir':args.cache_dir, 'fetch_origin':(not args.no_fetch_origin)}) job.start() jobs.append(job) while len(jobs) >= args.jobs: to_remove = None for j in jobs: if not j.is_alive(): j.join() to_remove = j if to_remove: jobs.remove(to_remove) else: time.sleep(1) if sub.last: while len(jobs): jobs.pop().join() def parser(): import argparse p = argparse.ArgumentParser() p.add_argument('-d', '--debug', action='count', default=1) p.add_argument('-j', '--jobs', default=1, type=int, help='use threaded clone with n jobs') p.add_argument('-q', '--quiet', action='store_true') p.add_argument('-o', '--output_dir', default='.', help='local directory to operate on') p.add_argument('-c', '--cache_dir', default=None, help='use a separate local directory to cache git repositories') p.add_argument('-n', '--no_fetch_origin', action='store_true', help='prevent fetching from origin') p.add_argument('-t', '--tag', default='master', help='commit/branch/tag to be checked out') p.add_argument('-s', '--source_url', default=None, help='url to be used for origin remote') p.add_argument('--recurse_bootloaders', action='store_true', help='also recurse into .abe/bootloaders') p.add_argument('--recurse_bsps', action='store_true', help='also recurse into .abe/bsps') p.add_argument('-a', '--abe_command', default=None, help="execute an ABE command in output_dir") p.add_argument('-x', '--execute_command', default=None, help='execute command in subdirs recursively') return p def executeAbeCommand( repo_path, command_name ): import os from subprocess import run, CalledProcessError commandFileRel = os.path.join('.abe', 'commands', command_name + ".cmd") commandFile = os.path.join(repo_path, commandFileRel) if os.path.isfile(commandFile): try: proc = run([ 'bash', '-c', f"set -e;./{commandFileRel}" ], check=True, cwd=repo_path) except CalledProcessError as err: print(commandFileRel, 'in', repo_path, 'returned exitcode', err.returncode) def abe_submodules(path, remote, ref, **kwargs): url = sane_origin_url( remote, path ) sub = AbeSubmodule([path, url.geturl(), ref], AbeSubType.SUBMODULE) yield sub for s in recurse_abe_submodules(sub, **kwargs): yield s def recurse_abe_submodules(parent, **kwargs): subm = get_abe_subtree( parent.fullpath, **kwargs) acc = [] last = None for su in subm: su.parent = parent print(f"Repo: \"{parent.fullpath}\" has Submodule: \"{su.fullpath}\" Type: \"{su.subtype}\" From: \"{su.url.geturl()}\" Ref: \"{su.ref}\"") if DEBUG else None acc.append(su) if last != None: last.last = False yield last last = su if last != None: last.last = True yield last for sub in acc: for ret in recurse_abe_submodules( sub ): yield ret def sane_origin_url(input_url = None, path = None): from urllib.parse import urlparse print("Source Url:", input_url) if DEBUG>1 else None if input_url: if hasattr(input_url, "geturl"): return input_url if input_url.find('://') < 0 and input_url.find( ':' ) > 0: source_url = 'ssh://' + input_url.replace( ':', '/', 1) else: source_url = input_url else: source_url = origin_url( path ) return urlparse(source_url) def origin_url(path): from git import Repo from urllib.parse import urlparse repo = Repo(path) for url in repo.remotes.origin.urls: return url from git import RemoteProgress import sys class ProgressPrinter(RemoteProgress): def update(self,op_code,cur_count,max_count=None,message=''): if DEBUG: cur_count_int = round(cur_count) if not max_count: div = f"{cur_count_int}/"+ ''.join(['?' for _ in range(len(str(cur_count_int)))]) perc = '???' else: max_count_int = round(max_count) div = f"{cur_count_int}/{max_count_int}" perc = round(100*cur_count / (max_count or 100)) print(f"\033[2K" + div, f"{perc:>3}%", message, end='\r') sys.stdout.flush() def clone_repo_cached( sub, cache_dir=None, bare=False, fetch_origin=True): print('Repo:', sub.path) if DEBUG>1 else None from git import Repo ref = sub.ref url = sub.url.geturl() repo = Repo.init(sub.fullpath, bare=bare) if 'origin' in repo.remotes: origin = repo.remotes['origin'] else: origin = repo.create_remote('origin', url) if cache_dir != None: cache_repo_path = cache_path(cache_dir, origin.urls) cache_link = real_relpath( cache_repo_path, sub.fullpath) print(f"Cacheing from: {cache_repo_path}, origin: {url}") if DEBUG else None cache_repo = AbeSubmodule((cache_repo_path, url, sub.ref), AbeSubType.SUBMODULE, parent=None) clone_repo_cached( cache_repo, cache_dir = None, bare = True, fetch_origin = fetch_origin) if 'cache' in repo.remotes: cache = repo.remotes['cache'] else: print(f"Setting up cache: {cache_link}, for Repo: {path}") if DEBUG>2 else None cache = repo.create_remote('cache', cache_link) # cache.set_url("no_push" , '--push') cache.fetch(refspec="+refs/remotes/origin/*:refs/remotes/origin/*",progress=ProgressPrinter()) print() elif fetch_origin or ( (ref not in origin.refs) and (ref not in repo.tags)): origin.fetch(progress=ProgressPrinter()) print() if not bare: print('Refs:', origin.refs) if DEBUG>1 else None print('Heads:', repo.heads) if DEBUG>1 else None if ref in repo.tags: tracking_branch_name = 'local_tag_branch/'+ref tracking_ref = repo.tags[ref] if tracking_branch_name in repo.heads: active_branch = repo.heads[tracking_branch_name] else: active_branch = repo.create_head(tracking_branch_name, tracking_ref) elif ref in repo.heads: tracking_ref = origin.refs[ref] active_branch = repo.heads[ref] active_branch.set_tracking_branch(tracking_ref) elif ref in origin.refs: tracking_ref = origin.refs[ref] active_branch = repo.create_head(ref, tracking_ref) active_branch.set_tracking_branch(tracking_ref) else: tracking_ref = ref tracking_branch_name = 'local_commit_branch/'+ref if tracking_branch_name in repo.heads: active_branch = repo.heads[tracking_branch_name] else: try: active_branch = repo.create_head(tracking_branch_name, tracking_ref) except Exception: raise Exception(f"Branch/Tag/Commit \"{ref}\" not found") print('Active Branch:', active_branch) if DEBUG else None print('Tracking Ref:', tracking_ref, '\n') if DEBUG else None active_branch.checkout() repo.head.reset( tracking_ref, index=True, working_tree=False) return repo def cache_path(cache, remote): import os.path from urllib.parse import urlparse for url in remote: sane_url = urlparse(url) sane_url = sane_url._replace( path = str(sane_url.path).lstrip('/'), netloc = str(sane_url.netloc).replace('/','_').replace( '@', '_').replace(':', '_') ) return os.path.join( cache, sane_url.netloc, sane_url.path) def real_relpath(dest, source='.'): import os.path real_dest = os.path.realpath(dest) real_source = os.path.realpath(source) return os.path.relpath(real_dest, real_source) from enum import Enum class AbeSubType(Enum): SUBMODULE = 1 BOOTLOADER = 2 BSP = 3 class AbeSubmodule(): subtype = AbeSubType.SUBMODULE last = True def __init__(self, sub_info, subtype=None, parent=None): if subtype != None: self.subtype = subtype self.path = sub_info[0] self.remote = sub_info[1] self.ref = sub_info[2] self.parent = parent if self.subtype == AbeSubType.BSP: self.remote = 'bsp/' + self.remote if self.subtype == AbeSubType.BOOTLOADER: self.remote = 'bootloader/' + self.remote def __repr__(self): return f"AbeSubmodule(['{self.path}','{self.remote}','{self.ref}'], {self.subtype}, {self.parent})" @property def url(self): from urllib.parse import urlparse if not self.parent: return urlparse(self.remote) surl = urlparse(self.remote) newurl = self.parent.url._replace(path=surl.path) if surl.netloc: newurl = newurl._replace(netloc=surl.netloc) if surl.scheme: newurl = newurl._replace(scheme=surl.scheme) return newurl @property def fullpath(self): if not self.parent: return self.path import os.path return os.path.normpath(os.path.join( self.parent.fullpath, self.path)) def clone_repo(self, cache_dir=None, fetch_origin=True): return clone_repo_cached( self, cache_dir=cache_dir, fetch_origin=fetch_origin) def get_abe_subtree(repo_dir, recurse_bsps=False, recurse_bootloaders=False): import itertools subfile_generators = [ get_abe_submodules(repo_dir), ] if recurse_bootloaders: subfile_generators.append(get_abe_bootloaders(repo_dir)) if recurse_bsps: subfile_generators.append(get_abe_bsps(repo_dir)) return itertools.chain(*subfile_generators) def get_abe_bsps(repo_dir): import os bspfile_path = os.path.join(repo_dir, '.abe', 'bsps') if os.path.isfile(bspfile_path): return parse_abe_subfile(bspfile_path, subtype=AbeSubType.BSP) return iter(()) def get_abe_bootloaders(repo_dir): import os bootloaderfile_path = os.path.join(repo_dir, '.abe', 'bootloaders') if os.path.isfile(bootloaderfile_path): return parse_abe_subfile(bootloaderfile_path, subtype=AbeSubType.BOOTLOADER) return iter(()) def get_abe_submodules(repo_dir): import os import itertools subfile_generators = [] subfile_path = os.path.join(repo_dir, '.abe', 'submodules') if os.path.isfile(subfile_path): return parse_abe_subfile(subfile_path, subtype=AbeSubType.SUBMODULE) return iter(()) def parse_abe_subfile(subfile_path, subtype=AbeSubType.SUBMODULE): with open(subfile_path) as subfile: for line in subfile.readlines(): strline = line.strip() if strline.startswith('#'): continue sline = strline.split() if len(sline) < 3: continue print('Submodule:', sline) if DEBUG>1 else None sub = AbeSubmodule(sline, subtype) yield sub if __name__=='__main__': main()