pub mod v1 {
use crate::{EAGAIN, EINVAL, ESHUTDOWN, ENOSPC, EWOULDBLOCK};
use crate::{call, flag};
use crate::{Error, Result};
use crate::{IoVec, Map, MapFlags};
use crate::{Event, EventFlags};
use core::convert::TryInto;
use core::marker::PhantomData;
use core::{cmp, mem, ops, ptr, slice};
use either::Either;
#[cfg(not(loom))]
use core::sync::atomic::{AtomicUsize, Ordering};
#[cfg(loom)]
use loom::sync::atomic::{AtomicUsize, Ordering};
#[cfg_attr(target_arch = "x86_64", repr(align(128)))]
#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))]
#[derive(Debug)]
pub struct CachePadded<T>(pub T);
impl<T> ops::Deref for CachePadded<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> ops::DerefMut for CachePadded<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[repr(C, align(32))]
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
pub struct SqEntry32 {
pub opcode: u8,
pub flags: u8,
pub priority: u16,
pub syscall_flags: u32,
pub fd: u32,
pub len: u32,
pub user_data: u32,
pub addr: u32,
pub offset: u64,
}
#[repr(C, align(16))]
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
pub struct CqEntry32 {
pub user_data: u32,
pub status: u32,
pub flags: u32,
pub extra: u32,
}
#[repr(C, align(64))]
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
pub struct SqEntry64 {
pub opcode: u8,
pub flags: u8,
pub priority: u16,
pub syscall_flags: u32,
pub fd: u64,
pub len: u64,
pub user_data: u64,
pub addr: u64,
pub offset: u64,
pub additional1: u64,
pub additional2: u64,
}
#[repr(C, align(32))]
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
pub struct CqEntry64 {
pub user_data: u64,
pub status: u64,
pub flags: u64,
pub extra: u64,
}
#[repr(C)]
pub struct Ring<T> {
pub base_rel: usize,
pub size: usize,
pub head_idx: CachePadded<AtomicUsize>,
pub tail_idx: CachePadded<AtomicUsize>,
pub sts: CachePadded<AtomicUsize>,
pub push_epoch: CachePadded<AtomicUsize>,
pub pop_epoch: CachePadded<AtomicUsize>,
pub _marker: PhantomData<*mut T>,
}
bitflags::bitflags! {
struct RingStatus: usize {
const DROP = 0x0000_0001;
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum RingSendError<T> {
Shutdown(T),
}
impl<T> From<RingSendError<T>> for Error {
fn from(error: RingSendError<T>) -> Error {
match error {
RingSendError::Shutdown(_) => Error::new(ESHUTDOWN),
}
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum RingPushError<T> {
Full(T),
Shutdown(T),
}
impl<T> From<RingPushError<T>> for Error {
fn from(error: RingPushError<T>) -> Error {
match error {
RingPushError::Full(_) => Error::new(ENOSPC),
RingPushError::Shutdown(_) => Error::new(ESHUTDOWN),
}
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum RingRecvError {
Shutdown,
}
impl From<RingRecvError> for Error {
fn from(error: RingRecvError) -> Error {
match error {
RingRecvError::Shutdown => Error::new(ESHUTDOWN),
}
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum RingPopError {
Empty { pending_push: bool },
Shutdown,
}
impl From<RingPopError> for Error {
fn from(error: RingPopError) -> Error {
match error {
RingPopError::Empty { pending_push: true } => Error::new(EAGAIN),
RingPopError::Empty { pending_push: false } => Error::new(EWOULDBLOCK),
RingPopError::Shutdown => Error::new(ESHUTDOWN),
}
}
}
pub struct SpscSender<T> {
pub(crate) ring: *const Ring<T>,
pub(crate) entries_base: *mut T,
}
unsafe impl<T: Send> Send for SpscSender<T> {}
impl<T> SpscSender<T> {
pub unsafe fn from_raw(ring: *const Ring<T>, entries_base: *mut T) -> Self {
Self { ring, entries_base }
}
pub fn try_send(&self, item: T) -> Result<(), RingPushError<T>> {
unsafe {
let ring = self.ring_header();
ring.push_back_spsc(self.entries_base, item)
}
}
pub fn spin_on_send(&self, mut item: T) -> Result<(), RingSendError<T>> {
loop {
match self.try_send(item) {
Ok(()) => return Ok(()),
Err(RingPushError::Full(i)) => {
item = i;
core::sync::atomic::spin_loop_hint();
continue;
}
Err(RingPushError::Shutdown(i)) => return Err(RingSendError::Shutdown(i)),
}
}
}
pub fn deallocate(self) -> crate::Result<()> {
unsafe {
let Self { ring, entries_base } = self;
mem::forget(self);
let ring = &*ring;
crate::funmap(ring as *const _ as usize)?;
crate::funmap(entries_base as usize)?;
Ok(())
}
}
pub unsafe fn ring_header(&self) -> &Ring<T> {
&*self.ring
}
pub fn notify(&self) {
let ring = unsafe { self.ring_header() };
let _ = ring.push_epoch.fetch_add(1, Ordering::Relaxed);
}
}
impl<T> Drop for SpscSender<T> {
fn drop(&mut self) {
unsafe {
let ring = self.ring_header();
ring.sts.fetch_or(RingStatus::DROP.bits(), Ordering::Release);
let _ = crate::funmap(self.ring as *const _ as usize);
let _ = crate::funmap(self.entries_base as usize);
}
}
}
pub struct SpscReceiver<T> {
ring: *const Ring<T>,
entries_base: *const T,
}
unsafe impl<T: Send> Send for SpscReceiver<T> {}
impl<T> SpscReceiver<T> {
pub unsafe fn from_raw(ring: *const Ring<T>, entries_base: *const T) -> Self {
Self { ring, entries_base }
}
pub fn try_recv(&self) -> Result<T, RingPopError> {
unsafe {
let ring = &*self.ring;
ring.pop_front_spsc(self.entries_base)
}
}
pub fn spin_on_recv(&self) -> Result<T, RingRecvError> {
loop {
match self.try_recv() {
Ok(item) => return Ok(item),
Err(RingPopError::Empty { .. }) => {
core::sync::atomic::spin_loop_hint();
continue;
}
Err(RingPopError::Shutdown) => return Err(RingRecvError::Shutdown),
}
}
}
pub fn try_iter(&self) -> impl Iterator<Item = T> + '_ {
core::iter::from_fn(move || self.try_recv().ok())
}
pub fn deallocate(self) -> crate::Result<()> {
unsafe {
let Self { ring, entries_base } = self;
mem::forget(self);
let ring = &*ring;
crate::funmap(ring as *const _ as usize)?;
crate::funmap(entries_base as usize)?;
Ok(())
}
}
pub unsafe fn ring_header(&self) -> &Ring<T> {
&*self.ring
}
}
impl<T> Drop for SpscReceiver<T> {
fn drop(&mut self) {
unsafe {
let _ = crate::funmap(self.ring as *const _ as usize);
let _ = crate::funmap(self.entries_base as usize);
}
}
}
impl<T> Ring<T> {
fn addr_bits_for_size(size: usize) -> u8 {
let size_as_poweroftwo = if size.is_power_of_two() {
size
} else {
size.next_power_of_two()
};
let addrmask = size_as_poweroftwo - 1;
let addrmask_ones: u8 = addrmask.count_ones().try_into().unwrap();
addrmask_ones
}
pub fn min_extra_bits() -> u8 {
Self::addr_bits_for_size(mem::size_of::<T>())
}
fn extra_bits_for_count(count: usize) -> u8 {
let pointer_width: u8 = (mem::size_of::<usize>() * 8).try_into().unwrap();
let idx_bits = Self::addr_bits_for_size(count);
pointer_width - idx_bits
}
pub fn extra_bits(&self) -> u8 {
Self::extra_bits_for_count(self.size)
}
pub fn usable_extra_bits(&self) -> u8 {
cmp::min(self.extra_bits(), 1) - 1
}
pub unsafe fn push_back_generic(
&self,
entries_base: *mut T,
item: T,
spsc: bool,
) -> Result<(), RingPushError<T>> {
let sts = RingStatus::from_bits_truncate(self.sts.load(Ordering::Relaxed));
if sts.contains(RingStatus::DROP) {
return Err(RingPushError::Shutdown(item));
}
let (tail_index_raw_with_flag, tail_index_raw) = if spsc {
let tail_index_raw = self.tail_idx.load(Ordering::Acquire);
(tail_index_raw, tail_index_raw)
} else {
let tail_index_raw_with_flag = self.tail_idx.fetch_add(1, Ordering::Acquire);
assert_ne!(
tail_index_raw_with_flag % 2,
1,
"the tail index is supposed to be even before an active push"
);
let tail_index_raw = tail_index_raw_with_flag >> 1;
(tail_index_raw_with_flag, tail_index_raw)
};
let head_idx_raw = if spsc {
self.head_idx.load(Ordering::Acquire)
} else {
let head_idx_raw_with_flag = self.head_idx.load(Ordering::Acquire);
head_idx_raw_with_flag >> 1
};
let head_idx = head_idx_raw % self.size;
let head_cycle = (head_idx_raw / self.size) & 1 == 1;
if tail_index_raw >= 2 * self.size {
let new_index = tail_index_raw % (2 * self.size);
let new_index_with_flag = if spsc { new_index } else { 2 * new_index + 1 };
let old_index = if spsc {
tail_index_raw
} else {
tail_index_raw_with_flag + 1
};
let _ = self.tail_idx.compare_exchange_weak(
old_index,
new_index_with_flag,
Ordering::SeqCst,
Ordering::SeqCst,
);
}
let actual_index = tail_index_raw % self.size;
let tail_cycle = (tail_index_raw / self.size) & 1 == 1;
let cycle = head_cycle ^ tail_cycle;
if cycle && actual_index >= head_idx || !cycle && actual_index < head_idx {
if !spsc {
self.tail_idx.fetch_sub(1, Ordering::Release);
}
return Err(RingPushError::Full(item));
}
let pointer = entries_base.add(actual_index);
ptr::write(pointer, item);
let old_tail_idx = self.tail_idx.fetch_add(1, Ordering::Release);
if !spsc {
assert_eq!(
old_tail_idx % (self.size * 4),
(tail_index_raw_with_flag + 1) % (self.size * 4),
"some other thread interfered with this push_back"
);
}
self.push_epoch.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub unsafe fn push_back(
&self,
entries_base: *mut T,
item: T,
) -> Result<(), RingPushError<T>> {
self.push_back_generic(entries_base, item, false)
}
pub unsafe fn push_back_spsc(
&self,
entries_base: *mut T,
item: T,
) -> Result<(), RingPushError<T>> {
self.push_back_generic(entries_base, item, true)
}
pub unsafe fn pop_front_generic(
&self,
entries_base: *const T,
spsc: bool,
) -> Result<T, RingPopError> {
let sts = RingStatus::from_bits_truncate(self.sts.load(Ordering::Relaxed));
if sts.contains(RingStatus::DROP) {
return Err(RingPopError::Shutdown);
}
let (head_index_raw_with_flag, head_index_raw) = if spsc {
let head_index_raw = self.head_idx.load(Ordering::Acquire);
(head_index_raw, head_index_raw)
} else {
let head_index_raw_with_flag = self.head_idx.fetch_add(1, Ordering::Acquire);
assert_ne!(
head_index_raw_with_flag % 2,
1,
"the head index is supposed to be even before an active pop"
);
(head_index_raw_with_flag, head_index_raw_with_flag >> 1)
};
let tail_index_raw_with_flag = self.tail_idx.load(Ordering::Acquire);
let (tail_index_raw, tail_is_modifying) = if spsc {
(tail_index_raw_with_flag, false)
} else {
(
tail_index_raw_with_flag >> 1,
tail_index_raw_with_flag & 1 == 1,
)
};
let tail_index = tail_index_raw % self.size;
if head_index_raw >= (2 * self.size) {
let new_index = head_index_raw % (2 * self.size);
let new_index_with_flag = if spsc { new_index } else { 2 * new_index + 1 };
let old_index = if spsc {
head_index_raw_with_flag
} else {
head_index_raw_with_flag + 1
};
let _ = self.head_idx.compare_exchange_weak(
old_index,
new_index_with_flag,
Ordering::SeqCst,
Ordering::SeqCst,
);
}
let actual_head_index = head_index_raw % self.size;
let head_cycle = (head_index_raw / self.size) & 1 == 1;
let tail_cycle = (tail_index_raw / self.size) & 1 == 1;
let cycle = head_cycle ^ tail_cycle;
if !cycle && actual_head_index >= tail_index || cycle && actual_head_index < tail_index
{
if !spsc {
self.head_idx.fetch_sub(1, Ordering::Release);
}
return Err(RingPopError::Empty {
pending_push: tail_is_modifying,
});
}
Ok({
let pointer = entries_base.add(actual_head_index);
let value = ptr::read(pointer);
let old_head_idx = self.head_idx.fetch_add(1, Ordering::Release);
if !spsc {
assert_eq!(
old_head_idx % (self.size * 4),
(head_index_raw_with_flag + 1) % (self.size * 4),
"some other thread interfered with this pop_front"
);
}
self.pop_epoch.fetch_add(1, Ordering::Relaxed);
value
})
}
pub unsafe fn pop_front(&self, entries_base: *const T) -> Result<T, RingPopError> {
self.pop_front_generic(entries_base, false)
}
pub unsafe fn pop_front_spsc(&self, entries_base: *const T) -> Result<T, RingPopError> {
self.pop_front_generic(entries_base, true)
}
pub fn available_entry_count_generic(&self, spsc: bool) -> usize {
let head_raw = self.head_idx.load(Ordering::Acquire) / if spsc { 1 } else { 2 };
let tail_raw = self.tail_idx.load(Ordering::Acquire) / if spsc { 1 } else { 2 };
let head_cycle = (head_raw / self.size) & 1 == 1;
let tail_cycle = (head_raw / self.size) & 1 == 1;
let cycle = head_cycle ^ tail_cycle;
let head = head_raw % self.size;
let tail = tail_raw % self.size;
if cycle {
tail - head
} else {
self.size - (head - tail)
}
}
pub fn available_entry_count_spsc(&self) -> usize {
self.available_entry_count_generic(true)
}
pub fn available_entry_count(&self) -> usize {
self.available_entry_count_generic(false)
}
pub fn free_entry_count_generic(&self, spsc: bool) -> usize {
self.size - self.available_entry_count_generic(spsc)
}
pub fn free_entry_count_spsc(&self) -> usize {
self.free_entry_count_generic(true)
}
pub fn free_entry_count(&self) -> usize {
self.free_entry_count_generic(false)
}
}
impl PartialOrd for SqEntry32 {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(Ord::cmp(self, other))
}
}
impl Ord for SqEntry32 {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
Ord::cmp(&self.priority, &other.priority)
}
}
impl PartialOrd for SqEntry64 {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(Ord::cmp(self, other))
}
}
impl Ord for SqEntry64 {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
Ord::cmp(&self.priority, &other.priority)
}
}
impl SqEntry64 {
pub fn linked(self) -> Self {
Self {
flags: self.flags | IoUringSqeFlags::CHAIN.bits(),
.. self
}
}
pub fn unlinked(self) -> Self {
Self {
flags: self.flags & !IoUringSqeFlags::CHAIN.bits(),
.. self
}
}
pub fn drain_first(self) -> Self {
Self {
flags: self.flags | IoUringSqeFlags::DRAIN.bits(),
.. self
}
}
pub fn dont_drain_first(self) -> Self {
Self {
flags: self.flags & !IoUringSqeFlags::DRAIN.bits(),
.. self
}
}
pub fn with_priority(self, priority: u16) -> Self {
Self {
priority,
.. self
}
}
pub fn with_user_data(self, user_data: u64) -> Self {
Self {
user_data,
.. self
}
}
pub fn new(flags: IoUringSqeFlags, priority: u16, user_data: u64) -> Self {
Self {
opcode: StandardOpcode::NoOp as u8,
flags: flags.bits(),
priority,
user_data,
.. Default::default()
}
}
pub fn open(self, path: &[u8], open_flags: u64) -> Self {
Self {
opcode: StandardOpcode::Open as u8,
syscall_flags: 0,
fd: 0,
addr: path.as_ptr() as usize as u64,
len: path.len() as u64,
offset: open_flags,
.. self
}
}
pub fn write(self, fd: u64, buf: &[u8]) -> Self {
Self {
opcode: StandardOpcode::Write as u8,
syscall_flags: operation::WriteFlags::CHANGE_OFFSET.bits(),
addr: buf.as_ptr() as usize as u64,
len: buf.len() as u64,
fd,
offset: 0,
.. self
}
}
pub fn pwrite(self, fd: u64, buf: &[u8], offset: u64) -> Self {
Self {
opcode: StandardOpcode::Write as u8,
syscall_flags: 0,
addr: buf.as_ptr() as usize as u64,
len: buf.len() as u64,
fd,
offset,
.. self
}
}
pub fn writev(self, fd: u64, vecs: &[IoVec]) -> Self {
Self {
opcode: StandardOpcode::Write as u8,
syscall_flags: (operation::WriteFlags::VECTORED | operation::WriteFlags::CHANGE_OFFSET).bits(),
addr: vecs.as_ptr() as usize as u64,
len: vecs.len() as u64,
fd,
offset: 0,
.. self
}
}
pub fn pwritev(self, fd: u64, vecs: &[IoVec], offset: u64) -> Self {
Self {
opcode: StandardOpcode::Write as u8,
syscall_flags: operation::WriteFlags::VECTORED.bits(),
addr: vecs.as_ptr() as usize as u64,
len: vecs.len() as u64,
fd,
offset,
.. self
}
}
pub fn read(self, fd: u64, buf: &mut [u8]) -> Self {
Self {
opcode: StandardOpcode::Read as u8,
syscall_flags: operation::ReadFlags::CHANGE_OFFSET.bits(),
addr: buf.as_mut_ptr() as usize as u64,
len: buf.len() as u64,
fd,
offset: 0,
.. self
}
}
pub fn pread(self, fd: u64, buf: &mut [u8], offset: u64) -> Self {
Self {
opcode: StandardOpcode::Read as u8,
syscall_flags: 0,
addr: buf.as_mut_ptr() as usize as u64,
len: buf.len() as u64,
fd,
offset,
.. self
}
}
pub fn readv(self, fd: u64, vecs: &[IoVec]) -> Self {
Self {
opcode: StandardOpcode::Read as u8,
syscall_flags: (operation::ReadFlags::VECTORED | operation::ReadFlags::CHANGE_OFFSET).bits(),
addr: vecs.as_ptr() as usize as u64,
len: vecs.len() as u64,
fd,
offset: 0,
.. self
}
}
pub fn preadv(self, fd: u64, vecs: &[IoVec], offset: u64) -> Self {
Self {
opcode: StandardOpcode::Read as u8,
syscall_flags: operation::ReadFlags::VECTORED.bits(),
addr: vecs.as_ptr() as usize as u64,
len: vecs.len() as u64,
fd,
offset,
.. self
}
}
pub fn file_update(self, fd: u64, on: EventFlags, oneshot: bool) -> Self {
Self {
opcode: StandardOpcode::FilesUpdate as u8,
flags: (self.flags & !IoUringSqeFlags::SUBSCRIBE.bits()) | if oneshot { 0 } else { IoUringSqeFlags::SUBSCRIBE.bits() },
fd,
syscall_flags: operation::FilesUpdateFlags::from(on).bits(),
.. self
}
}
pub fn files_update(self, fds: &[u64], on: EventFlags, oneshot: bool) -> Self {
Self {
addr: fds.as_ptr() as usize as u64,
len: fds.len() as u64,
.. self.file_update( 0, on, oneshot)
}
}
pub fn close_many(self, start_fd: u64, count: u64, flush: bool) -> Self {
Self {
syscall_flags: if flush { 0 } else { operation::CloseFlags::NO_FLUSH.bits() } | operation::CloseFlags::CLOSE_MANY.bits(),
fd: start_fd,
len: count,
.. self
}
}
pub fn close(self, fd: u64, flush: bool) -> Self {
Self {
syscall_flags: if flush { 0 } else { operation::CloseFlags::NO_FLUSH.bits() },
fd,
.. self
}
}
}
#[cfg(test)]
pub mod tests {
#[cfg(not(loom))]
use core::sync::atomic::AtomicUsize;
#[cfg(loom)]
use loom::sync::atomic::AtomicUsize;
use core::mem;
use super::{
CachePadded, CqEntry32, CqEntry64, Ring,
RingPopError, RingPushError, SpscReceiver, SpscSender, SqEntry32, SqEntry64,
};
fn setup_ring(count: usize) -> (Ring<CqEntry64>, *mut CqEntry64) {
use std::alloc::{alloc, Layout};
let base = unsafe {
alloc(
Layout::from_size_align(
count * mem::size_of::<CqEntry64>(),
mem::align_of::<CqEntry64>(),
)
.unwrap(),
) as *mut CqEntry64
};
(
Ring {
base_rel: 0,
size: count,
push_epoch: CachePadded(AtomicUsize::new(0)),
pop_epoch: CachePadded(AtomicUsize::new(0)),
head_idx: CachePadded(AtomicUsize::new(0)),
tail_idx: CachePadded(AtomicUsize::new(0)),
sts: CachePadded(AtomicUsize::new(0)),
_marker: core::marker::PhantomData,
},
base,
)
}
#[cfg(not(loom))]
#[test]
fn entry_sizes() {
assert_eq!(mem::size_of::<CqEntry32>(), 16);
assert_eq!(mem::size_of::<CqEntry64>(), 32);
assert_eq!(mem::size_of::<SqEntry32>(), 32);
assert_eq!(mem::size_of::<SqEntry64>(), 64);
}
#[cfg(not(loom))]
#[test]
fn usable_extra_bits() {
assert_eq!(Ring::<u8>::min_extra_bits(), 0);
assert_eq!(Ring::<u16>::min_extra_bits(), 1);
assert_eq!(Ring::<u32>::min_extra_bits(), 2);
assert_eq!(Ring::<u64>::min_extra_bits(), 3);
assert_eq!(Ring::<u128>::min_extra_bits(), 4);
assert_eq!(Ring::<CqEntry32>::min_extra_bits(), 4);
assert_eq!(Ring::<CqEntry64>::min_extra_bits(), 5);
assert_eq!(Ring::<SqEntry32>::min_extra_bits(), 5);
assert_eq!(Ring::<SqEntry64>::min_extra_bits(), 6);
#[cfg(target_pointer_width = "64")]
assert_eq!(Ring::<SqEntry64>::extra_bits_for_count(4096), 52);
#[cfg(target_pointer_width = "32")]
assert_eq!(Ring::<SqEntry32>::extra_bits_for_count(4096), 52);
}
#[cfg(not(loom))]
#[test]
fn single_push_pop() {
unsafe {
let (ring, base) = setup_ring(64);
let value = CqEntry64 {
flags: 0x7F454c46,
status: 0x1BADB002,
user_data: 0xDEADBEEF_FEDFACE0,
extra: 0,
};
assert_eq!(ring.push_back(base, value), Ok(()));
let retrieved_value = ring.pop_front(base);
assert_eq!(retrieved_value, Ok(value));
}
}
#[cfg(not(loom))]
#[test]
fn cycle() {
unsafe {
let (ring, base) = setup_ring(64);
for i in 0..64u64 {
assert_eq!(
ring.push_back(
base,
CqEntry64 {
user_data: i,
status: 1337,
flags: 0x8000_2070,
extra: 0,
}
),
Ok(())
);
}
assert_eq!(ring.available_entry_count(), 64);
assert_eq!(ring.free_entry_count(), 0);
assert_eq!(
ring.push_back(
base,
CqEntry64 {
user_data: 42,
status: 1337,
flags: 0xFFFF_FFFF,
extra: 0,
}
),
Err(RingPushError::Full(CqEntry64 {
user_data: 42,
status: 1337,
flags: 0xFFFF_FFFF,
extra: 0,
}))
);
for i in 0..48u64 {
assert_eq!(ring.free_entry_count(), i as usize);
assert_eq!(ring.available_entry_count(), 64 - i as usize);
assert_eq!(
ring.pop_front(base),
Ok(CqEntry64 {
user_data: i,
status: 1337,
flags: 0x8000_2070,
extra: 0,
})
);
}
for i in 0..48u64 {
assert_eq!(
ring.push_back(
base,
CqEntry64 {
user_data: 64 + i,
status: 1337,
flags: 0x8000_2070,
extra: 0,
}
),
Ok(())
);
}
for i in 0..64u64 {
assert_eq!(
ring.pop_front(base),
Ok(CqEntry64 {
user_data: 48 + i,
status: 1337,
flags: 0x8000_2070,
extra: 0,
})
);
}
assert_eq!(
ring.pop_front(base),
Err(RingPopError::Empty {
pending_push: false
})
);
for i in 0..64u64 {
assert_eq!(
ring.push_back(
base,
CqEntry64 {
user_data: 0xFFFF_FFFF + i,
status: 1337,
flags: 0,
extra: 0,
}
),
Ok(())
);
}
assert_eq!(
ring.push_back(
base,
CqEntry64 {
user_data: 0xDEADBEEF,
status: 42,
flags: 0,
extra: 0,
}
),
Err(RingPushError::Full(CqEntry64 {
user_data: 0xDEADBEEF,
status: 42,
flags: 0,
extra: 0,
}))
);
for i in 0..64u64 {
assert_eq!(
ring.pop_front(base),
Ok(CqEntry64 {
user_data: 0xFFFF_FFFF + i,
status: 1337,
flags: 0,
extra: 0,
})
);
}
assert_eq!(
ring.pop_front(base),
Err(RingPopError::Empty {
pending_push: false
})
);
}
}
macro_rules! simple_multithreaded_test(($sender:expr, $receiver:expr) => {{
let second = std::thread::spawn(move || {
let mut i = 0;
'pushing: loop {
if i > 4096 { break 'pushing }
let value = CqEntry64 {
user_data: i,
status: 1337,
flags: 0xDEADBEEF,
extra: 0,
};
'retry: loop {
match $sender.try_send(value) {
Ok(()) => {
i += 1;
continue 'pushing;
}
Err(_) => {
std::thread::yield_now();
continue 'retry;
}
}
}
}
});
let mut i = 0;
'popping: loop {
'retry: loop {
if i > 4096 { break 'popping }
match $receiver.try_recv() {
Ok(c) => {
assert_eq!(c, CqEntry64 {
user_data: i,
status: 1337,
flags: 0xDEADBEEF,
extra: 0,
});
i += 1;
continue 'popping;
}
Err(_) => {
std::thread::yield_now();
continue 'retry;
}
}
}
}
second.join().unwrap();
}});
#[test]
fn multithreaded_spsc() {
let (ring, entries_base) = setup_ring(64);
let ring = ˚
let sender = SpscSender { ring, entries_base };
let receiver = SpscReceiver { ring, entries_base };
simple_multithreaded_test!(sender, receiver);
}
#[cfg(loom)]
#[test]
fn multithreaded_spsc_loom() {
use loom::sync::Arc;
loom::model(move || {
let ring = Arc::new(setup_ring(64));
let ring_clone = Arc::clone(&ring);
let second = loom::thread::spawn(move || {
for i in 0..4096 {
assert_eq!(
ring_clone.push_back(CqEntry64 {
user_data: i,
status: 1337,
flags: 0xDEADBEEF,
}),
Ok(())
);
}
});
for i in 0..4096 {
assert_eq!(
ring.pop_front(),
Some(CqEntry64 {
user_data: i,
status: 1337,
flags: 0xDEADBEEF,
})
)
}
second.join().unwrap();
});
}
}
pub const SQ_HEADER_MMAP_OFFSET: usize = 0x0000_0000;
pub const SQ_ENTRIES_MMAP_OFFSET: usize = 0x0020_0000;
pub const CQ_HEADER_MMAP_OFFSET: usize = 0x8000_0000;
pub const CQ_ENTRIES_MMAP_OFFSET: usize = 0x8020_0000;
bitflags::bitflags! {
#[repr(C)]
pub struct IoUringCreateFlags: u32 {
const BITS_32 = 0x0000_0001;
}
}
bitflags::bitflags! {
#[repr(C)]
pub struct IoUringRecvFlags: u32 {
const BITS_32 = 0x0000_0001;
const FROM_KERNEL = 0x8000_0000;
}
}
mod consumer_instance {
use super::super::*;
use super::*;
#[derive(Default)]
struct InstanceBuilderCreateStageInfo {
minor: Option<u8>,
patch: Option<u8>,
flags: Option<IoUringCreateFlags>,
sq_entry_count: Option<usize>,
cq_entry_count: Option<usize>,
}
struct InstanceBuilderMmapStageInfo {
create_info: IoUringCreateInfo,
ringfd: usize,
sr_virtaddr: Option<usize>,
se_virtaddr: Option<usize>,
cr_virtaddr: Option<usize>,
ce_virtaddr: Option<usize>,
}
struct InstanceBuilderAttachStageInfo {
create_info: IoUringCreateInfo,
ringfd: usize,
sr_virtaddr: usize,
se_virtaddr: usize,
cr_virtaddr: usize,
ce_virtaddr: usize,
}
enum InstanceBuilderStage {
Create(InstanceBuilderCreateStageInfo),
Mmap(InstanceBuilderMmapStageInfo),
Attach(InstanceBuilderAttachStageInfo),
}
pub struct InstanceBuilder {
stage: InstanceBuilderStage,
}
impl InstanceBuilder {
pub fn new() -> Self {
Self {
stage: InstanceBuilderStage::Create(InstanceBuilderCreateStageInfo::default()),
}
}
fn as_create_stage(&mut self) -> Option<&mut InstanceBuilderCreateStageInfo> {
if let &mut InstanceBuilderStage::Create(ref mut stage_info) = &mut self.stage {
Some(stage_info)
} else {
None
}
}
fn as_mmap_stage(&mut self) -> Option<&mut InstanceBuilderMmapStageInfo> {
if let &mut InstanceBuilderStage::Mmap(ref mut stage_info) = &mut self.stage {
Some(stage_info)
} else {
None
}
}
fn consume_attach_state(self) -> Option<InstanceBuilderAttachStageInfo> {
if let InstanceBuilderStage::Attach(stage_info) = self.stage {
Some(stage_info)
} else {
None
}
}
pub fn with_minor_version(mut self, minor: u8) -> Self {
self.as_create_stage()
.expect("cannot set minor version after kernel io_uring instance is created")
.minor = Some(minor);
self
}
pub fn with_patch_version(mut self, patch: u8) -> Self {
self.as_create_stage()
.expect("cannot set patch version after kernel io_uring instance is created")
.patch = Some(patch);
self
}
pub fn with_flags(mut self, flags: IoUringCreateFlags) -> Self {
self.as_create_stage()
.expect("cannot set flags after kernel io_uring instance is created")
.flags = Some(flags);
self
}
pub fn with_submission_entry_count(mut self, sq_entry_count: usize) -> Self {
self.as_create_stage()
.expect("cannot set submission entry count after kernel instance is created")
.sq_entry_count = Some(sq_entry_count);
self
}
pub fn with_recommended_submission_entry_count(self) -> Self {
self.with_submission_entry_count(256)
}
pub fn with_recommended_completion_entry_count(self) -> Self {
self.with_completion_entry_count(256)
}
pub fn with_completion_entry_count(mut self, cq_entry_count: usize) -> Self {
self.as_create_stage()
.expect("cannot set completion entry count after kernel instance is created")
.cq_entry_count = Some(cq_entry_count);
self
}
pub fn version(&self) -> IoUringVersion {
match &self.stage {
&InstanceBuilderStage::Create(ref info) => IoUringVersion {
major: 1,
minor: info.minor.unwrap_or(CURRENT_MINOR),
patch: info.patch.unwrap_or(CURRENT_PATCH),
},
&InstanceBuilderStage::Mmap(InstanceBuilderMmapStageInfo {
ref create_info,
..
})
| InstanceBuilderStage::Attach(InstanceBuilderAttachStageInfo {
ref create_info,
..
}) => create_info.version,
}
}
pub fn flags(&self) -> IoUringCreateFlags {
match &self.stage {
&InstanceBuilderStage::Create(ref info) => {
info.flags.unwrap_or(IoUringCreateFlags::empty())
}
&InstanceBuilderStage::Mmap(InstanceBuilderMmapStageInfo {
ref create_info,
..
})
| InstanceBuilderStage::Attach(InstanceBuilderAttachStageInfo {
ref create_info,
..
}) => IoUringCreateFlags::from_bits(create_info.flags)
.expect("invalid io_uring flag bits"),
}
}
pub fn submission_entry_size(&self) -> usize {
if self.flags().contains(IoUringCreateFlags::BITS_32) {
mem::size_of::<SqEntry32>()
} else {
mem::size_of::<SqEntry64>()
}
}
pub fn completion_entry_size(&self) -> usize {
if self.flags().contains(IoUringCreateFlags::BITS_32) {
mem::size_of::<CqEntry32>()
} else {
mem::size_of::<CqEntry64>()
}
}
pub fn submission_entry_count(&self) -> usize {
match &self.stage {
InstanceBuilderStage::Create(ref info) => info
.sq_entry_count
.unwrap_or(4096 / self.submission_entry_size()),
InstanceBuilderStage::Mmap(ref info) => info.create_info.sq_entry_count,
InstanceBuilderStage::Attach(ref info) => info.create_info.sq_entry_count,
}
}
pub fn completion_entry_count(&self) -> usize {
match &self.stage {
InstanceBuilderStage::Create(ref info) => info
.cq_entry_count
.unwrap_or(4096 / self.completion_entry_size()),
InstanceBuilderStage::Mmap(ref info) => info.create_info.cq_entry_count,
InstanceBuilderStage::Attach(ref info) => info.create_info.cq_entry_count,
}
}
pub fn ring_header_size(&self) -> usize {
4096
}
pub fn submission_entries_bytesize(&self) -> usize {
(self.submission_entry_count() * self.submission_entry_size() + 4095) / 4096 * 4096
}
pub fn completion_entries_bytesize(&self) -> usize {
(self.submission_entry_count() * self.submission_entry_size() + 4095) / 4096 * 4096
}
pub fn create_info(&self) -> IoUringCreateInfo {
match &self.stage {
&InstanceBuilderStage::Create(_) => IoUringCreateInfo {
version: self.version(),
_rsvd: 0,
flags: self.flags().bits(),
len: mem::size_of::<IoUringCreateInfo>(),
sq_entry_count: self.submission_entry_count(),
cq_entry_count: self.completion_entry_count(),
},
&InstanceBuilderStage::Mmap(ref info) => info.create_info,
&InstanceBuilderStage::Attach(ref info) => info.create_info,
}
}
pub fn create_instance(mut self) -> Result<Self> {
let ringfd = call::open("io_uring:", flag::O_CREAT | flag::O_CLOEXEC)?;
let create_info = self.create_info();
let len = mem::size_of::<IoUringCreateInfo>();
let bytes_written = call::write(ringfd, unsafe {
slice::from_raw_parts(&create_info as *const _ as *const u8, len)
})?;
if bytes_written != len {
return Err(Error::new(EINVAL));
}
self.stage = InstanceBuilderStage::Mmap(InstanceBuilderMmapStageInfo {
create_info,
ringfd,
sr_virtaddr: None,
se_virtaddr: None,
cr_virtaddr: None,
ce_virtaddr: None,
});
Ok(self)
}
fn mmap(
mut self,
name: &str,
mmap_flags: MapFlags,
mmap_offset: usize,
mmap_size: usize,
f: impl FnOnce(&mut InstanceBuilderMmapStageInfo) -> (bool, &mut Option<usize>),
) -> Result<Self> {
let mmap_stage_info = match self.as_mmap_stage() {
Some(i) => i,
None => panic!("mapping {} when not in the mmap stage", name),
};
let ringfd = mmap_stage_info.ringfd;
let (only_addr_is_uninit, addr) = f(mmap_stage_info);
if addr.is_some() {
panic!("mapping {} again", name)
}
*addr = Some(unsafe {
call::fmap(
ringfd,
&Map {
offset: mmap_offset,
size: mmap_size,
flags: mmap_flags,
},
)?
});
if only_addr_is_uninit {
let mmap_stage_info = if let InstanceBuilderStage::Mmap(info) = self.stage {
info
} else {
unreachable!()
};
self.stage = InstanceBuilderStage::Attach(InstanceBuilderAttachStageInfo {
create_info: mmap_stage_info.create_info,
ringfd: mmap_stage_info.ringfd,
sr_virtaddr: mmap_stage_info.sr_virtaddr.unwrap(),
se_virtaddr: mmap_stage_info.se_virtaddr.unwrap(),
cr_virtaddr: mmap_stage_info.cr_virtaddr.unwrap(),
ce_virtaddr: mmap_stage_info.ce_virtaddr.unwrap(),
});
}
Ok(self)
}
pub fn map_submission_ring_header(self) -> Result<Self> {
let len = self.ring_header_size();
self.mmap(
"the submission ring header",
MapFlags::MAP_SHARED | MapFlags::PROT_READ | MapFlags::PROT_WRITE,
SQ_HEADER_MMAP_OFFSET,
len,
|stage_info| {
(
stage_info.se_virtaddr.is_some()
&& stage_info.cr_virtaddr.is_some()
&& stage_info.ce_virtaddr.is_some(),
&mut stage_info.sr_virtaddr,
)
},
)
}
pub fn map_completion_ring_header(self) -> Result<Self> {
let len = self.ring_header_size();
self.mmap(
"the completion ring header",
MapFlags::MAP_SHARED | MapFlags::PROT_READ | MapFlags::PROT_WRITE,
CQ_HEADER_MMAP_OFFSET,
len,
|stage_info| {
(
stage_info.sr_virtaddr.is_some()
&& stage_info.se_virtaddr.is_some()
&& stage_info.ce_virtaddr.is_some(),
&mut stage_info.cr_virtaddr,
)
},
)
}
pub fn map_submission_entries(self) -> Result<Self> {
let len = self.submission_entries_bytesize();
self.mmap(
"the submission entries",
MapFlags::MAP_SHARED | MapFlags::PROT_WRITE,
SQ_ENTRIES_MMAP_OFFSET,
len,
|stage_info| {
(
stage_info.sr_virtaddr.is_some()
&& stage_info.cr_virtaddr.is_some()
&& stage_info.ce_virtaddr.is_some(),
&mut stage_info.se_virtaddr,
)
},
)
}
pub fn map_completion_entries(self) -> Result<Self> {
let len = self.completion_entries_bytesize();
self.mmap(
"the completion entries",
MapFlags::MAP_SHARED | MapFlags::PROT_READ,
CQ_ENTRIES_MMAP_OFFSET,
len,
|stage_info| {
(
stage_info.sr_virtaddr.is_some()
&& stage_info.se_virtaddr.is_some()
&& stage_info.cr_virtaddr.is_some(),
&mut stage_info.ce_virtaddr,
)
},
)
}
pub fn map_all(self) -> Result<Self> {
self.map_submission_ring_header()?
.map_submission_entries()?
.map_completion_ring_header()?
.map_completion_entries()
}
pub fn attach<N: AsRef<str>>(self, scheme_name: N) -> Result<Instance> {
self.attach_raw(scheme_name.as_ref().as_bytes())
}
pub fn attach_to_kernel(self) -> Result<Instance> {
self.attach_raw(b":")
}
pub fn attach_raw<N: AsRef<[u8]>>(self, scheme_name: N) -> Result<Instance> {
let init_flags = self.flags();
let attach_info = self
.consume_attach_state()
.expect("attaching an io_uring before the builder was in its attach stage");
call::attach_iouring(attach_info.ringfd, scheme_name.as_ref())?;
fn init_sender<S>(info: &InstanceBuilderAttachStageInfo) -> SpscSender<S> {
unsafe {
SpscSender::from_raw(
info.sr_virtaddr as *const Ring<S>,
info.se_virtaddr as *mut S,
)
}
}
fn init_receiver<C>(info: &InstanceBuilderAttachStageInfo) -> SpscReceiver<C> {
unsafe {
SpscReceiver::from_raw(
info.cr_virtaddr as *const Ring<C>,
info.ce_virtaddr as *const C,
)
}
}
Ok(Instance {
ringfd: attach_info.ringfd,
sender: if init_flags.contains(IoUringCreateFlags::BITS_32) {
GenericSender::Bits32(init_sender(&attach_info))
} else {
GenericSender::Bits64(init_sender(&attach_info))
},
receiver: if init_flags.contains(IoUringCreateFlags::BITS_32) {
GenericReceiver::Bits32(init_receiver(&attach_info))
} else {
GenericReceiver::Bits64(init_receiver(&attach_info))
},
})
}
}
pub enum GenericSender {
Bits32(SpscSender<SqEntry32>),
Bits64(SpscSender<SqEntry64>),
}
impl GenericSender {
pub fn is_32(&self) -> bool {
if let Self::Bits32(_) = self { true } else { false }
}
pub fn is_64(&self) -> bool {
if let Self::Bits64(_) = self { true } else { false }
}
pub fn as_32(&self) -> Option<&SpscSender<SqEntry32>> {
match self {
&Self::Bits32(ref s) => Some(s),
_ => None,
}
}
pub fn as_64(&self) -> Option<&SpscSender<SqEntry64>> {
match self {
&Self::Bits64(ref s) => Some(s),
_ => None,
}
}
pub fn as_32_mut(&mut self) -> Option<&mut SpscSender<SqEntry32>> {
match self {
Self::Bits32(ref mut s) => Some(s),
_ => None,
}
}
pub fn as_64_mut(&mut self) -> Option<&mut SpscSender<SqEntry64>> {
match self {
Self::Bits64(ref mut s) => Some(s),
_ => None,
}
}
}
pub enum GenericReceiver {
Bits32(SpscReceiver<CqEntry32>),
Bits64(SpscReceiver<CqEntry64>),
}
impl GenericReceiver {
pub fn is_32(&self) -> bool {
if let Self::Bits32(_) = self { true } else { false }
}
pub fn is_64(&self) -> bool {
if let Self::Bits64(_) = self { true } else { false }
}
pub fn as_32(&self) -> Option<&SpscReceiver<CqEntry32>> {
match self {
&Self::Bits32(ref s) => Some(s),
_ => None,
}
}
pub fn as_64(&self) -> Option<&SpscReceiver<CqEntry64>> {
match self {
&Self::Bits64(ref s) => Some(s),
_ => None,
}
}
pub fn as_32_mut(&mut self) -> Option<&mut SpscReceiver<CqEntry32>> {
match self {
&mut Self::Bits32(ref mut s) => Some(s),
_ => None,
}
}
pub fn as_64_mut(&mut self) -> Option<&mut SpscReceiver<CqEntry64>> {
match self {
&mut Self::Bits64(ref mut s) => Some(s),
_ => None,
}
}
}
pub struct Instance {
ringfd: usize,
sender: GenericSender,
receiver: GenericReceiver,
}
impl Instance {
fn deinit(&mut self) -> Result<()> {
call::close(self.ringfd)?;
Ok(())
}
pub fn sender_mut(&mut self) -> &mut GenericSender {
&mut self.sender
}
pub fn receiver_mut(&mut self) -> &mut GenericReceiver {
&mut self.receiver
}
pub fn sender(&self) -> &GenericSender {
&self.sender
}
pub fn receiver(&self) -> &GenericReceiver {
&self.receiver
}
pub fn close(mut self) -> Result<()> {
self.deinit()?;
mem::forget(self);
Ok(())
}
pub fn ringfd(&self) -> usize {
self.ringfd
}
pub fn wait(&self, min_complete: usize, flags: IoUringEnterFlags) -> Result<usize> {
call::enter_iouring(self.ringfd, min_complete, flags)
}
}
impl Drop for Instance {
fn drop(&mut self) {
let _ = self.deinit();
}
}
}
pub use consumer_instance::{
GenericReceiver as ConsumerGenericReceiver, GenericSender as ConsumerGenericSender, Instance as ConsumerInstance,
InstanceBuilder as ConsumerInstanceBuilder,
};
mod producer_instance {
use super::super::{IoUringRecvFlags, IoUringRecvInfo};
use super::{CqEntry32, CqEntry64, Ring, SpscReceiver, SpscSender, SqEntry32, SqEntry64};
use crate::error::{EINVAL, ENOSYS};
use crate::{Error, Result};
pub enum GenericSender {
Bits32(SpscSender<CqEntry32>),
Bits64(SpscSender<CqEntry64>),
}
pub enum GenericReceiver {
Bits32(SpscReceiver<SqEntry32>),
Bits64(SpscReceiver<SqEntry64>),
}
pub struct Instance {
sender: GenericSender,
receiver: GenericReceiver,
}
impl Instance {
pub fn new(recv_info: &IoUringRecvInfo) -> Result<Self> {
if recv_info.version.major != 1 {
return Err(Error::new(ENOSYS));
}
let flags =
IoUringRecvFlags::from_bits(recv_info.flags).ok_or(Error::new(EINVAL))?;
fn init_sender<C>(info: &IoUringRecvInfo) -> SpscSender<C> {
unsafe {
SpscSender::from_raw(
info.cr_virtaddr as *const Ring<C>,
info.ce_virtaddr as *mut C,
)
}
}
fn init_receiver<S>(info: &IoUringRecvInfo) -> SpscReceiver<S> {
unsafe {
SpscReceiver::from_raw(
info.sr_virtaddr as *const Ring<S>,
info.se_virtaddr as *mut S,
)
}
}
Ok(Self {
sender: if flags.contains(IoUringRecvFlags::BITS_32) {
GenericSender::Bits32(init_sender(recv_info))
} else {
GenericSender::Bits64(init_sender(recv_info))
},
receiver: if flags.contains(IoUringRecvFlags::BITS_32) {
GenericReceiver::Bits32(init_receiver(recv_info))
} else {
GenericReceiver::Bits64(init_receiver(recv_info))
},
})
}
pub fn sender(&mut self) -> &mut GenericSender {
&mut self.sender
}
pub fn receiver(&mut self) -> &mut GenericSender {
&mut self.sender
}
}
}
pub use producer_instance::{GenericSender as ProducerGenericSender, GenericReceiver as ProducerGenericReceiver, Instance as ProducerInstance};
pub const CURRENT_MINOR: u8 = 0;
pub const CURRENT_PATCH: u8 = 0;
pub mod operation {
use super::EventFlags;
bitflags::bitflags! {
pub struct OpenFlags: u32 {
const OPEN_AT = 0x0000_0001;
}
}
bitflags::bitflags! {
pub struct CloseFlags: u32 {
const CLOSE_MANY = 0x0000_0001;
const NO_FLUSH = 0x0000_0002;
}
}
bitflags::bitflags! {
pub struct ReadFlags: u32 {
const CHANGE_OFFSET = 0x0000_0001;
const VECTORED = 0x0000_0002;
}
}
bitflags::bitflags! {
pub struct WriteFlags: u32 {
const CHANGE_OFFSET = 0x0000_0001;
const VECTORED = 0x0000_0002;
}
}
bitflags::bitflags! {
pub struct FilesUpdateFlags: u32 {
const READ = 0x0000_0001;
const WRITE = 0x0000_0002;
const MULTI = 0x0000_0004;
}
}
impl FilesUpdateFlags {
pub fn from_event_flags(event_flags: EventFlags) -> Self {
let mut this = Self::empty();
if event_flags.contains(EventFlags::EVENT_READ) {
this |= Self::READ;
}
if event_flags.contains(EventFlags::EVENT_WRITE) {
this |= Self::WRITE;
}
this
}
}
impl From<EventFlags> for FilesUpdateFlags {
fn from(flags: EventFlags) -> Self {
Self::from_event_flags(flags)
}
}
}
#[repr(u8)]
#[non_exhaustive]
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum StandardOpcode {
NoOp = 0,
Open = 1,
Close = 2,
Read = 3,
Write = 4,
Seek = 5,
Fstat = 6,
Fstatvfs = 7,
Mmap = 8,
Munmap = 9,
Fsync = 10,
FilesUpdate = 11,
RegularSyscall = 127,
}
macro_rules! switch(
(
$value:expr, {
$($pseudopattern:expr => $body:expr,)*
}
) => {if false {
unreachable!()
}
$(else if $value == $pseudopattern { Some($body) })*
else { None }
}
);
macro_rules! i_am_lazy(
(
$value:expr, {
$($name:ident,)*
}
) => {{
switch!(
$value,
{ $(Self::$name as u8 => Self::$name,)* }
)
}}
);
impl StandardOpcode {
pub const MAX_OPCODE: u8 = 127;
pub fn from_raw(raw: u8) -> Option<Self> {
i_am_lazy!(raw, {
NoOp,
Open,
Close,
Read,
Write,
Seek,
Fstat,
Fstatvfs,
Mmap,
Munmap,
Fsync,
FilesUpdate,
RegularSyscall,
})
}
}
#[repr(u8)]
#[non_exhaustive]
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum KernelOpcode {
Waitpid = 128,
AllocateIoringMemory = 129,
}
impl KernelOpcode {
pub fn from_raw(raw: u8) -> Option<Self> {
i_am_lazy!(raw, {
Waitpid,
AllocateIoringMemory,
})
}
}
bitflags::bitflags! {
pub struct IoUringSqeFlags: u8 {
const DRAIN = 0x00;
const CHAIN = 0x01;
const SUBSCRIBE = 0x02;
const DIRECT_RECV = 0x04;
const DIRECT_SEND = 0x08;
}
}
bitflags::bitflags! {
pub struct IoUringCqeFlags: u8 {
const DIRECTLY_RECEIVED = 0x01;
const EVENT = 0x40;
const LAST_UPDATE = 0x80;
}
}
pub trait GenericSubmissionEntry {
fn opcode(&self) -> u8;
fn sqe_flags(&self) -> u8;
fn priority(&self) -> u16;
fn syscall_flags(&self) -> u32;
fn len_raw(&self) -> Either<u32, u64>;
fn fd_raw(&self) -> Either<u32, u64>;
fn user_data_raw(&self) -> Either<u32, u64>;
fn addr_raw(&self) -> Either<u32, u64>;
fn offset(&self) -> u64;
}
mod private {
use super::GenericSubmissionEntry;
use either::Either;
pub trait PublicButPrivate {}
pub trait GenericSubmissionEntryExt: GenericSubmissionEntry + PublicButPrivate {
fn len64(&self) -> u64 {
match self.len_raw() {
Either::Left(a) => u64::from(a),
Either::Right(b) => b,
}
}
fn fd64(&self) -> u64 {
match self.fd_raw() {
Either::Left(a) => u64::from(a),
Either::Right(b) => b,
}
}
fn user_data64(&self) -> u64 {
match self.user_data_raw() {
Either::Left(a) => u64::from(a),
Either::Right(b) => b,
}
}
fn addr64(&self) -> u64 {
match self.addr_raw() {
Either::Left(a) => u64::from(a),
Either::Right(b) => b,
}
}
}
impl<T> PublicButPrivate for T
where
T: GenericSubmissionEntry,
{}
impl<T> GenericSubmissionEntryExt for T
where
T: GenericSubmissionEntry + PublicButPrivate,
{}
}
pub use private::GenericSubmissionEntryExt;
impl GenericSubmissionEntry for SqEntry32 {
fn opcode(&self) -> u8 {
self.opcode
}
fn sqe_flags(&self) -> u8 {
self.flags
}
fn priority(&self) -> u16 {
self.priority
}
fn syscall_flags(&self) -> u32 {
self.syscall_flags
}
fn len_raw(&self) -> Either<u32, u64> {
Either::Left(self.len)
}
fn user_data_raw(&self) -> Either<u32, u64> {
Either::Left(self.user_data)
}
fn fd_raw(&self) -> Either<u32, u64> {
Either::Left(self.fd)
}
fn addr_raw(&self) -> Either<u32, u64> {
Either::Left(self.addr)
}
fn offset(&self) -> u64 {
self.offset
}
}
impl GenericSubmissionEntry for SqEntry64 {
fn opcode(&self) -> u8 {
self.opcode
}
fn sqe_flags(&self) -> u8 {
self.flags
}
fn priority(&self) -> u16 {
self.priority
}
fn syscall_flags(&self) -> u32 {
self.syscall_flags
}
fn len_raw(&self) -> Either<u32, u64> {
Either::Right(self.len)
}
fn user_data_raw(&self) -> Either<u32, u64> {
Either::Right(self.user_data)
}
fn fd_raw(&self) -> Either<u32, u64> {
Either::Right(self.fd)
}
fn addr_raw(&self) -> Either<u32, u64> {
Either::Right(self.addr)
}
fn offset(&self) -> u64 {
self.offset
}
}
pub trait GenericCompletionEntry
where
Self: Sized,
{
const HAS_EXTRA: bool;
const USER_DATA_64: bool;
const STATUS_64: bool;
const FLAGS_64: bool;
const EXTRA_64: bool;
fn construct(user_data: u64, status: u64, flags: u64, extra: Option<u64>) -> Option<Self>;
}
impl GenericCompletionEntry for CqEntry32 {
const HAS_EXTRA: bool = true;
const USER_DATA_64: bool = false;
const STATUS_64: bool = false;
const FLAGS_64: bool = false;
const EXTRA_64: bool = false;
fn construct(user_data: u64, status: u64, flags: u64, extra: Option<u64>) -> Option<Self> {
if extra.is_some() { return None };
Some(Self {
user_data: user_data.try_into().ok()?,
status: status.try_into().ok()?,
flags: flags.try_into().ok()?,
extra: extra.unwrap_or(0).try_into().ok()?,
})
}
}
impl GenericCompletionEntry for CqEntry64 {
const HAS_EXTRA: bool = true;
const USER_DATA_64: bool = true;
const STATUS_64: bool = true;
const FLAGS_64: bool = true;
const EXTRA_64: bool = true;
fn construct(user_data: u64, status: u64, flags: u64, extra: Option<u64>) -> Option<Self> {
Some(Self {
user_data,
status,
flags,
extra: extra.unwrap_or(0),
})
}
}
}
#[repr(C)]
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Ord, PartialOrd)]
pub struct IoUringVersion {
pub major: u8,
pub minor: u8,
pub patch: u8,
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct IoUringCreateInfo {
pub version: IoUringVersion,
pub _rsvd: u8,
pub flags: u32,
pub len: usize,
pub sq_entry_count: usize,
pub cq_entry_count: usize,
}
#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct IoUringRecvInfo {
pub version: IoUringVersion,
pub flags: u32,
pub len: usize,
pub sq_entry_count: usize,
pub cq_entry_count: usize,
pub sr_virtaddr: usize,
pub cr_virtaddr: usize,
pub se_virtaddr: usize,
pub ce_virtaddr: usize,
}
pub struct ConsumerInstance;
impl ConsumerInstance {
pub fn new_v1() -> v1::ConsumerInstanceBuilder {
v1::ConsumerInstanceBuilder::new()
}
}
pub struct ProducerInstance;
impl ProducerInstance {
pub fn new_v1(recv_info: &IoUringRecvInfo) -> crate::Result<v1::ProducerInstance> {
v1::ProducerInstance::new(recv_info)
}
}
bitflags::bitflags! {
pub struct IoUringEnterFlags: usize {
const DRAIN = 0x0000_0001;
}
}
pub mod linux {
use core::convert::TryFrom;
use super::v1;
use either::Either;
#[allow(non_camel_case_types)]
#[allow(non_snake_case)]
#[repr(C)]
pub union off_addr2 {
pub offset: u64,
pub addr2: u64,
}
#[repr(C)]
pub union cmd_flags {
pub rw_flags: i32,
pub fsync_flags: u32,
pub poll_events: u16,
pub sync_range_flags: u32,
pub msg_flags: u32,
pub timeout_flags: u32,
pub accept_flags: u32,
pub cancel_flags: u32,
}
#[repr(C)]
pub union buf_index {
pub buf_index: u16,
pub pad: [u64; 3],
}
#[repr(C)]
pub struct io_uring_sqe {
pub opcode: u8,
pub flags: u8,
pub ioprio: u16,
pub fd: i32,
pub off_addr2: off_addr2,
pub addr: u64,
pub len: u32,
pub cmd_flags: cmd_flags,
pub user_data: u64,
pub buf_index: buf_index,
}
#[repr(C)]
pub struct io_uring_cqe {
pub user_data: u64,
pub result: i32,
pub flags: u32,
}
impl v1::GenericSubmissionEntry for io_uring_sqe {
fn opcode(&self) -> u8 {
self.opcode
}
fn sqe_flags(&self) -> u8 {
self.flags
}
fn priority(&self) -> u16 {
self.ioprio
}
fn syscall_flags(&self) -> u32 {
unsafe { self.cmd_flags.fsync_flags }
}
fn len_raw(&self) -> Either<u32, u64> {
Either::Left(self.len)
}
fn fd_raw(&self) -> Either<u32, u64> {
Either::Left(self.fd as u32)
}
fn user_data_raw(&self) -> Either<u32, u64> {
Either::Right(self.user_data)
}
fn addr_raw(&self) -> Either<u32, u64> {
Either::Right(self.addr)
}
fn offset(&self) -> u64 {
unsafe { self.off_addr2.offset }
}
}
impl v1::GenericCompletionEntry for io_uring_cqe {
const HAS_EXTRA: bool = false;
const USER_DATA_64: bool = true;
const STATUS_64: bool = false;
const FLAGS_64: bool = false;
const EXTRA_64: bool = false;
fn construct(user_data: u64, status: u64, flags: u64, extra: Option<u64>) -> Option<Self> {
if extra.is_some() {
return None;
}
Some(Self {
user_data,
result: i32::try_from(status as i64).ok()?,
flags: u32::try_from(flags).ok()?,
})
}
}
}
pub use v1::*;