Files
RasadDam_Backend/apps/herd/web/api/v1/api.py
2025-11-11 15:08:50 +03:30

168 lines
5.6 KiB
Python

from django.db import transaction
from rest_framework import status, filters
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from apps.authentication.api.v1.api import UserViewSet
from apps.core.api import BaseViewSet
from apps.core.mixins.search_mixin import DynamicSearchMixin
from apps.core.mixins.soft_delete_mixin import SoftDeleteMixin
from apps.herd.models import Herd, Rancher
from apps.herd.web.api.v1.serializers import HerdSerializer, RancherSerializer
from apps.livestock.web.api.v1.serializers import LiveStockSerializer
from apps.product.web.api.v1.serializers import product_serializers
from common.helpers import get_organization_by_user
from common.tools import CustomOperations
class HerdViewSet(BaseViewSet, SoftDeleteMixin, viewsets.ModelViewSet):
""" Herd ViewSet """
queryset = Herd.objects.all()
serializer_class = HerdSerializer
filter_backends = [filters.SearchFilter]
search_fields = [
"rancher__ranching_farm",
"name",
"code",
"province__name",
"city__name",
"postal",
"institution",
"epidemiologic",
"activity",
"capacity",
]
def list(self, request, *args, **kwargs):
""" list of herds """
search = self.filter_queryset(self.get_queryset().order_by('-modify_date')) # search & filter
page = self.paginate_queryset(search)
if page is not None: # noqa
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
@transaction.atomic
def create(self, request, *args, **kwargs):
""" create herd with user """
organization = get_organization_by_user(request.user)
if 'user' in request.data.keys():
# create user if owner of herd is not exist
user = CustomOperations().custom_create(
request=request,
view=UserViewSet(),
data_key='user'
)
owner = user['id']
request.data.update({'owner': owner})
# if cooperative do not send in request, set my organization in it
if 'cooperative' not in request.data.keys():
request.data.update({'cooperative': organization.id})
serializer = self.serializer_class(data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
else:
return Response(serializer.errors, status=status.HTTP_403_FORBIDDEN)
@action(
methods=['get'],
detail=False,
url_name='my_herds',
url_path='my_herds',
name='my_herds'
)
@transaction.atomic
def my_herds(self, request):
""" get current user herds """
serializer = self.serializer_class(self.get_queryset().filter(owner=request.user.id), many=True)
if serializer.data:
return Response(serializer.data, status=status.HTTP_200_OK)
else:
return Response(status=status.HTTP_204_NO_CONTENT)
@action(
methods=['get'],
detail=True,
url_path='live_stocks',
url_name='live_stocks',
name='live_stocks'
)
def live_stocks(self, request, pk=None):
""" list of herd live_stocks"""
herd = self.get_object()
queryset = herd.live_stock_herd.all() # get herd live_stocks
# paginate queryset
page = self.paginate_queryset(queryset)
if page is not None: # noqa
serializer = LiveStockSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
class RancherViewSet(BaseViewSet, SoftDeleteMixin, viewsets.ModelViewSet, DynamicSearchMixin):
queryset = Rancher.objects.all()
serializer_class = RancherSerializer
search_fields = [
"ranching_farm",
"first_name",
"last_name",
"mobile",
"national_code",
"birthdate",
"nationality",
"address",
"province__name",
"city__name",
]
def list(self, request, *args, **kwargs):
""" list of ranchers """
search = self.filter_query(self.get_queryset().order_by('-modify_date')) # search & filter
page = self.paginate_queryset(search)
if page is not None: # noqa
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
@action(
methods=['get'],
detail=True,
url_path='herds',
url_name='herds',
name='herds'
)
def herds_by_rancher(self, request, pk=None):
""" list of rancher herds """
rancher = self.get_object()
queryset = rancher.herd.all().order_by('-modify_date') # get rancher herds
# paginate queryset
page = self.paginate_queryset(queryset)
if page is not None: # noqa
serializer = HerdSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
@action(
methods=['get'],
detail=True,
url_path='rancher_plans',
url_name='rancher_plans',
name='rancher_plans'
)
def rancher_incentive_plans(self, request, pk=None):
""" return list of rancher plans with information """
rancher = self.get_object() # rancher object
page = self.paginate_queryset(rancher.plans.all())
if page is not None: # noqa
serializer = product_serializers.IncentivePlanRancherSerializer(page, many=True)
return self.get_paginated_response(serializer.data)