import - sync_livestock/herd_rancher_sync/purchase_policy & service_area in organization
This commit is contained in:
39
apps/herd/management/commands/link_ranchers_parallel.py
Normal file
39
apps/herd/management/commands/link_ranchers_parallel.py
Normal file
@@ -0,0 +1,39 @@
|
||||
from multiprocessing import cpu_count, Pool
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.db import connection
|
||||
|
||||
from apps.herd.models import Rancher
|
||||
from .link_ranchers_parallel_worker import process_city
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Parallel link ranchers to cooperative by city"
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
'--worker',
|
||||
type=int,
|
||||
default=cpu_count() // 2
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
workers = options['workers']
|
||||
|
||||
city_ids = (
|
||||
Rancher.objects.filter(city__isnull=False)
|
||||
.values_list('city_id', flat=True)
|
||||
.distinct()
|
||||
)
|
||||
|
||||
self.stdout.write(
|
||||
f"Starting parallel sync for {len(city_ids)} cities "
|
||||
f"with {workers} workers"
|
||||
)
|
||||
|
||||
connection.close()
|
||||
|
||||
with Pool(processes=workers) as pool:
|
||||
pool.map(process_city, city_ids)
|
||||
|
||||
self.stdout.write(self.style.SUCCESS("DONE ✅"))
|
||||
@@ -0,0 +1,55 @@
|
||||
from django.db import connection, transaction
|
||||
|
||||
from apps.herd.models import Organization, Rancher, RancherOrganizationLink
|
||||
|
||||
BATCH_SIZE = 2000
|
||||
|
||||
|
||||
def process_city(city_id):
|
||||
connection.close()
|
||||
|
||||
orgs = Organization.objects.filter(
|
||||
city_id=city_id,
|
||||
type__key='CO'
|
||||
)
|
||||
|
||||
if not orgs.exists() or orgs.count() > 1:
|
||||
return
|
||||
|
||||
coop = orgs.first()
|
||||
|
||||
ranchers = (
|
||||
Rancher.objects.filter(city_id=city_id)
|
||||
.only('id')
|
||||
)
|
||||
|
||||
buffer = []
|
||||
|
||||
for rancher in ranchers.iterator(chunk_size=BATCH_SIZE):
|
||||
|
||||
if RancherOrganizationLink.objects.filter(
|
||||
rancher_id=rancher.id
|
||||
).exists():
|
||||
continue
|
||||
|
||||
buffer.append(
|
||||
RancherOrganizationLink(
|
||||
rancher_id=rancher.id,
|
||||
organization_id=coop.id
|
||||
)
|
||||
)
|
||||
|
||||
if len(buffer) >= BATCH_SIZE:
|
||||
bulk_insert(buffer)
|
||||
buffer.clear()
|
||||
|
||||
if buffer:
|
||||
bulk_insert(buffer)
|
||||
|
||||
|
||||
@transaction.atomic
|
||||
def bulk_insert(objs):
|
||||
RancherOrganizationLink.objects.bulk_create(
|
||||
objs,
|
||||
ignore_conflicts=True
|
||||
)
|
||||
Reference in New Issue
Block a user