31 drf 认证、权限、频率

392次阅读
没有评论

共计 8929 个字符,预计需要花费 23 分钟才能阅读完成。

引入

1. 什么是认证、权限和限制

  • 认证是对用户身份进行验证, 然后权限、限制组件决定是否拒绝这个请求
  • 简单来说 : 认证确定了你是谁
  • 权限确定你能不能访问某个接口
  • 限制确定你访问的某个接口的频率

一. 认证组件

0. 作用

  • 校验用户, 三种类型 : 游客、合法用户、非法用户
  • 游客:代表校验通过,直接进入下一步校验(权限校验)
  • 非法用户:代表校验失败,抛出异常,返回 403 权限异常结果

1. 源码分析

  • 分析步骤 APIView---->dispatch---->self.initial(request, *args, **kwargs)

  • initial 方法里面认证、权限、频率, 我们先看认证 self.perform_authentication(request)

31 drf 认证、权限、频率

  • perform_authentication 方法里面就一句话 request.user, 调用 Request 类的 user 方法

31 drf 认证、权限、频率

  • 再调用 _authenticate 方法来开始验证

31 drf 认证、权限、频率

def _authenticate(self):
    # 遍历拿到一个认证类对象
    # self.authenticators 是一个个我们配置的认证类产生的的认证类对象组成的列表[obj1,obj2..]
    # 如果我们没有进行配置, 就从默认配置文件中读取配置类
    for authenticator in self.authenticators:
        try:
            # 认证类对象调用 authenticate 方法传入(认证类对象,request 请求对象)
            # 返回登入的用户与认证的信息组成的 tuple
            # try 捕获异常, 捕获到了则代表认证失败
            user_auth_tuple = authenticator.authenticate(self)
        except exceptions.APIException:
            self._not_authenticated()
            raise

        # 如果元组有内容
        if user_auth_tuple is not None:
            self._authenticator = authenticator
            # 将登入用户与登入认证分别解压赋值到 self.user,self.auth 中
            self.user, self.auth = user_auth_tuple
            return
    # 如果元组为空没有登入的用户和认证, 则表示该用户是匿名用户
    self._not_authenticated()
  • 认证类局部配置
# 在某个视图类中添加该认证配置
# self.authenticators 中得到的一个个认证类对象组成的列表就是从该配置中的类实例化出来的

authentication_classes = ['认证类 1','认证类 2','认证类 3']
  • 认证类全局配置
# 在 settings.py 配置文件中进行配置
REST_FRAMEWORK = {'DEFAULT_AUTHENTICATION_CLASSES': ["mydrf.auth.UserAuth",],
}    

2. 认证组件的用法 (自定义 Token 认证)

必须重写 authenticate 方法

  • 创建表模型 : models.py
from django.db import models

# 存放用户名和密码
class User(models.Model):
    username = models.CharField(max_length=32)
    password = models.CharField(max_length=32)
    level = models.IntegerField(choices=((1, " 超级用户 "), (2, " 普通用户 "), (3, " 游客 ")))


# 存放用户名外键和 token
class UserToken(models.Model):
    user = models.OneToOneField(to="User", on_delete=models.CASCADE)
    token = models.CharField(max_length=64)


class Book(models.Model):
    title = models.CharField(max_length=32)
    price = models.IntegerField()
  • 创建自定义的 Response 类 : myresponse.py
from rest_framework.response import Response


# 自定义 Response 对象
class UserResponse(Response):
    def __init__(self, code=100, msg=None, data=None, status=None,
                 template_name=None, headers=None,
                 exception=False, content_type=None, **kwargs):
        dic = {'status': code, 'msg': msg}
        if data:
            dic['data'] = data
        if kwargs:
            dic.update(kwargs)

        super().__init__(data=dic, status=status,
                         template_name=template_name, headers=headers,
                         exception=exception, content_type=content_type)
  • 创建序列化类 : serializer.py
