首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用Actix构建WebSocket客户端

使用Actix构建WebSocket客户端
EN

Stack Overflow用户
提问于 2021-11-26 02:25:54
回答 1查看 1.5K关注 0票数 1

我之前发布了一个关于如何使用Add awc websocket client to add_stream in actix的问题,该问题的重点是如何从AWC客户端向参与者添加流。我有solved that issue,但我仍然需要能够通过发送消息与服务器进行通信。

那么,让我们从一些上下文开始。这是我的演员:

代码语言:javascript
运行
复制
use actix_web_actors::ws::{Frame, ProtocolError};
use awc::BoxedSocket;
use awc::ws::Codec;
use futures::StreamExt;
use log::info;
use openssl::ssl::SslConnector;

pub struct RosClient {
    pub address: String,
    pub connection: Option<Framed<BoxedSocket, Codec>>,
    pub hb: Instant,
}

impl RosClient {
    pub fn new(address: &str) -> Self {
        Self {
            address: address.to_string(),
            connection: None,
            hb: Instant::now(),
        }
    }
}

impl Actor for RosClient {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        info!("Connecting to ROS client on {}", &self.address);
        let ssl = {
            let mut ssl = SslConnector::builder(openssl::ssl::SslMethod::tls()).unwrap();
            let _ = ssl.set_alpn_protos(b"\x08http/1.1");
            ssl.build()
        };
        let connector = awc::Connector::new().ssl(ssl).finish();
        let ws = awc::ClientBuilder::new()
            .connector(connector)
            .finish()
            .ws(&self.address)
            .set_header("Host", "0.0.0.0:9090");

        let _message = serde_json::json!({
            "op": "subscribe",
            "topic": "/client_count"
        });

        ws.connect()
            .into_actor(self)
            .map(|res, _act, ctx| match res {
                Ok((client_response, frame)) => {
                    info!("Response: {:?}", client_response);
                    let (_r, w) = frame.split();
                    let _ = ctx.add_stream(w);
                }
                Err(err) => {
                    info!("Websocket Client Actor failed to connect: {:?}", err);
                    ctx.stop();
                }
            })
            .wait(ctx);
    }

    fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
        Running::Stop
    }
}

impl StreamHandler<Result<Frame, ProtocolError>> for RosClient {
    fn handle(&mut self, item: Result<Frame, ProtocolError>, _ctx: &mut Self::Context) {
        match item.unwrap() {
            Frame::Text(text_bytes) => {
                let text = std::str::from_utf8(text_bytes.as_ref()).unwrap();
                info!("Message: {}", text);
            }
            Frame::Binary(_) => {}
            Frame::Continuation(_) => {}
            Frame::Ping(_) => {
                info!("Ping received!");
            }
            Frame::Pong(_) => {
                self.hb = Instant::now();
            }
            Frame::Close(_) => {}
        }
    }
}

如何保存对连接的引用(或句柄、副本或任何工作),以便在实现消息处理程序时,可以通过以下方式向服务器发送数据:

代码语言:javascript
运行
复制
Message::Text(message.to_string()) 
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-12-01 07:17:34

在调整了一些东西之后,我开始工作了。即使是从您之前的问题,问题是您是如何接近连接创建。一个很好的参考是在板条箱actix-web-actors中,其中的模式如下所示:

代码语言:javascript
运行
复制
pub fn start_with_addr<A, T>(
    actor: A, 
    req: &HttpRequest, 
    stream: T
) -> Result<(Addr<A>, HttpResponse), Error> 

就你而言,这就是我想出来的:

代码语言:javascript
运行
复制
use actix::io::SinkWrite;
use actix::prelude::*;
use actix_codec::Framed;
use awc::{error::WsProtocolError, ws, BoxedSocket, Client};
use futures::stream::{SplitSink, SplitStream};
use futures_util::stream::StreamExt;
use log::{error, info};
use openssl::ssl::SslConnector;

