import typing from django.db import transaction from rest_framework import status from rest_framework import viewsets, filters from rest_framework.decorators import action from rest_framework.response import Response from apps.core.api import BaseViewSet from apps.core.mixins.search_mixin import DynamicSearchMixin from apps.core.mixins.soft_delete_mixin import SoftDeleteMixin from apps.warehouse import models as warehouse_models from apps.warehouse.services.inventory_entry_dashboard_service import InventoryEntryDashboardService from apps.warehouse.services.transaction_dashboard_service import TransactionDashboardService from apps.warehouse.web.api.v1 import serializers as warehouse_serializers from common.generics import base64_to_image_file from common.helpers import get_organization_by_user from common.liara_tools import upload_to_liara class InventoryEntryViewSet( BaseViewSet, SoftDeleteMixin, viewsets.ModelViewSet, DynamicSearchMixin, InventoryEntryDashboardService ): queryset = warehouse_models.InventoryEntry.objects.all() serializer_class = warehouse_serializers.InventoryEntrySerializer # filter_backends = [filters.SearchFilter] search_fields = [ "distribution__distribution_id", "organization__name", "org_quota_stat__quota__product__name", "delivery_address", "weight", "balance", "lading_number", "is_confirmed", ] date_field = "create_date" def upload_confirmation_document(self, request, inventory: int) -> typing.Any: """ upload document for inventory entry confirmation """ inventory = self.queryset.get(id=inventory) # convert base64 to document file document = base64_to_image_file( request.data['document'], filename=f'inventory_entry_document_{inventory.id}' ) file, file_format = document[0], document[1] # upload document to liara document_url = upload_to_liara( file, f'inventory_entry_document_{inventory.id}.{file_format}' ) inventory.document = document_url inventory.is_confirmed = True inventory.save() return Response(status=status.HTTP_200_OK) @transaction.atomic def create(self, request, *args, **kwargs): """ custom create of inventory entry """ # create inventory entry inventory_balance = request.data['weight'] organization = get_organization_by_user(request.user) request.data.update({ 'organization': organization.id, 'balance': inventory_balance }) serializer = self.serializer_class(data=request.data) serializer.is_valid(raise_exception=True) inventory_entry = serializer.save() # upload document for confirmation entry if 'document' in request.data.keys(): self.upload_confirmation_document(request, inventory=inventory_entry.id) return Response(serializer.data, status=status.HTTP_201_CREATED) @transaction.atomic def update(self, request, pk=None, *args, **kwargs): """ edit inventory """ inventory = self.get_object() serializer = self.serializer_class(data=request.data, instance=inventory, partial=True) if serializer.is_valid(): serializer.save() # upload document for confirmation entry if 'document' in request.data.keys(): self.upload_confirmation_document(request, inventory=inventory.id) return Response(serializer.data, status=status.HTTP_200_OK) return Response(serializer.errors, status=status.HTTP_403_FORBIDDEN) @action( methods=['post'], detail=True, url_path='confirm_entry', url_name='confirm_entry', name='confirm_entry' ) @transaction.atomic def confirm_inventory_entry(self, request, pk=None): """ confirm inventory entry """ self.upload_confirmation_document(request, inventory=pk) return Response(status=status.HTTP_200_OK) @action( methods=['get'], detail=False, url_path='inventory_dashboard', url_name='inventory_dashboard', name='inventory_dashboard' ) def inventory_dashboard(self, request): """ inventory entry dashboard """ org = get_organization_by_user(request.user) query_param = self.request.query_params # noqa # filter by date start_date = query_param.get('start') if 'start' in query_param.keys() else None end_date = query_param.get('end') if 'end' in query_param.keys() else None query_string = query_param.get('search') if 'search' in query_param.keys() else None return Response( self.get_dashboard( org=org, start_date=start_date, end_date=end_date, query_string=query_string, search_fields=self.search_fields, ) ) @action( methods=['get'], detail=True, url_path='my_entries_by_quota', url_name='my_entries_by_quota', name='my_entries_by_quota' ) def my_inventory_entries_by_quota(self, request, pk=None): """ list of my inventory entries """ params = self.request.query_params # noqa if 'org_id' in params.keys(): org = warehouse_models.Organization.objects.get(id=params.get('org_id')) else: org = get_organization_by_user(request.user) entries = self.get_queryset( visibility_by_org_scope=True ) if org.free_visibility_by_scope else self.get_queryset().filter( organization=org, quota_id=pk ) queryset = self.filter_query(self.filter_queryset(entries)) # return by search param or all objects # paginate & response page = self.paginate_queryset(queryset) if page is not None: # noqa serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) @action( methods=['get'], detail=False, url_path='my_entries', url_name='my_entries', name='my_entries' ) def my_inventory_entries(self, request): """ list of my inventory entries """ org = get_organization_by_user(request.user) entries = self.get_queryset( visibility_by_org_scope=True ) if org.free_visibility_by_scope else self.get_queryset().filter(organization=org) queryset = self.filter_query(self.filter_queryset(entries)) # return by search param or all objects # paginate & response page = self.paginate_queryset(queryset) if page is not None: # noqa serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) class InventoryQuotaSaleTransactionViewSet( BaseViewSet, SoftDeleteMixin, TransactionDashboardService, DynamicSearchMixin, viewsets.ModelViewSet, ): queryset = warehouse_models.InventoryQuotaSaleTransaction.objects.all() serializer_class = warehouse_serializers.InventoryQuotaSaleTransactionSerializer # filter_backends = [filters.SearchFilter] search_fields = [ 'rancher_fullname', 'rancher_mobile', 'pos_device__device_identity', 'pos_device__acceptor', 'pos_device__terminal', 'pos_device__serial', 'transaction_id', 'seller_organization__name', 'quota_distribution__distribution_id', 'weight', 'delivery_address', 'transaction_price', 'price_paid', 'price_type', 'product_type', 'transactions_number', 'transaction_status', 'transaction_status_code', 'ref_num', 'terminal', 'payer_cart', 'transaction_date', 'rancher__national_code' ] def list(self, request, *args, **kwargs): """ list of transactions # filter by: search, all, my_transactions filter by: transaction status """ if 'status' in request.GET.keys(): status_param = self.request.query_params.get('status') # noqa if status_param == 'waiting': queryset = self.get_queryset( visibility_by_org_scope=True ).filter(transaction_status='waiting').order_by('-create_date') elif status_param == 'success': queryset = self.get_queryset( visibility_by_org_scope=True ).filter(transaction_status='success').order_by('-create_date') elif status_param == 'failed': queryset = self.get_queryset( visibility_by_org_scope=True ).filter(transaction_status='failed').order_by('-create_date') else: queryset = self.get_queryset( visibility_by_org_scope=True ).order_by('-create_date') else: queryset = self.get_queryset( visibility_by_org_scope=True ).order_by('-create_date') queryset = self.filter_query(queryset) # paginate & response page = self.paginate_queryset(queryset) if page is not None: # noqa serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) @action( methods=['get'], detail=False, url_path='transactions_dashboard', url_name='transactions_dashboard', name='transactions_dashboard' ) def transactions_dashboard(self, request): """ dashboard of full detail of all my transactions """ query_param = self.request.query_params # noqa start_date = query_param.get('start') if 'start' in query_param.keys() else None end_date = query_param.get('end') if 'end' in query_param.keys() else None transaction_status = query_param.get('status') if 'status' in query_param.keys() else None org = get_organization_by_user(request.user) # filer by date & transaction status transaction_dashboard_data = self.get_dashboard( org, start_date=start_date, end_date=end_date, status=transaction_status ) return Response(transaction_dashboard_data, status=status.HTTP_200_OK) class InventoryQuotaSaleItemViewSet(SoftDeleteMixin, BaseViewSet, viewsets.ModelViewSet): queryset = warehouse_models.InventoryQuotaSaleItem.objects.all() serializer_class = warehouse_serializers.InventoryQuotaSaleItemSerializer filter_backends = [filters.SearchFilter] search_fields = [ 'transaction', 'quota_distribution', 'gov_product', 'free_product', 'name', 'price_type', 'delivery_type', 'paid_type', 'item_type', 'unit', ]