[docs]classCooldownCache:def__init__(self,original:"Cooldown",type:BucketType):self._cache:dict[Union[int,tuple[int,int]],Cooldown]={}self._cooldown:Cooldown=originalself._type:BucketType=typedef__repr__(self)->str:return(f"<CooldownCache type={self._type!r} rate={self._cooldown.rate} "f"per={self._cooldown.per} "f"cache={len(self._cache)ifself._cacheelseNone}>")def_bucket_key(self,ctx:"Context")->Union[int,tuple[int,int]]:""" Creates a key for the bucket based on the type. Parameters ---------- ctx: `Context` Context to create the key for. Returns ------- `Union[int, tuple[int, int]]` Key for the bucket. """returnself._type(ctx)def_cleanup_cache(self,current:Optional[float]=None)->None:""" Cleans up the cache by removing expired buckets. Parameters ---------- current: `Optional[float]` Current time to check the cache for. """current=currentortime.time()any(self._cache.pop(k)fork,vinself._cache.items()ifcurrent>v._last+v.per)
[docs]defcreate_bucket(self)->"Cooldown":""" `Cooldown`: Creates a new cooldown bucket. """returnself._cooldown.copy()
[docs]defget_bucket(self,ctx:"Context",current:Optional[float]=None)->"Cooldown":""" Gets the cooldown bucket for the given context. Parameters ---------- ctx: `Context` Context to get the bucket for. current: `Optional[float]` Current time to check the bucket for. Returns ------- `Cooldown` Cooldown bucket for the context. """ifself._typeisBucketType.default:returnself._cooldownself._cleanup_cache(current)key=self._bucket_key(ctx)ifkeynotinself._cache:bucket=self.create_bucket()self._cache[key]=bucketelse:bucket=self._cache[key]returnbucket
[docs]defupdate_rate_limit(self,ctx:"Context",current:Optional[float]=None,*,tokens:int=1)->Optional[float]:""" Updates the rate limit for the given context. Parameters ---------- ctx: `Context` Context to update the rate limit for. current: `Optional[float]` Current time to update the rate limit for. tokens: `int` Amount of tokens to remove from the rate limit. Returns ------- `Optional[float]` Time left before the cooldown resets. Returns `None` if the rate limit was not exceeded. """bucket=self.get_bucket(ctx,current)returnbucket.update_rate_limit(current,tokens=tokens)
[docs]defget_tokens(self,current:Optional[float]=None)->int:""" Gets the amount of tokens available for the current time. Parameters ---------- current: `Optional[float]` The current time to check the tokens for. Returns ------- `int` Amount of tokens available. """current=currentortime.time()tokens=max(self._tokens,0)ifcurrent>self._window+self.per:tokens=self.ratereturntokens
[docs]defget_retry_after(self,current:Optional[float]=None)->float:""" Gets the time left before the cooldown resets. Parameters ---------- current: `Optional[float]` The current time to check the retry after for. Returns ------- `float` Time left before the cooldown resets. """current=currentortime.time()tokens=self.get_tokens(current)return(self.per-(current-self._window)iftokens==0else0.0)
[docs]defupdate_rate_limit(self,current:Optional[float]=None,*,tokens:int=1)->Optional[float]:""" Updates the rate limit for the current time. Parameters ---------- current: `Optional[float]` The current time to update the rate limit for. tokens: `int` Amount of tokens to remove from the rate limit. Returns ------- `Optional[float]` Time left before the cooldown resets. Returns `None` if the rate limit was not exceeded. """current=currentortime.time()self._last=currentself._tokens=self.get_tokens(current)ifself._tokens==self.rate:self._window=currentself._tokens-=tokensifself._tokens<0:returnself.per-(current-self._window)
[docs]defreset(self)->None:""" Resets the rate limit. """self._tokens=self.rateself._last=0.0
[docs]defcopy(self)->"Cooldown":""" `Cooldown`: Copies the cooldown. """returnCooldown(self.rate,self.per)