| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- #!/usr/bin/env python3
- #
- # A script to setup a Working tree from abe git submodules
- DEBUG = 1
- def main():
- global DEBUG
- p = parser()
- args = p.parse_args()
- DEBUG = args.debug
- DEBUG = DEBUG - (1 if args.quiet else 0)
- print(args) if DEBUG>1 else None
- if args.source_url and args.source_url.find('://') < 0 and args.source_url.find( ':' ) > 0:
- args.source_url = 'ssh://' + args.source_url.replace( ':', '/', 1)
- for ret in recurse_abe_submodules( args.output_dir, args.source_url, args.tag, clone_repo_cached(cache_dir=args.cache_dir)):
- pass
- def parser():
- import argparse
- p = argparse.ArgumentParser()
- p.add_argument('-d', '--debug', action='count', default=1)
- p.add_argument('-q', '--quiet', action='store_true')
- p.add_argument('-o', '--output_dir', default='.')
- p.add_argument('-c', '--cache_dir', default=None)
- p.add_argument('-t', '--tag', default='master')
- p.add_argument('-s', '--source_url', default=None)
- return p
- def recurse_abe_submodules(path, remote, ref=None, func=None):
- if func != None:
- yield func( path, remote, ref)
- subm = get_abe_submodules( path )
- import os
- import urllib.parse
- url = urllib.parse.urlparse(remote)
- for su in subm:
- newpath = os.path.normpath(os.path.join( path, su.subdir))
- newurl = url._replace(path=su.remote).geturl()
- print(f"Repo: \"{path}\" has Submodule: \"{newpath}\" Type: \"{su.subtype}\" From: \"{newurl}\"") if DEBUG else None
- for ret in recurse_abe_submodules( newpath, newurl, su.ref, func):
- yield ret
- def cache_path(cache, remote):
- import os.path
- for url in remote:
- return os.path.join( cache, str(url).replace('/','_').replace( '@', '_at_'))
- def real_relpath(dest, source='.'):
- import os.path
- real_dest = os.path.realpath(dest)
- real_source = os.path.realpath(source)
- return os.path.relpath(real_dest, real_source)
- from git import RemoteProgress
- class ProgressPrinter(RemoteProgress):
- def update(self,op_code,cur_count,max_count=None,message=''):
- cur_count_int = round(cur_count)
- max_count_int = round(max_count or 100)
- print(f"\033[2K{cur_count_int}/{max_count_int}", str(round(100*cur_count / (max_count or 100)))+'%', end="\r")
- def clone_repo_cached( cache_dir=None, bare=False):
- def clone_repo(path, remote, ref=None, cache_dir=cache_dir, bare=bare):
- print('Repo:', path) if DEBUG>1 else None
- from git import Repo
- repo = Repo.init(path, bare=bare)
- if 'origin' in repo.remotes:
- origin = repo.remotes['origin']
- else:
- origin = repo.create_remote('origin', remote)
- if cache_dir != None:
- cache_repo_path = cache_path(cache_dir, origin.urls)
- print(f"Cacheing from: {cache_repo_path}") if DEBUG else None
- clone_repo_cached(cache_dir=None, bare=True)( cache_repo_path, remote, ref)
- if 'cache' in repo.remotes:
- cache = repo.remotes['cache']
- else:
- cache = repo.create_remote('cache', real_relpath( cache_repo_path, path))
- cache.set_url("no_push" , '--push')
- cache.fetch(refspec="+refs/remotes/origin/*:refs/remotes/origin/*",progress=ProgressPrinter())
- print()
- else:
- origin.fetch(progress=ProgressPrinter())
- print()
- if not bare:
- print('Refs:', origin.refs) if DEBUG>1 else None
- print('Heads:', repo.heads) if DEBUG>1 else None
- if ref in repo.tags:
- tracking_branch_name = 'local_tag_branch/'+ref
- tracking_ref = repo.tags[ref]
- if tracking_branch_name in repo.heads:
- active_branch = repo.heads[tracking_branch_name]
- else:
- active_branch = repo.create_head('local_tag_branch/'+ref, tracking_ref)
- elif ref in repo.heads:
- tracking_ref = origin.refs[ref]
- active_branch = repo.heads[ref]
- active_branch.set_tracking_branch(tracking_ref)
- elif ref in origin.refs:
- tracking_ref = origin.refs[ref]
- active_branch = repo.create_head(ref, tracking_ref)
- active_branch.set_tracking_branch(tracking_ref)
- elif ref in origin.refs:
- tracking_ref = origin.refs[ref]
- active_branch = repo.create_head(ref, tracking_ref)
- active_branch.set_tracking_branch(tracking_ref)
- else:
- try:
- tracking_ref = ref
- tracking_branch_name = 'local_commit_branch/'+ref
- active_branch = repo.create_head(tracking_branch_name, tracking_ref)
- except Exception:
- raise Exception(f"Branch/Tag/Commit \"{ref}\" not found")
- print('Active Branch:', active_branch) if DEBUG else None
- print('Tracking Ref:', tracking_ref, '\n') if DEBUG else None
- active_branch.checkout()
- repo.head.reset( tracking_ref, index=True, working_tree=False)
- return repo
- return clone_repo
- from enum import Enum
- class AbeSubType(Enum):
- SUBMODULE = 1
- BOOTLOADER = 2
- BSP = 3
- class AbeSubmodule():
- subtype = AbeSubType.SUBMODULE
- def __init__(self, sub_info, subtype=None):
- if subtype != None:
- self.subtype = subtype
- self.subdir = sub_info[0]
- self.remote = sub_info[1]
- self.ref = sub_info[2]
- if self.subtype == AbeSubType.BSP:
- self.remote = 'bsp/' + self.remote
- if self.subtype == AbeSubType.BOOTLOADER:
- self.remote = 'bootloader/' + self.remote
- def __repr__(self):
- return f"AbeSubmodule(['{self.subdir}','{self.remote}','{self.ref}'], {self.subtype})"
- def get_abe_submodules(repo_dir):
- import os
- import itertools
- subfile_generators = []
- subfile_path = os.path.join(repo_dir, '.abe', 'submodules')
- if os.path.isfile(subfile_path):
- subfile_generators.append(parse_abe_subfile(subfile_path, subtype=AbeSubType.SUBMODULE))
- bspfile_path = os.path.join(repo_dir, '.abe', 'bsps')
- if os.path.isfile(bspfile_path):
- subfile_generators.append(parse_abe_subfile(bspfile_path, subtype=AbeSubType.BSP))
- bootloaderfile_path = os.path.join(repo_dir, '.abe', 'bootloaders')
- if os.path.isfile(bootloaderfile_path):
- subfile_generators.append(parse_abe_subfile(bootloaderfile_path, subtype=AbeSubType.BOOTLOADER))
- return itertools.chain(*subfile_generators)
-
- def parse_abe_subfile(subfile_path, subtype=AbeSubType.SUBMODULE):
- with open(subfile_path) as subfile:
- for line in subfile.readlines():
- strline = line.strip()
- if strline.startswith('#'):
- continue
- sline = strline.split()
- if len(sline) < 3:
- continue
- print('Submodule:', sline) if DEBUG>1 else None
- sub = AbeSubmodule(sline, subtype)
- yield sub
- if __name__=='__main__':
- main()
|