python3自动更新缓存类的具体使用

 更新时间:2025年01月08日 10:30:11   作者:言之。  
本文介绍了使用一个自动更新缓存的Python类AutoUpdatingCache,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

这个类会在后台自动更新缓存数据,你只需要调用方法来获取数据即可。

自动更新缓存类

以下是 AutoUpdatingCache 类的实现:

import threading
import time

class AutoUpdatingCache:
    def __init__(self, update_function, expiry_time=60):
        """
        初始化缓存类。

        :param update_function: 一个函数,用于生成或更新缓存数据。
        :param expiry_time: 缓存的更新周期(秒)。
        """
        self.update_function = update_function
        self.expiry_time = expiry_time
        self.cache_data = None
        self.last_updated = 0
        self.lock = threading.Lock()
        self._start_background_update()

    def _start_background_update(self):
        # 启动后台线程更新缓存
        self.update_thread = threading.Thread(target=self._update_cache_periodically)
        self.update_thread.daemon = True
        self.update_thread.start()

    def _update_cache_periodically(self):
        while True:
            current_time = time.time()
            if current_time - self.last_updated >= self.expiry_time:
                self._update_cache()
            time.sleep(1)  # 每秒检查一次

    def _update_cache(self):
        with self.lock:
            try:
                print("Updating cache...")
                new_data = self.update_function()
                self.cache_data = new_data
                self.last_updated = time.time()
                print("Cache updated!")
            except Exception as e:
                print(f"Error updating cache: {e}")

    def get_data(self):
        with self.lock:
            if self.cache_data is not None:
                return self.cache_data
            else:
                return "Cache is initializing, please try again later."

使用说明

定义一个数据生成函数

首先,需要定义一个用于生成或更新缓存数据的函数。这个函数可以是任何耗时的操作,例如从数据库查询、计算复杂结果等。

import time

def generate_cache_data():
    # 模拟耗时操作
    time.sleep(5)
    return {"value": "fresh data", "timestamp": time.time()}

创建缓存类的实例

将数据生成函数传递给 AutoUpdatingCache 类,并设置缓存更新周期。

cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)

获取缓存数据

在需要的地方调用 get_data() 方法即可获取缓存数据。

data = cache.get_data()
print(data)

完整示例

将以上步骤组合起来:

import threading
import time

class AutoUpdatingCache:
    def __init__(self, update_function, expiry_time=60):
        self.update_function = update_function
        self.expiry_time = expiry_time
        self.cache_data = None
        self.last_updated = 0
        self.lock = threading.Lock()
        self._start_background_update()

    def _start_background_update(self):
        self.update_thread = threading.Thread(target=self._update_cache_periodically)
        self.update_thread.daemon = True
        self.update_thread.start()

    def _update_cache_periodically(self):
        while True:
            current_time = time.time()
            if current_time - self.last_updated >= self.expiry_time:
                self._update_cache()
            time.sleep(1)

    def _update_cache(self):
        with self.lock:
            try:
                print("Updating cache...")
                new_data = self.update_function()
                self.cache_data = new_data
                self.last_updated = time.time()
                print("Cache updated!")
            except Exception as e:
                print(f"Error updating cache: {e}")

    def get_data(self):
        with self.lock:
            if self.cache_data is not None:
                return self.cache_data
            else:
                return "Cache is initializing, please try again later."

# 数据生成函数
def generate_cache_data():
    time.sleep(5)  # 模拟耗时操作
    return {"value": "fresh data", "timestamp": time.time()}

# 创建缓存实例
cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)

# 模拟获取数据
for _ in range(10):
    data = cache.get_data()
    print(data)
    time.sleep(10)

