Redis+threading实现多线程消息队列的使用示例

 更新时间:2023年12月18日 11:36:04   作者:Asura_____  
Redis多线程消息队列是一种使用Redis作为存储后端的消息队列实现,它利用Redis的线程并发处理能力来提高消息队列的处理效率,本文主要介绍了Redis+threading实现多线程消息队列的使用示例,感兴趣的可以了解一下

列表

lpush左插入、rpush右插入、lrange查询集合

127.0.0.1:6379> lpush list v1
(integer) 1
127.0.0.1:6379> lpush list v2
(integer) 2
127.0.0.1:6379> lpush list v3
(integer) 3
127.0.0.1:6379> LRANGE list 0 -1
1) "v3"
2) "v2"
3) "v1"
127.0.0.1:6379> LRANGE list 0 1
1) "v3"
2) "v2"
127.0.0.1:6379> LRANGE list 0 0
1) "v3"
127.0.0.1:6379> rpush list rv0
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "v3"
2) "v2"
3) "v1"
4) "rv0"

lpop左移除、rpop右移除

127.0.0.1:6379> lrange list 0 -1
1) "v3"
2) "v2"
3) "v1"
4) "rv0"
127.0.0.1:6379> lpop list
"v3"
127.0.0.1:6379> lrange list 0 -1
1) "v2"
2) "v1"
3) "rv0"
127.0.0.1:6379> rpop list
"rv0"
127.0.0.1:6379> lrange list 0 -1
1) "v2"
2) "v1"

lindex下标查询、llen长度查询

127.0.0.1:6379> lrange list 0 -1
1) "v4"
2) "v3"
3) "v2"
4) "v1"
127.0.0.1:6379> lindex list 1
"v3"
127.0.0.1:6379> lindex list 0
"v4"
127.0.0.1:6379> llen list
(integer) 4

blpop、brpop

BRPOP 是 Redis 的一个阻塞式列表弹出命令,用于从指定的一个或多个列表中弹出最后一个元素。它和BLPOP 不同之处在于它是从列表的尾部弹出元素,而不是从头部。
这种阻塞式弹出操作通常用于实现消息队列。如果列表为空,就会阻塞等待直到有消息可供处理。timeout: 阻塞超时时间,如果所有指定的列表都为空,命令将阻塞直到有元素可弹出或超时。

# 连接到本地 Redis 服务器
r = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)
a = ['item1','item2','item3','item4','item5','item6']
# 将元素推入列表
r.rpush('my_queue',*a)
# 使用 blpop 弹出元素
result = r.blpop('my_queue', timeout=5)  # 设置超时时间为 5 秒

['item1', 'item2', 'item3', 'item4', 'item5', 'item6']
['item2', 'item3', 'item4', 'item5', 'item6']

字符串

set、incr递增、decr递减

  • 虽然输入是int类型,但是set会自动转换为string
r.set('my_queue', 5)

# 对 key 为 'my_queue' 的值执行递减操作
value = r.decr('my_queue')
# 获取递减后的值
print(f'New value: {value}')

new_value = r.incr('my_queue')

print(f'New value: {new_value}')

New value: 4
New value: 5

可以看到就算是赋值也已经改变了my_queue键的值。

keys取键、get取值、delete

# 将元素推入列表
r.setnx('my_queue:ddd:count',123)
r.setnx('my_queue:aaa:count',456)
r.setnx('my_queue',789)
r.setnx('my_queue',101)
r.set('my_queue:kkk:count',159)

print(r.keys('*'))
keys = r.keys('my_queue:*:count')
# 打印匹配的键列表
print(keys)
value = r.get(keys[0])
print(value)
r.delete('my_queue:ddd:count')
print(r.keys('*'))

['my_queue:kkk:count', 'my_queue', 'my_queue:aaa:count', 'my_queue:ddd:count']
['my_queue:kkk:count', 'my_queue:aaa:count', 'my_queue:ddd:count']
159
['my_queue:kkk:count', 'my_queue', 'my_queue:aaa:count']

