import - whole system of syncing ranchers/herds/livestocks
This commit is contained in:
70
apps/herd/management/commands/sync_herd_rancher.py
Normal file
70
apps/herd/management/commands/sync_herd_rancher.py
Normal file
@@ -0,0 +1,70 @@
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.db import transaction
|
||||
|
||||
from apps.herd.models import (
|
||||
HerdRancherTemporary,
|
||||
)
|
||||
from apps.herd.services.herd_rancher_sync import HerdRancherSyncService
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Bulk Sync Rancher & Herd from HerdRancherTemporary"
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
'--limit',
|
||||
type=int,
|
||||
help='limit number of records'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--dry-run',
|
||||
action='store_true',
|
||||
help='run without saving data'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--batch-size',
|
||||
type=int,
|
||||
default=1000,
|
||||
help='batch size'
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
limit = options['limit']
|
||||
dry_run = options['dry_run']
|
||||
batch_size = options['batch_size']
|
||||
|
||||
qs = HerdRancherTemporary.objects.all()
|
||||
|
||||
if limit:
|
||||
qs = qs[:limit]
|
||||
|
||||
total = qs.count()
|
||||
|
||||
self.stdout.write(
|
||||
self.style.NOTICE(f"Start bulk syncing {total} records")
|
||||
)
|
||||
|
||||
if dry_run:
|
||||
self.stdout.write(
|
||||
self.style.WARNING("Running in DRY-RUN mode (no DB writes)")
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
with transaction.atomic():
|
||||
HerdRancherSyncService.bulk_sync(
|
||||
queryset=qs,
|
||||
batch_size=batch_size
|
||||
)
|
||||
|
||||
qs.update(sync_status='S')
|
||||
|
||||
except Exception as e:
|
||||
qs.update(sync_status='F')
|
||||
raise e
|
||||
|
||||
self.stdout.write(
|
||||
self.style.SUCCESS(
|
||||
f"Done. synced={total}"
|
||||
)
|
||||
)
|
||||
@@ -0,0 +1,18 @@
|
||||
# Generated by Django 5.0 on 2025-12-28 05:48
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('herd', '0024_alter_herdranchertemporary_agent_code_and_more'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='herdranchertemporary',
|
||||
name='sync_status',
|
||||
field=models.CharField(max_length=50, null=True),
|
||||
),
|
||||
]
|
||||
18
apps/herd/migrations/0026_herdranchertemporary_city.py
Normal file
18
apps/herd/migrations/0026_herdranchertemporary_city.py
Normal file
@@ -0,0 +1,18 @@
|
||||
# Generated by Django 5.0 on 2025-12-28 06:18
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('herd', '0025_herdranchertemporary_sync_status'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='herdranchertemporary',
|
||||
name='city',
|
||||
field=models.CharField(max_length=150, null=True),
|
||||
),
|
||||
]
|
||||
@@ -0,0 +1,23 @@
|
||||
# Generated by Django 5.0 on 2025-12-28 06:54
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('herd', '0026_herdranchertemporary_city'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name='herd',
|
||||
name='name',
|
||||
field=models.CharField(max_length=200),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='herd',
|
||||
name='photo',
|
||||
field=models.CharField(max_length=200, null=True),
|
||||
),
|
||||
]
|
||||
@@ -24,8 +24,8 @@ class Herd(BaseModel):
|
||||
related_name='herd',
|
||||
null=True
|
||||
)
|
||||
name = models.CharField(max_length=50)
|
||||
photo = models.CharField(max_length=50, null=True)
|
||||
name = models.CharField(max_length=200)
|
||||
photo = models.CharField(max_length=200, null=True)
|
||||
code = models.CharField(max_length=20)
|
||||
heavy_livestock_number = models.BigIntegerField(default=0)
|
||||
light_livestock_number = models.BigIntegerField(default=0)
|
||||
@@ -182,6 +182,8 @@ class HerdRancherTemporary(BaseModel):
|
||||
mobile = models.CharField(max_length=150, null=True)
|
||||
agent_code = models.CharField(max_length=150, null=True)
|
||||
registerer_user = models.CharField(max_length=150, null=True)
|
||||
sync_status = models.CharField(max_length=50, null=True)
|
||||
city = models.CharField(max_length=150, null=True)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
return super(HerdRancherTemporary, self).save(*args, **kwargs)
|
||||
|
||||
132
apps/herd/services/herd_rancher_sync.py
Normal file
132
apps/herd/services/herd_rancher_sync.py
Normal file
@@ -0,0 +1,132 @@
|
||||
from django.db import transaction
|
||||
|
||||
from apps.authentication.models import City
|
||||
from apps.herd.models import Rancher, Herd
|
||||
|
||||
|
||||
class HerdRancherSyncService:
|
||||
|
||||
@classmethod
|
||||
def bulk_sync(cls, queryset, batch_size=1000):
|
||||
"""
|
||||
optimized bulk sync for large datasets
|
||||
"""
|
||||
|
||||
# -------------------------
|
||||
# Cache Cities
|
||||
# -------------------------
|
||||
city_map = {
|
||||
name.strip(): id
|
||||
for id, name in City.objects.all().values_list('id', 'name')
|
||||
}
|
||||
|
||||
# -------------------------
|
||||
# Cache existing ranchers
|
||||
# -------------------------
|
||||
rancher_map = {
|
||||
r.national_code: r
|
||||
for r in Rancher.objects.filter(
|
||||
national_code__in=queryset.values_list(
|
||||
'rancher_national_code', flat=True
|
||||
)
|
||||
).only('id', 'national_code')
|
||||
}
|
||||
|
||||
new_ranchers = []
|
||||
new_herds = []
|
||||
|
||||
existing_herds = set(
|
||||
Herd.objects.filter(
|
||||
rancher__national_code__in=queryset.values_list(
|
||||
'rancher_national_code', flat=True
|
||||
)
|
||||
).values_list(
|
||||
'rancher__national_code', 'code'
|
||||
)
|
||||
)
|
||||
|
||||
seen_in_batch = set()
|
||||
|
||||
for temp in queryset.iterator(chunk_size=batch_size):
|
||||
|
||||
# -------------------------
|
||||
# Rancher
|
||||
# -------------------------
|
||||
rancher = rancher_map.get(temp.rancher_national_code)
|
||||
|
||||
if not rancher:
|
||||
rancher = Rancher(
|
||||
first_name=temp.rancher_name,
|
||||
mobile=temp.mobile,
|
||||
national_code=temp.rancher_national_code,
|
||||
rancher_type='N',
|
||||
city_id=city_map.get(temp.city.strip()),
|
||||
province_id=30
|
||||
)
|
||||
new_ranchers.append(rancher)
|
||||
rancher_map[temp.rancher_national_code] = rancher
|
||||
|
||||
# -------------------------
|
||||
# Herd
|
||||
# -------------------------
|
||||
herd_key = (temp.rancher_national_code, temp.herd_code)
|
||||
|
||||
if herd_key in existing_herds:
|
||||
continue
|
||||
|
||||
if herd_key in seen_in_batch:
|
||||
continue
|
||||
|
||||
seen_in_batch.add(herd_key)
|
||||
|
||||
new_herds.append({
|
||||
"rancher_code": temp.rancher_national_code,
|
||||
"herd": Herd(
|
||||
name=temp.herd_name,
|
||||
code=temp.herd_code,
|
||||
epidemiologic=temp.epidemiologic,
|
||||
latitude=temp.latitude,
|
||||
longitude=temp.longitude,
|
||||
postal=temp.postal_code,
|
||||
unit_unique_id=temp.unit_unique_id,
|
||||
city_id=city_map.get(temp.city.strip()),
|
||||
province_id=30
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
# -------------------------
|
||||
# Bulk DB Operations
|
||||
# -------------------------
|
||||
with transaction.atomic():
|
||||
Rancher.objects.bulk_create(
|
||||
new_ranchers,
|
||||
ignore_conflicts=True,
|
||||
batch_size=batch_size
|
||||
)
|
||||
|
||||
# refresh ranchers with ids
|
||||
rancher_map = {
|
||||
r.national_code: r
|
||||
for r in Rancher.objects.filter(
|
||||
national_code__in=rancher_map.keys()
|
||||
)
|
||||
}
|
||||
|
||||
final_herds = []
|
||||
|
||||
for item in new_herds:
|
||||
rancher = rancher_map.get(item["rancher_code"])
|
||||
|
||||
if not rancher:
|
||||
continue # یا raise error
|
||||
|
||||
herd = item["herd"]
|
||||
herd.rancher = rancher
|
||||
final_herds.append(herd)
|
||||
|
||||
Herd.objects.bulk_create(
|
||||
final_herds,
|
||||
ignore_conflicts=True,
|
||||
batch_size=batch_size
|
||||
)
|
||||
173
apps/tag/management/commands/sync_livestock.py
Normal file
173
apps/tag/management/commands/sync_livestock.py
Normal file
@@ -0,0 +1,173 @@
|
||||
import time
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.db import transaction
|
||||
|
||||
from apps.herd.models import Herd
|
||||
from apps.livestock.models import LiveStock, LiveStockType
|
||||
from apps.tag.models import Tag, TemporaryTags
|
||||
from common.generics import parse_birthdate
|
||||
|
||||
BATCH_SIZE = 5000
|
||||
CHUNK_SIZE = 10000
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Create Livestock if missing and assign Tag if missing"
|
||||
|
||||
def handle(self, *args, **options):
|
||||
|
||||
self.stdout.write(
|
||||
self.style.NOTICE(f"Start bulk syncing .... records")
|
||||
)
|
||||
|
||||
qs = (
|
||||
TemporaryTags.objects
|
||||
.filter(sync_status__isnull=True)
|
||||
.only('herd_code', 'birthdate', 'gender', 'tag')
|
||||
)
|
||||
|
||||
total = qs.count()
|
||||
processed = 0
|
||||
start_time = time.time()
|
||||
|
||||
LOG_EVERY = 10_000 # هر چند رکورد لاگ بده
|
||||
|
||||
buffer = []
|
||||
for temp in qs.iterator(chunk_size=CHUNK_SIZE):
|
||||
buffer.append(temp)
|
||||
|
||||
if len(buffer) >= BATCH_SIZE:
|
||||
self.process_batch(buffer)
|
||||
processed += len(buffer)
|
||||
buffer.clear()
|
||||
|
||||
if processed % LOG_EVERY == 0:
|
||||
elapsed = time.time() - start_time
|
||||
speed = processed / elapsed if elapsed else 0
|
||||
percent = (processed / total) * 100
|
||||
remaining = total - processed
|
||||
eta = remaining / speed if speed else 0
|
||||
|
||||
self.stdout.write(
|
||||
self.style.NOTICE(
|
||||
f"[SYNC] {processed:,}/{total:,} "
|
||||
f"({percent:.2f}%) | "
|
||||
f"{speed:.0f} rec/s | "
|
||||
f"ETA: {eta / 60:.1f} min"
|
||||
)
|
||||
)
|
||||
|
||||
if buffer:
|
||||
self.process_batch(buffer)
|
||||
|
||||
self.stdout.write(self.style.SUCCESS("DONE ✅"))
|
||||
|
||||
# ----------------------------------------------------
|
||||
|
||||
def process_batch(self, temps):
|
||||
herd_codes = {t.herd_code for t in temps if t.herd_code}
|
||||
|
||||
herds = {
|
||||
h.code: h
|
||||
for h in Herd.objects.filter(code__in=herd_codes)
|
||||
}
|
||||
|
||||
livestocks = LiveStock.objects.filter(
|
||||
herd__code__in=herd_codes
|
||||
).select_related('herd').only(
|
||||
'id', 'herd_id', 'birthdate', 'gender', 'tag'
|
||||
)
|
||||
|
||||
livestock_map = {
|
||||
(ls.herd.code, ls.birthdate, ls.gender): ls
|
||||
for ls in livestocks
|
||||
}
|
||||
|
||||
livestock_types = {
|
||||
stock_type.name: stock_type
|
||||
for stock_type in LiveStockType.objects.all()
|
||||
}
|
||||
|
||||
existing_tags = {
|
||||
t.tag_code: t
|
||||
for t in Tag.objects.filter(
|
||||
tag_code__in=[t.tag for t in temps if t.tag]
|
||||
)
|
||||
}
|
||||
|
||||
new_livestock = []
|
||||
updated_livestock = []
|
||||
new_tags = []
|
||||
|
||||
for temp in temps:
|
||||
herd = herds.get(temp.herd_code)
|
||||
if not herd:
|
||||
continue # گله باید وجود داشته باشد
|
||||
|
||||
birthdate = parse_birthdate(temp.birthdate)
|
||||
gender = 1 if temp.gender == 'M' else 2
|
||||
livestock_type = livestock_types.get(temp.type)
|
||||
weight_type = livestock_type.weight_type
|
||||
|
||||
key = (temp.herd_code, birthdate, gender)
|
||||
livestock = livestock_map.get(key)
|
||||
|
||||
# ---------- دام وجود ندارد ----------
|
||||
if not livestock:
|
||||
if not temp.tag:
|
||||
continue
|
||||
|
||||
tag = existing_tags.get(temp.tag)
|
||||
|
||||
if not tag:
|
||||
tag = Tag(tag_code=temp.tag, status='A')
|
||||
new_tags.append(tag)
|
||||
existing_tags[temp.tag] = tag
|
||||
|
||||
livestock = LiveStock(
|
||||
herd=herd,
|
||||
birthdate=birthdate,
|
||||
gender=gender,
|
||||
tag=tag,
|
||||
weight_type=weight_type,
|
||||
type=livestock_type
|
||||
)
|
||||
new_livestock.append(livestock)
|
||||
livestock_map[key] = livestock
|
||||
|
||||
temp.sync_status = 'S'
|
||||
continue
|
||||
|
||||
# ---------- دام وجود دارد ولی پلاک ندارد ----------
|
||||
if livestock.tag is None and temp.tag:
|
||||
tag = existing_tags.get(temp.tag)
|
||||
|
||||
if not tag:
|
||||
tag = Tag(tag_code=temp.tag, status='A')
|
||||
new_tags.append(tag)
|
||||
existing_tags[temp.tag] = tag
|
||||
|
||||
livestock.tag = tag
|
||||
updated_livestock.append(livestock)
|
||||
|
||||
temp.sync_status = 'S'
|
||||
|
||||
# ---------- BULK ----------
|
||||
with transaction.atomic():
|
||||
Tag.objects.bulk_create(new_tags, batch_size=BATCH_SIZE)
|
||||
LiveStock.objects.bulk_create(
|
||||
new_livestock,
|
||||
batch_size=BATCH_SIZE,
|
||||
ignore_conflicts=True
|
||||
)
|
||||
LiveStock.objects.bulk_update(
|
||||
updated_livestock,
|
||||
['tag'],
|
||||
batch_size=BATCH_SIZE
|
||||
)
|
||||
TemporaryTags.objects.bulk_update(
|
||||
temps,
|
||||
['sync_status'],
|
||||
batch_size=BATCH_SIZE
|
||||
)
|
||||
18
apps/tag/migrations/0026_temporarytags_sync_status.py
Normal file
18
apps/tag/migrations/0026_temporarytags_sync_status.py
Normal file
@@ -0,0 +1,18 @@
|
||||
# Generated by Django 5.0 on 2025-12-28 10:50
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('tag', '0025_temporarytags_agriculture_unique_id_and_more'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='temporarytags',
|
||||
name='sync_status',
|
||||
field=models.CharField(max_length=150, null=True),
|
||||
),
|
||||
]
|
||||
@@ -135,6 +135,7 @@ class TemporaryTags(BaseModel):
|
||||
agriculture_unique_id = models.CharField(max_length=150, null=True)
|
||||
keeper_agent = models.CharField(max_length=150, null=True)
|
||||
registerer_user = models.CharField(max_length=150, null=True)
|
||||
sync_status = models.CharField(max_length=150, null=True)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
super(TemporaryTags, self).save(*args, **kwargs)
|
||||
|
||||
105
apps/tag/services/temporary_tags_sync.py
Normal file
105
apps/tag/services/temporary_tags_sync.py
Normal file
@@ -0,0 +1,105 @@
|
||||
from apps.tags.models import Tag
|
||||
from django.db import transaction
|
||||
|
||||
from apps.herd.models import Herd
|
||||
from apps.livestock.models import LiveStock, LiveStockType
|
||||
from apps.tag.models import TemporaryTags
|
||||
|
||||
BATCH_SIZE = 1000
|
||||
|
||||
|
||||
class TemporaryTagsSyncService:
|
||||
|
||||
@classmethod
|
||||
@transaction.atomic
|
||||
def sync_tags_to_livestock(cls):
|
||||
# Preload herds
|
||||
herds = {h.code: h for h in Herd.objects.all()}
|
||||
|
||||
# Preload existing LiveStocks for speed
|
||||
live_stocks_map = {}
|
||||
for ls in LiveStock.objects.select_related('herd', 'type').all():
|
||||
key = (ls.herd.code, ls.birthdate.date() if ls.birthdate else None, ls.gender)
|
||||
live_stocks_map.setdefault(key, []).append(ls)
|
||||
|
||||
# Preload LiveStockType
|
||||
gender_type_map = {t.name.lower(): t for t in LiveStockType.objects.all()}
|
||||
|
||||
# Preload existing tags
|
||||
existing_tags = {t.tag_code: t for t in Tag.objects.all()}
|
||||
|
||||
new_herds = []
|
||||
new_livestock = []
|
||||
new_tags = []
|
||||
|
||||
for temp in TemporaryTags.objects.iterator():
|
||||
herd = herds.get(temp.herd_code)
|
||||
if not herd:
|
||||
herd = Herd(code=temp.herd_code, name=temp.herd_code, province_id=30)
|
||||
new_herds.append(herd)
|
||||
herds[temp.herd_code] = herd
|
||||
|
||||
# Gender mapping
|
||||
gender_lower = temp.gender.lower() if temp.gender else 'male'
|
||||
ls_type = gender_type_map.get(gender_lower)
|
||||
|
||||
birthdate = temp.birthdate # convert to datetime if needed
|
||||
|
||||
key = (herd.code, birthdate, ls_type.id if ls_type else None)
|
||||
live_stock_list = live_stocks_map.get(key, [])
|
||||
|
||||
if live_stock_list:
|
||||
# دام موجود است، فقط Tag ایجاد میکنیم
|
||||
ls = live_stock_list[0] # میتوانیم چندتا هم داشته باشیم
|
||||
else:
|
||||
# دام جدید ایجاد میکنیم
|
||||
ls = LiveStock(
|
||||
herd=herd,
|
||||
type=ls_type,
|
||||
gender=1 if gender_lower == 'male' else 2,
|
||||
birthdate=birthdate
|
||||
)
|
||||
new_livestock.append(ls)
|
||||
live_stocks_map[key] = [ls]
|
||||
|
||||
# ----------------------
|
||||
# Tag برای هر دام
|
||||
# ----------------------
|
||||
if temp.serial: # یا temp.tag_code
|
||||
tag_code = f"{temp.country_code}{temp.static_code}{temp.ownership_code}{temp.species_code}{temp.serial}"
|
||||
if tag_code not in existing_tags:
|
||||
tag = Tag(
|
||||
serial=temp.serial,
|
||||
country_code=getattr(temp, 'country_code', 364),
|
||||
static_code=getattr(temp, 'static_code', 0),
|
||||
ownership_code=getattr(temp, 'ownership_code', 0),
|
||||
species_code=getattr(temp, 'species_code', 0),
|
||||
tag_code=tag_code,
|
||||
status='F'
|
||||
)
|
||||
new_tags.append(tag)
|
||||
existing_tags[tag_code] = tag
|
||||
else:
|
||||
tag = existing_tags[tag_code]
|
||||
|
||||
# Assign Tag to LiveStock
|
||||
ls.tag = tag
|
||||
|
||||
# Batch insert
|
||||
if len(new_livestock) >= BATCH_SIZE:
|
||||
cls._bulk_insert(new_herds, new_livestock, new_tags)
|
||||
new_herds.clear()
|
||||
new_livestock.clear()
|
||||
new_tags.clear()
|
||||
|
||||
# insert remaining
|
||||
cls._bulk_insert(new_herds, new_livestock, new_tags)
|
||||
|
||||
@staticmethod
|
||||
def _bulk_insert(herds, livestock, tags):
|
||||
if herds:
|
||||
Herd.objects.bulk_create(herds, ignore_conflicts=True, batch_size=BATCH_SIZE)
|
||||
if tags:
|
||||
Tag.objects.bulk_create(tags, ignore_conflicts=True, batch_size=BATCH_SIZE)
|
||||
if livestock:
|
||||
LiveStock.objects.bulk_create(livestock, ignore_conflicts=True, batch_size=BATCH_SIZE)
|
||||
Reference in New Issue
Block a user