from rest_framework import serializers
from drf_test import models


class BookModelSerializer(serializers.ModelSerializer):
    class Meta:
        model = models.Book
        fields = "__all__"


class UserModelSerializer(serializers.ModelSerializer):
    class Meta:
        model = models.User
        fields = "__all__"
  • 创建认证类 : auth.py
from drf_test.models import UserToken
from rest_framework.exceptions import APIException
from rest_framework.authentication import BaseAuthentication

# 继承 BaseAuthentication 类
class UserAuth(BaseAuthentication):
    # 必须重写该方法
    def authenticate(self, request):
        token = request.META.get('HTTP_TOKEN')
        print(token)
        if token:
            token_obj = UserToken.objects.filter(token=token).first()
            if token_obj:
                # 必须返回两个参数
                return token_obj.user,token
            else:
                raise APIException('token 無效')
        raise APIException('token 不存在')
  • views.py
from drf_test import models
from drf_test.myresponse import UserResponse
from rest_framework.viewsets import ViewSetMixin
from drf_test.serializer import UserModelSerializer
from drf_test.serializer import BookModelSerializer
from rest_framework.decorators import action
from rest_framework.generics import ListAPIView, RetrieveAPIView, CreateAPIView
from drf_test.auth import UserAuth
import uuid

# 用户登入
class LoginView(ViewSetMixin, CreateAPIView):
    queryset = models.User.objects.all()
    serializer_class = UserModelSerializer
    # 用户认证(局部配置)
    authentication_classes = [UserAuth]

    @action(methods=['POst'], detail=False)
    def login(self, request):
        username = request.data.get('username')
        password = request.data.get('password')
        user_obj = models.User.objects.filter(username=username, password=password).first()
        if user_obj:
            token = uuid.uuid4()
            models.UserToken.objects.update_or_create(defaults={'token': token}, user=user_obj)
            return UserResponse(handers={'token': token},msg='成功')
        else:
            return UserResponse(code=201, msg='用户名或密码错误!')


# 书籍查询
class BookView(ViewSetMixin, ListAPIView, CreateAPIView,RetrieveAPIView):
    queryset = models.Book.objects.all()
    serializer_class = BookModelSerializer

    @action(detail=True)
    def get(self, request, pk):
        # 查出前几条(不是第几条)
        book_qs = self.get_queryset()[:int(pk)]
        serializer = self.get_serializer(instance=book_qs, many=True)
        return UserResponse(book_info=serializer.data)
  • 全局配置认证
REST_FRAMEWORK = {"DEFAULT_AUTHENTICATION_CLASSES": ["drf_test.auth.UserAuth",],
}

二. 权限组件

0. 作用

  • 权限控制可以限制用户对于视图的访问和对于具体数据对象的访问

  • 认证通过, 可以进行下一步验证 (频率认证)

  • 认证失败, 抛出权限异常结果

1. 源码分析

  • 权限组件入口 : APIView 中 dispatch 的 self.check_permissions(request)
def check_permissions(self, request):
    # 遍历权限对象列表得到一个个权限对象(权限器),进行权限认证
    for permission in self.get_permissions():
        # 权限类一定有一个 has_permission 权限方法,用来做权限认证的
        # 参数:权限对象 self、请求对象 request、视图类对象
        # 返回值:有权限返回 True,无权限返回 False
        if not permission.has_permission(request, self):
            self.permission_denied(
                request,
                message=getattr(permission, 'message', None),
                code=getattr(permission, 'code', None)
            )
# 执行 has_permission 方法, 会执行你全局或局部配置的自定义权限类, 通过返回值来确定是否抛出异常
# True 认证通过, False 认证不通过, 再通过 "self.permission_denied" 抛出异常
# 一旦抛出异常, dispatch 中 try 之下的 diamante 将不再运行

3. 权限组件用法

  • 创建权限类, 需要继承 BasePermission 父类, 必须重写 has_permissionhas_object_permission 方法
