1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| # 应该放在缓存里,存储访问的时间 VISIT_RECORD = { }
class VisitThrottle(object): # 60秒内只能访问3次 def __init__(self): self.history = None
# 是否可以继续访问 def allow_request(self,request,view): # 获取用户ip #remote_addr = self.get_ident(request) remote_addr = request._request.META.get('REMOTE_ADDR') # print(remote_addr) # 如果没有这个ip,将当前的访问时间写入对应的ip中 ctime = time.time() if remote_addr not in VISIT_RECORD: VISIT_RECORD[remote_addr] = [ctime,] return True history = VISIT_RECORD.get(remote_addr) self.history = history # 一分钟只可以访问3次 while history and history[-1] < ctime-60: history.pop() if len(history) < 3: # 如果可以访问的话,将当前的访问时间记录下来 history.insert(0,ctime) return True # return True # return False表示访问频率太高,被限制
def wait(self): # 还需要等待多少秒可以访问 ctime = time.time() return 60-(ctime - self.history[-1])
class AuthView(APIView): """ 用于用户认证 """ # 没有权限控制 authentication_classes = [] permission_classes = [] throttle_classes = [VisitThrottle,]
def post(self,request,*args,**kwargs):
ret= {'code':1000,'msg':None} try: # 获取到原生的那个request,拿到用户名和密码 user = request._request.POST.get('username') pwd = request._request.POST.get('password') obj = models.UserInfo.objects.filter(username=user, password=pwd).first() #如果没有这个,返回错误码 if not obj: ret['code'] = 1001 ret['msg'] = "用户名或密码错误" # 为登录用户创建token token = md5(user) # 存在就更新,不存在就创建 models.UserToken.objects.update_or_create(user=obj, defaults={'token': token}) ret['token'] = token except Exception as e: ret['code'] = 1002 ret['msg'] = '请求异常' return JsonResponse(ret)
|