Browse Source

threaded parallel

Tobias Simetsreiter 5 năm trước cách đây
mục cha
commit
553b3b2c81
1 tập tin đã thay đổi với 48 bổ sung38 xóa
  1. 48 38
      gitmirror.py

+ 48 - 38
gitmirror.py

@@ -12,6 +12,12 @@ mirror_base=f"{local_base}/{source_server}"
 COM=sys.argv[0]
 
 
+def main4():
+    repos = repos_from_lines(git_info(source_server))
+    from multiprocessing import Pool
+    pool = Pool(processes=4)
+    pool.map(pull_repo, repos)
+
 
 def git_info(source_server):
     proc = Popen(
@@ -19,8 +25,10 @@ def git_info(source_server):
         stdout=PIPE,
         stderr=PIPE,
         encoding="utf-8")
-    for line in proc.stdout.readlines():
+    for line in iter(proc.stdout.readline, ""):
         yield line
+    proc.wait()
+    print("GIT_INFO DONE!!!!!!!!!!!!!!!")
 
 def repos_from_lines(lines):
     for line in lines:
@@ -38,41 +46,47 @@ def run_threaded(target, args=[], procs=4, debug=False):
     inq = queue.Queue()
     outq = queue.Queue()
     jobs = []
-    j = 0
-    i = 0
-    def worker(j):
+    def worker():
         import time
         while True:
-            arg = inq.get(block=True)
-            if arg == None:
+            num, arg = inq.get(block=True)
+            if num == None:
                 break
-            j += 1
-            print(f"{COM}: mirror task ({j}/{i}): {source_repo(arg)} => {local_repo(arg)}")
-            outq.put(target(source_repo(arg), local_repo(arg)))
+            print(f"{COM}: threaded task ({num}/{num+inq.qsize()}): {arg}")
+            result = target(arg)
+            outq.put(result)
     while len(jobs) < procs:
-        th = threading.Thread(target=worker, args=(j,))
+        th = threading.Thread(target=worker)
         th.start()
         jobs.append(th)
-    i = 0
-    for arg in args:
-        i += 1
-        inq.put(arg)
-    for job in jobs:
-        inq.put(None)
+    def filler():
+        i = 0
+        for arg in args:
+            i += 1
+            inq.put((i,arg))
+        for job in jobs:
+            inq.put((None,None))
+    fillThread = threading.Thread(target=filler)
+    fillThread.start()
+    while fillThread.is_alive():
+        yield outq.get(block=True)
+    fillThread.join()
     for job in jobs:
         job.join()
+    while not outq.empty():
+        yield outq.get(block=True)
 
+def print_sleep(args):
+    import time
+    print("Job Started: ",args)
+    time.sleep(3)
+    print("Job Done!")
+    return args
 
 def main():
     repos = repos_from_lines(git_info(source_server))
-    from multiprocessing import Pool
-    pool = Pool(processes=4)
-    pool.map(pull_repo, repos)
-
-
-def main3():
-    repos = repos_from_lines(git_info(source_server))
-    run_threaded(mirror_repo, repos, debug=True)
+    for output in run_threaded(pull_repo, repos, debug=True):
+        pass
 
 def main2():
     lines = git_info(source_server)
@@ -87,18 +101,13 @@ def source_repo(repo):
 def local_repo(repo):
     return f"{mirror_base}/{repo}.git"
 
-processed = 0
 def pull_repo(repo):
-    global processed
-    processed += 1
-    print(f"{COM}: mirror task {processed}: {source_repo(repo)} => {local_repo(repo)}")
     return mirror_repo(source_repo(repo), local_repo(repo))
 
 def mirror_repo(source_repo,local_repo):
-    if os.path.isdir(f"{local_repo}/refs"):
-        update_local_repo(local_repo)
-    else:
-        clone_repo(source_repo, local_repo)
+    if not os.path.isdir(f"{local_repo}/refs"):
+        init_repo(source_repo, local_repo)
+    update_local_repo(local_repo)
     return True
 
 def update_local_repo(local_repo):
@@ -107,16 +116,17 @@ def update_local_repo(local_repo):
         print(f"Error: {proc}, {local_repo}")
         _rm(local_repo)
 
-def clone_repo(source_repo, local_repo):
-    run(["rm","-rf",local_repo])
+def init_repo(source_repo, local_repo):
+    _rm(local_repo)
     run(["mkdir","-p",local_repo])
-    proc = run(["git","clone","--bare",source_repo,"."], cwd=local_repo)
-    if not proc.returncode == 0:
-        print(f"Error: {proc}, {local_repo}")
+    proc1 = run(["git","init","--bare"], cwd=local_repo)
+    proc2 = run(["git","remote","add","origin",source_repo], cwd=local_repo)
+    if not proc1.returncode == 0 and proc2.returncode == 0:
+        print(f"Error: {proc1}, {proc2}, {local_repo}")
         _rm(local_repo)
 
 def _rm(di):
-    run(["rm","-rf",local_repo])
+    run(["rm","-rf", di])
 
 if __name__ == "__main__":
     main()