redis_publisher.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. import time
  2. import redis
  3. class Publisher:
  4. def __init__(self):
  5. print('redis初始化中..')
  6. if not self.redis_init():
  7. raise Exception("Redis 初始化失败")
  8. self.rds = None
  9. def redis_init(self, max_retries=5, retry_delay=1):
  10. pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
  11. retries = 0
  12. while retries < max_retries:
  13. try:
  14. self.rds = redis.Redis(connection_pool=pool)
  15. if self.rds.ping():
  16. print("Redis 连接成功.")
  17. return True
  18. else:
  19. print("Redis 连接失败,正在尝试重新连接...")
  20. except redis.ConnectionError as e:
  21. print(f"Connection error (retry {retries + 1} of {max_retries}): {e}")
  22. except Exception as e:
  23. print(f"An error occurred (retry {retries + 1} of {max_retries}): {e}")
  24. # 增加延迟时间,防止过于频繁的重试
  25. time.sleep(retry_delay * (1 ** retries))
  26. retries += 1
  27. print("多次尝试重连失败,检查redis服务是否开启.")
  28. return False
  29. def pub_plan_result(self, result_dict):
  30. """
  31. 发布路径规划结果
  32. """
  33. if self.rds is not None:
  34. self.rds.publish('plan_result', result_dict)
  35. else:
  36. self.redis_init()
  37. self.rds.publish('plan_result', result_dict)
  38. if __name__ == '__main__':
  39. Publisher()