代码解释

  • AutoUpdatingCache 类

    • init 方法:
      • 初始化缓存,设置数据生成函数和缓存更新周期。
      • 启动后台线程 _update_cache_periodically
    • _update_cache_periodically 方法:
      • 无限循环,每隔一秒检查缓存是否需要更新。
      • 如果当前时间距离上次更新时间超过了 expiry_time,则调用 _update_cache
    • _update_cache 方法:
      • 使用 update_function 更新缓存数据。
      • 使用锁机制 threading.Lock 确保线程安全。
    • get_data 方法:
      • 获取缓存数据。
      • 如果缓存数据为空(初始化中),返回提示信息。
  • 数据生成函数

    • generate_cache_data 函数模拟一个耗时操作,生成新的缓存数据。
  • 使用示例

    • 创建缓存实例并在循环中每隔 10 秒获取一次数据,观察缓存的更新情况。

注意事项

  • 线程安全:

    • 使用 threading.Lock 确保在多线程环境下数据访问的安全性。
  • 异常处理:

    • 在更新缓存时,捕获可能的异常,防止线程崩溃。
  • 后台线程:

    • 将线程设置为守护线程(daemon=True),使得主程序退出时,线程自动结束。

应用场景

你可以将这个缓存类应用在 Web 应用程序中,例如在 Sanic 的路由中:

from sanic import Sanic
from sanic.response import json

app = Sanic("CacheApp")

@app.route("/data")
async def get_cached_data(request):
    data = cache.get_data()
    return json({"data": data})

if __name__ == "__main__":
    # 确保缓存在应用启动前初始化
    cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)
    app.run(host="0.0.0.0", port=8000)

这样,用户在访问 /data 路由时,总是能得到缓存中的数据,而缓存会在后台自动更新,不会因为更新缓存而导致请求超时。

到此这篇关于python3自动更新缓存类的具体使用的文章就介绍到这了,更多相关python3自动更新缓存类内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Python Celery多队列配置代码实例

    Python Celery多队列配置代码实例

    这篇文章主要介绍了Python Celery多队列配置代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • Python图像处理库Pillow的简单实现

    Python图像处理库Pillow的简单实现

    本文主要介绍了Python图像处理库Pillow的简单实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-06-06
  • Python守护进程实现过程详解

    Python守护进程实现过程详解

    这篇文章主要介绍了Python守护进程实现过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-02-02
  • PyCharm2020.1.2社区版安装,配置及使用教程详解(Windows)

    PyCharm2020.1.2社区版安装,配置及使用教程详解(Windows)

    这篇文章主要介绍了PyCharm2020.1.2社区版安装,配置及使用教程(Windows),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • Pytorch自定义CNN网络实现猫狗分类详解过程

    Pytorch自定义CNN网络实现猫狗分类详解过程

    PyTorch是一个开源的Python机器学习库,基于Torch,用于自然语言处理等应用程序。它不仅能够实现强大的GPU加速,同时还支持动态神经网络。本文将介绍PyTorch自定义CNN网络实现猫狗分类,感兴趣的可以学习一下
    2022-12-12
  • Python 词典(Dict) 加载与保存示例

    Python 词典(Dict) 加载与保存示例

    今天小编就为大家分享一篇Python 词典(Dict) 加载与保存示例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-12-12
  • 4种Python基于字段的不使用元类的ORM实现方法总结

    4种Python基于字段的不使用元类的ORM实现方法总结

    在 Python 中,ORM(Object-Relational Mapping)是一种将对象和数据库之间的映射关系进行转换的技术,本文为大家整理了4种不使用元类的简单ORM实现方式,需要的可以参考下
    2023-12-12
  • python多进程读图提取特征存npy

    python多进程读图提取特征存npy

    这篇文章主要为大家详细介绍了python多进程读图提取特征存npy,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-05-05
  • python+selenium 鼠标事件操作方法

    python+selenium 鼠标事件操作方法

    今天小编就为大家分享一篇python+selenium 鼠标事件操作方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-08-08
  • python使用selenium登录QQ邮箱(附带滑动解锁)

    python使用selenium登录QQ邮箱(附带滑动解锁)

    这篇文章主要为大家详细介绍了python使用selenium登录QQ邮箱,带滑动解锁登录功能,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-01-01

最新评论