From 7d3d71cb540c6a9554befae7836057c79d8826fd Mon Sep 17 00:00:00 2001 From: aerosouund Date: Mon, 22 Dec 2025 21:43:54 +0000 Subject: [PATCH 1/2] add seqpacket support Signed-off-by: aerosouund --- Cargo.lock | 126 ++++++++++- Cargo.toml | 1 + src/vmm/Cargo.toml | 2 + src/vmm/src/device_manager/pci_mngr.rs | 3 +- src/vmm/src/devices/virtio/persist.rs | 2 +- .../devices/virtio/vsock/csm/connection.rs | 2 +- src/vmm/src/devices/virtio/vsock/persist.rs | 5 + src/vmm/src/devices/virtio/vsock/unix/mod.rs | 90 +++++++- .../src/devices/virtio/vsock/unix/muxer.rs | 209 ++++++++++++------ .../devices/virtio/vsock/unix/muxer_killq.rs | 7 +- .../devices/virtio/vsock/unix/muxer_rxq.rs | 7 +- .../devices/virtio/vsock/unix/seqpacket.rs | 155 +++++++++++++ src/vmm/src/vmm_config/vsock.rs | 30 ++- tests/framework/utils_vsock.py | 11 +- .../functional/test_vsock.py | 41 +++- 15 files changed, 590 insertions(+), 101 deletions(-) create mode 100644 src/vmm/src/devices/virtio/vsock/unix/seqpacket.rs diff --git a/Cargo.lock b/Cargo.lock index 108d1ce8785..7f63ba2b6b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -72,7 +72,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -83,7 +83,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -519,7 +519,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -802,6 +802,12 @@ dependencies = [ "vm-memory", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" + [[package]] name = "linux-raw-sys" version = "0.11.0" @@ -853,7 +859,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad38eb12aea514a0466ea40a80fd8cc83637065948eb4a426e4aa46261175227" dependencies = [ - "rustix", + "rustix 1.1.2", ] [[package]] @@ -1111,6 +1117,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.38.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +dependencies = [ + "bitflags 2.10.0", + "errno", + "libc", + "linux-raw-sys 0.4.15", + "windows-sys 0.59.0", +] + [[package]] name = "rustix" version = "1.1.2" @@ -1120,8 +1139,8 @@ dependencies = [ "bitflags 2.10.0", "errno", "libc", - "linux-raw-sys", - "windows-sys", + "linux-raw-sys 0.11.0", + "windows-sys 0.61.2", ] [[package]] @@ -1321,6 +1340,15 @@ dependencies = [ "syn", ] +[[package]] +name = "timerfd" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84e482e368cf7efa2c8b570f476e5b9fd9fd5e9b9219fc567832b05f13511091" +dependencies = [ + "rustix 0.38.44", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -1370,6 +1398,15 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df8b2b54733674ad286d16267dcfc7a71ed5c776e4ac7aa3c3e2561f7c637bf2" +[[package]] +name = "uds" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "885c31f06fce836457fe3ef09a59f83fe8db95d270b11cd78f40a4666c4d1661" +dependencies = [ + "libc", +] + [[package]] name = "unarray" version = "0.1.4" @@ -1548,6 +1585,8 @@ dependencies = [ "serde_json", "slab", "thiserror 2.0.17", + "timerfd", + "uds", "userfaultfd", "utils", "uuid", @@ -1658,7 +1697,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -1673,6 +1712,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -1682,6 +1730,70 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + [[package]] name = "winnow" version = "0.7.14" diff --git a/Cargo.toml b/Cargo.toml index a1c9ad79621..94755ae176a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ or_fun_call = "warn" [profile.dev] panic = "abort" +incremental = true [profile.release] panic = "abort" diff --git a/src/vmm/Cargo.toml b/src/vmm/Cargo.toml index b591cd29be4..a43bf9f6ff3 100644 --- a/src/vmm/Cargo.toml +++ b/src/vmm/Cargo.toml @@ -46,6 +46,8 @@ serde = { version = "1.0.228", features = ["derive", "rc"] } serde_json = "1.0.145" slab = "0.4.11" thiserror = "2.0.17" +timerfd = "1.5.0" +uds = "0.4.2" userfaultfd = "0.9.0" utils = { path = "../utils" } uuid = "1.19.0" diff --git a/src/vmm/src/device_manager/pci_mngr.rs b/src/vmm/src/device_manager/pci_mngr.rs index d465482d8c1..2b86ed5dd98 100644 --- a/src/vmm/src/device_manager/pci_mngr.rs +++ b/src/vmm/src/device_manager/pci_mngr.rs @@ -665,7 +665,7 @@ mod tests { use crate::vmm_config::memory_hotplug::MemoryHotplugConfig; use crate::vmm_config::net::NetworkInterfaceConfig; use crate::vmm_config::pmem::PmemConfig; - use crate::vmm_config::vsock::VsockDeviceConfig; + use crate::vmm_config::vsock::{VsockDeviceConfig, VsockType}; #[test] fn test_device_manager_persistence() { @@ -723,6 +723,7 @@ mod tests { vsock_id: Some(vsock_dev_id.to_string()), guest_cid: 3, uds_path: tmp_sock_file.as_path().to_str().unwrap().to_string(), + vsock_type: VsockType::Stream, }; insert_vsock_device(&mut vmm, &mut cmdline, &mut event_manager, vsock_config); // Add an entropy device. diff --git a/src/vmm/src/devices/virtio/persist.rs b/src/vmm/src/devices/virtio/persist.rs index 85c4940f305..dd6046b41d1 100644 --- a/src/vmm/src/devices/virtio/persist.rs +++ b/src/vmm/src/devices/virtio/persist.rs @@ -486,7 +486,7 @@ mod tests { // Remove the file so the path can be used by the socket. temp_uds_path.remove().unwrap(); let uds_path = String::from(temp_uds_path.as_path().to_str().unwrap()); - let backend = VsockUnixBackend::new(guest_cid, uds_path).unwrap(); + let backend = VsockUnixBackend::new(guest_cid, uds_path, VsockType::Stream).unwrap(); let vsock = Vsock::new(guest_cid, backend).unwrap(); let vsock = Arc::new(Mutex::new(vsock)); let mmio_transport = diff --git a/src/vmm/src/devices/virtio/vsock/csm/connection.rs b/src/vmm/src/devices/virtio/vsock/csm/connection.rs index a5a2f4aec5b..0a3f4a08ae5 100644 --- a/src/vmm/src/devices/virtio/vsock/csm/connection.rs +++ b/src/vmm/src/devices/virtio/vsock/csm/connection.rs @@ -101,7 +101,7 @@ use crate::utils::wrap_usize_to_u32; /// Used as an alias for `ReadVolatile + Write + WriteVolatile + AsRawFd` /// (sadly, trait aliases are not supported, /// ). -pub trait VsockConnectionBackend: ReadVolatile + Write + WriteVolatile + AsRawFd {} +pub trait VsockConnectionBackend: ReadVolatile + Write + WriteVolatile + AsRawFd + Debug {} /// A self-managing connection object, that handles communication between a guest-side AF_VSOCK /// socket and a host-side `ReadVolatile + Write + WriteVolatile + AsRawFd` stream. diff --git a/src/vmm/src/devices/virtio/vsock/persist.rs b/src/vmm/src/devices/virtio/vsock/persist.rs index acf330a3e71..eb8b554d3c2 100644 --- a/src/vmm/src/devices/virtio/vsock/persist.rs +++ b/src/vmm/src/devices/virtio/vsock/persist.rs @@ -15,6 +15,7 @@ use crate::devices::virtio::persist::VirtioDeviceState; use crate::devices::virtio::queue::FIRECRACKER_MAX_QUEUE_SIZE; use crate::devices::virtio::transport::VirtioInterrupt; use crate::snapshot::Persist; +use crate::vmm_config::vsock::VsockType; use crate::vstate::memory::GuestMemoryMmap; /// The Vsock serializable state. @@ -46,6 +47,7 @@ pub enum VsockBackendState { pub struct VsockUdsState { /// The path for the UDS socket. pub(crate) path: String, + pub(crate) vsock_type: VsockType, } /// A helper structure that holds the constructor arguments for VsockUnixBackend @@ -72,6 +74,7 @@ impl Persist<'_> for VsockUnixBackend { fn save(&self) -> Self::State { VsockBackendState::Uds(VsockUdsState { path: self.host_sock_path.clone(), + vsock_type: self.vsock_type.clone(), }) } @@ -83,6 +86,7 @@ impl Persist<'_> for VsockUnixBackend { VsockBackendState::Uds(uds_state) => Ok(VsockUnixBackend::new( constructor_args.cid, uds_state.path.clone(), + uds_state.vsock_type.clone(), )?), } } @@ -145,6 +149,7 @@ pub(crate) mod tests { fn save(&self) -> Self::State { VsockBackendState::Uds(VsockUdsState { path: "test".to_owned(), + vsock_type: VsockType::Stream, }) } diff --git a/src/vmm/src/devices/virtio/vsock/unix/mod.rs b/src/vmm/src/devices/virtio/vsock/unix/mod.rs index 25fef274fc6..d1cf02b8fe2 100644 --- a/src/vmm/src/devices/virtio/vsock/unix/mod.rs +++ b/src/vmm/src/devices/virtio/vsock/unix/mod.rs @@ -10,10 +10,22 @@ mod muxer; mod muxer_killq; mod muxer_rxq; - +mod seqpacket; +use crate::devices::VsockError; +use crate::devices::virtio::vsock::packet::VsockPacketTx; +use crate::devices::virtio::vsock::{ + VsockChannel as _, VsockEpollListener, + csm::{ConnState, VsockConnectionBackend, VsockCsmError}, + packet::VsockPacketRx, + unix::seqpacket::SeqPacketConn, +}; pub use muxer::VsockMuxer as VsockUnixBackend; - -use crate::devices::virtio::vsock::csm::VsockConnectionBackend; +use std::{ + os::{fd::AsRawFd as _, unix::net::UnixStream}, + time::Instant, +}; +use vm_memory::io::{ReadVolatile, WriteVolatile}; +use vmm_sys_util::epoll::EventSet; mod defs { /// Maximum number of established connections that we can handle. @@ -47,6 +59,74 @@ pub enum VsockUnixBackendError { TooManyConnections, } -type MuxerConnection = super::csm::VsockConnection; +type MuxerStreamConnection = super::csm::VsockConnection; +type MuxerSeqpacketConnetion = super::csm::VsockConnection; + +#[derive(Debug)] +enum MuxerConn { + Stream(MuxerStreamConnection), + Seqpacket(MuxerSeqpacketConnetion), +} + +macro_rules! forward_to_inner { + ($self:ident, $method:ident $(, $args:expr )* ) => { + match $self { + MuxerConn::Stream(inner) => inner.$method($($args),*), + MuxerConn::Seqpacket(inner) => inner.$method($($args),*), + } + }; +} + +impl MuxerConn { + fn has_pending_rx(&self) -> bool { + forward_to_inner!(self, has_pending_rx) + } + + fn as_raw_fd(&self) -> i32 { + forward_to_inner!(self, as_raw_fd) + } + + fn kill(&mut self) { + forward_to_inner!(self, kill) + } + + fn get_polled_evset(&self) -> EventSet { + forward_to_inner!(self, get_polled_evset) + } + + fn will_expire(&self) -> bool { + forward_to_inner!(self, will_expire) + } + + fn has_expired(&self) -> bool { + forward_to_inner!(self, has_expired) + } + + fn send_bytes_raw(&mut self, buf: &[u8]) -> Result { + forward_to_inner!(self, send_bytes_raw, buf) + } + + fn state(&self) -> ConnState { + forward_to_inner!(self, state) + } + + fn expiry(&self) -> Option { + forward_to_inner!(self, expiry) + } + + fn recv_pkt(&mut self, pkt: &mut VsockPacketRx) -> Result<(), VsockError> { + forward_to_inner!(self, recv_pkt, pkt) + } + + fn send_pkt(&mut self, pkt: &VsockPacketTx) -> Result<(), VsockError> { + forward_to_inner!(self, send_pkt, pkt) + } + + fn notify(&mut self, evset: EventSet) { + forward_to_inner!(self, notify, evset) + } +} + +impl VsockConnectionBackend for UnixStream {} -impl VsockConnectionBackend for std::os::unix::net::UnixStream {} +impl VsockConnectionBackend for SeqPacketConn {} diff --git a/src/vmm/src/devices/virtio/vsock/unix/muxer.rs b/src/vmm/src/devices/virtio/vsock/unix/muxer.rs index ad979b4bdeb..b3ef081d5c9 100644 --- a/src/vmm/src/devices/virtio/vsock/unix/muxer.rs +++ b/src/vmm/src/devices/virtio/vsock/unix/muxer.rs @@ -33,10 +33,12 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::io::Read; -use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::fd::FromRawFd; +use std::os::unix::io::{AsRawFd, IntoRawFd, OwnedFd, RawFd}; use std::os::unix::net::{UnixListener, UnixStream}; use log::{debug, error, info, warn}; +use uds::UnixSeqpacketConn; use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet}; use super::super::csm::ConnState; @@ -44,10 +46,14 @@ use super::super::defs::uapi; use super::super::{VsockBackend, VsockChannel, VsockEpollListener, VsockError}; use super::muxer_killq::MuxerKillQ; use super::muxer_rxq::MuxerRxQ; -use super::{MuxerConnection, VsockUnixBackendError, defs}; +use super::{MuxerStreamConnection, VsockUnixBackendError, defs}; +use crate::devices::virtio::vsock::csm::{VsockConnection, VsockConnectionBackend}; use crate::devices::virtio::vsock::metrics::METRICS; use crate::devices::virtio::vsock::packet::{VsockPacketRx, VsockPacketTx}; +use crate::devices::virtio::vsock::unix::MuxerConn; +use crate::devices::virtio::vsock::unix::seqpacket::{SeqPacketConn, SeqPacketListener, Socket}; use crate::logger::IncMetric; +use crate::vmm_config::vsock::VsockType; /// A unique identifier of a `MuxerConnection` object. Connections are stored in a hash map, /// keyed by a `ConnMapKey` object. @@ -77,7 +83,7 @@ enum EpollListener { HostSock, /// A listener interested in reading host `connect ` commands from a freshly /// connected host socket. - LocalStream(UnixStream), + LocalStream(RawFd), } /// The vsock connection multiplexer. @@ -86,7 +92,9 @@ pub struct VsockMuxer { /// Guest CID. cid: u64, /// A hash map used to store the active connections. - conn_map: HashMap, + conn_map: HashMap, + /// the underlying host socket file descriptor type wrapper + host_sock: Box, /// A hash map used to store epoll event listeners / handlers. listener_map: HashMap, /// The RX queue. Items in this queue are consumed by `VsockMuxer::recv_pkt()`, and @@ -96,8 +104,6 @@ pub struct VsockMuxer { rxq: MuxerRxQ, /// A queue used for terminating connections that are taking too long to shut down. killq: MuxerKillQ, - /// The Unix socket, through which host-initiated connections are accepted. - host_sock: UnixListener, /// The file system path of the host-side Unix socket. This is used to figure out the path /// to Unix sockets listening on specific ports. I.e. `"_"`. pub(crate) host_sock_path: String, @@ -108,8 +114,12 @@ pub struct VsockMuxer { local_port_set: HashSet, /// The last used host-side port. local_port_last: u32, + /// the type of the socket (stream or seqpacket) + pub(crate) vsock_type: VsockType, } +unsafe impl Send for VsockMuxer {} + impl VsockChannel for VsockMuxer { /// Deliver a vsock packet to the guest vsock driver. /// @@ -191,6 +201,7 @@ impl VsockChannel for VsockMuxer { /// always `Ok(())` - the packet has been consumed, and its virtio TX buffers can be /// returned to the guest vsock driver. fn send_pkt(&mut self, pkt: &VsockPacketTx) -> Result<(), VsockError> { + // when we get here the packet is rst let conn_key = ConnMapKey { local_port: pkt.hdr.dst_port(), peer_port: pkt.hdr.src_port(), @@ -303,12 +314,27 @@ impl VsockBackend for VsockMuxer {} impl VsockMuxer { /// Muxer constructor. - pub fn new(cid: u64, host_sock_path: String) -> Result { + pub fn new( + cid: u64, + host_sock_path: String, + vsock_type: VsockType, + ) -> Result { // Open/bind on the host Unix socket, so we can accept host-initiated // connections. - let host_sock = UnixListener::bind(&host_sock_path) - .and_then(|sock| sock.set_nonblocking(true).map(|_| sock)) - .map_err(VsockUnixBackendError::UnixBind)?; + let host_sock: Box = match vsock_type { + VsockType::SeqPacket => { + let sock = uds::UnixSeqpacketListener::bind(&host_sock_path) + .and_then(|sock| sock.set_nonblocking(true).map(|_| sock)) + .map_err(VsockUnixBackendError::UnixBind)?; + Box::new(SeqPacketListener::new(sock)) + } + VsockType::Stream => { + let sock = UnixListener::bind(&host_sock_path) + .and_then(|sock| sock.set_nonblocking(true).map(|_| sock)) + .map_err(VsockUnixBackendError::UnixBind)?; + Box::new(sock) + } + }; let mut muxer = Self { cid, @@ -321,6 +347,7 @@ impl VsockMuxer { killq: MuxerKillQ::new(), local_port_last: (1u32 << 30) - 1, local_port_set: HashSet::with_capacity(defs::MAX_CONNECTIONS), + vsock_type, }; // Listen on the host initiated socket, for incoming connections. @@ -363,21 +390,18 @@ impl VsockMuxer { self.host_sock.accept().map(|_| 0).unwrap_or(0); return; } - self.host_sock + self.host_sock // calling accept on this file descriptor errors? .accept() .map_err(VsockUnixBackendError::UnixAccept) - .and_then(|(stream, _)| { - stream - .set_nonblocking(true) - .map(|_| stream) - .map_err(VsockUnixBackendError::UnixAccept) - }) .and_then(|stream| { // Before forwarding this connection to a listening AF_VSOCK socket on // the guest side, we need to know the destination port. We'll read // that port from a "connect" command received on this socket, so the // next step is to ask to be notified the moment we can read from it. - self.add_listener(stream.as_raw_fd(), EpollListener::LocalStream(stream)) + self.add_listener( + stream.as_raw_fd(), + EpollListener::LocalStream(stream.as_raw_fd()), + ) }) .unwrap_or_else(|err| { warn!("vsock: unable to accept local connection: {:?}", err); @@ -387,27 +411,59 @@ impl VsockMuxer { // Data is ready to be read from a host-initiated connection. That would be the // "connect" command that we're expecting. Some(EpollListener::LocalStream(_)) => { - if let Some(EpollListener::LocalStream(mut stream)) = self.remove_listener(fd) { - Self::read_local_stream_port(&mut stream) - .map(|peer_port| (self.allocate_local_port(), peer_port)) - .and_then(|(local_port, peer_port)| { - self.add_connection( - ConnMapKey { - local_port, - peer_port, - }, - MuxerConnection::new_local_init( - stream, - uapi::VSOCK_HOST_CID, - self.cid, - local_port, - peer_port, - ), - ) - }) - .unwrap_or_else(|err| { - info!("vsock: error adding local-init connection: {:?}", err); - }) + if let Some(EpollListener::LocalStream(fd)) = self.remove_listener(fd) { + match self.vsock_type { + VsockType::Stream => { + let mut stream = unsafe { UnixStream::from_raw_fd(fd) }; + Self::read_local_stream_port(&mut stream) + .map(|peer_port| (self.allocate_local_port(), peer_port)) + .and_then(|(local_port, peer_port)| { + self.add_connection( + ConnMapKey { + local_port, + peer_port, + }, + MuxerConn::Stream( + VsockConnection::::new_local_init( + stream, + uapi::VSOCK_HOST_CID, + self.cid, + local_port, + peer_port, + ), + ), + ) + }) + .unwrap_or_else(|err| { + info!("vsock: error adding local-init connection: {:?}", err); + }) + } + VsockType::SeqPacket => { + let mut stream = SeqPacketConn::new(fd); + Self::read_local_stream_port(&mut stream) + .map(|peer_port| (self.allocate_local_port(), peer_port)) + .and_then(|(local_port, peer_port)| { + self.add_connection( + ConnMapKey { + local_port, + peer_port, + }, + MuxerConn::Seqpacket( + VsockConnection::::new_local_init( + stream, + uapi::VSOCK_HOST_CID, + self.cid, + local_port, + peer_port, + ), + ), + ) + }) + .unwrap_or_else(|err| { + info!("vsock: error adding local-init connection: {:?}", err); + }) + } + }; } } @@ -422,7 +478,7 @@ impl VsockMuxer { } /// Parse a host "connect" command, and extract the destination vsock port. - fn read_local_stream_port(stream: &mut UnixStream) -> Result { + fn read_local_stream_port(stream: &mut dyn Read) -> Result { let mut buf = [0u8; 32]; // This is the minimum number of bytes that we should be able to read, when parsing a @@ -475,7 +531,7 @@ impl VsockMuxer { fn add_connection( &mut self, key: ConnMapKey, - conn: MuxerConnection, + conn: MuxerConn, ) -> Result<(), VsockUnixBackendError> { // We might need to make room for this new connection, so let's sweep the kill queue // first. It's fine to do this here because: @@ -612,27 +668,52 @@ impl VsockMuxer { /// RST packet will be scheduled for delivery to the guest. fn handle_peer_request_pkt(&mut self, pkt: &VsockPacketTx) { let port_path = format!("{}_{}", self.host_sock_path, pkt.hdr.dst_port()); - - UnixStream::connect(port_path) - .and_then(|stream| stream.set_nonblocking(true).map(|_| stream)) - .map_err(VsockUnixBackendError::UnixConnect) - .and_then(|stream| { - self.add_connection( - ConnMapKey { - local_port: pkt.hdr.dst_port(), - peer_port: pkt.hdr.src_port(), - }, - MuxerConnection::new_peer_init( - stream, - uapi::VSOCK_HOST_CID, - self.cid, - pkt.hdr.dst_port(), - pkt.hdr.src_port(), - pkt.hdr.buf_alloc(), - ), - ) - }) - .unwrap_or_else(|_| self.enq_rst(pkt.hdr.dst_port(), pkt.hdr.src_port())); + match self.vsock_type { + VsockType::Stream => { + UnixStream::connect(port_path) + .and_then(|stream| stream.set_nonblocking(true).map(|_| stream)) + .map_err(VsockUnixBackendError::UnixConnect) + .and_then(|stream| { + self.add_connection( + ConnMapKey { + local_port: pkt.hdr.dst_port(), + peer_port: pkt.hdr.src_port(), + }, + MuxerConn::Stream(VsockConnection::::new_peer_init( + stream, + uapi::VSOCK_HOST_CID, + self.cid, + pkt.hdr.dst_port(), + pkt.hdr.src_port(), + pkt.hdr.buf_alloc(), + )), + ) + }) + .unwrap_or_else(|_| self.enq_rst(pkt.hdr.dst_port(), pkt.hdr.src_port())); + } + VsockType::SeqPacket => { + UnixSeqpacketConn::connect(port_path) + .and_then(|stream| stream.set_nonblocking(true).map(|_| stream)) + .map_err(VsockUnixBackendError::UnixConnect) + .and_then(|stream| { + self.add_connection( + ConnMapKey { + local_port: pkt.hdr.dst_port(), + peer_port: pkt.hdr.src_port(), + }, + MuxerConn::Seqpacket(VsockConnection::::new_peer_init( + SeqPacketConn::new(stream.into_raw_fd()), + uapi::VSOCK_HOST_CID, + self.cid, + pkt.hdr.dst_port(), + pkt.hdr.src_port(), + pkt.hdr.buf_alloc(), + )), + ) + }) + .unwrap_or_else(|_| self.enq_rst(pkt.hdr.dst_port(), pkt.hdr.src_port())); + } + } } /// Perform an action that might mutate a connection's state. @@ -644,7 +725,7 @@ impl VsockMuxer { /// - kill the connection if an unrecoverable error occurs. fn apply_conn_mutation(&mut self, key: ConnMapKey, mut_fn: F) where - F: FnOnce(&mut MuxerConnection), + F: FnOnce(&mut MuxerConn), { if let Some(conn) = self.conn_map.get_mut(&key) { let had_rx = conn.has_pending_rx(); @@ -849,7 +930,7 @@ mod tests { ) .unwrap(); - let muxer = VsockMuxer::new(PEER_CID, get_file(name)).unwrap(); + let muxer = VsockMuxer::new(PEER_CID, get_file(name), VsockType::Stream).unwrap(); Self { _vsock_test_ctx: vsock_test_ctx, rx_pkt, diff --git a/src/vmm/src/devices/virtio/vsock/unix/muxer_killq.rs b/src/vmm/src/devices/virtio/vsock/unix/muxer_killq.rs index 17cc193d120..a028f79b71d 100644 --- a/src/vmm/src/devices/virtio/vsock/unix/muxer_killq.rs +++ b/src/vmm/src/devices/virtio/vsock/unix/muxer_killq.rs @@ -27,8 +27,11 @@ use std::collections::{HashMap, VecDeque}; use std::time::Instant; +use crate::devices::virtio::vsock::csm::{VsockConnection, VsockConnectionBackend}; +use crate::devices::virtio::vsock::unix::MuxerConn; + use super::muxer::ConnMapKey; -use super::{MuxerConnection, defs}; +use super::{MuxerStreamConnection, defs}; /// A kill queue item, holding the connection key and the scheduled time for termination. #[derive(Debug, Clone, Copy)] @@ -66,7 +69,7 @@ impl MuxerKillQ { /// set to expire at some point in the future. /// Note: if more than `Self::SIZE` connections are found, the queue will be created in an /// out-of-sync state, and will be discarded after it is emptied. - pub fn from_conn_map(conn_map: &HashMap) -> Self { + pub fn from_conn_map(conn_map: &HashMap) -> Self { let mut q_buf: Vec = Vec::with_capacity(Self::SIZE); let mut synced = true; for (key, conn) in conn_map.iter() { diff --git a/src/vmm/src/devices/virtio/vsock/unix/muxer_rxq.rs b/src/vmm/src/devices/virtio/vsock/unix/muxer_rxq.rs index 1b888dfa453..f25fef21f02 100644 --- a/src/vmm/src/devices/virtio/vsock/unix/muxer_rxq.rs +++ b/src/vmm/src/devices/virtio/vsock/unix/muxer_rxq.rs @@ -17,9 +17,12 @@ /// out-of-sync is drained, the muxer will discard it, and attempt to rebuild a synced one. use std::collections::{HashMap, VecDeque}; +use crate::devices::virtio::vsock::csm::{VsockConnection, VsockConnectionBackend}; +use crate::devices::virtio::vsock::unix::MuxerConn; + use super::super::VsockChannel; +use super::defs; use super::muxer::{ConnMapKey, MuxerRx}; -use super::{MuxerConnection, defs}; /// The muxer RX queue. #[derive(Debug)] @@ -45,7 +48,7 @@ impl MuxerRxQ { /// Note: the resulting queue may still be desynchronized, if there are too many connections /// that have pending RX data. In that case, the muxer will first drain this queue, and /// then try again to build a synchronized one. - pub fn from_conn_map(conn_map: &HashMap) -> Self { + pub fn from_conn_map(conn_map: &HashMap) -> Self { let mut q = VecDeque::new(); let mut synced = true; diff --git a/src/vmm/src/devices/virtio/vsock/unix/seqpacket.rs b/src/vmm/src/devices/virtio/vsock/unix/seqpacket.rs new file mode 100644 index 00000000000..e25a5e4da0d --- /dev/null +++ b/src/vmm/src/devices/virtio/vsock/unix/seqpacket.rs @@ -0,0 +1,155 @@ +use libc::{EINVAL, F_DUPFD_CLOEXEC, MSG_NOSIGNAL, dup, fcntl}; +use libc::{FIOCLEX, FIONCLEX}; +use libc::{ + MSG_EOR, MSG_PEEK, SOCK_CLOEXEC, SOCK_NONBLOCK, SOCK_SEQPACKET, c_void, close, recv, send, +}; +use libc::{SO_ERROR, SOL_SOCKET, getsockopt}; +use std::io; +use std::io::ErrorKind; +use std::io::{Read, Write}; +use std::os::fd::RawFd; +use std::os::fd::{AsRawFd, IntoRawFd}; +use std::os::unix::net::UnixListener; +use uds::{UnixSeqpacketConn, UnixSeqpacketListener}; +use vm_memory::{ReadVolatile, VolatileMemoryError, WriteVolatile}; + +use crate::devices::virtio::vsock::csm::VsockConnectionBackend; + +#[derive(Debug)] +pub struct SeqPacketConn(std::os::fd::RawFd); + +impl SeqPacketConn { + pub fn new(fd: RawFd) -> Self { + SeqPacketConn { 0: fd } + } +} + +/// Get errno as io::Error on -1 and retry on EINTR. +macro_rules! cvt_r { + ($syscall:expr) => { + loop { + let result = $syscall; + if result != -1 { + break Ok(result); + } + let err = io::Error::last_os_error(); + if err.kind() != ErrorKind::Interrupted { + break Err(err); + } + } + }; +} + +impl AsRawFd for SeqPacketConn { + fn as_raw_fd(&self) -> i32 { + self.0.as_raw_fd() + } +} + +impl Read for SeqPacketConn { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let ptr = buf.as_ptr() as *mut c_void; + let received = cvt_r!(unsafe { recv(self.0.as_raw_fd(), ptr, buf.len(), MSG_NOSIGNAL) })?; + Ok(received as usize) + } +} + +impl Write for SeqPacketConn { + fn write(&mut self, buf: &[u8]) -> io::Result { + let ptr = buf.as_ptr() as *const c_void; + let flags = MSG_NOSIGNAL | MSG_EOR; + let sent = cvt_r!(unsafe { send(self.0.as_raw_fd(), ptr, buf.len(), flags) })?; + Ok(sent as usize) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl ReadVolatile for SeqPacketConn { + fn read_volatile( + &mut self, + buf: &mut vm_memory::VolatileSlice, + ) -> Result { + let fd = self.0.as_raw_fd(); + let guard = buf.ptr_guard_mut(); + + let dst = guard.as_ptr().cast::(); + + // SAFETY: Rust's I/O safety invariants ensure that BorrowedFd contains a valid file descriptor`. + // The memory pointed to by `dst` is valid for writes of length `buf.len() by the invariants + // upheld by the constructor of `VolatileSlice`. + let bytes_read = unsafe { libc::read(fd, dst, buf.len()) }; + + if bytes_read < 0 { + // We don't know if a partial read might have happened, so mark everything as dirty + buf.bitmap().mark_dirty(0, buf.len()); + + Err(VolatileMemoryError::IOError(std::io::Error::last_os_error())) + } else { + let bytes_read = bytes_read.try_into().unwrap(); + buf.bitmap().mark_dirty(0, bytes_read); + Ok(bytes_read) + } + } + // copy the code from the vm memory crate +} + +impl WriteVolatile for SeqPacketConn { + fn write_volatile( + &mut self, + buf: &vm_memory::VolatileSlice, + ) -> Result { + let fd = self.0.as_raw_fd(); + let guard = buf.ptr_guard(); + + let src = guard.as_ptr().cast::(); + + // SAFETY: Rust's I/O safety invariants ensure that BorrowedFd contains a valid file descriptor`. + // The memory pointed to by `src` is valid for reads of length `buf.len() by the invariants + // upheld by the constructor of `VolatileSlice`. + let bytes_written = unsafe { libc::write(fd, src, buf.len()) }; + + if bytes_written < 0 { + Err(VolatileMemoryError::IOError(std::io::Error::last_os_error())) + } else { + Ok(bytes_written.try_into().unwrap()) + } + } +} + +#[derive(Debug)] +pub struct SeqPacketListener(uds::UnixSeqpacketListener); + +impl SeqPacketListener { + pub fn new(uds_listener: uds::UnixSeqpacketListener) -> Self { + SeqPacketListener { 0: uds_listener } + } +} + +impl AsRawFd for SeqPacketListener { + fn as_raw_fd(&self) -> i32 { + self.0.as_raw_fd() + } +} + +pub trait Socket: AsRawFd + std::fmt::Debug { + fn accept(&self) -> Result; +} + +impl Socket for SeqPacketListener { + fn accept(&self) -> Result { + let (sock, _) = self.0.accept_unix_addr()?; + sock.set_nonblocking(true); + Ok(sock.into_raw_fd()) + } +} + +impl Socket for UnixListener { + fn accept(&self) -> Result { + let (conn, _) = self.accept()?; + conn.set_nonblocking(true); + Ok(conn.into_raw_fd()) + } +} diff --git a/src/vmm/src/vmm_config/vsock.rs b/src/vmm/src/vmm_config/vsock.rs index 920e4a4d217..4b6d684cd71 100644 --- a/src/vmm/src/vmm_config/vsock.rs +++ b/src/vmm/src/vmm_config/vsock.rs @@ -19,6 +19,21 @@ pub enum VsockConfigError { CreateVsockDevice(VsockError), } +/// from vsock related requests. +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub enum VsockType { + /// a stream type socket + Stream, + /// a seqpacket type socket + SeqPacket, +} + +impl Default for VsockType { + fn default() -> Self { + VsockType::Stream + } +} + /// This struct represents the strongly typed equivalent of the json body /// from vsock related requests. #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] @@ -32,6 +47,8 @@ pub struct VsockDeviceConfig { pub guest_cid: u32, /// Path to local unix socket. pub uds_path: String, + /// the type of the underlying socket + pub vsock_type: VsockType, } #[derive(Debug)] @@ -47,6 +64,7 @@ impl From<&VsockAndUnixPath> for VsockDeviceConfig { vsock_id: None, guest_cid: u32::try_from(vsock_lock.cid()).unwrap(), uds_path: vsock.uds_path.clone(), + vsock_type: VsockType::default(), } } } @@ -99,7 +117,8 @@ impl VsockBuilder { pub fn create_unixsock_vsock( cfg: VsockDeviceConfig, ) -> Result, VsockConfigError> { - let backend = VsockUnixBackend::new(u64::from(cfg.guest_cid), cfg.uds_path)?; + let backend = + VsockUnixBackend::new(u64::from(cfg.guest_cid), cfg.uds_path, cfg.vsock_type)?; Vsock::new(u64::from(cfg.guest_cid), backend).map_err(VsockConfigError::CreateVsockDevice) } @@ -122,6 +141,7 @@ pub(crate) mod tests { vsock_id: None, guest_cid: 3, uds_path: tmp_sock_file.as_path().to_str().unwrap().to_string(), + vsock_type: VsockType::default(), } } @@ -171,8 +191,12 @@ pub(crate) mod tests { tmp_sock_file.remove().unwrap(); let vsock = Vsock::new( 0, - VsockUnixBackend::new(1, tmp_sock_file.as_path().to_str().unwrap().to_string()) - .unwrap(), + VsockUnixBackend::new( + 1, + tmp_sock_file.as_path().to_str().unwrap().to_string(), + VsockType::default(), + ) + .unwrap(), ) .unwrap(); diff --git a/tests/framework/utils_vsock.py b/tests/framework/utils_vsock.py index 9561c1c26f2..029b88d5782 100644 --- a/tests/framework/utils_vsock.py +++ b/tests/framework/utils_vsock.py @@ -7,7 +7,7 @@ import re import time from pathlib import Path -from socket import AF_UNIX, SOCK_STREAM, socket +from socket import AF_UNIX, SOCK_SEQPACKET, SOCK_STREAM, socket from subprocess import Popen from threading import Thread @@ -31,14 +31,15 @@ class HostEchoWorker(Thread): contents of `blob_path`. """ - def __init__(self, uds_path, blob_path): + def __init__(self, uds_path, blob_path, type=SOCK_STREAM): """.""" super().__init__() self.uds_path = uds_path self.blob_path = blob_path self.hash = None + self.type = type self.error = None - self.sock = _vsock_connect_to_guest(self.uds_path, ECHO_SERVER_PORT) + self.sock = _vsock_connect_to_guest(self.uds_path, ECHO_SERVER_PORT, self.type) def run(self): """Thread code payload. @@ -202,9 +203,9 @@ def make_host_port_path(uds_path, port): return "{}_{}".format(uds_path, port) -def _vsock_connect_to_guest(uds_path, port): +def _vsock_connect_to_guest(uds_path, port, type): """Return a Unix socket, connected to the guest vsock port `port`.""" - sock = socket(AF_UNIX, SOCK_STREAM) + sock = socket(AF_UNIX, type) sock.connect(uds_path) buf = bytearray("CONNECT {}\n".format(port).encode("utf-8")) diff --git a/tests/integration_tests/functional/test_vsock.py b/tests/integration_tests/functional/test_vsock.py index 8c0d30700c6..8ff11aea5f0 100644 --- a/tests/integration_tests/functional/test_vsock.py +++ b/tests/integration_tests/functional/test_vsock.py @@ -17,6 +17,7 @@ import subprocess import time from pathlib import Path +from socket import SOCK_SEQPACKET from socket import timeout as SocketTimeout from framework.utils_vsock import ( @@ -49,7 +50,12 @@ def test_vsock(uvm_plain_any, bin_vsock_path, test_fc_session_root_path): vm.basic_config() vm.add_net_iface() - vm.api.vsock.put(vsock_id="vsock0", guest_cid=3, uds_path=f"/{VSOCK_UDS_PATH}") + vm.api.vsock.put( + vsock_id="vsock0", + guest_cid=3, + uds_path=f"/{VSOCK_UDS_PATH}", + vsock_type="Stream", + ) vm.start() check_vsock_device(vm, bin_vsock_path, test_fc_session_root_path, vm.ssh) @@ -57,7 +63,7 @@ def test_vsock(uvm_plain_any, bin_vsock_path, test_fc_session_root_path): validate_fc_metrics(metrics) -def negative_test_host_connections(vm, blob_path, blob_hash): +def negative_test_host_connections(vm, blob_path, blob_hash, type): """Negative test for host-initiated connections. This will start a daemonized echo server on the guest VM, and then spawn @@ -69,7 +75,7 @@ def negative_test_host_connections(vm, blob_path, blob_hash): workers = [] for _ in range(NEGATIVE_TEST_CONNECTION_COUNT): - worker = HostEchoWorker(uds_path, blob_path) + worker = HostEchoWorker(uds_path, blob_path, type) workers.append(worker) worker.start() @@ -110,7 +116,12 @@ def test_vsock_epipe(uvm_plain_any, bin_vsock_path, test_fc_session_root_path): vm.spawn() vm.basic_config() vm.add_net_iface() - vm.api.vsock.put(vsock_id="vsock0", guest_cid=3, uds_path=f"/{VSOCK_UDS_PATH}") + vm.api.vsock.put( + vsock_id="vsock0", + guest_cid=3, + uds_path=f"/{VSOCK_UDS_PATH}", + vsock_type="SeqPacket", + ) vm.start() # Generate the random data blob file, 20MB @@ -123,7 +134,7 @@ def test_vsock_epipe(uvm_plain_any, bin_vsock_path, test_fc_session_root_path): # Negative test for host-initiated connections that # are closed with in flight data. - negative_test_host_connections(vm, blob_path, blob_hash) + negative_test_host_connections(vm, blob_path, blob_hash, SOCK_SEQPACKET) metrics = vm.flush_metrics() validate_fc_metrics(metrics) @@ -150,7 +161,12 @@ def test_vsock_transport_reset_h2g( test_vm.spawn() test_vm.basic_config(vcpu_count=2, mem_size_mib=256) test_vm.add_net_iface() - test_vm.api.vsock.put(vsock_id="vsock0", guest_cid=3, uds_path=f"/{VSOCK_UDS_PATH}") + test_vm.api.vsock.put( + vsock_id="vsock0", + guest_cid=3, + uds_path=f"/{VSOCK_UDS_PATH}", + vsock_type="Stream", + ) test_vm.start() # Generate the random data blob file. @@ -189,9 +205,9 @@ def test_vsock_transport_reset_h2g( # it shouldn't receive anything. worker.sock.settimeout(0.25) response = worker.sock.recv(32) - assert ( - response == b"" - ), f"Connection not closed: response received '{response.decode('utf-8')}'" + assert response == b"", ( + f"Connection not closed: response received '{response.decode('utf-8')}'" + ) except (SocketTimeout, ConnectionResetError, BrokenPipeError): pass @@ -223,7 +239,12 @@ def test_vsock_transport_reset_g2h(uvm_plain_any, microvm_factory): test_vm.spawn() test_vm.basic_config(vcpu_count=2, mem_size_mib=256) test_vm.add_net_iface() - test_vm.api.vsock.put(vsock_id="vsock0", guest_cid=3, uds_path=f"/{VSOCK_UDS_PATH}") + test_vm.api.vsock.put( + vsock_id="vsock0", + guest_cid=3, + uds_path=f"/{VSOCK_UDS_PATH}", + vsock_type="Stream", + ) test_vm.start() # Create snapshot and terminate a VM. From 63e08c95c848e2a74f7941e8820715489c5e3417 Mon Sep 17 00:00:00 2001 From: aerosouund Date: Mon, 22 Dec 2025 21:47:13 +0000 Subject: [PATCH 2/2] change to stream type Signed-off-by: aerosouund --- tests/integration_tests/functional/test_vsock.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration_tests/functional/test_vsock.py b/tests/integration_tests/functional/test_vsock.py index 8ff11aea5f0..192f348b797 100644 --- a/tests/integration_tests/functional/test_vsock.py +++ b/tests/integration_tests/functional/test_vsock.py @@ -120,7 +120,7 @@ def test_vsock_epipe(uvm_plain_any, bin_vsock_path, test_fc_session_root_path): vsock_id="vsock0", guest_cid=3, uds_path=f"/{VSOCK_UDS_PATH}", - vsock_type="SeqPacket", + vsock_type="Stream", ) vm.start()