use core::{cmp, fmt, hash, mem, ptr, slice, usize};
use core::iter::{FromIterator};
use core::ops::{Deref, RangeBounds};
use alloc::{vec::Vec, string::String, boxed::Box, borrow::Borrow};
use crate::Buf;
use crate::buf::IntoIter;
use crate::loom::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
pub struct Bytes {
    ptr: *const u8,
    len: usize,
    
    data: AtomicPtr<()>,
    vtable: &'static Vtable,
}
pub(crate) struct Vtable {
    
    pub clone: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Bytes,
    
    pub drop: unsafe fn(&mut AtomicPtr<()>, *const u8, usize),
}
impl Bytes {
    
    
    
    
    
    
    
    
    
    
    
    
    #[inline]
    #[cfg(not(all(loom, test)))]
    pub const fn new() -> Bytes {
        
        
        const EMPTY: &[u8] = &[];
        Bytes::from_static(EMPTY)
    }
    #[cfg(all(loom, test))]
    pub fn new() -> Bytes {
        const EMPTY: &[u8] = &[];
        Bytes::from_static(EMPTY)
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    #[inline]
    #[cfg(not(all(loom, test)))]
    pub const fn from_static(bytes: &'static [u8]) -> Bytes {
        Bytes {
            ptr: bytes.as_ptr(),
            len: bytes.len(),
            data: AtomicPtr::new(ptr::null_mut()),
            vtable: &STATIC_VTABLE,
        }
    }
    #[cfg(all(loom, test))]
    pub fn from_static(bytes: &'static [u8]) -> Bytes {
        Bytes {
            ptr: bytes.as_ptr(),
            len: bytes.len(),
            data: AtomicPtr::new(ptr::null_mut()),
            vtable: &STATIC_VTABLE,
        }
    }
    
    
    
    
    
    
    
    
    
    
    #[inline]
    pub fn len(&self) -> usize {
        self.len
    }
    
    
    
    
    
    
    
    
    
    
    #[inline]
    pub fn is_empty(&self) -> bool {
        self.len == 0
    }
    
    pub fn copy_from_slice(data: &[u8]) -> Self {
        data.to_vec().into()
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    pub fn slice(&self, range: impl RangeBounds<usize>) -> Bytes {
        use core::ops::Bound;
        let len = self.len();
        let begin = match range.start_bound() {
            Bound::Included(&n) => n,
            Bound::Excluded(&n) => n + 1,
            Bound::Unbounded => 0,
        };
        let end = match range.end_bound() {
            Bound::Included(&n) => n + 1,
            Bound::Excluded(&n) => n,
            Bound::Unbounded => len,
        };
        assert!(
            begin <= end,
            "range start must not be greater than end: {:?} <= {:?}",
            begin,
            end,
        );
        assert!(
            end <= len,
            "range end out of bounds: {:?} <= {:?}",
            end,
            len,
        );
        if end == begin {
            return Bytes::new();
        }
        let mut ret = self.clone();
        ret.len = end - begin;
        ret.ptr = unsafe { ret.ptr.offset(begin as isize) };
        ret
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    pub fn slice_ref(&self, subset: &[u8]) -> Bytes {
        
        
        if subset.is_empty() {
            return Bytes::new();
        }
        let bytes_p = self.as_ptr() as usize;
        let bytes_len = self.len();
        let sub_p = subset.as_ptr() as usize;
        let sub_len = subset.len();
        assert!(
            sub_p >= bytes_p,
            "subset pointer ({:p}) is smaller than self pointer ({:p})",
            sub_p as *const u8,
            bytes_p as *const u8,
        );
        assert!(
            sub_p + sub_len <= bytes_p + bytes_len,
            "subset is out of bounds: self = ({:p}, {}), subset = ({:p}, {})",
            bytes_p as *const u8,
            bytes_len,
            sub_p as *const u8,
            sub_len,
        );
        let sub_offset = sub_p - bytes_p;
        self.slice(sub_offset..(sub_offset + sub_len))
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    #[must_use = "consider Bytes::truncate if you don't need the other half"]
    pub fn split_off(&mut self, at: usize) -> Bytes {
        assert!(
            at <= self.len(),
            "split_off out of bounds: {:?} <= {:?}",
            at,
            self.len(),
        );
        if at == self.len() {
            return Bytes::new();
        }
        if at == 0 {
            return mem::replace(self, Bytes::new());
        }
        let mut ret = self.clone();
        self.len = at;
        unsafe { ret.inc_start(at) };
        ret
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    #[must_use = "consider Bytes::advance if you don't need the other half"]
    pub fn split_to(&mut self, at: usize) -> Bytes {
        assert!(
            at <= self.len(),
            "split_to out of bounds: {:?} <= {:?}",
            at,
            self.len(),
        );
        if at == self.len() {
            return mem::replace(self, Bytes::new());
        }
        if at == 0 {
            return Bytes::new();
        }
        let mut ret = self.clone();
        unsafe { self.inc_start(at) };
        ret.len = at;
        ret
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    #[inline]
    pub fn truncate(&mut self, len: usize) {
        if len < self.len {
            
            
            
            if self.vtable as *const Vtable == &PROMOTABLE_EVEN_VTABLE ||
                self.vtable as *const Vtable == &PROMOTABLE_ODD_VTABLE {
                drop(self.split_off(len));
            } else {
                self.len = len;
            }
        }
    }
    
    
    
    
    
    
    
    
    
    
    
    #[inline]
    pub fn clear(&mut self) {
        self.truncate(0);
    }
    #[inline]
    pub(crate) unsafe fn with_vtable(ptr: *const u8, len: usize, data: AtomicPtr<()>, vtable: &'static Vtable) -> Bytes {
        Bytes {
            ptr,
            len,
            data,
            vtable,
        }
    }
    
    #[inline]
    fn as_slice(&self) -> &[u8] {
        unsafe {
            slice::from_raw_parts(self.ptr, self.len)
        }
    }
    #[inline]
    unsafe fn inc_start(&mut self, by: usize) {
        
        debug_assert!(self.len >= by, "internal: inc_start out of bounds");
        self.len -= by;
        self.ptr = self.ptr.offset(by as isize);
    }
}
unsafe impl Send for Bytes {}
unsafe impl Sync for Bytes {}
impl Drop for Bytes {
    #[inline]
    fn drop(&mut self) {
        unsafe {
            (self.vtable.drop)(&mut self.data, self.ptr, self.len)
        }
    }
}
impl Clone for Bytes {
    #[inline]
    fn clone(&self) -> Bytes {
        unsafe {
            (self.vtable.clone)(&self.data, self.ptr, self.len)
        }
    }
}
impl Buf for Bytes {
    #[inline]
    fn remaining(&self) -> usize {
        self.len()
    }
    #[inline]
    fn bytes(&self) -> &[u8] {
        self.as_slice()
    }
    #[inline]
    fn advance(&mut self, cnt: usize) {
        assert!(
            cnt <= self.len(),
            "cannot advance past `remaining`: {:?} <= {:?}",
            cnt,
            self.len(),
        );
        unsafe {
            self.inc_start(cnt);
        }
    }
    fn to_bytes(&mut self) -> crate::Bytes {
        core::mem::replace(self, Bytes::new())
    }
}
impl Deref for Bytes {
    type Target = [u8];
    #[inline]
    fn deref(&self) -> &[u8] {
        self.as_slice()
    }
}
impl AsRef<[u8]> for Bytes {
    #[inline]
    fn as_ref(&self) -> &[u8] {
        self.as_slice()
    }
}
impl hash::Hash for Bytes {
    fn hash<H>(&self, state: &mut H) where H: hash::Hasher {
        self.as_slice().hash(state);
    }
}
impl Borrow<[u8]> for Bytes {
    fn borrow(&self) -> &[u8] {
        self.as_slice()
    }
}
impl IntoIterator for Bytes {
    type Item = u8;
    type IntoIter = IntoIter<Bytes>;
    fn into_iter(self) -> Self::IntoIter {
        IntoIter::new(self)
    }
}
impl<'a> IntoIterator for &'a Bytes {
    type Item = &'a u8;
    type IntoIter = core::slice::Iter<'a, u8>;
    fn into_iter(self) -> Self::IntoIter {
        self.as_slice().into_iter()
    }
}
impl FromIterator<u8> for Bytes {
    fn from_iter<T: IntoIterator<Item = u8>>(into_iter: T) -> Self {
        Vec::from_iter(into_iter).into()
    }
}
impl PartialEq for Bytes {
    fn eq(&self, other: &Bytes) -> bool {
        self.as_slice() == other.as_slice()
    }
}
impl PartialOrd for Bytes {
    fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
        self.as_slice().partial_cmp(other.as_slice())
    }
}
impl Ord for Bytes {
    fn cmp(&self, other: &Bytes) -> cmp::Ordering {
        self.as_slice().cmp(other.as_slice())
    }
}
impl Eq for Bytes {}
impl PartialEq<[u8]> for Bytes {
    fn eq(&self, other: &[u8]) -> bool {
        self.as_slice() == other
    }
}
impl PartialOrd<[u8]> for Bytes {
    fn partial_cmp(&self, other: &[u8]) -> Option<cmp::Ordering> {
        self.as_slice().partial_cmp(other)
    }
}
impl PartialEq<Bytes> for [u8] {
    fn eq(&self, other: &Bytes) -> bool {
        *other == *self
    }
}
impl PartialOrd<Bytes> for [u8] {
    fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
        <[u8] as PartialOrd<[u8]>>::partial_cmp(self, other)
    }
}
impl PartialEq<str> for Bytes {
    fn eq(&self, other: &str) -> bool {
        self.as_slice() == other.as_bytes()
    }
}
impl PartialOrd<str> for Bytes {
    fn partial_cmp(&self, other: &str) -> Option<cmp::Ordering> {
        self.as_slice().partial_cmp(other.as_bytes())
    }
}
impl PartialEq<Bytes> for str {
    fn eq(&self, other: &Bytes) -> bool {
        *other == *self
    }
}
impl PartialOrd<Bytes> for str {
    fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
        <[u8] as PartialOrd<[u8]>>::partial_cmp(self.as_bytes(), other)
    }
}
impl PartialEq<Vec<u8>> for Bytes {
    fn eq(&self, other: &Vec<u8>) -> bool {
        *self == &other[..]
    }
}
impl PartialOrd<Vec<u8>> for Bytes {
    fn partial_cmp(&self, other: &Vec<u8>) -> Option<cmp::Ordering> {
        self.as_slice().partial_cmp(&other[..])
    }
}
impl PartialEq<Bytes> for Vec<u8> {
    fn eq(&self, other: &Bytes) -> bool {
        *other == *self
    }
}
impl PartialOrd<Bytes> for Vec<u8> {
    fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
        <[u8] as PartialOrd<[u8]>>::partial_cmp(self, other)
    }
}
impl PartialEq<String> for Bytes {
    fn eq(&self, other: &String) -> bool {
        *self == &other[..]
    }
}
impl PartialOrd<String> for Bytes {
    fn partial_cmp(&self, other: &String) -> Option<cmp::Ordering> {
        self.as_slice().partial_cmp(other.as_bytes())
    }
}
impl PartialEq<Bytes> for String {
    fn eq(&self, other: &Bytes) -> bool {
        *other == *self
    }
}
impl PartialOrd<Bytes> for String {
    fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
        <[u8] as PartialOrd<[u8]>>::partial_cmp(self.as_bytes(), other)
    }
}
impl PartialEq<Bytes> for &[u8] {
    fn eq(&self, other: &Bytes) -> bool {
        *other == *self
    }
}
impl PartialOrd<Bytes> for &[u8] {
    fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
        <[u8] as PartialOrd<[u8]>>::partial_cmp(self, other)
    }
}
impl PartialEq<Bytes> for &str {
    fn eq(&self, other: &Bytes) -> bool {
        *other == *self
    }
}
impl PartialOrd<Bytes> for &str {
    fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
        <[u8] as PartialOrd<[u8]>>::partial_cmp(self.as_bytes(), other)
    }
}
impl<'a, T: ?Sized> PartialEq<&'a T> for Bytes
    where Bytes: PartialEq<T>
{
    fn eq(&self, other: &&'a T) -> bool {
        *self == **other
    }
}
impl<'a, T: ?Sized> PartialOrd<&'a T> for Bytes
    where Bytes: PartialOrd<T>
{
    fn partial_cmp(&self, other: &&'a T) -> Option<cmp::Ordering> {
        self.partial_cmp(&**other)
    }
}
impl Default for Bytes {
    #[inline]
    fn default() -> Bytes {
        Bytes::new()
    }
}
impl From<&'static [u8]> for Bytes {
    fn from(slice: &'static [u8]) -> Bytes {
        Bytes::from_static(slice)
    }
}
impl From<&'static str> for Bytes {
    fn from(slice: &'static str) -> Bytes {
        Bytes::from_static(slice.as_bytes())
    }
}
impl From<Vec<u8>> for Bytes {
    fn from(vec: Vec<u8>) -> Bytes {
        
        
        
        if vec.is_empty() {
            return Bytes::new();
        }
        let slice = vec.into_boxed_slice();
        let len = slice.len();
        let ptr = slice.as_ptr();
        drop(Box::into_raw(slice));
        if ptr as usize & 0x1 == 0 {
            let data = ptr as usize | KIND_VEC;
            Bytes {
                ptr,
                len,
                data: AtomicPtr::new(data as *mut _),
                vtable: &PROMOTABLE_EVEN_VTABLE,
            }
        } else {
            Bytes {
                ptr,
                len,
                data: AtomicPtr::new(ptr as *mut _),
                vtable: &PROMOTABLE_ODD_VTABLE,
            }
        }
    }
}
impl From<String> for Bytes {
    fn from(s: String) -> Bytes {
        Bytes::from(s.into_bytes())
    }
}
impl fmt::Debug for Vtable {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Vtable")
            .field("clone", &(self.clone as *const ()))
            .field("drop", &(self.drop as *const ()))
            .finish()
    }
}
const STATIC_VTABLE: Vtable = Vtable {
    clone: static_clone,
    drop: static_drop,
};
unsafe fn static_clone(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
    let slice = slice::from_raw_parts(ptr, len);
    Bytes::from_static(slice)
}
unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
    
}
static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable {
    clone: promotable_even_clone,
    drop: promotable_even_drop,
};
static PROMOTABLE_ODD_VTABLE: Vtable = Vtable {
    clone: promotable_odd_clone,
    drop: promotable_odd_drop,
};
unsafe fn promotable_even_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
    let shared = data.load(Ordering::Acquire);
    let kind = shared as usize & KIND_MASK;
    if kind == KIND_ARC {
        shallow_clone_arc(shared as _, ptr, len)
    } else {
        debug_assert_eq!(kind, KIND_VEC);
        let buf = (shared as usize & !KIND_MASK) as *mut u8;
        shallow_clone_vec(data, shared, buf, ptr, len)
    }
}
unsafe fn promotable_even_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) {
    let shared = *data.get_mut();
    let kind = shared as usize & KIND_MASK;
    if kind == KIND_ARC {
        release_shared(shared as *mut Shared);
    } else {
        debug_assert_eq!(kind, KIND_VEC);
        let buf = (shared as usize & !KIND_MASK) as *mut u8;
        drop(rebuild_boxed_slice(buf, ptr, len));
    }
}
unsafe fn promotable_odd_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
    let shared = data.load(Ordering::Acquire);
    let kind = shared as usize & KIND_MASK;
    if kind == KIND_ARC {
        shallow_clone_arc(shared as _, ptr, len)
    } else {
        debug_assert_eq!(kind, KIND_VEC);
        shallow_clone_vec(data, shared, shared as *mut u8, ptr, len)
    }
}
unsafe fn promotable_odd_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) {
    let shared = *data.get_mut();
    let kind = shared as usize & KIND_MASK;
    if kind == KIND_ARC {
        release_shared(shared as *mut Shared);
    } else {
        debug_assert_eq!(kind, KIND_VEC);
        drop(rebuild_boxed_slice(shared as *mut u8, ptr, len));
    }
}
unsafe fn rebuild_boxed_slice(buf: *mut u8, offset: *const u8, len: usize) -> Box<[u8]> {
    let cap = (offset as usize - buf as usize) + len;
    Box::from_raw(slice::from_raw_parts_mut(buf, cap))
}
struct Shared {
    
