Files
RasadDam_Backend/apps/warehouse/services/transaction_dashboard_service.py

178 lines
7.0 KiB
Python

from collections import defaultdict
from django.db.models import Sum, Count, Case, When, Q, Value
from django.db.models.functions import Coalesce
from apps.authentication.models import Organization
from apps.authentication.services.service import get_all_org_child
from apps.core.services.filter.search import DynamicSearchService
from apps.warehouse.models import InventoryQuotaSaleTransaction, InventoryQuotaSaleItem
class TransactionDashboardService:
@staticmethod
def get_dashboard(org: Organization, start_date: str = None, end_date: str = None, status: str = None):
orgs_child = get_all_org_child(org=org)
orgs_child.append(org)
if org.type.key == 'ADM':
transactions = InventoryQuotaSaleTransaction.objects.all()
items = InventoryQuotaSaleItem.objects.all().select_related("gov_product", "free_product")
else:
transactions = InventoryQuotaSaleTransaction.objects.filter(
seller_organization__in=orgs_child
)
items = InventoryQuotaSaleItem.objects.filter(
transaction__seller_organization__in=orgs_child
).select_related("gov_product", "free_product")
# filter queryset (transactions & items) by date
if start_date and end_date:
transactions = DynamicSearchService(
queryset=transactions,
start=start_date,
end=end_date,
date_field="create_date",
).apply()
items = DynamicSearchService(
queryset=items,
start=start_date,
end=end_date,
date_field="create_date",
).apply()
# filer by transaction status
if status:
transactions = transactions.filter(transaction_status=status)
items = items.filter(transaction__transaction_status=status)
transaction_stats = transactions.aggregate(
total_transactions=Count("id"),
success_transactions=Count("id", filter=Q(transaction_status="success")),
failed_transactions=Count("id", filter=Q(transaction_status="failed")),
waiting_transactions=Count("id", filter=Q(transaction_status="waiting")),
total_amount=Coalesce(
Sum(
"price_paid", filter=Q(transaction_status='success' if not status else status)
), 0
),
unique_ranchers=Count("rancher", distinct=True),
)
transaction_stats['total_weight'] = items.aggregate(
total_weight=Coalesce(Sum(
"weight",
filter=Q(transaction__transaction_status="success" if not status else status)
), 0)
)['total_weight']
products_stats = items.values(
product_id=Case(
When(gov_product__isnull=False, then="gov_product_id"),
When(free_product__isnull=False, then="free_product_id"),
),
product_name=Case(
When(gov_product__isnull=False, then="gov_product__name"),
When(free_product__isnull=False, then="free_product__product__name"),
),
product_type=Case(
When(gov_product__isnull=False, then=Value("gov")),
When(free_product__isnull=False, then=Value("free")),
)
).annotate(
total_sales=Count("id"),
total_weight=Coalesce(Sum("weight"), 0),
# total_price=Coalesce(Sum("total_price"), 0),
# avg_unit_price=Coalesce(Sum("total_price") / Sum("weight"), 0),
success_sales=Count("id", filter=Q(transaction__transaction_status="success")),
failed_sales=Count("id", filter=Q(transaction__transaction_status="failed")),
waiting_sales=Count("id", filter=Q(transaction__transaction_status="waiting")),
card_payments=Count("id", filter=Q(transaction__price_type="card")),
cash_payments=Count("id", filter=Q(transaction__price_type="cash")),
check_payments=Count("id", filter=Q(transaction__price_type="check")),
credit_payments=Count("id", filter=Q(transaction__price_type="credit")),
extra_items=Count("id", filter=Q(is_extra=True)),
pre_sale_items=Count("id", filter=Q(is_pre_sale=True)),
).order_by("-total_sales")
# calculate sum of item share percentage by product
items_by_product = defaultdict(list)
for item in items:
pid = item.gov_product_id or item.free_product_id
items_by_product[pid].append(item)
for product in products_stats:
pid = product["product_id"]
share_totals = defaultdict(lambda: {"total_price": 0, "count": 0})
for item in items_by_product.get(pid, []):
if item.item_share:
for share in item.item_share:
# share: {"name": "", "price": "", ....}
name = share.get("name")
price = share.get("price", 0)
shaba = share.get("shaba", 0) # noqa
share_totals[name]["total_price"] += price
share_totals[name]["count"] += 1
share_totals[name]["shaba"] = shaba # noqa
share_totals = merge_by_shaba_single_name(share_totals)
product["item_share_stats"] = sorted(
[
{"name": name, **val}
for name, val in share_totals.items()
],
key=lambda x: -x["total_price"]
)
# calculate total statistic of every broker
share_totals = defaultdict(lambda: {"total_price": 0, "count": 0})
for product in products_stats:
for item in product["item_share_stats"]:
name = item["name"]
share_totals[name]["total_price"] += item["total_price"]
share_totals[name]["count"] += item["count"]
dict_share_total = dict(share_totals)
share_totals = [{"name": share, **dict_share_total[share]} for share in list(share_totals)]
return {
"transaction_summary": transaction_stats,
"product_summary": list(products_stats),
"brokers_sharing_summary": share_totals
}
def merge_by_shaba_single_name(data): # noqa
grouped = defaultdict(lambda: {"name": None, "total_price": 0, "count": 0, "shaba": None}) # noqa
for name, info in data.items(): # noqa
shaba = info["shaba"] # noqa
if grouped[shaba]["name"] is None:
grouped[shaba]["name"] = name
grouped[shaba]["total_price"] += info["total_price"]
grouped[shaba]["count"] += info["count"]
grouped[shaba]["shaba"] = shaba # noqa
final_result = {}
for shaba, g in grouped.items(): # noqa
final_result[g["name"]] = {
"total_price": g["total_price"],
"count": g["count"],
"shaba": g["shaba"], # noqa
}
return final_result