880: Add UDP socket support r=Dirbaio a=arturkow2000



Co-authored-by: Artur Kowalski <artur.kowalski@3mdeb.com>
Co-authored-by: Artur Kowalski <arturkow2000@gmail.com>
This commit is contained in:
bors[bot] 2022-08-11 14:17:11 +00:00 committed by GitHub
commit 6ffca81a38
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 273 additions and 1 deletions

View file

@ -18,6 +18,7 @@ std = []
defmt = ["dep:defmt", "smoltcp/defmt"]
udp = ["smoltcp/socket-udp"]
tcp = ["smoltcp/socket-tcp"]
dns = ["smoltcp/socket-dns"]
dhcpv4 = ["medium-ethernet", "smoltcp/socket-dhcpv4"]

View file

@ -16,6 +16,9 @@ pub use stack::{Config, ConfigStrategy, Stack, StackResources};
#[cfg(feature = "tcp")]
pub mod tcp;
#[cfg(feature = "udp")]
pub mod udp;
// smoltcp reexports
pub use smoltcp::phy::{DeviceCapabilities, Medium};
pub use smoltcp::time::{Duration as SmolDuration, Instant as SmolInstant};
@ -24,3 +27,5 @@ pub use smoltcp::wire::{EthernetAddress, HardwareAddress};
pub use smoltcp::wire::{IpAddress, IpCidr, Ipv4Address, Ipv4Cidr};
#[cfg(feature = "proto-ipv6")]
pub use smoltcp::wire::{Ipv6Address, Ipv6Cidr};
#[cfg(feature = "udp")]
pub use smoltcp::{socket::udp::PacketMetadata, wire::IpListenEndpoint};

157
embassy-net/src/udp.rs Normal file
View file

@ -0,0 +1,157 @@
use core::cell::UnsafeCell;
use core::mem;
use core::task::Poll;
use futures::future::poll_fn;
use smoltcp::iface::{Interface, SocketHandle};
use smoltcp::socket::udp::{self, PacketMetadata};
use smoltcp::wire::{IpEndpoint, IpListenEndpoint};
use super::stack::SocketStack;
use crate::{Device, Stack};
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum BindError {
/// The socket was already open.
InvalidState,
/// No route to host.
NoRoute,
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum Error {
/// No route to host.
NoRoute,
}
pub struct UdpSocket<'a> {
stack: &'a UnsafeCell<SocketStack>,
handle: SocketHandle,
}
impl<'a> UdpSocket<'a> {
pub fn new<D: Device>(
stack: &'a Stack<D>,
rx_meta: &'a mut [PacketMetadata],
rx_buffer: &'a mut [u8],
tx_meta: &'a mut [PacketMetadata],
tx_buffer: &'a mut [u8],
) -> Self {
// safety: not accessed reentrantly.
let s = unsafe { &mut *stack.socket.get() };
let rx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(rx_meta) };
let rx_buffer: &'static mut [u8] = unsafe { mem::transmute(rx_buffer) };
let tx_meta: &'static mut [PacketMetadata] = unsafe { mem::transmute(tx_meta) };
let tx_buffer: &'static mut [u8] = unsafe { mem::transmute(tx_buffer) };
let handle = s.sockets.add(udp::Socket::new(
udp::PacketBuffer::new(rx_meta, rx_buffer),
udp::PacketBuffer::new(tx_meta, tx_buffer),
));
Self {
stack: &stack.socket,
handle,
}
}
pub fn bind<T>(&mut self, endpoint: T) -> Result<(), BindError>
where
T: Into<IpListenEndpoint>,
{
let mut endpoint = endpoint.into();
// safety: not accessed reentrantly.
if endpoint.port == 0 {
// If user didn't specify port allocate a dynamic port.
endpoint.port = unsafe { &mut *self.stack.get() }.get_local_port();
}
// safety: not accessed reentrantly.
match unsafe { self.with_mut(|s, _| s.bind(endpoint)) } {
Ok(()) => Ok(()),
Err(udp::BindError::InvalidState) => Err(BindError::InvalidState),
Err(udp::BindError::Unaddressable) => Err(BindError::NoRoute),
}
}
/// SAFETY: must not call reentrantly.
unsafe fn with<R>(&self, f: impl FnOnce(&udp::Socket, &Interface) -> R) -> R {
let s = &*self.stack.get();
let socket = s.sockets.get::<udp::Socket>(self.handle);
f(socket, &s.iface)
}
/// SAFETY: must not call reentrantly.
unsafe fn with_mut<R>(&self, f: impl FnOnce(&mut udp::Socket, &mut Interface) -> R) -> R {
let s = &mut *self.stack.get();
let socket = s.sockets.get_mut::<udp::Socket>(self.handle);
let res = f(socket, &mut s.iface);
s.waker.wake();
res
}
pub async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, IpEndpoint), Error> {
poll_fn(move |cx| unsafe {
self.with_mut(|s, _| match s.recv_slice(buf) {
Ok(x) => Poll::Ready(Ok(x)),
// No data ready
Err(udp::RecvError::Exhausted) => {
//s.register_recv_waker(cx.waker());
cx.waker().wake_by_ref();
Poll::Pending
}
})
})
.await
}
pub async fn send_to<T>(&self, buf: &[u8], remote_endpoint: T) -> Result<(), Error>
where
T: Into<IpEndpoint>,
{
let remote_endpoint = remote_endpoint.into();
poll_fn(move |cx| unsafe {
self.with_mut(|s, _| match s.send_slice(buf, remote_endpoint) {
// Entire datagram has been sent
Ok(()) => Poll::Ready(Ok(())),
Err(udp::SendError::BufferFull) => {
s.register_send_waker(cx.waker());
Poll::Pending
}
Err(udp::SendError::Unaddressable) => Poll::Ready(Err(Error::NoRoute)),
})
})
.await
}
pub fn endpoint(&self) -> IpListenEndpoint {
unsafe { self.with(|s, _| s.endpoint()) }
}
pub fn is_open(&self) -> bool {
unsafe { self.with(|s, _| s.is_open()) }
}
pub fn close(&mut self) {
unsafe { self.with_mut(|s, _| s.close()) }
}
pub fn may_send(&self) -> bool {
unsafe { self.with(|s, _| s.can_send()) }
}
pub fn may_recv(&self) -> bool {
unsafe { self.with(|s, _| s.can_recv()) }
}
}
impl Drop for UdpSocket<'_> {
fn drop(&mut self) {
// safety: not accessed reentrantly.
let s = unsafe { &mut *self.stack.get() };
s.sockets.remove(self.handle);
}
}

