Browse Source

added multiprocessing

Tobias Simetsreiter 5 năm trước cách đây
mục cha
commit
ff33804b57
1 tập tin đã thay đổi với 62 bổ sung39 xóa
  1. 62 39
      gitmirror.py

+ 62 - 39
gitmirror.py

@@ -32,53 +32,69 @@ def repos_from_lines(lines):
         yield fields[2]
 
 
-async def run_parallel(target, args=[], procs=4, debug=False):
-    import asyncio
-    loop = asyncio.new_event_loop()
+def run_threaded(target, args=[], procs=4, debug=False):
+    import threading
+    import queue
+    inq = queue.Queue()
+    outq = queue.Queue()
     jobs = []
-    lock = asyncio.Lock()
-    async def addJobs(target, args):
-        for arg in args:
-            print("addJobs:", len(jobs))
-            while len(jobs) >= procs:
-                lock.release()
-                await asyncio.sleep(0.5)
-            await lock.acquire()
-            print("adding Job with arg:", arg)
-            jobs.append(loop.create_task(target(arg)))
-
-    jobmon = loop.create_task(addJobs(target, args))
-    while True:
-        while len(jobs)==0:
-            await lock.acquire()
-            if jobmon.done():
+    j = 0
+    i = 0
+    def worker(j):
+        import time
+        while True:
+            arg = inq.get(block=True)
+            if arg == None:
                 break
-        result = await jobs[0]
-        print("Job done with result:", result, flush=True)
-        jobs.pop(0)
-        lock.release()
-    print(jobmon)
-    await jobmon
+            j += 1
+            print(f"{COM}: mirror task ({j}/{i}): {source_repo(arg)} => {local_repo(arg)}")
+            outq.put(target(source_repo(arg), local_repo(arg)))
+    while len(jobs) < procs:
+        th = threading.Thread(target=worker, args=(j,))
+        th.start()
+        jobs.append(th)
+    i = 0
+    for arg in args:
+        i += 1
+        inq.put(arg)
+    for job in jobs:
+        inq.put(None)
+    for job in jobs:
+        job.join()
 
 
+def main():
+    repos = repos_from_lines(git_info(source_server))
+    from multiprocessing import Pool
+    pool = Pool(processes=4)
+    pool.map(pull_repo, repos)
 
-def main2():
-    import asyncio
-    lines = git_info(source_server)
-    repos = repos_from_lines(lines)
-    asyncio.run(run_parallel(mirror_repo, repos, debug=True))
 
-def main():
+def main3():
+    repos = repos_from_lines(git_info(source_server))
+    run_threaded(mirror_repo, repos, debug=True)
+
+def main2():
     lines = git_info(source_server)
     repos = repos_from_lines(lines)
     for repo in repos:
         mirror_repo(repo)
 
 
-def mirror_repo(repo):
-    print(f"{COM}: checking: {repo}")
-    source_repo = f"{source_server}:{repo}"
-    local_repo = f"{mirror_base}/{repo}"
+def source_repo(repo):
+    return f"{source_server}:{repo}"
+
+def local_repo(repo):
+    return f"{mirror_base}/{repo}.git"
+
+processed = 0
+def pull_repo(repo):
+    global processed
+    processed += 1
+    print(f"{COM}: mirror task {processed}: {source_repo(repo)} => {local_repo(repo)}")
+    return mirror_repo(source_repo(repo), local_repo(repo))
+
+def mirror_repo(source_repo,local_repo):
     if os.path.isdir(f"{local_repo}/refs"):
         update_local_repo(local_repo)
     else:
@@ -86,14 +102,21 @@ def mirror_repo(repo):
     return True
 
 def update_local_repo(local_repo):
-    print(f"{COM}: {local_repo} exists, updating...")
-    run(["git","fetch","-p","origin"], cwd=local_repo)
+    proc = run(["git","fetch","-p","origin"], cwd=local_repo)
+    if not proc.returncode == 0:
+        print(f"Error: {proc}, {local_repo}")
+        _rm(local_repo)
 
 def clone_repo(source_repo, local_repo):
-    print(f"{COM}: Cloning {source_repo} => {local_repo}")
     run(["rm","-rf",local_repo])
     run(["mkdir","-p",local_repo])
-    run(["git","clone","--bare",source_repo,"."], cwd=local_repo)
+    proc = run(["git","clone","--bare",source_repo,"."], cwd=local_repo)
+    if not proc.returncode == 0:
+        print(f"Error: {proc}, {local_repo}")
+        _rm(local_repo)
+
+def _rm(di):
+    run(["rm","-rf",local_repo])
 
 if __name__ == "__main__":
     main()