class UserPermission(BasePermission):
    # 自定义的提示信息(原本是英文)
    message = '没有权限访问!'
    # 重写该方法
    def has_permission(self, request, view):
        # 权限在认证之后, 所以可以取到 user
        if request.user.level == 1:
            return True
        else:
            self.message = f'你是 {request.user.get_level_display()} 用户, 没有权限!'
            return False
  • 局部配置使用
from drf_test.auth import UserPernission

# 在需要权限认证的视图类中书写
permission_classes = [UserPermission]
  • 全局配置使用
REST_FRAMEWORK = {"DEFAULT_PERMISSION_CLASSES": ["drf_test.auth.UserPermission",],
}

三. 频率组件

0. 作用

  • 限制视图接口被访问的频率次数
  • 限制条件 : IP、ID、唯一键
  • 频率周期 : 时(h)、分(m)、秒(s)
  • 频率次数 : [num] / s
  • 没有达到限制频率可正常访问接口
  • 达到了频率限制次数, 在限制时间内不能进行访问, 超过时间后可以正常访问

1. 源码分析

  • 频率组件入口 : APIView 中 dispatch 的 self.check_throttles(request)

原文内容

def check_throttles(self, request):
    throttle_durations = []
    # 1)遍历配置的频率认证类,初始化得到一个个频率认证类对象(会调用频率认证类的 __init__() 方法)# 2)频率认证类对象调用 allow_request 方法,判断是否限次(没有限次可访问,限次不可访问)# 3)频率认证类对象在限次后,调用 wait 方法,获取还需等待多长时间可以进行下一次访问
    # 注:频率认证类都是继承 SimpleRateThrottle 类
    for throttle in self.get_throttles():
        if not throttle.allow_request(request, self):
            # 只要频率限制了,allow_request 返回 False 了,才会调用 wait
            throttle_durations.append(throttle.wait())

            if throttle_durations:
                # Filter out `None` values which may happen in case of config / rate
                # changes, see #1438
                durations = [
                    duration for duration in throttle_durations
                    if duration is not None
                ]

                duration = max(durations, default=None)
                self.throttled(request, duration)

class SimpleRateThrottle(BaseThrottle):
  def __init__(self):
     if not getattr(self, 'rate', None):
        # 得到 settings 配置的 次数 / 时间 赋值给 rate
        self.rate = self.get_rate()
    # 将切分后的 '次数 / 时间' 解压赋值 num_requests= 次数,duration= 时间
    self.num_requests, self.duration = self.parse_rate(self.rate)
  # 自定义频率限制时 需要我们实现的方法
  def get_cache_key(self, request, view):
      raise NotImplementedError('.get_cache_key() must be overridden')

2. 频率组件的使用

必须重写 allow_request, 由于我们继承了 SimpleRateThrottle 类, 对基类进行了进一步的封装, 所以只需要重写get_cache_key, 返回什么就以什么作为限制条件(ip,用户 id)

  • 先创建一个频率类
class MyThrottles(SimpleRateThrottle):
    scope = 'ip'

    def get_cache_key(self, request, view):
        # 返回什么就以什么作为限制条件
        return self.get_ident(request)  # IP 作为限制(也可以下面写法)
        # return request.Meta.get('REMOTE_ADDR')
        # return request.user.id  # 以用户 id 作为限制
  • view.py
class BookView(ViewSetMixin, ListAPIView, CreateAPIView,RetrieveAPIView):
    queryset = models.Book.objects.all()
    serializer_class = BookModelSerializer
    authentication_classes = [UserAuth]
    permission_classes = [UserPermission]
    throttle_classes = [MyThrottles]


    @action(detail=True)
    def get(self, request, pk):
        # 查出前几条(不是第几条)
        book_qs = self.get_queryset()[:int(pk)]
        serializer = self.get_serializer(instance=book_qs, many=True)
        return UserResponse(book_info=serializer.data)
  • 设置限制次数