关于为什么delete了还能取到值(

delete只是删除了redis队列的键值对,keys是已经赋过值了所以不受影响。

r.setnx(f'rtp_task:{1}:{123}:count',123)
r.setnx(f'rtp_task:{2}:{456}:count',456)
r.setnx(f'rtp_task:{3}:{789}:count',789)
r.setnx(f'rtp_task:{4}:{101}:count',101)
r.setnx(f'rtp_task:{5}:{159}:count',159)


keys = r.keys('rtp_task:*:count')
r.delete('rtp_task:1:123:count')
print(r.keys("*"))
print(keys)

['rtp_task:4:101:count', 'rtp_task:2:456:count', 'rtp_task:3:789:count', 'rtp_task:5:159:count']
['rtp_task:4:101:count', 'rtp_task:2:456:count', 'rtp_task:3:789:count', 'rtp_task:1:123:count', 'rtp_task:5:159:count']

setnx

含义(setnx = SET if Not eXists):如果不存在,则set。

r.setnx('my_queue:ddd:count',123)
print(r.get('my_queue:ddd:count'))
123
r.setnx('my_queue:ddd:count',123)
r.setnx('my_queue:ddd:count',456)
print(r.get('my_queue:ddd:count'))
123

threading

Thread

创建线程

在创建线程时,传递参数需要是一个可迭代的对象,如果只有一个参数,需要在参数后面添加逗号,以表示它是一个元组而不是一个单一的值。

如果写成 args=(a,),它会被解释为一个包含单一元素的元组,而如果你写成 args=(a),它会被解释为 args=a,这样就不再是一个元组。

a = "this is message"

def iptest(message):
    print(message)

t = threading.Thread(target=iptest, args=(a,))

start、join

a = "this is message"

def iptest(message):
    print(message)

t = threading.Thread(target=iptest, args=(a,))

t.start()

this is message

join方法的作用是确保thread子线程执行完毕后才能执行下一个线程。

没加join前:

def iptest():
    print("message1\n")
    for i in range(10):
        # time.sleep() 函数推迟调用线程的运行,可通过参数secs指秒数,表示进程挂起的时间。
        time.sleep(0.1)
    print('message2')
def main():
    add_thread = threading.Thread(target=iptest, name="T2")
    add_thread.start()
    print("done")
    
if __name__ == '__main__':
    main()


message1
done

message2

加join后

message1

message2
done

消息队列

import redis
import threading
import time
import json

# 连接到本地 Redis 服务器
r = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)

def producer(queue_name):
    # 生产者线程,模拟向队列中推送任务
    for i in range(5):
        message = {'task_id': i, 'data': f'Task {i}'}
        r.rpush(queue_name, json.dumps(message))
        time.sleep(1)

def consumer(queue_name, worker_id):
    while True:
        # 消费者线程,使用 blpop 从队列中阻塞获取任务
        message = r.blpop(queue_name, timeout=10)
        if message:
            task = json.loads(message[1])
            print(f"Worker {worker_id} processing task: {task} \n")
            # 模拟任务处理时间
            time.sleep(2)
            # 模拟任务处理完成后,更新任务计数
            r.decr(f'rtp_task:{task["task_id"]}:count')

            num_task = r.get(f'rtp_task:{task["task_id"]}:count')
            print(f'taskid{task["task_id"]},num_task{num_task}')

if __name__ == '__main__':
    # 设置初始任务计数
    for i in range(5):
        r.setnx(f'rtp_task:{i}:count', 3)

    # 创建一个生产者线程
    producer_thread = threading.Thread(target=producer, args=('product',))
    producer_thread.start()

    # 创建多个消费者线程
    num_consumers = 3
    consumer_threads = []
    for i in range(num_consumers):
        consumer_thread = threading.Thread(target=consumer, args=('product', i + 1))
        consumer_threads.append(consumer_thread)
        consumer_thread.start()

    # 等待生产者线程和消费者线程完成
    producer_thread.join()
    for consumer_thread in consumer_threads:
        consumer_thread.join()