    _vec: Vec<u8>,
    ref_cnt: AtomicUsize,
}
const _: [(); 0 - mem::align_of::<Shared>() % 2] = []; 
static SHARED_VTABLE: Vtable = Vtable {
    clone: shared_clone,
    drop: shared_drop,
};
const KIND_ARC: usize = 0b0;
const KIND_VEC: usize = 0b1;
const KIND_MASK: usize = 0b1;
unsafe fn shared_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
    let shared = data.load(Ordering::Acquire);
    shallow_clone_arc(shared as _, ptr, len)
}
unsafe fn shared_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
    let shared = *data.get_mut();
    release_shared(shared as *mut Shared);
}
unsafe fn shallow_clone_arc(shared: *mut Shared, ptr: *const u8, len: usize) -> Bytes {
    let old_size = (*shared).ref_cnt.fetch_add(1, Ordering::Relaxed);
    if old_size > usize::MAX >> 1 {
        crate::abort();
    }
    Bytes {
        ptr,
        len,
        data: AtomicPtr::new(shared as _),
        vtable: &SHARED_VTABLE,
    }
}
#[cold]
unsafe fn shallow_clone_vec(atom: &AtomicPtr<()>, ptr: *const (), buf: *mut u8, offset: *const u8, len: usize) -> Bytes {
    
    
    
    
    
    
    
    
    
    
    let vec = rebuild_boxed_slice(buf, offset, len).into_vec();
    let shared = Box::new(Shared {
        _vec: vec,
        
        
        
        ref_cnt: AtomicUsize::new(2),
    });
    let shared = Box::into_raw(shared);
    
    
    debug_assert!(
        0 == (shared as usize & KIND_MASK),
        "internal: Box<Shared> should have an aligned pointer",
    );
    
    
    
    
    
    
    
    
    
    let actual = atom.compare_and_swap(ptr as _, shared as _, Ordering::AcqRel);
    if actual as usize == ptr as usize {
        
        
        return Bytes {
            ptr: offset,
            len,
            data: AtomicPtr::new(shared as _),
            vtable: &SHARED_VTABLE,
        };
    }
    
    
    
    let shared = Box::from_raw(shared);
    mem::forget(*shared);
    
    
    shallow_clone_arc(actual as _, offset, len)
}
unsafe fn release_shared(ptr: *mut Shared) {
    
    if (*ptr).ref_cnt.fetch_sub(1, Ordering::Release) != 1 {
        return;
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    atomic::fence(Ordering::Acquire);
    
    Box::from_raw(ptr);
}
fn _split_to_must_use() {}
fn _split_off_must_use() {}
#[cfg(all(test, loom))]
mod fuzz {
    use std::sync::Arc;
    use loom::thread;
    use super::Bytes;
    #[test]
    fn bytes_cloning_vec() {
        loom::model(|| {
            let a = Bytes::from(b"abcdefgh".to_vec());
            let addr = a.as_ptr() as usize;
            
            let a1 = Arc::new(a);
            let a2 = a1.clone();
            let t1 = thread::spawn(move || {
                let b: Bytes = (*a1).clone();
                assert_eq!(b.as_ptr() as usize, addr);
            });
            let t2 = thread::spawn(move || {
                let b: Bytes = (*a2).clone();
                assert_eq!(b.as_ptr() as usize, addr);
            });
            t1.join().unwrap();
            t2.join().unwrap();
        });
    }
}