type WsFramedSink = SplitSink<Framed<BoxedSocket, ws::Codec>, ws::Message>;
type WsFramedStream = SplitStream<Framed<BoxedSocket, ws::Codec>>;
struct RosClient {
    sink: SinkWrite<ws::Message, WsFramedSink>,
}

impl RosClient {
    pub fn start(sink: WsFramedSink, stream: WsFramedStream) -> Addr<Self> {
        RosClient::create(|ctx| {
            ctx.add_stream(stream);
            RosClient {
                sink: SinkWrite::new(sink, ctx),
            }
        })
    }
}
impl Actor for RosClient {
    type Context = Context<Self>;

    fn started(&mut self, _ctx: &mut Context<Self>) {
        info!("RosClient started");
    }
}

impl actix::io::WriteHandler<WsProtocolError> for RosClient {}

#[derive(Message, Debug)]
#[rtype(result = "()")]
struct Event {
    op: String,
    topic: String,
}
impl Handler<Event> for RosClient {
    type Result = ();

    fn handle(&mut self, msg: Event, _ctx: &mut Self::Context) {
        info!("Pushing Message {:?}", msg);
        if let Some(error) = self
            .sink
            .write(ws::Message::Text(format!("{:?}", msg).into()))
        {
            error!("Error RosClient {:?}", error);
        }
    }
}

impl StreamHandler<Result<ws::Frame, WsProtocolError>> for RosClient {
    fn handle(&mut self, item: Result<ws::Frame, WsProtocolError>, _ctx: &mut Self::Context) {
        use ws::Frame;
        match item.unwrap() {
            Frame::Text(text_bytes) => {
                let text = std::str::from_utf8(text_bytes.as_ref()).unwrap();
                info!("Receiving Message: {}", text);
            }
            Frame::Binary(_) => {}
            Frame::Continuation(_) => {}
            Frame::Ping(_) => {
                info!("Ping received!");
            }
            Frame::Pong(_) => {
                //self.hb = Instant::now();
            }
            Frame::Close(_) => {}
        }
    }
}

#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    std::env::set_var("RUST_LOG", "info");
    env_logger::init();
    let _ssl = {
        let mut ssl = SslConnector::builder(openssl::ssl::SslMethod::tls()).unwrap();
        let _ = ssl.set_alpn_protos(b"\x08http/1.1");
        ssl.build()
    };
    //let connector = awc::Connector::new().ssl(ssl).finish();
    let (_, framed) = Client::default()
        .ws("http://localhost:8080")
        .connect()
        .await?;
    let (sink, stream): (WsFramedSink, WsFramedStream) = framed.split();
    let addr = RosClient::start(sink, stream);

    let _res = addr
        .send(Event {
            op: format!("subscribe"),
            topic: "/client_count".to_string(),
        })
        .await
        .unwrap();
    let _ = actix_rt::signal::ctrl_c().await?;
    Ok(())
}

我编写了一个简单的node.js服务器(没有ssl):

代码语言:javascript
运行
复制
const { WebSocketServer } = require('ws');
const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', function connection(ws) {
    ws.on('message', function message(data) {
        console.log('received: %s', data);
    });

    ws.send('something');
});

它工作得很完美:

代码语言:javascript
运行
复制
[2021-12-01T07:08:03Z INFO  actix-wc-client] RosClient started
[2021-12-01T07:08:03Z INFO  actix-wc-client] Pushing Message Event { op: "subscribe", topic: "/client_count" }
[2021-12-01T07:08:03Z INFO  actix-wc-client] Receiving Message: something

您可能需要更新一些actix-*版本。这是我的Cargo.toml文件:

代码语言:javascript
运行
复制
[package]
name = "actix-wc-client"
version = "0.1.0"
edition = "2018"

[dependencies]
awc = "3.0.0-beta.9"
openssl = { version = "0.10" }
log = { version = "0.4" }
futures = "0.3"
actix = "0.11"
actix-web = "4.0.0-beta.10"
serde = "1"
serde_json = "1"
actix-codec = "0.4"
actix-rt = "2.5"
futures-util = "0.3"
actix-http = "3.0.0-beta.11"
env_logger = "0.7"
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70118994

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档