Files
RasadDam_Backend/apps/warehouse/services/transaction_dashboard_service.py

189 lines
7.5 KiB
Python

from collections import defaultdict
from django.db.models import Sum, Count, Case, When, Q, Value
from django.db.models.functions import Coalesce
from apps.authentication.models import Organization, BankAccountInformation
from apps.authentication.services.service import get_all_org_child
from apps.core.services.filter.search import DynamicSearchService
from apps.warehouse.models import InventoryQuotaSaleTransaction, InventoryQuotaSaleItem
class TransactionDashboardService:
@staticmethod
def get_dashboard(org: Organization, start_date: str = None, end_date: str = None, status: str = None):
orgs_child = get_all_org_child(org=org)
orgs_child.append(org)
if org.type.key == 'ADM':
transactions = InventoryQuotaSaleTransaction.objects.all()
items = InventoryQuotaSaleItem.objects.all().select_related("gov_product", "free_product")
else:
transactions = InventoryQuotaSaleTransaction.objects.filter(
seller_organization__in=orgs_child
)
items = InventoryQuotaSaleItem.objects.filter(
transaction__seller_organization__in=orgs_child
).select_related("gov_product", "free_product")
# filter queryset (transactions & items) by date
if start_date and end_date:
transactions = DynamicSearchService(
queryset=transactions,
start=start_date,
end=end_date,
date_field="create_date",
).apply()
items = DynamicSearchService(
queryset=items,
start=start_date,
end=end_date,
date_field="create_date",
).apply()
# filer by transaction status
if status:
transactions = transactions.filter(transaction_status=status)
items = items.filter(transaction__transaction_status=status)
transaction_stats = transactions.aggregate(
total_transactions=Count("id"),
success_transactions=Count("id", filter=Q(transaction_status="success")),
failed_transactions=Count("id", filter=Q(transaction_status="failed")),
waiting_transactions=Count("id", filter=Q(transaction_status="waiting")),
total_amount=Coalesce(
Sum(
"price_paid", filter=Q(transaction_status='success' if not status else status)
), 0
),
unique_ranchers=Count("rancher", distinct=True),
)
transaction_stats['total_weight'] = items.aggregate(
total_weight=Coalesce(Sum(
"weight",
filter=Q(transaction__transaction_status="success" if not status else status)
), 0)
)['total_weight']
products_stats = items.values(
product_id=Case(
When(gov_product__isnull=False, then="gov_product_id"),
When(free_product__isnull=False, then="free_product_id"),
),
product_name=Case(
When(gov_product__isnull=False, then="gov_product__name"),
When(free_product__isnull=False, then="free_product__product__name"),
),
product_type=Case(
When(gov_product__isnull=False, then=Value("gov")),
When(free_product__isnull=False, then=Value("free")),
)
).annotate(
total_sales=Count("id"),
total_weight=Coalesce(Sum("weight"), 0),
# total_price=Coalesce(Sum("total_price"), 0),
# avg_unit_price=Coalesce(Sum("total_price") / Sum("weight"), 0),
success_sales=Count("id", filter=Q(transaction__transaction_status="success")),
failed_sales=Count("id", filter=Q(transaction__transaction_status="failed")),
waiting_sales=Count("id", filter=Q(transaction__transaction_status="waiting")),
card_payments=Count("id", filter=Q(transaction__price_type="card")),
cash_payments=Count("id", filter=Q(transaction__price_type="cash")),
check_payments=Count("id", filter=Q(transaction__price_type="check")),
credit_payments=Count("id", filter=Q(transaction__price_type="credit")),
extra_items=Count("id", filter=Q(is_extra=True)),
pre_sale_items=Count("id", filter=Q(is_pre_sale=True)),
).order_by("-total_sales")
# calculate sum of item share percentage by product
items_by_product = defaultdict(list)
bank_accounts = BankAccountInformation.objects.all()
bank_accounts_dict = {
f'{account.sheba}': account.organization.name for account in bank_accounts
}
for item in items:
pid = item.gov_product_id or item.free_product_id
items_by_product[pid].append(item)
for product in products_stats:
pid = product["product_id"]
share_totals = defaultdict(lambda: {"total_price": 0, "count": 0})
for item in items_by_product.get(pid, []):
if item.item_share:
for share in item.item_share:
# share: {"name": "", "price": "", ....}
name = share.get("name")
shaba = str(share.get("shaba", 0)) # noqa
if shaba.startswith("IR"):
shaba = shaba[2:] # noqa
price = share.get("price", 0)
# get name of org from their shaba
if shaba in bank_accounts_dict.keys():
name = bank_accounts_dict[shaba]
share_totals[name]["total_price"] += price
share_totals[name]["count"] += 1
share_totals[name]["shaba"] = shaba # noqa
share_totals = merge_by_shaba_single_name(share_totals)
product["item_share_stats"] = sorted(
[
{"name": name, **val}
for name, val in share_totals.items()
],
key=lambda x: -x["total_price"]
)
# calculate total statistic of every broker
share_totals = defaultdict(lambda: {"total_price": 0, "count": 0})
for product in products_stats:
for item in product["item_share_stats"]:
name = item["name"]
share_totals[name]["total_price"] += item["total_price"]
share_totals[name]["count"] += item["count"]
dict_share_total = dict(share_totals)
share_totals = [{"name": share, **dict_share_total[share]} for share in list(share_totals)]
return {
"transaction_summary": transaction_stats,
"product_summary": list(products_stats),
"brokers_sharing_summary": share_totals
}
def merge_by_shaba_single_name(data): # noqa
grouped = defaultdict(lambda: {"name": None, "total_price": 0, "count": 0, "shaba": None}) # noqa
for name, info in data.items(): # noqa
shaba = info["shaba"] # noqa
if grouped[shaba]["name"] is None:
grouped[shaba]["name"] = name
grouped[shaba]["total_price"] += info["total_price"]
grouped[shaba]["count"] += info["count"]
grouped[shaba]["shaba"] = shaba # noqa
final_result = {}
for shaba, g in grouped.items(): # noqa
final_result[g["name"]] = {
"total_price": g["total_price"],
"count": g["count"],
"shaba": g["shaba"], # noqa
}
return final_result