遇到一个场景,通过浏览器客户端,来订阅服务端的指令
服务端将指令,统一下发到redis,订阅发布,通过sse推流至browser agent
发布到redis
def push_property_to_redis(property_data):
try:
redis_client.publish('new_properties_channel', json.dumps(property_data))
print(f"New property published: {property_data}")
except Exception as e:
print(f"Error publishing to Redis: {e}")
推流至前端
@app.route('/new_properties_stream')
def new_properties_stream():
def generate():
pubsub = redis_client.pubsub()
pubsub.subscribe('new_properties_channel')
# 设置心跳间隔(秒)
heartbeat_interval = 2
last_heartbeat = time.time()
while True:
message = pubsub.get_message(timeout=1)
# 定期发送心跳
current_time = time.time()
if current_time - last_heartbeat >= heartbeat_interval:
yield f"data: {json.dumps({'heartbeat': 'keep-alive'})}\n\n"
last_heartbeat = current_time
if message and message['type'] == 'message':
new_property = json.loads(message['data'])
yield f"data: {json.dumps(new_property)}\n\n"
response = Response(stream_with_context(generate()), content_type='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['Connection'] = 'keep-alive'
return response
前端获取到推流
<script>
const eventSource = new EventSource('/new_properties_stream');
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log("Received new property:", data);
</script>