From 3518bc94b155ba0234aa7173d29f14c41cda7c2f Mon Sep 17 00:00:00 2001 From: Mojtaba-z Date: Wed, 31 Dec 2025 16:30:33 +0330 Subject: [PATCH] import - non parallel version of link rancher to org --- .../commands/link_ranchers_on_org.py | 96 +++++++++++++++++++ .../commands/link_ranchers_parallel.py | 22 +++++ 2 files changed, 118 insertions(+) create mode 100644 apps/herd/management/commands/link_ranchers_on_org.py diff --git a/apps/herd/management/commands/link_ranchers_on_org.py b/apps/herd/management/commands/link_ranchers_on_org.py new file mode 100644 index 0000000..f1ecb24 --- /dev/null +++ b/apps/herd/management/commands/link_ranchers_on_org.py @@ -0,0 +1,96 @@ +import time + +from django.core.management.base import BaseCommand +from django.db import transaction + +from apps.authentication.models import Organization +from apps.herd.models import Rancher, RancherOrganizationLink + + +class Command(BaseCommand): + help = "Link ranchers to cooperative of their city" + + BATCH_SIZE = 2000 + + def handle(self, *args, **options): + + qs = ( + Rancher.objects + .filter(city__isnull=False) + .select_related('city') + ) + + total = qs.count() + processed = 0 + created = 0 + skipped = 0 + ambiguous = 0 + + start = time.time() + + buffer = [] + + for rancher in qs.iterator(chunk_size=self.BATCH_SIZE): + + processed += 1 + + # اگر قبلاً لینک داره + if RancherOrganizationLink.objects.filter(rancher=rancher).exists(): + skipped += 1 + continue + + orgs = Organization.objects.filter( + city=rancher.city, + type__key='CO' # ⚠️ کد type تعاونی خودت + ) + + if not orgs.exists(): + skipped += 1 + continue + + if orgs.count() > 1: + ambiguous += 1 + continue + + buffer.append( + RancherOrganizationLink( + rancher=rancher, + organization=orgs.first() + ) + ) + print(len(buffer)) + if len(buffer) >= self.BATCH_SIZE: + created += self.bulk_create(buffer) + buffer.clear() + + # progress log + if processed % 10_000 == 0: + elapsed = time.time() - start + speed = processed / elapsed if elapsed else 0 + + self.stdout.write( + self.style.NOTICE( + f"[{processed:,}/{total:,}] " + f"created={created:,} " + f"skipped={skipped:,} " + f"ambiguous={ambiguous:,} " + f"{speed:.0f} r/s" + ) + ) + + if buffer: + created += self.bulk_create(buffer) + + self.stdout.write( + self.style.SUCCESS( + f"DONE ✅ created={created}, skipped={skipped}, ambiguous={ambiguous}" + ) + ) + + @transaction.atomic + def bulk_create(self, objs): + RancherOrganizationLink.objects.bulk_create( + objs, + ignore_conflicts=True + ) + return len(objs) diff --git a/apps/herd/management/commands/link_ranchers_parallel.py b/apps/herd/management/commands/link_ranchers_parallel.py index 5acb33d..d0ff9b5 100644 --- a/apps/herd/management/commands/link_ranchers_parallel.py +++ b/apps/herd/management/commands/link_ranchers_parallel.py @@ -1,3 +1,4 @@ +import os from multiprocessing import cpu_count, Pool from django.core.management.base import BaseCommand @@ -5,6 +6,19 @@ from django.db import connection from .link_ranchers_parallel_worker import process_city +os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" + + +def run_worker(chunk): + import os + import django + + os.environ.setdefault( + "DJANGO_SETTINGS_MODULE", + "Rasaddam_Backend.settings" + ) + django.setup() + class Command(BaseCommand): help = "Parallel link ranchers to cooperative by city" @@ -20,6 +34,14 @@ class Command(BaseCommand): from apps.herd.models import Rancher workers = options['worker'] + from multiprocessing import get_context + + chunks = self.prepare_chunks() + + ctx = get_context("spawn") + with ctx.Pool(processes=options["workers"]) as pool: + pool.map(run_worker, chunks) + city_ids = ( Rancher.objects.filter(city__isnull=False) .values_list('city_id', flat=True)