pub(crate) use re::make_module; #[pymodule] mod re { /* * Regular expressions. * * This module fits the python re interface onto the rust regular expression * system. */ use crate::vm::{ PyObjectRef, PyPayload, PyResult, VirtualMachine, builtins::{PyInt, PyIntRef, PyStr, PyStrRef}, convert::{ToPyObject, TryFromObject}, function::{OptionalArg, PosArgs}, match_class, }; use num_traits::Signed; use regex::bytes::{Captures, Regex, RegexBuilder}; use std::fmt; use std::ops::Range; #[pyattr] #[pyclass(module = "re", name = "Pattern")] #[derive(Debug, PyPayload)] struct PyPattern { regex: Regex, pattern: String, } #[pyattr] const IGNORECASE: usize = 2; #[pyattr] const LOCALE: usize = 4; #[pyattr] const MULTILINE: usize = 8; #[pyattr] const DOTALL: usize = 16; #[pyattr] const UNICODE: usize = 32; #[pyattr] const VERBOSE: usize = 64; #[pyattr] const DEBUG: usize = 128; #[pyattr] const ASCII: usize = 256; #[derive(Default)] struct PyRegexFlags { ignorecase: bool, #[allow(unused)] locale: bool, multiline: bool, dotall: bool, unicode: bool, verbose: bool, #[allow(unused)] debug: bool, ascii: bool, } impl PyRegexFlags { fn from_int(bits: usize) -> Self { // TODO: detect unknown flag bits. PyRegexFlags { ignorecase: (bits & IGNORECASE) != 0, locale: (bits & LOCALE) != 0, multiline: (bits & MULTILINE) != 0, dotall: (bits & DOTALL) != 0, unicode: (bits & UNICODE) != 0, verbose: (bits & VERBOSE) != 0, debug: (bits & DEBUG) != 0, ascii: (bits & ASCII) != 0, } } } /// Inner data for a match object. #[pyattr] #[pyclass(module = "re", name = "Match", traverse)] #[derive(PyPayload, Traverse)] struct PyMatch { haystack: PyStrRef, #[pytraverse(skip)] captures: Vec>>, } impl fmt::Debug for PyMatch { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Match()") } } // type PyPatternRef = PyRef; // type PyMatchRef = PyRef; #[pyfunction(name = "match")] fn match_( pattern: PyStrRef, string: PyStrRef, flags: OptionalArg, vm: &VirtualMachine, ) -> PyResult> { let flags = extract_flags(flags); let regex = make_regex(vm, pattern.as_str(), flags)?; Ok(do_match(®ex, string)) } #[pyfunction] fn search( pattern: PyStrRef, string: PyStrRef, flags: OptionalArg, vm: &VirtualMachine, ) -> PyResult> { let flags = extract_flags(flags); let regex = make_regex(vm, pattern.as_str(), flags)?; Ok(do_search(®ex, string)) } #[pyfunction] fn sub( pattern: PyStrRef, repl: PyStrRef, string: PyStrRef, count: OptionalArg, flags: OptionalArg, vm: &VirtualMachine, ) -> PyResult { let flags = extract_flags(flags); let regex = make_regex(vm, pattern.as_str(), flags)?; let limit = count.unwrap_or(0); Ok(do_sub(®ex, repl, string, limit)) } #[pyfunction] fn findall( pattern: PyStrRef, string: PyStrRef, flags: OptionalArg, vm: &VirtualMachine, ) -> PyResult { let flags = extract_flags(flags); let regex = make_regex(vm, pattern.as_str(), flags)?; do_findall(vm, ®ex, string) } #[pyfunction] fn split( pattern: PyStrRef, string: PyStrRef, maxsplit: OptionalArg, flags: OptionalArg, vm: &VirtualMachine, ) -> PyResult { let flags = extract_flags(flags); let regex = make_regex(vm, pattern.as_str(), flags)?; do_split(vm, ®ex, string, maxsplit.into_option()) } fn do_sub(pattern: &PyPattern, repl: PyStrRef, search_text: PyStrRef, limit: usize) -> String { let out = pattern .regex .replacen(search_text.as_bytes(), limit, repl.as_bytes()); String::from_utf8_lossy(&out).into_owned() } fn do_match(pattern: &PyPattern, search_text: PyStrRef) -> Option { // I really wish there was a better way to do this; I don't think there is let mut regex_text = r"\A".to_owned(); regex_text.push_str(pattern.regex.as_str()); let regex = Regex::new(®ex_text).unwrap(); regex .captures(search_text.as_bytes()) .map(|captures| create_match(search_text.clone(), captures)) } fn do_search(regex: &PyPattern, search_text: PyStrRef) -> Option { regex .regex .captures(search_text.as_bytes()) .map(|captures| create_match(search_text.clone(), captures)) } fn do_findall(vm: &VirtualMachine, pattern: &PyPattern, search_text: PyStrRef) -> PyResult { let out = pattern .regex .captures_iter(search_text.as_bytes()) .map(|captures| match captures.len() { 1 => { let full = captures.get(0).unwrap().as_bytes(); let full = String::from_utf8_lossy(full).into_owned(); vm.ctx.new_str(full).into() } 2 => { let capture = captures.get(1).unwrap().as_bytes(); let capture = String::from_utf8_lossy(capture).into_owned(); vm.ctx.new_str(capture).into() } _ => { let out = captures .iter() .skip(1) .map(|m| { let s = m .map(|m| String::from_utf8_lossy(m.as_bytes()).into_owned()) .unwrap_or_default(); vm.ctx.new_str(s).into() }) .collect(); vm.ctx.new_tuple(out).into() } }) .collect(); Ok(vm.ctx.new_list(out).into()) } fn do_split( vm: &VirtualMachine, pattern: &PyPattern, search_text: PyStrRef, maxsplit: Option, ) -> PyResult { if maxsplit .as_ref() .map_or(false, |i| i.as_bigint().is_negative()) { return Ok(vm.ctx.new_list(vec![search_text.into()]).into()); } let maxsplit = maxsplit .map(|i| i.try_to_primitive::(vm)) .transpose()? .unwrap_or(0); let text = search_text.as_bytes(); // essentially Regex::split, but it outputs captures as well let mut output = Vec::new(); let mut last = 0; for (n, captures) in pattern.regex.captures_iter(text).enumerate() { let full = captures.get(0).unwrap(); let matched = &text[last..full.start()]; last = full.end(); output.push(Some(matched)); for m in captures.iter().skip(1) { output.push(m.map(|m| m.as_bytes())); } if maxsplit != 0 && n >= maxsplit { break; } } if last < text.len() { output.push(Some(&text[last..])); } let split = output .into_iter() .map(|v| { vm.unwrap_or_none(v.map(|v| { vm.ctx .new_str(String::from_utf8_lossy(v).into_owned()) .into() })) }) .collect(); Ok(vm.ctx.new_list(split).into()) } fn make_regex(vm: &VirtualMachine, pattern: &str, flags: PyRegexFlags) -> PyResult { let unicode = if flags.unicode && flags.ascii { return Err(vm.new_value_error("ASCII and UNICODE flags are incompatible")); } else { !flags.ascii }; let r = RegexBuilder::new(pattern) .case_insensitive(flags.ignorecase) .multi_line(flags.multiline) .dot_matches_new_line(flags.dotall) .ignore_whitespace(flags.verbose) .unicode(unicode) .build() .map_err(|err| match err { regex::Error::Syntax(s) => vm.new_value_error(format!("Error in regex: {}", s)), err => vm.new_value_error(format!("Error in regex: {:?}", err)), })?; Ok(PyPattern { regex: r, pattern: pattern.to_owned(), }) } /// Take a found regular expression and convert it to proper match object. fn create_match(haystack: PyStrRef, captures: Captures) -> PyMatch { let captures = captures .iter() .map(|opt| opt.map(|m| m.start()..m.end())) .collect(); PyMatch { haystack, captures } } fn extract_flags(flags: OptionalArg) -> PyRegexFlags { flags.map_or_else(Default::default, PyRegexFlags::from_int) } #[pyfunction] fn compile( pattern: PyStrRef, flags: OptionalArg, vm: &VirtualMachine, ) -> PyResult { let flags = extract_flags(flags); make_regex(vm, pattern.as_str(), flags) } #[pyfunction] fn escape(pattern: PyStrRef) -> String { regex::escape(pattern.as_str()) } #[pyfunction] fn purge(_vm: &VirtualMachine) {} #[pyclass] impl PyPattern { #[pymethod(name = "match")] fn match_(&self, text: PyStrRef) -> Option { do_match(self, text) } #[pymethod] fn search(&self, text: PyStrRef) -> Option { do_search(self, text) } #[pymethod] fn sub(&self, repl: PyStrRef, text: PyStrRef, vm: &VirtualMachine) -> PyResult { let replaced_text = self.regex.replace_all(text.as_bytes(), repl.as_bytes()); let replaced_text = String::from_utf8_lossy(&replaced_text).into_owned(); Ok(vm.ctx.new_str(replaced_text)) } #[pymethod] fn subn(&self, repl: PyStrRef, text: PyStrRef, vm: &VirtualMachine) -> PyResult { self.sub(repl, text, vm) } #[pygetset] fn pattern(&self, vm: &VirtualMachine) -> PyResult { Ok(vm.ctx.new_str(self.pattern.clone())) } #[pymethod] fn split( &self, search_text: PyStrRef, maxsplit: OptionalArg, vm: &VirtualMachine, ) -> PyResult { do_split(vm, self, search_text, maxsplit.into_option()) } #[pymethod] fn findall(&self, search_text: PyStrRef, vm: &VirtualMachine) -> PyResult { do_findall(vm, self, search_text) } } #[pyclass] impl PyMatch { #[pymethod] fn start(&self, group: OptionalArg, vm: &VirtualMachine) -> PyResult { let group = group.unwrap_or_else(|| vm.ctx.new_int(0).into()); let start = self.get_bounds(group, vm)?.map_or_else( || vm.ctx.new_int(-1).into(), |r| vm.ctx.new_int(r.start).into(), ); Ok(start) } #[pymethod] fn end(&self, group: OptionalArg, vm: &VirtualMachine) -> PyResult { let group = group.unwrap_or_else(|| vm.ctx.new_int(0).into()); let end = self.get_bounds(group, vm)?.map_or_else( || vm.ctx.new_int(-1).into(), |r| vm.ctx.new_int(r.end).into(), ); Ok(end) } fn subgroup(&self, bounds: Range) -> String { self.haystack.as_str()[bounds].to_owned() } fn get_bounds( &self, id: PyObjectRef, vm: &VirtualMachine, ) -> PyResult>> { match_class!(match id { i @ PyInt => { let i = usize::try_from_object(vm, i.into())?; let capture = self .captures .get(i) .ok_or_else(|| vm.new_index_error("No such group".to_owned()))?; Ok(capture.clone()) } _s @ PyStr => unimplemented!(), _ => Err(vm.new_index_error("No such group".to_owned())), }) } fn get_group(&self, id: PyObjectRef, vm: &VirtualMachine) -> PyResult> { let bounds = self.get_bounds(id, vm)?; Ok(bounds.map(|b| self.subgroup(b))) } #[pymethod] fn group(&self, groups: PosArgs, vm: &VirtualMachine) -> PyResult { let mut groups = groups.into_vec(); match groups.len() { 0 => Ok(self .subgroup(self.captures[0].clone().unwrap()) .to_pyobject(vm)), 1 => self .get_group(groups.pop().unwrap(), vm) .map(|g| g.to_pyobject(vm)), _ => { let output: Result, _> = groups .into_iter() .map(|id| self.get_group(id, vm).map(|g| g.to_pyobject(vm))) .collect(); Ok(vm.ctx.new_tuple(output?)).into() } } } #[pymethod] fn groups(&self, default: OptionalArg, vm: &VirtualMachine) -> PyTupleRef { let default = default.into_option(); let groups = self .captures .iter() .map(|capture| { vm.unwrap_or_none( capture .as_ref() .map(|bounds| self.subgroup(bounds.clone()).to_pyobject(vm)) .or_else(|| default.clone()), ) }) .collect(); vm.ctx.new_tuple(groups) } } }