from collections import defaultdict from django.db.models import Sum, Count, Case, When, Q, Value from django.db.models.functions import Coalesce from apps.authentication.models import Organization, BankAccountInformation from apps.authentication.services.service import get_all_org_child from apps.core.services.filter.search import DynamicSearchService from apps.warehouse.models import InventoryQuotaSaleTransaction, InventoryQuotaSaleItem class TransactionDashboardService: @staticmethod def get_dashboard( org: Organization, free_visibility_tr_objects=None, free_visibility_tr_item_objects=None, start_date: str = None, end_date: str = None, status: str = None ): orgs_child = get_all_org_child(org=org) orgs_child.append(org) if org.type.key == 'ADM': transactions = InventoryQuotaSaleTransaction.objects.all() items = InventoryQuotaSaleItem.objects.all().select_related("gov_product", "free_product") else: if free_visibility_tr_objects: transactions = free_visibility_tr_objects items = InventoryQuotaSaleItem.objects.filter( transaction__in=transactions ).select_related("gov_product", "free_product") else: transactions = InventoryQuotaSaleTransaction.objects.filter( seller_organization__in=orgs_child ) items = InventoryQuotaSaleItem.objects.filter( transaction__seller_organization__in=orgs_child ).select_related("gov_product", "free_product") # filter queryset (transactions & items) by date if start_date and end_date: transactions = DynamicSearchService( queryset=transactions, start=start_date, end=end_date, date_field="create_date", ).apply() items = DynamicSearchService( queryset=items, start=start_date, end=end_date, date_field="create_date", ).apply() # filer by transaction status if status: transactions = transactions.filter(transaction_status=status) items = items.filter(transaction__transaction_status=status) transaction_stats = transactions.aggregate( total_transactions=Count("id"), success_transactions=Count("id", filter=Q(transaction_status="success")), failed_transactions=Count("id", filter=Q(transaction_status="failed")), waiting_transactions=Count("id", filter=Q(transaction_status="waiting")), total_amount=Coalesce( Sum( "price_paid", filter=Q(transaction_status='success' if not status else status) ), 0 ), unique_ranchers=Count("rancher", distinct=True), ) transaction_stats['total_weight'] = items.aggregate( total_weight=Coalesce(Sum( "weight", filter=Q(transaction__transaction_status="success" if not status else status) ), 0) )['total_weight'] products_stats = items.values( product_id=Case( When(gov_product__isnull=False, then="gov_product_id"), When(free_product__isnull=False, then="free_product_id"), ), product_name=Case( When(gov_product__isnull=False, then="gov_product__name"), When(free_product__isnull=False, then="free_product__product__name"), ), product_type=Case( When(gov_product__isnull=False, then=Value("gov")), When(free_product__isnull=False, then=Value("free")), ) ).annotate( total_sales=Count("id"), total_weight=Coalesce(Sum("weight"), 0), # total_price=Coalesce(Sum("total_price"), 0), # avg_unit_price=Coalesce(Sum("total_price") / Sum("weight"), 0), success_sales=Count("id", filter=Q(transaction__transaction_status="success")), failed_sales=Count("id", filter=Q(transaction__transaction_status="failed")), waiting_sales=Count("id", filter=Q(transaction__transaction_status="waiting")), card_payments=Count("id", filter=Q(transaction__price_type="card")), cash_payments=Count("id", filter=Q(transaction__price_type="cash")), check_payments=Count("id", filter=Q(transaction__price_type="check")), credit_payments=Count("id", filter=Q(transaction__price_type="credit")), extra_items=Count("id", filter=Q(is_extra=True)), pre_sale_items=Count("id", filter=Q(is_pre_sale=True)), ).order_by("-total_sales") # calculate sum of item share percentage by product items_by_product = defaultdict(list) bank_accounts = BankAccountInformation.objects.all() bank_accounts_dict = { f'{account.sheba}': account.organization.name for account in bank_accounts } for item in items: pid = item.gov_product_id or item.free_product_id items_by_product[pid].append(item) for product in products_stats: pid = product["product_id"] share_totals = defaultdict(lambda: {"total_price": 0, "count": 0}) for item in items_by_product.get(pid, []): if item.item_share: for share in item.item_share: # share: {"name": "", "price": "", ....} name = share.get("name") shaba = str(share.get("shaba", 0)) # noqa if shaba.startswith("IR"): shaba = shaba[2:] # noqa price = share.get("price", 0) # get name of org from their shaba if shaba in bank_accounts_dict.keys(): name = bank_accounts_dict[shaba] share_totals[name]["total_price"] += price share_totals[name]["count"] += 1 share_totals[name]["shaba"] = shaba # noqa share_totals = merge_by_shaba_single_name(share_totals) product["item_share_stats"] = sorted( [ {"name": name, **val} for name, val in share_totals.items() ], key=lambda x: -x["total_price"] ) # calculate total statistic of every broker share_totals = defaultdict(lambda: {"total_price": 0, "count": 0}) for product in products_stats: for item in product["item_share_stats"]: name = item["name"] share_totals[name]["total_price"] += item["total_price"] share_totals[name]["count"] += item["count"] dict_share_total = dict(share_totals) share_totals = [{"name": share, **dict_share_total[share]} for share in list(share_totals)] return { "transaction_summary": transaction_stats, "product_summary": list(products_stats), "brokers_sharing_summary": share_totals } def merge_by_shaba_single_name(data): # noqa grouped = defaultdict(lambda: {"name": None, "total_price": 0, "count": 0, "shaba": None}) # noqa for name, info in data.items(): # noqa shaba = info["shaba"] # noqa if grouped[shaba]["name"] is None: grouped[shaba]["name"] = name grouped[shaba]["total_price"] += info["total_price"] grouped[shaba]["count"] += info["count"] grouped[shaba]["shaba"] = shaba # noqa final_result = {} for shaba, g in grouped.items(): # noqa final_result[g["name"]] = { "total_price": g["total_price"], "count": g["count"], "shaba": g["shaba"], # noqa } return final_result