# setting.py 文件
REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_RATES': {'ip': '5/m',  #一分钟访问 5 次},
}
  • 局部使用
# 在某个视图类中书写
throttle_classes = [MyThrottles]
  • 全局使用
# 在 settings.py
REST_FRAMEWORK = {"DEFAULT_THROTTLE_CLASSES": ["app01.auth.MyThrottle",],
    'DEFAULT_THROTTLE_RATES': {'ip': '5/m',  #一分钟访问 5 次},
}

3. 自定义频率类

  • 实现步骤 :
  1. 取出访问者 ip
  2. 判断当前 ip 不在访问字典里,添加进去,并且直接返回 True, 表示第一次访问,在字典里,继续往下走
  3. 循环判断当前 ip 的列表,有值,并且当前时间减去列表的最后一个时间大于 60s,把这种数据 pop 掉,这样列表中只有 60s 以内的访问时间
  4. 判断,当列表长度小于 3(代表访问的次数小于 3),说明一分钟以内访问不足三次,再把当前时间插入到列表第一个位置,返回 True,顺利通过
  5. 当大于等于 3,说明一分钟内访问超过三次,返回 False 验证失败
  6. 实现代码
class MyThrottles(BaseThrottle):
    VISIT_RECORD = {}  # 记录访问者的字典
    def __init__(self):
        self.history=None  # 用来存时间戳次数的列表
    def allow_request(self,request, view):
        #(1)取出访问者 ip
        ip=request.META.get('REMOTE_ADDR')
        import time
        ctime=time.time()
        #(2)判断当前 ip 不在访问字典里,添加进去,并且直接返回 True, 表示第一次访问
        if ip not in self.VISIT_RECORD:
            self.VISIT_RECORD[ip]=[ctime,]  # 将当前时间放在列表的第一个位置(第一次访问)
            return True
        self.history=self.VISIT_RECORD.get(ip,[])  # 拿出某个 ip 的时间戳列表
        #(3)循环判断当前 ip 的列表,有值,并且当前时间减去列表的最后一个时间大于 60s,把这种数据 pop 掉,这样列表中只有 60s 以内的访问时间,while self.history and ctime-self.history[-1]>60:
            self.history.pop()
        #(4)判断,当列表小于 3,说明一分钟以内访问不足三次,把当前时间插入到列表第一个位置,返回 True,顺利通过
        #(5)当大于等于 3,说明一分钟内访问超过三次,返回 False 验证失败
        if len(self.history)<3:
            self.history.insert(0,ctime)
            return True
        else:
            return False
    def wait(self):
        import time
        ctime=time.time()
        return 60-(ctime-self.history[-1])  # 展示还剩多少秒
        # ctime-self.history[-1]  (表示最早一次登入到现在过去了多少时间)

SimpleRateThrottle 内部源码实现方式也是如此, 只不过通过配置, 它的可扩展性更高

四. 接口补充

接口 : 一种规范, 协议, 用来保证交互

  • 上面我们使用认证、权限、频率都必须要重写父类中的方法, 不然直接 raise 异常
  • 在规范子类的行为上, Python 中有 abc 模块来强制规定子类必须要重写父类的某一个方法, 不然就报错
  • 但其实 Python 并不推崇这种做法 (接口), Python 推崇 鸭子类型
  • 鸭子类型 : 即你不需要强制性的规定子类写某个方法, 子类中只要有相同的方法, 那么就可以说他们是一类

例 : 制作 14 根螺丝, 有 4 个 4 厘米长的, 那么我们就可以说这 4 个 4 厘米长的螺丝属于 " 4 厘米螺丝"

  • 而认证、权限、频率是通过抛出异常的方式来进行限制的, 只要不重写, 就直接抛出异常 :

31 drf 认证、权限、频率

正文完
 
shawn
版权声明:本站原创文章,由 shawn 2023-06-16发表,共计8929字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)