View file

@ -6,7 +6,7 @@ version = "0.1.0"
[dependencies]
embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["log"] }
embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["log", "std", "time", "nightly"] }
embassy-net = { version = "0.1.0", path = "../../embassy-net", features=[ "std", "log", "medium-ethernet", "tcp", "dhcpv4", "pool-16"] }
embassy-net = { version = "0.1.0", path = "../../embassy-net", features=[ "std", "log", "medium-ethernet", "tcp", "udp", "dhcpv4", "pool-16"] }
embedded-io = { version = "0.3.0", features = ["async", "std", "futures"] }
async-io = "1.6.0"

View file

@ -0,0 +1,109 @@
#![feature(type_alias_impl_trait)]
use clap::Parser;
use embassy_executor::executor::{Executor, Spawner};
use embassy_net::udp::UdpSocket;
use embassy_net::{ConfigStrategy, Ipv4Address, Ipv4Cidr, PacketMetadata, Stack, StackResources};
use embassy_util::Forever;
use heapless::Vec;
use log::*;
use rand_core::{OsRng, RngCore};
#[path = "../tuntap.rs"]
mod tuntap;
use crate::tuntap::TunTapDevice;
macro_rules! forever {
($val:expr) => {{
type T = impl Sized;
static FOREVER: Forever<T> = Forever::new();
FOREVER.put_with(move || $val)
}};
}
#[derive(Parser)]
#[clap(version = "1.0")]
struct Opts {
/// TAP device name
#[clap(long, default_value = "tap0")]
tap: String,
/// use a static IP instead of DHCP
#[clap(long)]
static_ip: bool,
}
#[embassy_executor::task]
async fn net_task(stack: &'static Stack<TunTapDevice>) -> ! {
stack.run().await
}
#[embassy_executor::task]
async fn main_task(spawner: Spawner) {
let opts: Opts = Opts::parse();
// Init network device
let device = TunTapDevice::new(&opts.tap).unwrap();
// Choose between dhcp or static ip
let config = if opts.static_ip {
ConfigStrategy::Static(embassy_net::Config {
address: Ipv4Cidr::new(Ipv4Address::new(192, 168, 69, 2), 24),
dns_servers: Vec::new(),
gateway: Some(Ipv4Address::new(192, 168, 69, 1)),
})
} else {
ConfigStrategy::Dhcp
};
// Generate random seed
let mut seed = [0; 8];
OsRng.fill_bytes(&mut seed);
let seed = u64::from_le_bytes(seed);
// Init network stack
let stack = &*forever!(Stack::new(
device,
config,
forever!(StackResources::<1, 2, 8>::new()),
seed
));
// Launch network task
spawner.spawn(net_task(stack)).unwrap();
// Then we can use it!
let mut rx_meta = [PacketMetadata::EMPTY; 16];
let mut rx_buffer = [0; 4096];
let mut tx_meta = [PacketMetadata::EMPTY; 16];
let mut tx_buffer = [0; 4096];
let mut buf = [0; 4096];
let mut socket = UdpSocket::new(stack, &mut rx_meta, &mut rx_buffer, &mut tx_meta, &mut tx_buffer);
socket.bind(9400).unwrap();
loop {
let (n, ep) = socket.recv_from(&mut buf).await.unwrap();
if let Ok(s) = core::str::from_utf8(&buf[..n]) {
info!("ECHO (to {}): {}", ep, s);
} else {
info!("ECHO (to {}): bytearray len {}", ep, n);
}
socket.send_to(&buf[..n], ep).await.unwrap();
}
}
static EXECUTOR: Forever<Executor> = Forever::new();
fn main() {
env_logger::builder()
.filter_level(log::LevelFilter::Debug)
.filter_module("async_io", log::LevelFilter::Info)
.format_timestamp_nanos()
.init();
let executor = EXECUTOR.put(Executor::new());
executor.run(|spawner| {
spawner.spawn(main_task(spawner)).unwrap();
});
}