use bstr::ByteSlice; use crossbeam_utils::atomic::AtomicCell; use rustpython_common::{ borrow::{BorrowedValue, BorrowedValueMut}, lock::OnceCell, }; use std::mem::size_of; use std::ops::Deref; use super::objint::PyIntRef; use super::objiter; use super::objstr::PyStrRef; use super::objtype::PyTypeRef; use crate::bytesinner::{ bytes_decode, ByteInnerFindOptions, ByteInnerNewOptions, ByteInnerPaddingOptions, ByteInnerSplitOptions, ByteInnerTranslateOptions, DecodeArgs, PyBytesInner, }; use crate::common::hash::PyHash; use crate::function::{OptionalArg, OptionalOption}; use crate::obj::objtuple::PyTupleRef; use crate::pyobject::{ BorrowValue, Either, IntoPyObject, PyClassImpl, PyComparisonValue, PyContext, PyIterable, PyObjectRef, PyRef, PyResult, PyValue, TryFromObject, }; use crate::slots::{BufferProtocol, Comparable, Hashable, PyComparisonOp}; use crate::vm::VirtualMachine; use crate::{ anystr::{self, AnyStr}, byteslike::PyBytesLike, }; use crate::obj::objmemory::{Buffer, BufferOptions}; /// "bytes(iterable_of_ints) -> bytes\n\ /// bytes(string, encoding[, errors]) -> bytes\n\ /// bytes(bytes_or_buffer) -> immutable copy of bytes_or_buffer\n\ /// bytes(int) -> bytes object of size given by the parameter initialized with null bytes\n\ /// bytes() -> empty bytes object\n\nConstruct an immutable array of bytes from:\n \ /// - an iterable yielding integers in range(256)\n \ /// - a text string encoded using the specified encoding\n \ /// - any object implementing the buffer API.\n \ /// - an integer"; #[pyclass(module = false, name = "bytes")] #[derive(Clone, Debug)] pub struct PyBytes { inner: PyBytesInner, buffer_options: OnceCell>, } pub type PyBytesRef = PyRef; impl<'a> BorrowValue<'a> for PyBytes { type Borrowed = &'a [u8]; fn borrow_value(&'a self) -> Self::Borrowed { &self.inner.elements } } impl From> for PyBytes { fn from(elements: Vec) -> Self { Self { inner: PyBytesInner { elements }, buffer_options: OnceCell::new(), } } } impl From for PyBytes { fn from(inner: PyBytesInner) -> Self { Self { inner, buffer_options: OnceCell::new(), } } } impl IntoPyObject for Vec { fn into_pyobject(self, vm: &VirtualMachine) -> PyObjectRef { vm.ctx.new_bytes(self) } } impl Deref for PyBytes { type Target = [u8]; fn deref(&self) -> &[u8] { &self.inner.elements } } impl PyValue for PyBytes { fn class(vm: &VirtualMachine) -> PyTypeRef { vm.ctx.types.bytes_type.clone() } } pub(crate) fn init(context: &PyContext) { PyBytes::extend_class(context, &context.types.bytes_type); let bytes_type = &context.types.bytes_type; extend_class!(context, bytes_type, { "maketrans" => context.new_method(PyBytesInner::maketrans), }); PyBytesIterator::extend_class(context, &context.types.bytes_iterator_type); } #[pyimpl(flags(BASETYPE), with(Hashable, Comparable, BufferProtocol))] impl PyBytes { #[pyslot] fn tp_new( cls: PyTypeRef, options: ByteInnerNewOptions, vm: &VirtualMachine, ) -> PyResult { PyBytes { inner: options.get_value(vm)?, buffer_options: OnceCell::new(), } .into_ref_with_type(vm, cls) } #[pymethod(name = "__repr__")] pub(crate) fn repr(&self) -> String { format!("b'{}'", self.inner.repr()) } #[pymethod(name = "__len__")] pub(crate) fn len(&self) -> usize { self.inner.len() } #[pymethod(name = "__iter__")] fn iter(zelf: PyRef) -> PyBytesIterator { PyBytesIterator { position: AtomicCell::new(0), bytes: zelf, } } #[pymethod(name = "__sizeof__")] fn sizeof(&self) -> PyResult { Ok(size_of::() + self.inner.elements.len() * size_of::()) } #[pymethod(name = "__add__")] fn add(&self, other: PyBytesLike, vm: &VirtualMachine) -> PyObjectRef { vm.ctx.new_bytes(self.inner.add(&*other.borrow_value())) } #[pymethod(name = "__contains__")] fn contains( &self, needle: Either, vm: &VirtualMachine, ) -> PyResult { self.inner.contains(needle, vm) } #[pymethod(name = "__getitem__")] fn getitem(&self, needle: PyObjectRef, vm: &VirtualMachine) -> PyResult { self.inner.getitem("byte", needle, vm) } #[pymethod(name = "isalnum")] fn isalnum(&self) -> bool { self.inner.isalnum() } #[pymethod(name = "isalpha")] fn isalpha(&self) -> bool { self.inner.isalpha() } #[pymethod(name = "isascii")] fn isascii(&self) -> bool { self.inner.isascii() } #[pymethod(name = "isdigit")] fn isdigit(&self) -> bool { self.inner.isdigit() } #[pymethod(name = "islower")] fn islower(&self) -> bool { self.inner.islower() } #[pymethod(name = "isspace")] fn isspace(&self) -> bool { self.inner.isspace() } #[pymethod(name = "isupper")] fn isupper(&self) -> bool { self.inner.isupper() } #[pymethod(name = "istitle")] fn istitle(&self) -> bool { self.inner.istitle() } #[pymethod(name = "lower")] fn lower(&self) -> Self { self.inner.lower().into() } #[pymethod(name = "upper")] fn upper(&self) -> Self { self.inner.upper().into() } #[pymethod(name = "capitalize")] fn capitalize(&self) -> Self { self.inner.capitalize().into() } #[pymethod(name = "swapcase")] fn swapcase(&self) -> Self { self.inner.swapcase().into() } // TODO: Changed in version 3.8: bytes.hex() now supports optional sep and // bytes_per_sep parameters to insert separators between bytes in the hex output. #[pymethod(name = "hex")] pub(crate) fn hex(&self) -> String { self.inner.hex() } #[pymethod] fn fromhex(string: PyStrRef, vm: &VirtualMachine) -> PyResult { Ok(PyBytesInner::fromhex(string.borrow_value(), vm)?.into()) } #[pymethod(name = "center")] fn center(&self, options: ByteInnerPaddingOptions, vm: &VirtualMachine) -> PyResult { Ok(self.inner.center(options, vm)?.into()) } #[pymethod(name = "ljust")] fn ljust(&self, options: ByteInnerPaddingOptions, vm: &VirtualMachine) -> PyResult { Ok(self.inner.ljust(options, vm)?.into()) } #[pymethod(name = "rjust")] fn rjust(&self, options: ByteInnerPaddingOptions, vm: &VirtualMachine) -> PyResult { Ok(self.inner.rjust(options, vm)?.into()) } #[pymethod(name = "count")] fn count(&self, options: ByteInnerFindOptions, vm: &VirtualMachine) -> PyResult { self.inner.count(options, vm) } #[pymethod(name = "join")] fn join(&self, iter: PyIterable, vm: &VirtualMachine) -> PyResult { Ok(self.inner.join(iter, vm)?.into()) } #[pymethod(name = "endswith")] fn endswith(&self, options: anystr::StartsEndsWithArgs, vm: &VirtualMachine) -> PyResult { self.inner.elements[..].py_startsendswith( options, "endswith", "bytes", |s, x: &PyBytesInner| s.ends_with(&x.elements[..]), vm, ) } #[pymethod(name = "startswith")] fn startswith( &self, options: anystr::StartsEndsWithArgs, vm: &VirtualMachine, ) -> PyResult { self.inner.elements[..].py_startsendswith( options, "startswith", "bytes", |s, x: &PyBytesInner| s.starts_with(&x.elements[..]), vm, ) } #[pymethod(name = "find")] fn find(&self, options: ByteInnerFindOptions, vm: &VirtualMachine) -> PyResult { let index = self.inner.find(options, |h, n| h.find(n), vm)?; Ok(index.map_or(-1, |v| v as isize)) } #[pymethod(name = "index")] fn index(&self, options: ByteInnerFindOptions, vm: &VirtualMachine) -> PyResult { let index = self.inner.find(options, |h, n| h.find(n), vm)?; index.ok_or_else(|| vm.new_value_error("substring not found".to_owned())) } #[pymethod(name = "rfind")] fn rfind(&self, options: ByteInnerFindOptions, vm: &VirtualMachine) -> PyResult { let index = self.inner.find(options, |h, n| h.rfind(n), vm)?; Ok(index.map_or(-1, |v| v as isize)) } #[pymethod(name = "rindex")] fn rindex(&self, options: ByteInnerFindOptions, vm: &VirtualMachine) -> PyResult { let index = self.inner.find(options, |h, n| h.rfind(n), vm)?; index.ok_or_else(|| vm.new_value_error("substring not found".to_owned())) } #[pymethod(name = "translate")] fn translate( &self, options: ByteInnerTranslateOptions, vm: &VirtualMachine, ) -> PyResult { Ok(self.inner.translate(options, vm)?.into()) } #[pymethod(name = "strip")] fn strip(&self, chars: OptionalOption) -> Self { self.inner.strip(chars).into() } #[pymethod(name = "lstrip")] fn lstrip(&self, chars: OptionalOption) -> Self { self.inner.lstrip(chars).into() } #[pymethod(name = "rstrip")] fn rstrip(&self, chars: OptionalOption) -> Self { self.inner.rstrip(chars).into() } /// removeprefix($self, prefix, /) /// /// /// Return a bytes object with the given prefix string removed if present. /// /// If the bytes starts with the prefix string, return string[len(prefix):] /// Otherwise, return a copy of the original bytes. #[pymethod(name = "removeprefix")] fn removeprefix(&self, prefix: PyBytesInner) -> Self { self.inner.removeprefix(prefix).into() } /// removesuffix(self, prefix, /) /// /// /// Return a bytes object with the given suffix string removed if present. /// /// If the bytes ends with the suffix string, return string[:len(suffix)] /// Otherwise, return a copy of the original bytes. #[pymethod(name = "removesuffix")] fn removesuffix(&self, suffix: PyBytesInner) -> Self { self.inner.removesuffix(suffix).into() } #[pymethod(name = "split")] fn split(&self, options: ByteInnerSplitOptions, vm: &VirtualMachine) -> PyResult { self.inner .split(options, |s, vm| vm.ctx.new_bytes(s.to_vec()), vm) } #[pymethod(name = "rsplit")] fn rsplit(&self, options: ByteInnerSplitOptions, vm: &VirtualMachine) -> PyResult { self.inner .rsplit(options, |s, vm| vm.ctx.new_bytes(s.to_vec()), vm) } #[pymethod(name = "partition")] fn partition(&self, sep: PyObjectRef, vm: &VirtualMachine) -> PyResult { let sub = PyBytesInner::try_from_object(vm, sep.clone())?; let (front, has_mid, back) = self.inner.partition(&sub, vm)?; Ok(vm.ctx.new_tuple(vec![ vm.ctx.new_bytes(front), if has_mid { sep } else { vm.ctx.new_bytes(Vec::new()) }, vm.ctx.new_bytes(back), ])) } #[pymethod(name = "rpartition")] fn rpartition(&self, sep: PyObjectRef, vm: &VirtualMachine) -> PyResult { let sub = PyBytesInner::try_from_object(vm, sep.clone())?; let (back, has_mid, front) = self.inner.rpartition(&sub, vm)?; Ok(vm.ctx.new_tuple(vec![ vm.ctx.new_bytes(front), if has_mid { sep } else { vm.ctx.new_bytes(Vec::new()) }, vm.ctx.new_bytes(back), ])) } #[pymethod(name = "expandtabs")] fn expandtabs(&self, options: anystr::ExpandTabsArgs) -> Self { self.inner.expandtabs(options).into() } #[pymethod(name = "splitlines")] fn splitlines(&self, options: anystr::SplitLinesArgs, vm: &VirtualMachine) -> PyResult { let lines = self .inner .splitlines(options, |x| vm.ctx.new_bytes(x.to_vec())); Ok(vm.ctx.new_list(lines)) } #[pymethod(name = "zfill")] fn zfill(&self, width: isize) -> Self { self.inner.zfill(width).into() } #[pymethod(name = "replace")] fn replace( &self, old: PyBytesInner, new: PyBytesInner, count: OptionalArg, vm: &VirtualMachine, ) -> PyResult { Ok(self.inner.replace(old, new, count, vm)?.into()) } #[pymethod(name = "title")] fn title(&self) -> Self { self.inner.title().into() } #[pymethod(name = "__mul__")] #[pymethod(name = "__rmul__")] fn mul(&self, value: isize, vm: &VirtualMachine) -> PyResult { if value > 0 && self.inner.len() as isize > std::isize::MAX / value { return Err(vm.new_overflow_error("repeated bytes are too long".to_owned())); } Ok(self.inner.repeat(value).into()) } #[pymethod(name = "__mod__")] fn modulo(&self, values: PyObjectRef, vm: &VirtualMachine) -> PyResult { let formatted = self.inner.cformat(values, vm)?; Ok(vm.ctx.new_bytes(formatted.into_bytes())) } #[pymethod(name = "__rmod__")] fn rmod(&self, _values: PyObjectRef, vm: &VirtualMachine) -> PyObjectRef { vm.ctx.not_implemented() } /// Return a string decoded from the given bytes. /// Default encoding is 'utf-8'. /// Default errors is 'strict', meaning that encoding errors raise a UnicodeError. /// Other possible values are 'ignore', 'replace' /// For a list of possible encodings, /// see https://docs.python.org/3/library/codecs.html#standard-encodings /// currently, only 'utf-8' and 'ascii' emplemented #[pymethod] fn decode(zelf: PyRef, args: DecodeArgs, vm: &VirtualMachine) -> PyResult { bytes_decode(zelf.into_object(), args, vm) } #[pymethod(magic)] fn getnewargs(&self, vm: &VirtualMachine) -> PyTupleRef { let param: Vec = self .inner .elements .iter() .map(|x| x.into_pyobject(vm)) .collect(); PyTupleRef::with_elements(param, &vm.ctx) } } impl BufferProtocol for PyBytes { fn get_buffer(zelf: &PyRef, _vm: &VirtualMachine) -> PyResult> { Ok(Box::new(zelf.clone())) } } impl Buffer for PyBytesRef { fn obj_bytes(&self) -> BorrowedValue<[u8]> { self.inner.elements.as_slice().into() } fn obj_bytes_mut(&self) -> BorrowedValueMut<[u8]> { unreachable!("bytes is not mutable") } fn is_resizable(&self) -> bool { false } fn release(&self) {} fn get_options(&self) -> BorrowedValue { self.buffer_options .get_or_init(|| { Box::new(BufferOptions { len: self.len(), ..Default::default() }) }) .as_ref() .into() } } impl Hashable for PyBytes { fn hash(zelf: &PyRef, vm: &VirtualMachine) -> PyResult { Ok(zelf.inner.hash(vm)) } } impl Comparable for PyBytes { fn cmp( zelf: &PyRef, other: &PyObjectRef, op: PyComparisonOp, vm: &VirtualMachine, ) -> PyResult { Ok(if let Some(res) = op.identical_optimization(zelf, other) { res.into() } else { zelf.inner.cmp(other, op, vm) }) } } #[pyclass(module = false, name = "bytes_iterator")] #[derive(Debug)] pub struct PyBytesIterator { position: AtomicCell, bytes: PyBytesRef, } impl PyValue for PyBytesIterator { fn class(vm: &VirtualMachine) -> PyTypeRef { vm.ctx.types.bytes_iterator_type.clone() } } #[pyimpl] impl PyBytesIterator { #[pymethod(name = "__next__")] fn next(&self, vm: &VirtualMachine) -> PyResult { let pos = self.position.fetch_add(1); if let Some(&ret) = self.bytes.borrow_value().get(pos) { Ok(ret) } else { Err(objiter::new_stop_iteration(vm)) } } #[pymethod(name = "__iter__")] fn iter(zelf: PyRef) -> PyRef { zelf } } impl TryFromObject for PyBytes { fn try_from_object(vm: &VirtualMachine, obj: PyObjectRef) -> PyResult { PyBytesInner::try_from_object(vm, obj).map(|x| x.into()) } }