import random import string from datetime import timedelta from apps.product.web.api.v1.viewsets.quota_distribution_api import QuotaDistributionViewSet from apps.pos_device.web.api.v1.serilaizers import device as device_serializer from apps.authentication.exceptions import OrganizationBankAccountException from apps.authorization.api.v1.serializers import UserRelationSerializer from apps.pos_device.web.api.v1.viewsets.client import POSClientViewSet from apps.core.mixins.soft_delete_mixin import SoftDeleteMixin from apps.authentication.api.v1.api import ( Organization, BankAccountInformation, OrganizationSerializer ) from apps.core.mixins.search_mixin import DynamicSearchMixin from apps.core.mixins.admin_mixin import AdminFilterMixin from apps.authorization.models import UserRelations from common.helpers import get_organization_by_user from rest_framework.exceptions import APIException from apps.pos_device import models as pos_models from rest_framework.response import Response from rest_framework.decorators import action from common.tools import CustomOperations from common.helpers import generate_code from django.utils.timezone import now from rest_framework import viewsets from django.db import transaction from rest_framework import status class ProviderCompanyViewSet(SoftDeleteMixin, viewsets.ModelViewSet): # noqa queryset = pos_models.ProviderCompany.objects.all() serializer_class = device_serializer.ProviderCompanySerializer def list(self, request, *args, **kwargs): """ list of provider companies """ # paginate devices page = self.paginate_queryset(self.queryset.order_by('-create_date')) if page is not None: serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) @action( methods=['get'], detail=False, url_path='psp_users', url_name='psp_users', name='psp_users' ) def users_with_psp_org(self, request): """ list of psp users """ users = UserRelations.objects.filter( organization__type__key='PSP' ) # paginate devices page = self.paginate_queryset(users) if page is not None: serializer = UserRelationSerializer(page, many=True) return self.get_paginated_response(serializer.data) class DeviceViewSet(SoftDeleteMixin, viewsets.ModelViewSet, AdminFilterMixin): queryset = pos_models.Device.objects.all() serializer_class = device_serializer.DeviceSerializer def create(self, request, *args, **kwargs): """ Custom create of pos devices """ if 'organization' not in request.data.keys(): organization = get_organization_by_user(request.user) request.data.update({'organization': organization.id}) # create device serializer = self.serializer_class(data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_403_FORBIDDEN) @action( methods=['get'], detail=False, url_name='my_devices', url_path='my_devices', name='my_devices' ) @transaction.atomic def my_devices(self, request): """ list of company devices """ # using admin filter mixin to get query devices = self.get_query(self.queryset) # paginate devices page = self.paginate_queryset(devices) if page is not None: serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) def activate_device(self, request): """ activations of pos by a temporary code """ device = self.queryset.filter(serial=request.data['serial']).first() if not device or not device.is_activated: code = generate_code() pos_models.DeviceActivationCode.objects.create( code=code, expires_at=now() + timedelta(hours=2) ) return Response({'code': code, 'detail': 'enter code in panel'}, status=status.HTTP_202_ACCEPTED) elif device.is_activated: raise APIException('device is activated', code=403) @action( methods=['get'], detail=False, url_name='psp_organizations', url_path='psp_organizations', name='psp_organizations' ) @transaction.atomic def psp_organizations(self, request): """ list of psp organizations """ print(request.path) organizations = Organization.objects.filter(type__key='PSP') # paginate devices page = self.paginate_queryset(organizations) if page is not None: serializer = OrganizationSerializer(page, many=True) return self.get_paginated_response(serializer.data) @action( methods=['get'], detail=True, url_path='devices_by_psp', url_name='devices_by_psp', name='devices_by_psp', ) @transaction.atomic def devices_by_psp(self, request, pk=None): """ list of devices by their psp """ devices = self.queryset.filter(organization__id=pk).order_by('-create_date') # paginate devices page = self.paginate_queryset(devices) if page is not None: serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) class DeviceVersionViewSet(SoftDeleteMixin, viewsets.ModelViewSet): queryset = pos_models.DeviceVersion.objects.all() serializer_class = device_serializer.DeviceVersionSerializer class SessionViewSet(SoftDeleteMixin, viewsets.ModelViewSet): # noqa queryset = pos_models.Sessions.objects.all() serializer_class = device_serializer.SessionSerializer class DeviceAssignmentViewSet(SoftDeleteMixin, viewsets.ModelViewSet): queryset = pos_models.DeviceAssignment.objects.all() serializer_class = device_serializer.DeviceAssignmentSerializer @transaction.atomic def create(self, request, *args, **kwargs): """ assign pos device to client by company """ data = request.data.copy() if 'organization' not in request.data.keys(): organization = get_organization_by_user(request.user) data['organization'] = organization.id client_id = None client_data = data.get('client_data') if client_data and client_data.get('is_organization'): org_id = client_data.get('organization') # check if organization have bank account or raise exception if not BankAccountInformation.objects.filter(organization_id=org_id).exists(): raise OrganizationBankAccountException() # check if organization is a client before client = pos_models.POSClient.objects.filter(organization_id=org_id) if client.exists(): client_id = client.first().id else: # create client client = CustomOperations().custom_create( request=request, view=POSClientViewSet(), data=request.data['client_data'] ) client_id = client['id'] data['client'] = client_id elif client_data and client_data.get('organization') is False: # create client client = CustomOperations().custom_create( request=request, view=POSClientViewSet(), data=request.data['client_data'] ) client_id = client['id'] data['client'] = client_id # create assignment serializer = self.serializer_class(data=data) if serializer.is_valid(): assignment = serializer.save() # set device status to assigned assignment.device.assigned_state = True assignment.device.acceptor = data['acceptor'] assignment.device.terminal = data['terminal'] if not assignment.device.password: assignment.device.password = ''.join(random.choices(string.digits, k=6)) assignment.device.save() # set organization having pos status if assignment.client.organization: assignment.client.organization.has_pos = True assignment.client.organization.save() # after pos device assignment, must set owner # as default stake holder pos_models.StakeHolders.objects.create( assignment=assignment, device=assignment.device, organization=assignment.client.organization, default=True ) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_403_FORBIDDEN) @transaction.atomic def update(self, request, pk=None, *args, **kwargs): """ edit assignment """ assignment = self.get_object() if 'client_data' in request.data.keys(): client = CustomOperations().custom_update( request=request, view=POSClientViewSet(), data_key='client_data', obj_id=request.data['client_data']['id'] ) request.data.update({'client': client['id']}) serializer = self.serializer_class(data=request.data, instance=assignment, partial=True) if serializer.is_valid(): assignment = serializer.save() if assignment.client.organization: # update default stake holder for device (assignment) stake_holder = assignment.stake_holders.filter(default=True).first() stake_holder.organization = assignment.client.organization stake_holder.save() return Response(serializer.data, status=status.HTTP_200_OK) return Response(serializer.errors, status=status.HTTP_403_FORBIDDEN) @action( methods=['get'], detail=False, url_name='my_assignments', url_path='my_assignments', name='my_assignments' ) def my_assignment(self, request): """ list of company device assignment to clients """ try: organization = get_organization_by_user(request.user) # get device assignment assignments = self.queryset.filter(organization=organization) serializer = self.serializer_class(assignments, many=True) return Response(serializer.data, status=status.HTTP_200_OK) except Exception as e: raise APIException('Non Object Error', code=403) class StakeHoldersViewSet(SoftDeleteMixin, viewsets.ModelViewSet, DynamicSearchMixin): queryset = pos_models.StakeHolders.objects.all() serializer_class = device_serializer.StakeHoldersSerializer @transaction.atomic def create(self, request, *args, **kwargs): """ create shares amount of stakeholders """ stakeholders_data = [] for stakeholder in request.data['stakeholders']: # check if organization have bank account or raise exception if not BankAccountInformation.objects.filter( organization_id=stakeholder['organization'] ).exists(): raise OrganizationBankAccountException() serializer = self.serializer_class(data=stakeholder) if serializer.is_valid(raise_exception=True): serializer.save() stakeholders_data.append(serializer.data) return Response(stakeholders_data, status=status.HTTP_201_CREATED) @action( methods=['get'], detail=True, url_name='list_by_device', url_path='list_by_device', name='list_by_device', ) @transaction.atomic def list_by_device(self, request, pk=None): """ list of stakeholders by device """ device = pos_models.Device.objects.get(id=pk) stakeholders = device.stake_holders.all() query = self.filter_query(stakeholders) # paginate stakeholders page = self.paginate_queryset(query) if page is not None: serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) @action( methods=['get'], detail=False, url_path='list_by_organization', url_name='list_by_organization', name='list_by_organization', ) def list_by_organization(self, request): """ list of stakeholders by organization """ org = get_organization_by_user(request.user) stakeholders = self.queryset.filter( assignment__client__organization=org, organization__type__key='CMP' ) # paginate stakeholders page = self.paginate_queryset(stakeholders) if page is not None: serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) class StakeHolderShareAmountViewSet(SoftDeleteMixin, viewsets.ModelViewSet, DynamicSearchMixin): queryset = pos_models.StakeHolderShareAmount.objects.select_related('quota_distribution', 'stakeholders') serializer_class = device_serializer.StakeHolderShareAmountSerializer @transaction.atomic def create(self, request, *args, **kwargs): """ create share amount for company stakeholders """ data = request.data.copy() organization = get_organization_by_user(request.user) data.update({'registering_organization': organization.id}) assigner_organization = get_organization_by_user(request.user) data['distribution'].update({'assigner_organization': assigner_organization.id}) # create distribution if 'distribution' in data.keys(): distribution = CustomOperations().custom_create( request=request, view=QuotaDistributionViewSet(), data=data['distribution'] ) data.update({'quota_distribution': distribution['id']}) serializer = self.serializer_class(data=data) if serializer.is_valid(raise_exception=True): serializer.save() return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_403_FORBIDDEN) @action( methods=['get'], detail=False, url_path='my_sharing_distributes', url_name='my_sharing_distributes', name='my_sharing_distributes', ) def my_shared_distributes(self, request): """ list of my shared stakeholders with detail """ organization = get_organization_by_user(request.user) stakeholders_sharing = self.queryset.filter( registering_organization=organization ).order_by('-create_date') # paginate stakeholders page = self.paginate_queryset(stakeholders_sharing) if page is not None: serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) @action( methods=['get'], detail=True, url_path='shared_by_distribution', url_name='shared_by_distribution', name='shared_by_distribution', ) @transaction.atomic def shared_by_distribution(self, request, pk=None): """ list of shared stakeholder with distribution """ stakeholder_sharing = self.queryset.filter( quota_distribution_id=pk ).order_by('-create_date') # paginate stakeholders page = self.paginate_queryset(stakeholder_sharing) if page is not None: serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data)