前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Django MemCache 缓存使用方法和源码

Django MemCache 缓存使用方法和源码

作者头像
卓越笔记
发布2023-02-18 11:38:08
3350
发布2023-02-18 11:38:08
举报
文章被收录于专栏:卓越笔记

常用方法

代码语言:javascript
复制
from django.core.cache import cache


def view_test(request):
	# 获取 cache 中 my_key 的值
	var = cache.get("my_key")
	if not var:  # 如果 var 为空
		var = "abc"  # 赋值
		# 设置到 cache, 第一个参数是 key,第二个参数是 key 的值, 第三个参数是 缓存时间(单位:秒)
		cache.set("my_key", var, 60)

	if cache.has_key("my_key"):  # 如果有这个 key
		cache.delete("my_key")  # 删除 key

	return HttpResponse("Hello World !")

源码路径

python3.6/site-packages/django/core/cache/backends/locmem.py

locmem.py

代码语言:javascript
复制
"Thread-safe in-memory cache backend."

import time
from contextlib import contextmanager

from django.core.cache.backends.base import DEFAULT_TIMEOUT, BaseCache
from django.utils.synch import RWLock

try:
    from django.utils.six.moves import cPickle as pickle
except ImportError:
    import pickle


# Global in-memory store of cache data. Keyed by name, to provide
# multiple named local memory caches.
_caches = {}
_expire_info = {}
_locks = {}


@contextmanager
def dummy():
    """A context manager that does nothing special."""
    yield


class LocMemCache(BaseCache):
    def __init__(self, name, params):
        BaseCache.__init__(self, params)
        self._cache = _caches.setdefault(name, {})
        self._expire_info = _expire_info.setdefault(name, {})
        self._lock = _locks.setdefault(name, RWLock())

    def add(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        key = self.make_key(key, version=version)
        self.validate_key(key)
        pickled = pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
        with self._lock.writer():
            if self._has_expired(key):
                self._set(key, pickled, timeout)
                return True
            return False

    def get(self, key, default=None, version=None, acquire_lock=True):
        key = self.make_key(key, version=version)
        self.validate_key(key)
        pickled = None
        with (self._lock.reader() if acquire_lock else dummy()):
            if not self._has_expired(key):
                pickled = self._cache[key]
        if pickled is not None:
            try:
                return pickle.loads(pickled)
            except pickle.PickleError:
                return default

        with (self._lock.writer() if acquire_lock else dummy()):
            try:
                del self._cache[key]
                del self._expire_info[key]
            except KeyError:
                pass
            return default

    def _set(self, key, value, timeout=DEFAULT_TIMEOUT):
        if len(self._cache) >= self._max_entries:
            self._cull()
        self._cache[key] = value
        self._expire_info[key] = self.get_backend_timeout(timeout)

    def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        key = self.make_key(key, version=version)
        self.validate_key(key)
        pickled = pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
        with self._lock.writer():
            self._set(key, pickled, timeout)

    def incr(self, key, delta=1, version=None):
        with self._lock.writer():
            value = self.get(key, version=version, acquire_lock=False)
            if value is None:
                raise ValueError("Key '%s' not found" % key)
            new_value = value + delta
            key = self.make_key(key, version=version)
            pickled = pickle.dumps(new_value, pickle.HIGHEST_PROTOCOL)
            self._cache[key] = pickled
        return new_value

    def has_key(self, key, version=None):
        key = self.make_key(key, version=version)
        self.validate_key(key)
        with self._lock.reader():
            if not self._has_expired(key):
                return True

        with self._lock.writer():
            try:
                del self._cache[key]
                del self._expire_info[key]
            except KeyError:
                pass
            return False

    def _has_expired(self, key):
        exp = self._expire_info.get(key, -1)
        if exp is None or exp > time.time():
            return False
        return True

    def _cull(self):
        if self._cull_frequency == 0:
            self.clear()
        else:
            doomed = [k for (i, k) in enumerate(self._cache) if i % self._cull_frequency == 0]
            for k in doomed:
                self._delete(k)

    def _delete(self, key):
        try:
            del self._cache[key]
        except KeyError:
            pass
        try:
            del self._expire_info[key]
        except KeyError:
            pass

    def delete(self, key, version=None):
        key = self.make_key(key, version=version)
        self.validate_key(key)
        with self._lock.writer():
            self._delete(key)

    def clear(self):
        self._cache.clear()
        self._expire_info.clear()
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-11-9 2,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 常用方法
  • 源码路径
  • locmem.py
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档