我想发送事件之间的游戏客户端和服务器,我已经让它工作,但我不知道如何做它与贝维。
我依赖于使用tokios异步TcpStream
,因为我必须能够使用stream.into_split()
将流拆分为OwnedWriteHalf
和OwnedReadhalf
。
我的第一个想法是生成一个处理连接的线程,然后使用mpsc::channel
将接收到的事件发送到队列中。
然后,我使用app.insert_resource(Queue)
将这个队列包含到一个贝维资源中,并在游戏循环中从它中提取事件。
排队:
use tokio::sync::mpsc;
pub enum Instruction {
Push(GameEvent),
Pull(mpsc::Sender<Option<GameEvent>>),
}
#[derive(Clone, Debug)]
pub struct Queue {
sender: mpsc::Sender<Instruction>,
}
impl Queue {
pub fn init() -> Self {
let (tx, rx) = mpsc::channel(1024);
init(rx);
Self{sender: tx}
}
pub async fn send(&self, event: GameEvent) {
self.sender.send(Instruction::Push(event)).await.unwrap();
}
pub async fn pull(&self) -> Option<GameEvent> {
println!("new pull");
let (tx, mut rx) = mpsc::channel(1);
self.sender.send(Instruction::Pull(tx)).await.unwrap();
rx.recv().await.unwrap()
}
}
fn init(mut rx: mpsc::Receiver<Instruction>) {
tokio::spawn(async move {
let mut queue: Vec<GameEvent> = Vec::new();
loop {
match rx.recv().await.unwrap() {
Instruction::Push(ev) => {
queue.push(ev);
}
Instruction::Pull(sender) => {
sender.send(queue.pop()).await.unwrap();
}
}
}
});
}
但是由于所有这些都必须是异步的,所以我在同步游戏循环中阻塞了pull()
函数。我使用futures-lite
机箱来完成这个任务:
fn event_pull(
communication: Res<Communication>
) {
let ev = future::block_on(communication.event_queue.pull());
println!("got event: {:?}", ev);
}
这很好,但是大约5秒后整个程序就停止了,没有收到任何更多的事件。
看起来,future::block_on()
确实会无限期地阻塞。
让主函数(在其中构建和运行bevy::prelude::App
)成为异步tokio::main
函数可能也是一个问题。
最好将异步TcpStream
初始化和tokio::sync::mpsc::Sender
以及Queue.pull
封装到同步函数中,但我不知道如何做到这一点。
有人能帮忙吗?
如何繁殖
回购可以找到这里
只需编译server
和client
,然后按相同的顺序运行。
发布于 2022-03-27 07:13:38
我用crossbeam::channel
替换了每个crossbeam::channel
,这可能是个问题,因为它确实会阻塞。
并手动初始化tokio运行时。
所以init代码如下所示:
pub struct Communicator {
pub event_bridge: bridge::Bridge,
pub event_queue: event_queue::Queue,
_runtime: Runtime,
}
impl Communicator {
pub fn init(ip: &str) -> Self {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_io()
.build()
.unwrap();
let (bridge, queue, game_rx) = rt.block_on(async move {
let socket = TcpStream::connect(ip).await.unwrap();
let (read, write) = socket.into_split();
let reader = TcpReader::new(read);
let writer = TcpWriter::new(write);
let (bridge, tcp_rx, game_rx) = bridge::Bridge::init();
reader::init(bridge.clone(), reader);
writer::init(tcp_rx, writer);
let event_queue = event_queue::Queue::init();
return (bridge, event_queue, game_rx);
});
// game of game_rx events to queue for game loop
let eq_clone = queue.clone();
rt.spawn(async move {
loop {
let event = game_rx.recv().unwrap();
eq_clone.send(event);
}
});
Self {
event_bridge: bridge,
event_queue: queue,
_runtime: rt,
}
}
}
main.rs
看起来是这样的:
fn main() {
let communicator = communication::Communicator::init("0.0.0.0:8000");
communicator.event_bridge.push_tcp(TcpEvent::Register{name: String::from("luca")});
App::new()
.insert_resource(communicator)
.add_system(event_pull)
.add_plugins(DefaultPlugins)
.run();
}
fn event_pull(
communication: Res<communication::Communicator>
) {
let ev = communication.event_queue.pull();
if let Some(ev) = ev {
println!("ev");
}
}
也许会有更好的解决办法。
https://stackoverflow.com/questions/71636383
复制