import time import redis class Publisher: def __init__(self): print('redis初始化中..') if not self.redis_init(): raise Exception("Redis 初始化失败") self.rds = None def redis_init(self, max_retries=5, retry_delay=1): pool = redis.ConnectionPool(host='localhost', port=6379, db=0) retries = 0 while retries < max_retries: try: self.rds = redis.Redis(connection_pool=pool) if self.rds.ping(): print("Redis 连接成功.") return True else: print("Redis 连接失败,正在尝试重新连接...") except redis.ConnectionError as e: print(f"Connection error (retry {retries + 1} of {max_retries}): {e}") except Exception as e: print(f"An error occurred (retry {retries + 1} of {max_retries}): {e}") # 增加延迟时间,防止过于频繁的重试 time.sleep(retry_delay * (1 ** retries)) retries += 1 print("多次尝试重连失败,检查redis服务是否开启.") return False def pub_plan_result(self, result_dict): """ 发布路径规划结果 """ if self.rds is not None: self.rds.publish('plan_result', result_dict) else: self.redis_init() self.rds.publish('plan_result', result_dict) if __name__ == '__main__': Publisher()