Worker 2 processing task: {'task_id': 0, 'data': 'Task 0'} 

Worker 1 processing task: {'task_id': 1, 'data': 'Task 1'} 

Worker 3 processing task: {'task_id': 2, 'data': 'Task 2'} 

taskid0,num_task2
taskid1,num_task2
Worker 2 processing task: {'task_id': 3, 'data': 'Task 3'} 

taskid2,num_task2
Worker 1 processing task: {'task_id': 4, 'data': 'Task 4'} 

taskid3,num_task2
taskid4,num_task2

到此这篇关于Redis+threading实现多线程消息队列的使用示例的文章就介绍到这了,更多相关Redis threading多线程消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

相关文章

  • 解析Redis Cluster原理

    解析Redis Cluster原理

    redis最开始使用主从模式做集群,若master宕机需要手动配置slave转为master;后来为了高可用提出来哨兵模式,该模式下有一个哨兵监视master和slave,若master宕机可自动将slave转为master,但它也有一个问题,就是不能动态扩充;所以在3.x提出cluster集群模式
    2021-06-06
  • Redis中Bloom filter布隆过滤器的学习

    Redis中Bloom filter布隆过滤器的学习

    布隆过滤器是一个非常长的二进制向量和一系列随机哈希函数的组合,可用于检索一个元素是否存在,本文就详细的介绍一下Bloom filter布隆过滤器,具有一定的参考价值,感兴趣的可以了解一下
    2022-12-12
  • 使用Redis实现令牌桶算法原理解析

    使用Redis实现令牌桶算法原理解析

    这篇文章主要介绍了使用Redis实现令牌桶算法,该算法可以应对短暂的突发流量,这对于现实环境中流量不怎么均匀的情况特别有用,不会频繁的触发限流,对调用方比较友好,需要的朋友可以参考下
    2021-12-12
  • 从一个小需求感受Redis的独特魅力(需求设计)

    从一个小需求感受Redis的独特魅力(需求设计)

    Redis在实际应用中使用的非常广泛,本篇文章就从一个简单的需求说起,为你讲述一个需求是如何从头到尾开始做的,又是如何一步步完善的
    2019-12-12
  • Redis 的内存淘汰策略和过期删除策略的区别

    Redis 的内存淘汰策略和过期删除策略的区别

    这篇文章主要介绍了Redis 的内存淘汰策略和过期删除策略的区别,Redis 是可以对 key 设置过期时间的,因此需要有相应的机制将已过期的键值对删除,而做这个工作的就是过期键值删除策略
    2022-07-07
  • RedisTemplate访问Redis的更好方法

    RedisTemplate访问Redis的更好方法

    这篇文章主要为大家介绍了RedisTemplate访问Redis的更好方法详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • IDEA初次连接Redis配置的实现

    IDEA初次连接Redis配置的实现

    本文主要介绍了IDEA初次连接Redis配置的实现,文中通过图文步骤介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-12-12
  • k8s部署redis cluster集群的实现

    k8s部署redis cluster集群的实现

    在Kubernetes中部署Redis集群面临挑战,因为每个Redis实例都依赖于一个配置文件,该文件可以跟踪其他集群实例及其角色。需要的朋友们下面随着小编来一起学习学习吧
    2021-06-06
  • 使用redis分布式锁解决并发线程资源共享问题

    使用redis分布式锁解决并发线程资源共享问题

    这篇文章主要介绍了使用redis分布式锁解决并发线程资源共享问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-07-07
  • Redis集群详解

    Redis集群详解

    这篇文章主要介绍了Redis集群详解,需要的朋友可以参考下
    2020-07-07

最新评论