189 lines
9.0 KiB
Python
189 lines
9.0 KiB
Python
import logging
|
||
|
||
from django.core.management.base import BaseCommand
|
||
from django.db.models import Count, Min, Max
|
||
|
||
logger = logging.getLogger("merge_duplicate_ranchers")
|
||
handler = logging.StreamHandler()
|
||
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") # noqa
|
||
handler.setFormatter(formatter)
|
||
logger.addHandler(handler)
|
||
logger.setLevel(logging.INFO)
|
||
|
||
|
||
class Command(BaseCommand):
|
||
help = "Merge duplicate ranchers by national_code, reassign their herds and delete duplicates."
|
||
|
||
def add_arguments(self, parser):
|
||
parser.add_argument(
|
||
'--dry-run',
|
||
action='store_true',
|
||
help='Run without making DB changes (just report what would change).'
|
||
)
|
||
parser.add_argument(
|
||
'--apply',
|
||
action='store_true',
|
||
help='Apply changes to the database (perform updates and deletes).'
|
||
)
|
||
parser.add_argument(
|
||
'--keep-strategy',
|
||
choices=['min_id', 'max_id', 'latest_updated'],
|
||
default='min_id',
|
||
help='Which rancher record to keep among duplicates (default: min_id).'
|
||
)
|
||
parser.add_argument(
|
||
'--min-duplicates',
|
||
type=int,
|
||
default=2,
|
||
help='Only consider national_code groups with at least this many records (default=2).'
|
||
)
|
||
parser.add_argument(
|
||
'--batch-size',
|
||
type=int,
|
||
default=200,
|
||
help='Process this many duplicate groups per DB transaction (default=200).'
|
||
)
|
||
|
||
def handle(self, *args, **options):
|
||
dry_run = options['dry_run']
|
||
apply_changes = options['apply']
|
||
keep_strategy = options['keep_strategy']
|
||
min_duplicates = options['min_duplicates']
|
||
batch_size = options['batch_size']
|
||
|
||
if not dry_run and not apply_changes:
|
||
self.stdout.write(self.style.ERROR(
|
||
"Specify --dry-run to preview or --apply to actually perform changes."
|
||
))
|
||
return
|
||
|
||
self.stdout.write(self.style.NOTICE("Collecting duplicate ranchers by national_code..."))
|
||
|
||
# Import مدلها لوکالی تا جلوگیری از circular imports
|
||
from apps.herd.models import Rancher as RancherModel # adjust import path
|
||
from apps.herd.models import Herd as HerdModel # adjust import path
|
||
|
||
# 1) پیدا کردن national_code هایی که duplicate هستن
|
||
dup_qs = RancherModel.objects.values('national_code').annotate(
|
||
cnt=Count('id'),
|
||
min_id=Min('id'),
|
||
max_id=Max('id'),
|
||
).filter(cnt__gte=min_duplicates)
|
||
|
||
total_groups = dup_qs.count()
|
||
self.stdout.write(self.style.SUCCESS(f"Found {total_groups} duplicated national_code groups."))
|
||
|
||
groups = list(dup_qs)
|
||
processed = 0
|
||
errors = 0
|
||
|
||
def choose_keep_id(group):
|
||
if keep_strategy == 'min_id':
|
||
return group['min_id']
|
||
elif keep_strategy == 'max_id':
|
||
return group['max_id']
|
||
elif keep_strategy == 'latest_updated':
|
||
# we'll fetch it explicitly later (fallback to min_id)
|
||
return None
|
||
return group['min_id']
|
||
|
||
# پردازش گروهها به صورت batch
|
||
for i in range(0, len(groups), batch_size):
|
||
batch = groups[i:i + batch_size]
|
||
if dry_run:
|
||
self.stdout.write(
|
||
self.style.WARNING(f"Dry-run: processing batch {i // batch_size + 1} ({len(batch)} groups)"))
|
||
else:
|
||
self.stdout.write(self.style.WARNING(f"Applying batch {i // batch_size + 1} ({len(batch)} groups)"))
|
||
|
||
# هر batch رو توی یک تراکنش میکنیم
|
||
try:
|
||
with transaction.atomic():
|
||
for g in batch:
|
||
national_code = g['national_code']
|
||
self.stdout.write(f"-- handling national_code={national_code}")
|
||
|
||
# fetch all ranchers for this national_code
|
||
ranchers = list(RancherModel.objects.filter(national_code=national_code).order_by('id'))
|
||
if len(ranchers) < min_duplicates:
|
||
continue
|
||
|
||
# انتخاب رکورد نگهداری شده
|
||
keep_id = choose_keep_id(g)
|
||
if keep_id is None and keep_strategy == 'latest_updated':
|
||
# پیدا کردن رکورد با آخرین updated_at یا created_at
|
||
ordered = RancherModel.objects.filter(national_code=national_code).order_by('-updated_at',
|
||
'-id')
|
||
keep = ordered.first()
|
||
if not keep:
|
||
self.stderr.write(f"Couldn't determine keep record for {national_code}")
|
||
continue
|
||
keep_id = keep.id
|
||
else:
|
||
# اگه choose_keep_id برگشت None (نباید) از min_id استفاده میکنیم
|
||
if keep_id is None:
|
||
keep_id = g['min_id']
|
||
|
||
# آمادهسازی لیست ids برای حذف (به جز keep_id)
|
||
all_ids = [r.id for r in ranchers]
|
||
remove_ids = [rid for rid in all_ids if rid != keep_id]
|
||
|
||
self.stdout.write(f" keep_id={keep_id} remove_ids={remove_ids}")
|
||
|
||
# اگر dry-run، فقط گزارش میدیم
|
||
if dry_run:
|
||
# آماری از گلهها
|
||
herd_count_keep = HerdModel.objects.filter(rancher_id=keep_id).count()
|
||
herd_count_remove = HerdModel.objects.filter(rancher_id__in=remove_ids).count()
|
||
self.stdout.write(
|
||
f" [DRY] keep_has_herds={herd_count_keep} remove_has_herds={herd_count_remove}"
|
||
)
|
||
continue
|
||
|
||
# --- حالت apply: lock رکوردها و انجام تغییرات ---
|
||
# قفل رکورد(rancher)ها برای جلوگیری از race condition
|
||
RancherModel.objects.select_for_update().filter(id__in=all_ids)
|
||
|
||
# 1) انتقال herds از رکوردهای remove_ids به keep_id
|
||
updated = HerdModel.objects.filter(rancher_id__in=remove_ids).update(rancher_id=keep_id)
|
||
self.stdout.write(self.style.SUCCESS(f" moved {updated} herds to rancher {keep_id}"))
|
||
|
||
# 2) در صورت نیاز: اگر بخواهیم فیلدهای عددی در rancher را جمع کنیم
|
||
# مثال: total_weight و animal_count (در صورت وجود)
|
||
numeric_fields = []
|
||
for f in ['total_weight', 'animal_count', 'some_other_numeric_field']:
|
||
if hasattr(RancherModel, f):
|
||
numeric_fields.append(f)
|
||
|
||
if numeric_fields:
|
||
# Aggregate sums from all involved ranchers (including keep)
|
||
agg = RancherModel.objects.filter(id__in=all_ids).aggregate(
|
||
**{f: Sum(F(f)) for f in numeric_fields}
|
||
)
|
||
# prepare update dict
|
||
update_data = {f: agg[f] or 0 for f in numeric_fields}
|
||
|
||
# update keep record
|
||
RancherModel.objects.filter(id=keep_id).update(**update_data)
|
||
self.stdout.write(
|
||
self.style.SUCCESS(f" aggregated numeric fields on keep {keep_id}: {update_data}"))
|
||
|
||
# 3) حذف رکوردهای remove_ids
|
||
del_q = RancherModel.objects.filter(id__in=remove_ids)
|
||
count_del = del_q.count()
|
||
del_q.delete()
|
||
self.stdout.write(self.style.SUCCESS(f" deleted {count_del} duplicate rancher records"))
|
||
|
||
processed += 1
|
||
|
||
# پایان تراکنش برای batch
|
||
except Exception as e:
|
||
errors += 1
|
||
logger.exception(f"Error processing batch starting at index {i}: {e}")
|
||
# در حالت apply اگر خطا داشتیم تراکنش roll back شده و ادامه میدیم به batch بعدی
|
||
continue
|
||
|
||
self.stdout.write(self.style.SUCCESS(f"Done. Processed groups: {processed}, errors: {errors}"))
|
||
self.stdout.write(
|
||
self.style.NOTICE("IMPORTANT: If you ran with --apply, verify data integrity and related FK constraints."))
|