通过sse分发指令

遇到一个场景,通过浏览器客户端,来订阅服务端的指令

服务端将指令,统一下发到redis,订阅发布,通过sse推流至browser agent

发布到redis

def push_property_to_redis(property_data):
    try:
        redis_client.publish('new_properties_channel', json.dumps(property_data))
        print(f"New property published: {property_data}")
    except Exception as e:
        print(f"Error publishing to Redis: {e}")

推流至前端

@app.route('/new_properties_stream')
def new_properties_stream():
    def generate():
        pubsub = redis_client.pubsub()
        pubsub.subscribe('new_properties_channel')

        # 设置心跳间隔(秒)
        heartbeat_interval = 2
        last_heartbeat = time.time()

        while True:
            message = pubsub.get_message(timeout=1)

            # 定期发送心跳
            current_time = time.time()
            if current_time - last_heartbeat >= heartbeat_interval:
                yield f"data: {json.dumps({'heartbeat': 'keep-alive'})}\n\n"
                last_heartbeat = current_time

            if message and message['type'] == 'message':
                new_property = json.loads(message['data'])
                yield f"data: {json.dumps(new_property)}\n\n"

    response = Response(stream_with_context(generate()), content_type='text/event-stream')
    response.headers['Cache-Control'] = 'no-cache'
    response.headers['Connection'] = 'keep-alive'
    return response

前端获取到推流

<script>
    const eventSource = new EventSource('/new_properties_stream');

    eventSource.onmessage = function(event) {
        const data = JSON.parse(event.data);
        console.log("Received new property:", data);
</script>
updatedupdated2024-10-172024-10-17