diff --git a/caldav/davclient.py b/caldav/davclient.py index 8f81666..4d5976e 100644 --- a/caldav/davclient.py +++ b/caldav/davclient.py @@ -60,9 +60,7 @@ class DAVResponse: davclient = None huge_tree: bool = False - def __init__( - self, response: Response, davclient: Optional["DAVClient"] = None - ) -> None: + def __init__(self, response: Response, davclient: Optional["DAVClient"] = None) -> None: self.headers = response.headers log.debug("response headers: " + str(self.headers)) log.debug("response status: " + str(self.status)) @@ -75,9 +73,9 @@ def __init__( ## TODO: this if/else/elif could possibly be refactored, or we should ## consider to do streaming into the xmltree library as originally ## intended. It only makes sense for really huge payloads though. - if self.headers.get("Content-Type", "").startswith( - "text/xml" - ) or self.headers.get("Content-Type", "").startswith("application/xml"): + if self.headers.get("Content-Type", "").startswith("text/xml") or self.headers.get( + "Content-Type", "" + ).startswith("application/xml"): try: content_length = int(self.headers["Content-Length"]) except: @@ -94,22 +92,19 @@ def __init__( try: self.tree = etree.XML( self._raw, - parser=etree.XMLParser( - remove_blank_text=True, huge_tree=self.huge_tree - ), + parser=etree.XMLParser(remove_blank_text=True, huge_tree=self.huge_tree), ) except: logging.critical( - "Expected some valid XML from the server, but got this: \n" - + str(self._raw), + "Expected some valid XML from the server, but got this: \n" + str(self._raw), exc_info=True, ) raise if log.level <= logging.DEBUG: log.debug(etree.tostring(self.tree, pretty_print=True)) - elif self.headers.get("Content-Type", "").startswith( - "text/calendar" - ) or self.headers.get("Content-Type", "").startswith("text/plain"): + elif self.headers.get("Content-Type", "").startswith("text/calendar") or self.headers.get( + "Content-Type", "" + ).startswith("text/plain"): ## text/plain is typically for errors, we shouldn't see it on 200/207 responses. ## TODO: may want to log an error if it's text/plain and 200/207. ## Logic here was moved when refactoring @@ -124,9 +119,7 @@ def __init__( try: self.tree = etree.XML( self._raw, - parser=etree.XMLParser( - remove_blank_text=True, huge_tree=self.huge_tree - ), + parser=etree.XMLParser(remove_blank_text=True, huge_tree=self.huge_tree), ) except: pass @@ -193,12 +186,7 @@ def validate_status(self, status: str) -> None: makes sense to me, but I've only seen it from SOGo, and it's not in accordance with the examples in rfc6578. """ - if ( - " 200 " not in status - and " 201 " not in status - and " 207 " not in status - and " 404 " not in status - ): + if " 200 " not in status and " 201 " not in status and " 207 " not in status and " 404 " not in status: raise error.ResponseError(status) def _parse_response(self, response) -> Tuple[str, List[_Element], Optional[Any]]: @@ -280,9 +268,7 @@ def find_objects_and_props(self) -> Dict[str, Dict[str, _Element]]: return self.objects - def _expand_simple_prop( - self, proptag, props_found, multi_value_allowed=False, xpath=None - ): + def _expand_simple_prop(self, proptag, props_found, multi_value_allowed=False, xpath=None): values = [] if proptag in props_found: prop_xml = props_found[proptag] @@ -334,9 +320,7 @@ def expand_simple_props( if prop.tag is None: continue - props_found[prop.tag] = self._expand_simple_prop( - prop.tag, props_found, xpath=xpath - ) + props_found[prop.tag] = self._expand_simple_prop(prop.tag, props_found, xpath=xpath) for prop in multi_value_props: if prop.tag is None: continue @@ -503,9 +487,7 @@ def check_scheduling_support(self) -> bool: support_list = self.check_dav_support() return support_list is not None and "calendar-auto-schedule" in support_list - def propfind( - self, url: Optional[str] = None, props: str = "", depth: int = 0 - ) -> DAVResponse: + def propfind(self, url: Optional[str] = None, props: str = "", depth: int = 0) -> DAVResponse: """ Send a propfind request. @@ -517,9 +499,7 @@ def propfind( Returns * DAVResponse """ - return self.request( - url or str(self.url), "PROPFIND", props, {"Depth": str(depth)} - ) + return self.request(url or str(self.url), "PROPFIND", props, {"Depth": str(depth)}) def proppatch(self, url: str, body: str, dummy: None = None) -> DAVResponse: """ @@ -591,17 +571,13 @@ def mkcalendar(self, url: str, body: str = "", dummy: None = None) -> DAVRespons """ return self.request(url, "MKCALENDAR", body) - def put( - self, url: str, body: str, headers: Mapping[str, str] = None - ) -> DAVResponse: + def put(self, url: str, body: str, headers: Mapping[str, str] = None) -> DAVResponse: """ Send a put request. """ return self.request(url, "PUT", body, headers or {}) - def post( - self, url: str, body: str, headers: Mapping[str, str] = None - ) -> DAVResponse: + def post(self, url: str, body: str, headers: Mapping[str, str] = None) -> DAVResponse: """ Send a POST request. """ @@ -684,12 +660,7 @@ def request( if not r.status_code == 401: raise - if ( - r.status_code == 401 - and "WWW-Authenticate" in r.headers - and not self.auth - and self.username - ): + if r.status_code == 401 and "WWW-Authenticate" in r.headers and not self.auth and self.username: auth_types = self.extract_auth_types(r.headers["WWW-Authenticate"]) if self.password and self.username and "digest" in auth_types: @@ -725,13 +696,9 @@ def request( auth_types = self.extract_auth_types(r.headers["WWW-Authenticate"]) if self.password and self.username and "digest" in auth_types: - self.auth = requests.auth.HTTPDigestAuth( - self.username, self.password.decode() - ) + self.auth = requests.auth.HTTPDigestAuth(self.username, self.password.decode()) elif self.password and self.username and "basic" in auth_types: - self.auth = requests.auth.HTTPBasicAuth( - self.username, self.password.decode() - ) + self.auth = requests.auth.HTTPBasicAuth(self.username, self.password.decode()) elif self.password and "bearer" in auth_types: self.auth = HTTPBearerAuth(self.password.decode()) @@ -740,10 +707,7 @@ def request( return self.request(str(url_obj), method, body, headers) # this is an error condition that should be raised to the application - if ( - response.status == requests.codes.forbidden - or response.status == requests.codes.unauthorized - ): + if response.status == requests.codes.forbidden or response.status == requests.codes.unauthorized: try: reason = response.reason except AttributeError: @@ -759,24 +723,16 @@ def request( commlog.write(f"{datetime.datetime.now():%FT%H:%M:%S}".encode("utf-8")) commlog.write(b"\n====>\n") commlog.write(f"{method} {url}\n".encode("utf-8")) - commlog.write( - b"\n".join(to_wire(f"{x}: {headers[x]}") for x in headers) - ) + commlog.write(b"\n".join(to_wire(f"{x}: {headers[x]}") for x in headers)) commlog.write(b"\n\n") commlog.write(to_wire(body)) commlog.write(b"<====\n") commlog.write(f"{response.status} {response.reason}".encode("utf-8")) - commlog.write( - b"\n".join( - to_wire(f"{x}: {response.headers[x]}") for x in response.headers - ) - ) + commlog.write(b"\n".join(to_wire(f"{x}: {response.headers[x]}") for x in response.headers)) commlog.write(b"\n\n") ct = response.headers.get("Content-Type", "") if response.tree is not None: - commlog.write( - to_wire(etree.tostring(response.tree, pretty_print=True)) - ) + commlog.write(to_wire(etree.tostring(response.tree, pretty_print=True))) else: commlog.write(to_wire(response._raw)) commlog.write(b"\n") diff --git a/caldav/elements/base.py b/caldav/elements/base.py index c2e63a9..bbc4bfb 100644 --- a/caldav/elements/base.py +++ b/caldav/elements/base.py @@ -29,9 +29,7 @@ class BaseElement: attributes: Optional[dict] = None caldav_class = None - def __init__( - self, name: Optional[str] = None, value: Union[str, bytes, None] = None - ) -> None: + def __init__(self, name: Optional[str] = None, value: Union[str, bytes, None] = None) -> None: self.children = [] self.attributes = {} value = to_unicode(value) @@ -41,15 +39,11 @@ def __init__( if value is not None: self.value = value - def __add__( - self, other: Union["BaseElement", Iterable["BaseElement"]] - ) -> "BaseElement": + def __add__(self, other: Union["BaseElement", Iterable["BaseElement"]]) -> "BaseElement": return self.append(other) def __str__(self) -> str: - utf8 = etree.tostring( - self.xmlelement(), encoding="utf-8", xml_declaration=True, pretty_print=True - ) + utf8 = etree.tostring(self.xmlelement(), encoding="utf-8", xml_declaration=True, pretty_print=True) return str(utf8, "utf-8") def xmlelement(self) -> _Element: diff --git a/caldav/elements/cdav.py b/caldav/elements/cdav.py index 6df4926..a28d561 100644 --- a/caldav/elements/cdav.py +++ b/caldav/elements/cdav.py @@ -33,14 +33,10 @@ def _to_utc_date_string(ts): mindate = datetime.min.replace(tzinfo=utc_tz) maxdate = datetime.max.replace(tzinfo=utc_tz) if mindate + ts.tzinfo.utcoffset(ts) > ts: - logging.error( - "Cannot coerce datetime %s to UTC. Changed to min-date.", ts - ) + logging.error("Cannot coerce datetime %s to UTC. Changed to min-date.", ts) ts = mindate elif ts > maxdate - ts.tzinfo.utcoffset(ts): - logging.error( - "Cannot coerce datetime %s to UTC. Changed to max-date.", ts - ) + logging.error("Cannot coerce datetime %s to UTC. Changed to max-date.", ts) ts = maxdate else: ts = ts.astimezone(utc_tz) @@ -108,9 +104,7 @@ def __init__(self, value, collation: str = "i;octet", negate: bool = False) -> N class TimeRange(BaseElement): tag: ClassVar[str] = ns("C", "time-range") - def __init__( - self, start: Optional[datetime] = None, end: Optional[datetime] = None - ) -> None: + def __init__(self, start: Optional[datetime] = None, end: Optional[datetime] = None) -> None: ## start and end should be an icalendar "date with UTC time", ## ref https://tools.ietf.org/html/rfc4791#section-9.9 super(TimeRange, self).__init__() @@ -136,9 +130,7 @@ class CalendarData(BaseElement): class Expand(BaseElement): tag: ClassVar[str] = ns("C", "expand") - def __init__( - self, start: Optional[datetime], end: Optional[datetime] = None - ) -> None: + def __init__(self, start: Optional[datetime], end: Optional[datetime] = None) -> None: super(Expand, self).__init__() if self.attributes is None: diff --git a/caldav/lib/error.py b/caldav/lib/error.py index a09e0e9..fc998f8 100644 --- a/caldav/lib/error.py +++ b/caldav/lib/error.py @@ -31,9 +31,7 @@ def assert_(condition: object) -> None: assert condition except AssertionError: if debugmode == "PRODUCTION": - log.error( - "Deviation from expectations found. %s" % ERR_FRAGMENT, exc_info=True - ) + log.error("Deviation from expectations found. %s" % ERR_FRAGMENT, exc_info=True) elif debugmode == "DEBUG_PDB": log.error("Deviation from expectations found. Dropping into debugger") import pdb diff --git a/caldav/lib/url.py b/caldav/lib/url.py index 9feff4a..20661db 100644 --- a/caldav/lib/url.py +++ b/caldav/lib/url.py @@ -132,8 +132,7 @@ def unauth(self) -> "URL": return URL.objectify( ParseResult( self.scheme, - "%s:%s" - % (self.hostname, self.port or {"https": 443, "http": 80}[self.scheme]), + "%s:%s" % (self.hostname, self.port or {"https": 443, "http": 80}[self.scheme]), self.path.replace("//", "/"), self.params, self.query, diff --git a/caldav/lib/vcal.py b/caldav/lib/vcal.py index e196f4c..25d1b86 100644 --- a/caldav/lib/vcal.py +++ b/caldav/lib/vcal.py @@ -65,9 +65,7 @@ def fix(event): ## TODO: add ^ before COMPLETED and CREATED? ## 1) Add an arbitrary time if completed is given as date - fixed = re.sub( - r"COMPLETED(?:;VALUE=DATE)?:(\d+)\s", r"COMPLETED:\g<1>T120000Z", event - ) + fixed = re.sub(r"COMPLETED(?:;VALUE=DATE)?:(\d+)\s", r"COMPLETED:\g<1>T120000Z", event) ## 2) CREATED timestamps prior to epoch does not make sense, ## change from year 0001 to epoch. @@ -81,10 +79,7 @@ def fix(event): ## 5 prepare to remove DURATION or DTEND/DUE if both DURATION and ## DTEND/DUE is set. ## remove duplication of DTSTAMP - fixed2 = ( - "\n".join(filter(LineFilterDiscardingDuplicates(), fixed.strip().split("\n"))) - + "\n" - ) + fixed2 = "\n".join(filter(LineFilterDiscardingDuplicates(), fixed.strip().split("\n"))) + "\n" if fixed2 != event: global fixup_error_loggings @@ -105,13 +100,7 @@ def fix(event): try: import difflib - log( - "\n".join( - difflib.unified_diff( - event.split("\n"), fixed2.split("\n"), lineterm="" - ) - ) - ) + log("\n".join(difflib.unified_diff(event.split("\n"), fixed2.split("\n"), lineterm=""))) except: log("Original: \n" + event) log("Modified: \n" + fixed2) @@ -168,20 +157,12 @@ def create_ical(ical_fragment=None, objtype=None, language="en_DK", **props): ## STATUS should default to NEEDS-ACTION for tasks, if it's not set ## (otherwise we cannot easily add a task to a davical calendar and ## then find it again - ref https://gitlab.com/davical-project/davical/-/issues/281 - if ( - not props.get("status") - and "\nSTATUS:" not in (ical_fragment or "") - and objtype == "VTODO" - ): + if not props.get("status") and "\nSTATUS:" not in (ical_fragment or "") and objtype == "VTODO": props["status"] = "NEEDS-ACTION" else: if not ical_fragment.strip().startswith("BEGIN:VCALENDAR"): - ical_fragment = ( - "BEGIN:VCALENDAR\n" - + to_normal_str(ical_fragment.strip()) - + "\nEND:VCALENDAR\n" - ) + ical_fragment = "BEGIN:VCALENDAR\n" + to_normal_str(ical_fragment.strip()) + "\nEND:VCALENDAR\n" my_instance = icalendar.Calendar.from_ical(ical_fragment) component = my_instance.subcomponents[0] ical_fragment = None diff --git a/caldav/objects.py b/caldav/objects.py index 40ab040..f90479f 100644 --- a/caldav/objects.py +++ b/caldav/objects.py @@ -163,9 +163,7 @@ def children(self, type: Optional[str] = None) -> List[Tuple[URL, Any, Any]]: multiprops = [dav.ResourceType()] props_multiprops = props + multiprops response = self._query_properties(props_multiprops, depth) - properties = response.expand_simple_props( - props=props, multi_value_props=multiprops - ) + properties = response.expand_simple_props(props=props, multi_value_props=multiprops) for path in properties: resource_types = properties[path][dav.ResourceType.tag] @@ -192,9 +190,7 @@ def children(self, type: Optional[str] = None) -> List[Tuple[URL, Any, Any]]: ## the properties we've already fetched return c - def _query_properties( - self, props: Optional[Sequence[BaseElement]] = None, depth: int = 0 - ): + def _query_properties(self, props: Optional[Sequence[BaseElement]] = None, depth: int = 0): """ This is an internal method for doing a propfind query. It's a result of code-refactoring work, attempting to consolidate @@ -237,28 +233,16 @@ def _query( ret = getattr(self.client, query_method)(url, body, depth) if ret.status == 404: raise error.NotFoundError(errmsg(ret)) - if ( - expected_return_value is not None and ret.status != expected_return_value - ) or ret.status >= 400: + if (expected_return_value is not None and ret.status != expected_return_value) or ret.status >= 400: ## COMPATIBILITY HACK - see https://github.com/python-caldav/caldav/issues/309 body = to_wire(body) - if ( - ret.status == 500 - and b"getetag" not in body - and b"" in body - ): - body = body.replace( - b"", b"" - ) - return self._query( - body, depth, query_method, url, expected_return_value - ) + if ret.status == 500 and b"getetag" not in body and b"" in body: + body = body.replace(b"", b"") + return self._query(body, depth, query_method, url, expected_return_value) raise error.exception_by_method[query_method](errmsg(ret)) return ret - def get_property( - self, prop: BaseElement, use_cached: bool = False, **passthrough - ) -> Optional[str]: + def get_property(self, prop: BaseElement, use_cached: bool = False, **passthrough) -> Optional[str]: ## TODO: use_cached should probably be true if use_cached: if prop.tag in self.props: @@ -352,8 +336,7 @@ def get_properties( log.error( "Possibly the server has a path handling problem, possibly the URL configured is wrong.\n" "Path expected: %s, path found: %s %s.\n" - "Continuing, probably everything will be fine" - % (path, str(list(properties)), error.ERR_FRAGMENT) + "Continuing, probably everything will be fine" % (path, str(list(properties)), error.ERR_FRAGMENT) ) rc = list(properties.values())[0] else: @@ -425,9 +408,7 @@ def get_display_name(self): def __str__(self) -> str: try: - return ( - str(self.get_property(dav.DisplayName(), use_cached=True)) or self.url - ) + return str(self.get_property(dav.DisplayName(), use_cached=True)) or self.url except: return str(self.url) @@ -456,9 +437,7 @@ def calendars(self) -> List["Calendar"]: except: log.error(f"Calendar {c_name} has unexpected url {c_url}") cal_id = None - cals.append( - Calendar(self.client, id=cal_id, url=c_url, parent=self, name=c_name) - ) + cals.append(Calendar(self.client, id=cal_id, url=c_url, parent=self, name=c_name)) return cals @@ -490,9 +469,7 @@ def make_calendar( supported_calendar_component_set=supported_calendar_component_set, ).save() - def calendar( - self, name: Optional[str] = None, cal_id: Optional[str] = None - ) -> "Calendar": + def calendar(self, name: Optional[str] = None, cal_id: Optional[str] = None) -> "Calendar": """ The calendar method will return a calendar object. If it gets a cal_id but no name, it will not initiate any communication with the server @@ -510,9 +487,7 @@ def calendar( if display_name == name: return calendar if name and not cal_id: - raise error.NotFoundError( - "No calendar with name %s found under %s" % (name, self.url) - ) + raise error.NotFoundError("No calendar with name %s found under %s" % (name, self.url)) if not cal_id and not name: return self.calendars()[0] @@ -522,13 +497,10 @@ def calendar( if cal_id is None: raise ValueError("Unexpected value None for cal_id") - if str(URL.objectify(cal_id).canonical()).startswith( - str(self.client.url.canonical()) - ): + if str(URL.objectify(cal_id).canonical()).startswith(str(self.client.url.canonical())): url = self.client.url.join(cal_id) elif isinstance(cal_id, URL) or ( - isinstance(cal_id, str) - and (cal_id.startswith("https://") or cal_id.startswith("http://")) + isinstance(cal_id, str) and (cal_id.startswith("https://") or cal_id.startswith("http://")) ): if self.url is None: raise ValueError("Unexpected value None for self.url") @@ -667,10 +639,7 @@ def calendar_home_set(self, url) -> None: ## research. added here as it solves real-world issues, ref ## https://github.com/python-caldav/caldav/pull/56 if sanitized_url is not None: - if ( - sanitized_url.hostname - and sanitized_url.hostname != self.client.url.hostname - ): + if sanitized_url.hostname and sanitized_url.hostname != self.client.url.hostname: # icloud (and others?) having a load balanced system, # where each principal resides on one named host ## TODO: @@ -680,9 +649,7 @@ def calendar_home_set(self, url) -> None: ## is an unacceptable side effect and may be a cause of ## incompatibilities with icloud. Do more research! self.client.url = sanitized_url - self._calendar_home_set = CalendarSet( - self.client, self.client.url.join(sanitized_url) - ) + self._calendar_home_set = CalendarSet(self.client, self.client.url.join(sanitized_url)) def calendars(self) -> List["Calendar"]: """ @@ -719,9 +686,7 @@ def calendar_user_address_set(self) -> List[Optional[str]]: """ defined in RFC6638 """ - _addresses: Optional[_Element] = self.get_property( - cdav.CalendarUserAddressSet(), parse_props=False - ) + _addresses: Optional[_Element] = self.get_property(cdav.CalendarUserAddressSet(), parse_props=False) if _addresses is None: raise ValueError("Unexpected value None for _addresses") @@ -747,9 +712,7 @@ class Calendar(DAVObject): https://tools.ietf.org/html/rfc4791#section-5.3.1 """ - def _create( - self, name=None, id=None, supported_calendar_component_set=None - ) -> None: + def _create(self, name=None, id=None, supported_calendar_component_set=None) -> None: """ Create a new calendar with display name `name` in `parent`. """ @@ -779,9 +742,7 @@ def _create( mkcol = cdav.Mkcalendar() + set - r = self._query( - root=mkcol, query_method="mkcalendar", url=path, expected_return_value=201 - ) + r = self._query(root=mkcol, query_method="mkcalendar", url=path, expected_return_value=201) # COMPATIBILITY ISSUE # name should already be set, but we've seen caldav servers failing @@ -815,9 +776,7 @@ def get_supported_components(self) -> List[Any]: props = [cdav.SupportedCalendarComponentSet()] response = self.get_properties(props, parse_response_xml=False) response_list = response.find_objects_and_props() - prop = response_list[unquote(self.url.path)][ - cdav.SupportedCalendarComponentSet().tag - ] + prop = response_list[unquote(self.url.path)][cdav.SupportedCalendarComponentSet().tag] return [supported.get("name") for supported in prop] def save_with_invites(self, ical: str, attendees, **attendeeoptions) -> None: @@ -837,8 +796,7 @@ def save_with_invites(self, ical: str, attendees, **attendeeoptions) -> None: def _use_or_create_ics(self, ical, objtype, **ical_data): if ical_data or ( - (isinstance(ical, str) or isinstance(ical, bytes)) - and b"BEGIN:VCALENDAR" not in to_wire(ical) + (isinstance(ical, str) or isinstance(ical, bytes)) and b"BEGIN:VCALENDAR" not in to_wire(ical) ): ## TODO: the ical_fragment code is not much tested if ical and "ical_fragment" not in ical_data: @@ -917,9 +875,9 @@ def save_journal( return j def _handle_relations(self, uid, ical_data) -> None: - for reverse_reltype, other_uid in [ - ("parent", x) for x in ical_data.get("child", ()) - ] + [("child", x) for x in ical_data.get("parent", ())]: + for reverse_reltype, other_uid in [("parent", x) for x in ical_data.get("child", ())] + [ + ("child", x) for x in ical_data.get("parent", ()) + ]: other = self.object_by_uid(other_uid) other.set_relation(other=uid, reltype=reverse_reltype, set_reverse=False) @@ -957,11 +915,7 @@ def calendar_multiget(self, event_urls: Iterable[URL]) -> List["Event"]: rv = [] prop = dav.Prop() + cdav.CalendarData() - root = ( - cdav.CalendarMultiGet() - + prop - + [dav.Href(value=u.path) for u in event_urls] - ) + root = cdav.CalendarMultiGet() + prop + [dav.Href(value=u.path) for u in event_urls] response = self._query(root, 1, "report") results = response.expand_simple_props([cdav.CalendarData()]) rv = [ @@ -1007,9 +961,7 @@ def build_date_search_query( else: comp_class = None - return self.build_search_xml_query( - comp_class=comp_class, expand=expand, start=start, end=end - ) + return self.build_search_xml_query(comp_class=comp_class, expand=expand, start=start, end=end) def date_search( self, @@ -1078,9 +1030,7 @@ def date_search( return objects - def _request_report_build_resultlist( - self, xml, comp_class=None, props=None, no_calendardata=False - ): + def _request_report_build_resultlist(self, xml, comp_class=None, props=None, no_calendardata=False): """ Takes some input XML, does a report query on a calendar object and returns the resource objects found. @@ -1207,16 +1157,10 @@ def search( objects.append(item) else: if not xml: - (xml, comp_class) = self.build_search_xml_query( - comp_class=comp_class, todo=todo, props=props, **kwargs - ) + (xml, comp_class) = self.build_search_xml_query(comp_class=comp_class, todo=todo, props=props, **kwargs) elif kwargs: - raise error.ConsistencyError( - "Inconsistent usage parameters: xml together with other search options" - ) - (response, objects) = self._request_report_build_resultlist( - xml, comp_class, props=props - ) + raise error.ConsistencyError("Inconsistent usage parameters: xml together with other search options") + (response, objects) = self._request_report_build_resultlist(xml, comp_class, props=props) for o in objects: ## This would not be needed if the servers would follow the standard ... @@ -1264,14 +1208,10 @@ def sort_key_func(x): ## Usage of strftime is a simple way to ensure there won't be ## problems if comparing dates with timestamps "isnt_overdue": not ( - "due" in comp - and comp["due"].dt.strftime("%F%H%M%S") - < datetime.now().strftime("%F%H%M%S") + "due" in comp and comp["due"].dt.strftime("%F%H%M%S") < datetime.now().strftime("%F%H%M%S") ), "hasnt_started": ( - "dtstart" in comp - and comp["dtstart"].dt.strftime("%F%H%M%S") - > datetime.now().strftime("%F%H%M%S") + "dtstart" in comp and comp["dtstart"].dt.strftime("%F%H%M%S") > datetime.now().strftime("%F%H%M%S") ), } for sort_key in sort_keys: @@ -1380,8 +1320,7 @@ def build_search_xml_query( if todo: if comp_class is not None and comp_class is not Todo: raise error.ConsistencyError( - "inconsistent search parameters - comp_class = %s, todo=%s" - % (comp_class, todo) + "inconsistent search parameters - comp_class = %s, todo=%s" % (comp_class, todo) ) comp_filter = cdav.CompFilter("VTODO") comp_class = Todo @@ -1391,8 +1330,7 @@ def build_search_xml_query( if event: if comp_class is not None and comp_class is not Event: raise error.ConsistencyError( - "inconsistent search parameters - comp_class = %s, event=%s" - % (comp_class, event) + "inconsistent search parameters - comp_class = %s, event=%s" % (comp_class, event) ) comp_filter = cdav.CompFilter("VEVENT") comp_class = Event @@ -1404,9 +1342,7 @@ def build_search_xml_query( elif comp_class is Journal: comp_filter = cdav.CompFilter("VJOURNAL") else: - raise error.ConsistencyError( - "unsupported comp class %s for search" % comp_class - ) + raise error.ConsistencyError("unsupported comp class %s for search" % comp_class) for other in kwargs: find_not_defined = other.startswith("no_") @@ -1506,9 +1442,7 @@ def todos( if sort_key: sort_keys = (sort_key,) - return self.search( - todo=True, include_completed=include_completed, sort_keys=sort_keys - ) + return self.search(todo=True, include_completed=include_completed, sort_keys=sort_keys) def _calendar_comp_class_by_data(self, data): """ @@ -1573,9 +1507,7 @@ def object_by_uid( assert not comp_class if hasattr(comp_filter, "attributes"): if comp_filter.attributes is None: - raise ValueError( - "Unexpected None value for variable comp_filter.attributes" - ) + raise ValueError("Unexpected None value for variable comp_filter.attributes") comp_filter = comp_filter.attributes["name"] if comp_filter == "VTODO": comp_class = Todo @@ -1589,9 +1521,7 @@ def object_by_uid( query = cdav.TextMatch(uid) query = cdav.PropFilter("UID") + query - root, comp_class = self.build_search_xml_query( - comp_class=comp_class, filters=[query] - ) + root, comp_class = self.build_search_xml_query(comp_class=comp_class, filters=[query]) try: items_found: List[Event] = self.search(root) @@ -1607,16 +1537,12 @@ def object_by_uid( items_found = [] for compfilter in ("VTODO", "VEVENT", "VJOURNAL"): try: - items_found.append( - self.object_by_uid(uid, cdav.CompFilter(compfilter)) - ) + items_found.append(self.object_by_uid(uid, cdav.CompFilter(compfilter))) except error.NotFoundError: pass if len(items_found) >= 1: if len(items_found) > 1: - logging.error( - "multiple items found with same UID. Returning the first one" - ) + logging.error("multiple items found with same UID. Returning the first one") return items_found[0] # Ref Lucas Verney, we've actually done a substring search, if the @@ -1685,9 +1611,7 @@ def objects_by_sync_token( level = dav.SyncLevel(value="1") props = dav.Prop() + dav.GetEtag() root = cmd + [level, token, props] - (response, objects) = self._request_report_build_resultlist( - root, props=[dav.GetEtag()], no_calendardata=True - ) + (response, objects) = self._request_report_build_resultlist(root, props=[dav.GetEtag()], no_calendardata=True) ## TODO: look more into this, I think sync_token should be directly available through response object try: sync_token = response.sync_token @@ -1702,9 +1626,7 @@ def objects_by_sync_token( except error.NotFoundError: ## The object was deleted pass - return SynchronizableCalendarObjectCollection( - calendar=self, objects=objects, sync_token=sync_token - ) + return SynchronizableCalendarObjectCollection(calendar=self, objects=objects, sync_token=sync_token) objects = objects_by_sync_token @@ -1771,8 +1693,7 @@ def __init__( # we ignore the type here as this is defined in sub-classes only; require more changes to # properly fix in a future revision raise error.NotFoundError( - "principal has no %s. %s" - % (str(self.findprop()), error.ERR_FRAGMENT) # type: ignore + "principal has no %s. %s" % (str(self.findprop()), error.ERR_FRAGMENT) # type: ignore ) def get_items(self): @@ -1788,20 +1709,14 @@ def get_items(self): "caldav server does not seem to support a sync-token REPORT query on a scheduling mailbox" ) error.assert_("google" in str(self.url)) - self._items = [ - CalendarObjectResource(url=x[0], client=self.client) - for x in self.children() - ] + self._items = [CalendarObjectResource(url=x[0], client=self.client) for x in self.children()] for x in self._items: x.load() else: try: self._items.sync() except: - self._items = [ - CalendarObjectResource(url=x[0], client=self.client) - for x in self.children() - ] + self._items = [CalendarObjectResource(url=x[0], client=self.client) for x in self.children()] for x in self._items: x.load() return self._items @@ -1860,17 +1775,11 @@ def sync(self) -> Tuple[Any, Any]: """ updated_objs = [] deleted_objs = [] - updates = self.calendar.objects_by_sync_token( - self.sync_token, load_objects=False - ) + updates = self.calendar.objects_by_sync_token(self.sync_token, load_objects=False) obu = self.objects_by_url() for obj in updates: obj.url = obj.url.canonical() - if ( - obj.url in obu - and dav.GetEtag.tag in obu[obj.url].props - and dav.GetEtag.tag in obj.props - ): + if obj.url in obu and dav.GetEtag.tag in obu[obj.url].props and dav.GetEtag.tag in obj.props: if obu[obj.url].props[dav.GetEtag.tag] == obj.props[dav.GetEtag.tag]: continue obu[obj.url] = obj @@ -1917,9 +1826,7 @@ def __init__( CalendarObjectResource has an additional parameter for its constructor: * data = "...", vCal data for the event """ - super(CalendarObjectResource, self).__init__( - client=client, url=url, parent=parent, id=id, props=props - ) + super(CalendarObjectResource, self).__init__(client=client, url=url, parent=parent, id=id, props=props) if data is not None: self.data = data if id: @@ -1984,9 +1891,7 @@ def expand_rrule(self, start: datetime, end: datetime) -> None: stripped_event = self.copy(keep_uid=True) if stripped_event.vobject_instance is None: - raise ValueError( - "Unexpected value None for stripped_event.vobject_instance" - ) + raise ValueError("Unexpected value None for stripped_event.vobject_instance") # remove all recurrence properties for component in stripped_event.vobject_instance.components(): # type: ignore @@ -2007,9 +1912,7 @@ def expand_rrule(self, start: datetime, end: datetime) -> None: if component.name not in ("VEVENT", "VTODO", "VTIMEZONE"): calendar.add_component(component) - def set_relation( - self, other, reltype=None, set_reverse=True - ) -> None: ## TODO: logic to find and set siblings? + def set_relation(self, other, reltype=None, set_reverse=True) -> None: ## TODO: logic to find and set siblings? """ Sets a relation between this object and another object (given by uid or object). """ @@ -2029,11 +1932,7 @@ def set_relation( other.set_relation(other=self, reltype=reltype_reverse, set_reverse=False) existing_relation = self.icalendar_component.get("related-to", None) - existing_relations = ( - existing_relation - if isinstance(existing_relation, list) - else [existing_relation] - ) + existing_relations = existing_relation if isinstance(existing_relation, list) else [existing_relation] for rel in existing_relations: if rel == uid: return @@ -2044,9 +1943,7 @@ def set_relation( # see https://github.com/collective/icalendar/issues/557 # workaround should be safe to remove if issue gets fixed uid = str(uid) - self.icalendar_component.add( - "related-to", uid, parameters={"RELTYPE": reltype}, encode=True - ) + self.icalendar_component.add("related-to", uid, parameters={"RELTYPE": reltype}, encode=True) self.save() @@ -2094,9 +1991,7 @@ def get_relatives( raise ValueError("Unexpected value None for self.parent") if not isinstance(self.parent, Calendar): - raise ValueError( - "self.parent expected to be of type Calendar but it is not" - ) + raise ValueError("self.parent expected to be of type Calendar but it is not") for obj in uids: try: @@ -2118,11 +2013,7 @@ def _get_icalendar_component(self, assert_one=False): self.load(only_if_unloaded=True) if not self.icalendar_instance: return None - ret = [ - x - for x in self.icalendar_instance.subcomponents - if not isinstance(x, icalendar.Timezone) - ] + ret = [x for x in self.icalendar_instance.subcomponents if not isinstance(x, icalendar.Timezone)] error.assert_(len(ret) == 1 or not assert_one) for x in ret: for cl in ( @@ -2173,9 +2064,7 @@ def get_due(self): get_dtend = get_due - def add_attendee( - self, attendee, no_default_parameters: bool = False, **parameters - ) -> None: + def add_attendee(self, attendee, no_default_parameters: bool = False, **parameters) -> None: """ For the current (event/todo/journal), add an attendee. @@ -2368,9 +2257,7 @@ def _find_id_path(self, id=None, path=None) -> None: def _put(self, retry_on_failure=True): ## SECURITY TODO: we should probably have a check here to verify that no such object exists already - r = self.client.put( - self.url, self.data, {"Content-Type": 'text/calendar; charset="utf-8"'} - ) + r = self.client.put(self.url, self.data, {"Content-Type": 'text/calendar; charset="utf-8"'}) if r.status == 302: path = [x[1] for x in r.headers if x[0] == "location"][0] elif r.status not in (204, 201): @@ -2415,9 +2302,7 @@ def change_attendee_status(self, attendee: Optional[Any] = None, **kwargs) -> No except error.NotFoundError: pass if not cnt: - raise error.NotFoundError( - "Principal %s is not invited to event" % str(attendee) - ) + raise error.NotFoundError("Principal %s is not invited to event" % str(attendee)) error.assert_(cnt == 1) return @@ -2456,11 +2341,7 @@ def save( * self """ - if ( - self._vobject_instance is None - and self._data is None - and self._icalendar_instance is None - ): + if self._vobject_instance is None and self._data is None and self._icalendar_instance is None: return self path = self.url.path if self.url else None @@ -2501,17 +2382,13 @@ def save( try: existing = method(self.id) if no_overwrite: - raise error.ConsistencyError( - "no_overwrite flag was set, but object already exists" - ) + raise error.ConsistencyError("no_overwrite flag was set, but object already exists") break except error.NotFoundError: pass if no_create and not existing: - raise error.ConsistencyError( - "no_create flag was set, but object does not exists" - ) + raise error.ConsistencyError("no_create flag was set, but object does not exists") if increase_seqno and b"SEQUENCE" in to_wire(self.data): seqno = self.icalendar_component.pop("SEQUENCE", None) @@ -2522,20 +2399,12 @@ def save( return self def is_loaded(self): - return ( - self._data or self._vobject_instance or self._icalendar_instance - ) and self.data.count("BEGIN:") > 1 + return (self._data or self._vobject_instance or self._icalendar_instance) and self.data.count("BEGIN:") > 1 def has_component(self): return ( - self._data - or self._vobject_instance - or (self._icalendar_instance and self.icalendar_component) - ) and self.data.count("BEGIN:VEVENT") + self.data.count( - "BEGIN:VTODO" - ) + self.data.count( - "BEGIN:VJOURNAL" - ) > 0 + self._data or self._vobject_instance or (self._icalendar_instance and self.icalendar_component) + ) and self.data.count("BEGIN:VEVENT") + self.data.count("BEGIN:VTODO") + self.data.count("BEGIN:VJOURNAL") > 0 def __str__(self) -> str: return "%s: %s" % (self.__class__.__name__, self.url) @@ -2581,9 +2450,7 @@ def _get_wire_data(self): return to_wire(self._icalendar_instance.to_ical()) return None - data: Any = property( - _get_data, _set_data, doc="vCal representation of the object as normal string" - ) + data: Any = property(_get_data, _set_data, doc="vCal representation of the object as normal string") wire_data = property( _get_wire_data, _set_data, @@ -2601,9 +2468,7 @@ def _get_vobject_instance(self) -> Optional[vobject.base.Component]: if self._get_data() is None: return None try: - self._set_vobject_instance( - vobject.readOne(to_unicode(self._get_data())) # type: ignore - ) + self._set_vobject_instance(vobject.readOne(to_unicode(self._get_data()))) # type: ignore except: log.critical( "Something went wrong while loading icalendar data into the vobject class. ical url: " @@ -2628,9 +2493,7 @@ def _get_icalendar_instance(self): if not self._icalendar_instance: if not self.data: return None - self.icalendar_instance = icalendar.Calendar.from_ical( - to_unicode(self.data) - ) + self.icalendar_instance = icalendar.Calendar.from_ical(to_unicode(self.data)) return self._icalendar_instance icalendar_instance: Any = property( @@ -2730,9 +2593,7 @@ def __init__( url: Union[str, ParseResult, SplitResult, URL, None] = None, id: Optional[Any] = None, ) -> None: - CalendarObjectResource.__init__( - self, client=parent.client, url=url, data=data, parent=parent, id=id - ) + CalendarObjectResource.__init__(self, client=parent.client, url=url, data=data, parent=parent, id=id) class Todo(CalendarObjectResource): @@ -2824,9 +2685,7 @@ def _next(self, ts=None, i=None, dtstart=None, rrule=None, by=None, no_count=Tru if not rrule: rrule = i["RRULE"] if not dtstart: - if by is True or ( - by is None and any((x for x in rrule if x.startswith("BY"))) - ): + if by is True or (by is None and any((x for x in rrule if x.startswith("BY")))): if "DTSTART" in i: dtstart = i["DTSTART"].dt else: @@ -2917,9 +2776,7 @@ def _complete_recurring_thisandfuture(self, completion_timestamp) -> None: ## We copy the original one just_completed = orig.copy() just_completed.pop("RRULE") - just_completed.add( - "RECURRENCE-ID", orig.get("DTSTART", completion_timestamp) - ) + just_completed.add("RECURRENCE-ID", orig.get("DTSTART", completion_timestamp)) seqno = just_completed.pop("SEQUENCE", 0) just_completed.add("SEQUENCE", seqno + 1) recurrences.append(just_completed) @@ -2956,12 +2813,8 @@ def _complete_recurring_thisandfuture(self, completion_timestamp) -> None: thisandfuture.add("RRULE", rrule2) else: count = rrule.get("COUNT", None) - if count is not None and count[0] <= len( - [x for x in recurrences if not self._is_pending(x)] - ): - self._complete_ical( - recurrences[0], completion_timestamp=completion_timestamp - ) + if count is not None and count[0] <= len([x for x in recurrences if not self._is_pending(x)]): + self._complete_ical(recurrences[0], completion_timestamp=completion_timestamp) self.save(increase_seqno=False) return @@ -3000,9 +2853,7 @@ def complete( completion_timestamp = datetime.utcnow().astimezone(timezone.utc) if "RRULE" in self.icalendar_component and handle_rrule: - return getattr(self, "_complete_recurring_%s" % rrule_mode)( - completion_timestamp - ) + return getattr(self, "_complete_recurring_%s" % rrule_mode)(completion_timestamp) self._complete_ical(completion_timestamp=completion_timestamp) self.save() diff --git a/examples/basic_usage_examples.py b/examples/basic_usage_examples.py index bf404e5..c818184 100644 --- a/examples/basic_usage_examples.py +++ b/examples/basic_usage_examples.py @@ -54,9 +54,7 @@ def run_examples(): ## * server may not support it (it's not mandatory in the CalDAV RFC) ## * principal may not have the permission to create calendars ## * some cloud providers have a global namespace - my_new_calendar = my_principal.make_calendar( - name="Test calendar from caldav examples" - ) + my_new_calendar = my_principal.make_calendar(name="Test calendar from caldav examples") ## Let's add some events to our newly created calendar add_stuff_to_calendar_demo(my_new_calendar) @@ -139,9 +137,7 @@ def read_modify_event_demo(event): uid = event.icalendar_component["uid"] ## Let's correct that typo using the icalendar library. - event.icalendar_component["summary"] = event.icalendar_component["summary"].replace( - "celebratiuns", "celebrations" - ) + event.icalendar_component["summary"] = event.icalendar_component["summary"].replace("celebratiuns", "celebrations") ## timestamps (DTSTAMP, DTSTART, DTEND for events, DUE for tasks, ## etc) can be fetched using the icalendar library like this: @@ -183,10 +179,7 @@ def read_modify_event_demo(event): ## Finally, let's verify that the correct data was saved calendar = event.parent same_event = calendar.event_by_uid(uid) - assert ( - same_event.icalendar_component["summary"] - == "Norwegian national day celebrations" - ) + assert same_event.icalendar_component["summary"] == "Norwegian national day celebrations" def search_calendar_demo(calendar): @@ -279,9 +272,7 @@ def find_delete_calendar_demo(my_principal, calendar_name): ## This will raise a NotFoundError if calendar does not exist demo_calendar = my_principal.calendar(name="Test calendar from caldav examples") assert demo_calendar - print( - f"We found an existing calendar with name {calendar_name}, now deleting it" - ) + print(f"We found an existing calendar with name {calendar_name}, now deleting it") demo_calendar.delete() except caldav.error.NotFoundError: ## Calendar was not found diff --git a/examples/scheduling_examples.py b/examples/scheduling_examples.py index 4b591ad..b4bb059 100644 --- a/examples/scheduling_examples.py +++ b/examples/scheduling_examples.py @@ -41,9 +41,7 @@ def __init__(self, i): calendar_id = "schedulingtestcalendar%i" % i calendar_name = "calendar #%i for scheduling demo" % i self.cleanup(calendar_name) - self.calendar = self.principal.make_calendar( - name=calendar_name, cal_id=calendar_id - ) + self.calendar = self.principal.make_calendar(name=calendar_name, cal_id=calendar_id) def cleanup(self, calendar_name): ## Cleanup from earlier runs @@ -138,12 +136,8 @@ def cleanup(self, calendar_name): print("Sending a calendar invite") organizer.calendar.save_with_invites(caldata, attendees=attendees) -print( - "Storing another calendar event with the same participants, but without sending out emails" -) -organizer.calendar.save_with_invites( - caldata2, attendees=attendees, schedule_agent="NONE" -) +print("Storing another calendar event with the same participants, but without sending out emails") +organizer.calendar.save_with_invites(caldata2, attendees=attendees, schedule_agent="NONE") ## There are some attendee parameters that may be set (TODO: add ## example code), the convenience method above will use sensible diff --git a/pyproject.toml b/pyproject.toml index 68de6c4..25fc405 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,3 +53,6 @@ include-package-data = true [tool.setuptools.packages.find] exclude = ["tests"] namespaces = false + +[tool.black] +line-length = 120 diff --git a/tests/_test_absolute.py b/tests/_test_absolute.py index 39cb83a..fc30b4c 100644 --- a/tests/_test_absolute.py +++ b/tests/_test_absolute.py @@ -11,9 +11,7 @@ class TestRadicale(object): "Standard - GBA", ) ) - DTSTART = set( - (datetime.datetime(2011, 3, 4, 20, 0), datetime.datetime(2011, 1, 15, 20, 0)) - ) + DTSTART = set((datetime.datetime(2011, 3, 4, 20, 0), datetime.datetime(2011, 1, 15, 20, 0))) def setup(self): URL = "http://localhost:8080/nicoe/perso/" diff --git a/tests/conf.py b/tests/conf.py index 01b9754..66fbc6c 100644 --- a/tests/conf.py +++ b/tests/conf.py @@ -139,9 +139,7 @@ ################################################################### # Convenience - get a DAVClient object from the caldav_servers list ################################################################### -CONNKEYS = set( - ("url", "proxy", "username", "password", "ssl_verify_cert", "ssl_cert", "auth") -) +CONNKEYS = set(("url", "proxy", "username", "password", "ssl_verify_cert", "ssl_cert", "auth")) def client(idx=None, **kwargs): diff --git a/tests/proxy.py b/tests/proxy.py index b74b543..9585d2f 100644 --- a/tests/proxy.py +++ b/tests/proxy.py @@ -63,9 +63,7 @@ def _connect_to(self, netloc, soc): host_port = netloc[:i], int(netloc[i + 1 :]) else: host_port = netloc, 80 - self.server.logger.log( - logging.INFO, "connect to %s:%d", host_port[0], host_port[1] - ) + self.server.logger.log(logging.INFO, "connect to %s:%d", host_port[0], host_port[1]) try: soc.connect(host_port) except socket.error as arg: @@ -82,9 +80,7 @@ def do_CONNECT(self): try: if self._connect_to(self.path, soc): self.log_request(200) - self.wfile.write( - self.protocol_version + " 200 Connection established\r\n" - ) + self.wfile.write(self.protocol_version + " 200 Connection established\r\n") self.wfile.write("Proxy-agent: %s\r\n" % self.version_string()) self.wfile.write("\r\n") self._read_write(soc, 300) @@ -178,14 +174,10 @@ def _read_write(self, soc, max_idling=20, local=False): do_PROPFIND = do_GET def log_message(self, format, *args): - self.server.logger.log( - logging.INFO, "%s %s", self.address_string(), format % args - ) + self.server.logger.log(logging.INFO, "%s %s", self.address_string(), format % args) def log_error(self, format, *args): - self.server.logger.log( - logging.ERROR, "%s %s", self.address_string(), format % args - ) + self.server.logger.log(logging.ERROR, "%s %s", self.address_string(), format % args) class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): @@ -212,13 +204,9 @@ def logSetup(filename, log_size, daemon): DEFAULT_LOG_FILENAME, maxBytes=(log_size * (1 << 20)), backupCount=5 ) else: - handler = logging.handlers.RotatingFileHandler( - filename, maxBytes=(log_size * (1 << 20)), backupCount=5 - ) + handler = logging.handlers.RotatingFileHandler(filename, maxBytes=(log_size * (1 << 20)), backupCount=5) fmt = logging.Formatter( - "[%(asctime)-12s.%(msecs)03d] " - "%(levelname)-8s {%(name)s %(threadName)s}" - " %(message)s", + "[%(asctime)-12s.%(msecs)03d] " "%(levelname)-8s {%(name)s %(threadName)s}" " %(message)s", "%Y-%m-%d %H:%M:%S", ) handler.setFormatter(fmt) diff --git a/tests/test_caldav.py b/tests/test_caldav.py index 2c003df..3940db2 100644 --- a/tests/test_caldav.py +++ b/tests/test_caldav.py @@ -345,9 +345,7 @@ ) -@pytest.mark.skipif( - not rfc6638_users, reason="need rfc6638_users to be set in order to run this test" -) +@pytest.mark.skipif(not rfc6638_users, reason="need rfc6638_users to be set in order to run this test") @pytest.mark.skipif( len(rfc6638_users) < 3, reason="need at least three users in rfc6638_users to be set in order to run this test", @@ -410,19 +408,13 @@ def testInviteAndRespond(self): ## out existing stuff from new stuff if len(self.principals) < 2: pytest.skip("need 2 principals to do the invite and respond test") - inbox_items = set( - [x.url for x in self.principals[0].schedule_inbox().get_items()] - ) - inbox_items.update( - set([x.url for x in self.principals[1].schedule_inbox().get_items()]) - ) + inbox_items = set([x.url for x in self.principals[0].schedule_inbox().get_items()]) + inbox_items.update(set([x.url for x in self.principals[1].schedule_inbox().get_items()])) ## self.principal[0] is the organizer, and invites self.principal[1] organizers_calendar = self._getCalendar(0) attendee_calendar = self._getCalendar(1) - organizers_calendar.save_with_invites( - sched, [self.principals[0], self.principals[1].get_vcal_address()] - ) + organizers_calendar.save_with_invites(sched, [self.principals[0], self.principals[1].get_vcal_address()]) assert len(organizers_calendar.events()) == 1 ## no new inbox items expected for principals[0] @@ -512,9 +504,7 @@ def setup_method(self): self.caldav = client(**self.server_params) if False and self.check_compatibility_flag("no-current-user-principal"): - self.principal = Principal( - client=self.caldav, url=self.server_params["principal_url"] - ) + self.principal = Principal(client=self.caldav, url=self.server_params["principal_url"]) else: self.principal = self.caldav.principal() @@ -541,10 +531,7 @@ def _cleanup(self, mode=None): return if self.check_compatibility_flag("read_only"): return ## no cleanup needed - if ( - self.check_compatibility_flag("no_mkcalendar") - or self.cleanup_regime == "thorough" - ): + if self.check_compatibility_flag("no_mkcalendar") or self.cleanup_regime == "thorough": for uid in uids_used: try: obj = self._fixCalendar().object_by_uid(uid) @@ -552,9 +539,7 @@ def _cleanup(self, mode=None): except error.NotFoundError: pass except: - logging.error( - "Something went kaboom while deleting event", exc_info=True - ) + logging.error("Something went kaboom while deleting event", exc_info=True) return for cal in self.calendars_used: cal.delete() @@ -587,9 +572,7 @@ def _fixCalendar(self, **kwargs): should see if there exists a test calendar, if that's not possible, give up and return the primary calendar. """ - if self.check_compatibility_flag( - "no_mkcalendar" - ) or self.check_compatibility_flag("read_only"): + if self.check_compatibility_flag("no_mkcalendar") or self.check_compatibility_flag("read_only"): if not self._default_calendar: calendars = self.principal.calendars() for c in calendars: @@ -606,17 +589,13 @@ def _fixCalendar(self, **kwargs): self._default_calendar = calendars[0] return self._default_calendar else: - if not self.check_compatibility_flag( - "unique_calendar_ids" - ) and self.cleanup_regime in ("light", "pre"): + if not self.check_compatibility_flag("unique_calendar_ids") and self.cleanup_regime in ("light", "pre"): self._teardownCalendar(cal_id=self.testcal_id) if self.check_compatibility_flag("no_displayname"): name = None else: name = "Yep" - ret = self.principal.make_calendar( - name=name, cal_id=self.testcal_id, **kwargs - ) + ret = self.principal.make_calendar(name=name, cal_id=self.testcal_id, **kwargs) ## TEMP - checking that the calendar works ret.events() if self.cleanup_regime == "post": @@ -716,17 +695,12 @@ def testGetCalendar(self): def testProxy(self): if self.caldav.url.scheme == "https": - pytest.skip( - "Skipping %s.testProxy as the TinyHTTPProxy " - "implementation doesn't support https" - ) + pytest.skip("Skipping %s.testProxy as the TinyHTTPProxy " "implementation doesn't support https") self.skip_on_compatibility_flag("no_default_calendar") server_address = ("127.0.0.1", 8080) try: - proxy_httpd = NonThreadingHTTPServer( - server_address, ProxyHandler, logging.getLogger("TinyHTTPProxy") - ) + proxy_httpd = NonThreadingHTTPServer(server_address, ProxyHandler, logging.getLogger("TinyHTTPProxy")) except: pytest.skip("Unable to set up proxy server") @@ -779,9 +753,7 @@ def testPrincipal(self): def testCreateDeleteCalendar(self): self.skip_on_compatibility_flag("no_mkcalendar") self.skip_on_compatibility_flag("read_only") - if not self.check_compatibility_flag( - "unique_calendar_ids" - ) and self.cleanup_regime in ("light", "pre"): + if not self.check_compatibility_flag("unique_calendar_ids") and self.cleanup_regime in ("light", "pre"): self._teardownCalendar(cal_id=self.testcal_id) c = self.principal.make_calendar(name="Yep", cal_id=self.testcal_id) assert c.url is not None @@ -819,9 +791,7 @@ def testCreateEvent(self): assert len(events2) == len(existing_events) + 1 assert events2[0].url == events[0].url - if not self.check_compatibility_flag( - "no_mkcalendar" - ) and not self.check_compatibility_flag("no_displayname"): + if not self.check_compatibility_flag("no_mkcalendar") and not self.check_compatibility_flag("no_displayname"): # We should be able to access the calender through the name c2 = self.principal.calendar(name="Yep") ## may break if we have multiple calendars with the same name @@ -873,9 +843,9 @@ def testObjectBySyncToken(self): if not self.check_compatibility_flag("no_recurring"): c.save_event(evr) objcnt += 1 - if not self.check_compatibility_flag( - "no_todo" - ) and not self.check_compatibility_flag("no_todo_on_standard_calendar"): + if not self.check_compatibility_flag("no_todo") and not self.check_compatibility_flag( + "no_todo_on_standard_calendar" + ): c.save_todo(todo) c.save_todo(todo2) c.save_todo(todo3) @@ -914,9 +884,7 @@ def testObjectBySyncToken(self): time.sleep(1) ## The modified object should be returned by the server - my_changed_objects = c.objects_by_sync_token( - sync_token=my_changed_objects.sync_token, load_objects=True - ) + my_changed_objects = c.objects_by_sync_token(sync_token=my_changed_objects.sync_token, load_objects=True) if self.check_compatibility_flag("fragile_sync_tokens"): assert len(list(my_changed_objects)) >= 1 else: @@ -929,9 +897,7 @@ def testObjectBySyncToken(self): time.sleep(1) ## Re-running objects_by_sync_token, and no objects should be returned - my_changed_objects = c.objects_by_sync_token( - sync_token=my_changed_objects.sync_token - ) + my_changed_objects = c.objects_by_sync_token(sync_token=my_changed_objects.sync_token) if not self.check_compatibility_flag("fragile_sync_tokens"): assert len(list(my_changed_objects)) == 0 @@ -942,9 +908,7 @@ def testObjectBySyncToken(self): obj3 = c.save_event(ev3) if self.check_compatibility_flag("time_based_sync_tokens"): time.sleep(1) - my_changed_objects = c.objects_by_sync_token( - sync_token=my_changed_objects.sync_token - ) + my_changed_objects = c.objects_by_sync_token(sync_token=my_changed_objects.sync_token) if not self.check_compatibility_flag("fragile_sync_tokens"): assert len(list(my_changed_objects)) == 1 @@ -952,9 +916,7 @@ def testObjectBySyncToken(self): time.sleep(1) ## Re-running objects_by_sync_token, and no objects should be returned - my_changed_objects = c.objects_by_sync_token( - sync_token=my_changed_objects.sync_token - ) + my_changed_objects = c.objects_by_sync_token(sync_token=my_changed_objects.sync_token) if not self.check_compatibility_flag("fragile_sync_tokens"): assert len(list(my_changed_objects)) == 0 @@ -966,9 +928,7 @@ def testObjectBySyncToken(self): self.skip_on_compatibility_flag("sync_breaks_on_delete") if self.check_compatibility_flag("time_based_sync_tokens"): time.sleep(1) - my_changed_objects = c.objects_by_sync_token( - sync_token=my_changed_objects.sync_token, load_objects=True - ) + my_changed_objects = c.objects_by_sync_token(sync_token=my_changed_objects.sync_token, load_objects=True) if not self.check_compatibility_flag("fragile_sync_tokens"): assert len(list(my_changed_objects)) == 1 if self.check_compatibility_flag("time_based_sync_tokens"): @@ -977,9 +937,7 @@ def testObjectBySyncToken(self): assert list(my_changed_objects)[0].data is None ## Re-running objects_by_sync_token, and no objects should be returned - my_changed_objects = c.objects_by_sync_token( - sync_token=my_changed_objects.sync_token - ) + my_changed_objects = c.objects_by_sync_token(sync_token=my_changed_objects.sync_token) if not self.check_compatibility_flag("fragile_sync_tokens"): assert len(list(my_changed_objects)) == 0 @@ -1003,9 +961,9 @@ def testSync(self): if not self.check_compatibility_flag("no_recurring"): c.save_event(evr) objcnt += 1 - if not self.check_compatibility_flag( - "no_todo" - ) and not self.check_compatibility_flag("no_todo_on_standard_calendar"): + if not self.check_compatibility_flag("no_todo") and not self.check_compatibility_flag( + "no_todo_on_standard_calendar" + ): c.save_todo(todo) c.save_todo(todo2) c.save_todo(todo3) @@ -1089,9 +1047,7 @@ def testSync(self): def testLoadEvent(self): self.skip_on_compatibility_flag("read_only") self.skip_on_compatibility_flag("no_mkcalendar") - if not self.check_compatibility_flag( - "unique_calendar_ids" - ) and self.cleanup_regime in ("light", "pre"): + if not self.check_compatibility_flag("unique_calendar_ids") and self.cleanup_regime in ("light", "pre"): self._teardownCalendar(cal_id=self.testcal_id) self._teardownCalendar(cal_id=self.testcal_id2) c1 = self.principal.make_calendar(name="Yep", cal_id=self.testcal_id) @@ -1103,19 +1059,14 @@ def testLoadEvent(self): assert e1.url == e1_.url if not self.check_compatibility_flag("event_by_url_is_broken"): e1.load() - if ( - not self.check_compatibility_flag("unique_calendar_ids") - and self.cleanup_regime == "post" - ): + if not self.check_compatibility_flag("unique_calendar_ids") and self.cleanup_regime == "post": self._teardownCalendar(cal_id=self.testcal_id) self._teardownCalendar(cal_id=self.testcal_id2) def testCopyEvent(self): self.skip_on_compatibility_flag("read_only") self.skip_on_compatibility_flag("no_mkcalendar") - if not self.check_compatibility_flag( - "unique_calendar_ids" - ) and self.cleanup_regime in ("light", "pre"): + if not self.check_compatibility_flag("unique_calendar_ids") and self.cleanup_regime in ("light", "pre"): self._teardownCalendar(cal_id=self.testcal_id) self._teardownCalendar(cal_id=self.testcal_id2) @@ -1131,14 +1082,10 @@ def testCopyEvent(self): e1_dup.save() assert len(c1.events()) == 2 - if not self.check_compatibility_flag( - "duplicate_in_other_calendar_with_same_uid_breaks" - ): + if not self.check_compatibility_flag("duplicate_in_other_calendar_with_same_uid_breaks"): e1_in_c2 = e1.copy(new_parent=c2, keep_uid=True) e1_in_c2.save() - if not self.check_compatibility_flag( - "duplicate_in_other_calendar_with_same_uid_is_lost" - ): + if not self.check_compatibility_flag("duplicate_in_other_calendar_with_same_uid_is_lost"): assert len(c2.events()) == 1 ## what will happen with the event in c1 if we modify the event in c2, @@ -1162,10 +1109,7 @@ def testCopyEvent(self): else: assert len(c1.events()) == 2 - if ( - not self.check_compatibility_flag("unique_calendar_ids") - and self.cleanup_regime == "post" - ): + if not self.check_compatibility_flag("unique_calendar_ids") and self.cleanup_regime == "post": self._teardownCalendar(cal_id=self.testcal_id) self._teardownCalendar(cal_id=self.testcal_id2) @@ -1230,9 +1174,7 @@ def testSearchEvent(self): ## Search for misc text fields ## UID is a special case, supported by almost all servers - some_events = c.search( - comp_class=Event, uid="19970901T130000Z-123403@example.com" - ) + some_events = c.search(comp_class=Event, uid="19970901T130000Z-123403@example.com") if not self.check_compatibility_flag("text_search_not_working"): assert len(some_events) == 1 @@ -1277,9 +1219,7 @@ def testSearchEvent(self): ## This is not a very useful search, and it's sort of a client side bug that we allow it at all. ## It will not match if categories field is set to "PERSONAL,ANNIVERSARY,SPECIAL OCCASION" ## It may not match since the above is to be considered equivalent to the raw data entered. - some_events = c.search( - comp_class=Event, category="ANNIVERSARY,PERSONAL,SPECIAL OCCASION" - ) + some_events = c.search(comp_class=Event, category="ANNIVERSARY,PERSONAL,SPECIAL OCCASION") assert len(some_events) in (0, 1) ## TODO: This is actually a bug. We need to do client side filtering some_events = c.search(comp_class=Event, category="PERSON") @@ -1371,9 +1311,7 @@ def testSearchSortTodo(self): ) def check_order(tasks, order): - assert [str(x.icalendar_component["uid"]) for x in tasks] == [ - "tsst" + str(x) for x in order - ] + assert [str(x.icalendar_component["uid"]) for x in tasks] == ["tsst" + str(x) for x in order] all_tasks = c.search(todo=True, sort_keys=("uid",)) check_order(all_tasks, (1, 2, 3, 4, 6)) @@ -1381,9 +1319,7 @@ def check_order(tasks, order): all_tasks = c.search(sort_keys=("summary",)) check_order(all_tasks, (1, 2, 3, 4, 5, 6)) - all_tasks = c.search( - sort_keys=("isnt_overdue", "categories", "dtstart", "priority", "status") - ) + all_tasks = c.search(sort_keys=("isnt_overdue", "categories", "dtstart", "priority", "status")) ## This is difficult ... ## * 1 is the only one that is overdue, and False sorts before True, so 1 comes first ## * categories, empty string sorts before a non-empty string, so 6 is at the end of the list @@ -1441,14 +1377,14 @@ def testSearchTodos(self): ## Too much copying of the examples ... some_todos = c.search(comp_class=Todo, category="FINANCE") - if not self.check_compatibility_flag( - "category_search_yields_nothing" - ) and not self.check_compatibility_flag("text_search_not_working"): + if not self.check_compatibility_flag("category_search_yields_nothing") and not self.check_compatibility_flag( + "text_search_not_working" + ): assert len(some_todos) == 6 some_todos = c.search(comp_class=Todo, category="finance") - if not self.check_compatibility_flag( - "category_search_yields_nothing" - ) and not self.check_compatibility_flag("text_search_not_working"): + if not self.check_compatibility_flag("category_search_yields_nothing") and not self.check_compatibility_flag( + "text_search_not_working" + ): if self.check_compatibility_flag("text_search_is_case_insensitive"): assert len(some_todos) == 6 else: @@ -1467,9 +1403,9 @@ def testSearchTodos(self): assert len(some_todos) in (0, 6) elif self.check_compatibility_flag("text_search_is_exact_match_only"): assert len(some_todos) == 0 - elif not self.check_compatibility_flag( - "category_search_yields_nothing" - ) and not self.check_compatibility_flag("text_search_not_working"): + elif not self.check_compatibility_flag("category_search_yields_nothing") and not self.check_compatibility_flag( + "text_search_not_working" + ): ## This is the correct thing, according to the letter of the RFC assert len(some_todos) == 6 @@ -1491,13 +1427,9 @@ def testWrongPassword(self): or not self.server_params["password"] or self.server_params["password"] == "any-password-seems-to-work" ): - pytest.skip( - "Testing with wrong password skipped as calendar server does not require a password" - ) + pytest.skip("Testing with wrong password skipped as calendar server does not require a password") server_params = self.server_params.copy() - server_params["password"] = ( - codecs.encode(server_params["password"], "rot13") + "!" - ) + server_params["password"] = codecs.encode(server_params["password"], "rot13") + "!" with pytest.raises(error.AuthorizationError): client(**server_params).principal() @@ -1539,9 +1471,7 @@ def testCreateChildParent(self): rt = parent_.icalendar_component["RELATED-TO"] assert len(rt) == 2 assert set([str(rt[0]), str(rt[1])]) == set([grandparent.id, child.id]) - assert set([rt[0].params["RELTYPE"], rt[1].params["RELTYPE"]]) == set( - ["CHILD", "PARENT"] - ) + assert set([rt[0].params["RELTYPE"], rt[1].params["RELTYPE"]]) == set(["CHILD", "PARENT"]) rt = child_.icalendar_component["RELATED-TO"] if isinstance(rt, list): assert len(rt) == 1 @@ -1579,21 +1509,13 @@ def testSetDue(self): ## setting the due should ... set the due (surprise, surprise) some_todo.set_due(datetime(2022, 12, 26, 20, 10, tzinfo=utc)) - assert some_todo.icalendar_component["DUE"].dt == datetime( - 2022, 12, 26, 20, 10, tzinfo=utc - ) - assert some_todo.icalendar_component["DTSTART"].dt == datetime( - 2022, 12, 26, 19, 15, tzinfo=utc - ) + assert some_todo.icalendar_component["DUE"].dt == datetime(2022, 12, 26, 20, 10, tzinfo=utc) + assert some_todo.icalendar_component["DTSTART"].dt == datetime(2022, 12, 26, 19, 15, tzinfo=utc) ## move_dtstart causes the duration to be unchanged some_todo.set_due(datetime(2022, 12, 26, 20, 20, tzinfo=utc), move_dtstart=True) - assert some_todo.icalendar_component["DUE"].dt == datetime( - 2022, 12, 26, 20, 20, tzinfo=utc - ) - assert some_todo.icalendar_component["DTSTART"].dt == datetime( - 2022, 12, 26, 19, 25, tzinfo=utc - ) + assert some_todo.icalendar_component["DUE"].dt == datetime(2022, 12, 26, 20, 20, tzinfo=utc) + assert some_todo.icalendar_component["DTSTART"].dt == datetime(2022, 12, 26, 19, 25, tzinfo=utc) ## This task has duration set rather than due. Due should be implied to be 19:30. some_other_todo = c.save_todo( @@ -1602,15 +1524,9 @@ def testSetDue(self): summary="Some other task", uid="ctuid2", ) - some_other_todo.set_due( - datetime(2022, 12, 26, 19, 45, tzinfo=utc), move_dtstart=True - ) - assert some_other_todo.icalendar_component["DUE"].dt == datetime( - 2022, 12, 26, 19, 45, tzinfo=utc - ) - assert some_other_todo.icalendar_component["DTSTART"].dt == datetime( - 2022, 12, 26, 19, 30, tzinfo=utc - ) + some_other_todo.set_due(datetime(2022, 12, 26, 19, 45, tzinfo=utc), move_dtstart=True) + assert some_other_todo.icalendar_component["DUE"].dt == datetime(2022, 12, 26, 19, 45, tzinfo=utc) + assert some_other_todo.icalendar_component["DTSTART"].dt == datetime(2022, 12, 26, 19, 30, tzinfo=utc) some_todo.save() @@ -1633,12 +1549,8 @@ def testSetDue(self): move_dtstart=True, check_dependent=True, ) - assert some_todo.icalendar_component["DUE"].dt == datetime( - 2022, 12, 26, 20, 30, tzinfo=utc - ) - assert some_todo.icalendar_component["DTSTART"].dt == datetime( - 2022, 12, 26, 19, 35, tzinfo=utc - ) + assert some_todo.icalendar_component["DUE"].dt == datetime(2022, 12, 26, 20, 30, tzinfo=utc) + assert some_todo.icalendar_component["DTSTART"].dt == datetime(2022, 12, 26, 19, 35, tzinfo=utc) ## This should not work out (set the children due to some time before the parents due) with pytest.raises(error.ConsistencyError): @@ -1663,12 +1575,8 @@ def testSetDue(self): move_dtstart=True, check_dependent=True, ) - assert some_todo.icalendar_component["DUE"].dt == datetime( - 2022, 12, 26, 20, 31, tzinfo=utc - ) - assert some_todo.icalendar_component["DTSTART"].dt == datetime( - 2022, 12, 26, 19, 36, tzinfo=utc - ) + assert some_todo.icalendar_component["DUE"].dt == datetime(2022, 12, 26, 20, 31, tzinfo=utc) + assert some_todo.icalendar_component["DTSTART"].dt == datetime(2022, 12, 26, 19, 36, tzinfo=utc) def testCreateJournalListAndJournalEntry(self): """ @@ -1733,9 +1641,7 @@ def testCreateTaskListAndTodo(self): assert len(todos) == 1 assert len(todos2) == 1 - t3 = c.save_todo( - summary="mop the floor", categories=["housework"], priority=4, uid="ctuid1" - ) + t3 = c.save_todo(summary="mop the floor", categories=["housework"], priority=4, uid="ctuid1") assert len(c.todos()) == 2 # adding a todo without a UID, it should also work (library will add the missing UID) @@ -1780,11 +1686,7 @@ def uids(lst): todos2 = c.todos(sort_key="priority") def pri(lst): - return [ - x.instance.vtodo.priority.value - for x in lst - if hasattr(x.instance.vtodo, "priority") - ] + return [x.instance.vtodo.priority.value for x in lst if hasattr(x.instance.vtodo, "priority")] assert pri(todos) == pri([t4, t2]) assert pri(todos2) == pri([t4, t2]) @@ -1869,13 +1771,9 @@ def testTodoDatesearch(self): # Hence a compliant server should chuck out all the todos except t5. # Not all servers perform according to (my interpretation of) the RFC. foo = 5 - if self.check_compatibility_flag( - "no_recurring" - ) or self.check_compatibility_flag("no_recurring_todo"): + if self.check_compatibility_flag("no_recurring") or self.check_compatibility_flag("no_recurring_todo"): foo -= 1 ## t6 will not be returned - if self.check_compatibility_flag( - "vtodo_datesearch_nodtstart_task_is_skipped" - ) or self.check_compatibility_flag( + if self.check_compatibility_flag("vtodo_datesearch_nodtstart_task_is_skipped") or self.check_compatibility_flag( "vtodo_datesearch_nodtstart_task_is_skipped_in_closed_date_range" ): foo -= 2 ## t1 and t4 not returned @@ -1895,9 +1793,7 @@ def testTodoDatesearch(self): ## exercise the default for expand (maybe -> False for open-ended search) todos1 = c.date_search(start=datetime(2025, 4, 14), compfilter="VTODO") - todos2 = c.search( - start=datetime(2025, 4, 14), todo=True, include_completed=True - ) + todos2 = c.search(start=datetime(2025, 4, 14), todo=True, include_completed=True) todos3 = c.search(start=datetime(2025, 4, 14), todo=True) assert isinstance(todos1[0], Todo) @@ -1913,16 +1809,11 @@ def testTodoDatesearch(self): urls_found = [x.url for x in todos1] urls_found2 = [x.url for x in todos1] assert urls_found == urls_found2 - if not ( - self.check_compatibility_flag("no_recurring") - or self.check_compatibility_flag("no_recurring_todo") - ): + if not (self.check_compatibility_flag("no_recurring") or self.check_compatibility_flag("no_recurring_todo")): urls_found.remove(t6.url) if not self.check_compatibility_flag( "vtodo_datesearch_nodtstart_task_is_skipped" - ) and not self.check_compatibility_flag( - "vtodo_datesearch_notime_task_is_skipped" - ): + ) and not self.check_compatibility_flag("vtodo_datesearch_notime_task_is_skipped"): urls_found.remove(t4.url) if self.check_compatibility_flag("vtodo_no_due_infinite_duration"): urls_found.remove(t1.url) @@ -2040,17 +1931,13 @@ def testUtf8Event(self): # TODO: split up in creating a calendar with non-ascii name # and an event with non-ascii description self.skip_on_compatibility_flag("no_mkcalendar") - if not self.check_compatibility_flag( - "unique_calendar_ids" - ) and self.cleanup_regime in ("light", "pre"): + if not self.check_compatibility_flag("unique_calendar_ids") and self.cleanup_regime in ("light", "pre"): self._teardownCalendar(cal_id=self.testcal_id) c = self.principal.make_calendar(name="Yølp", cal_id=self.testcal_id) # add event - e1 = c.save_event( - ev1.replace("Bastille Day Party", "Bringebærsyltetøyfestival") - ) + e1 = c.save_event(ev1.replace("Bastille Day Party", "Bringebærsyltetøyfestival")) # fetch it back events = c.events() @@ -2064,25 +1951,18 @@ def testUtf8Event(self): if "zimbra" not in str(c.url): assert len(events) == 1 - if ( - not self.check_compatibility_flag("unique_calendar_ids") - and self.cleanup_regime == "post" - ): + if not self.check_compatibility_flag("unique_calendar_ids") and self.cleanup_regime == "post": self._teardownCalendar(cal_id=self.testcal_id) def testUnicodeEvent(self): self.skip_on_compatibility_flag("read_only") self.skip_on_compatibility_flag("no_mkcalendar") - if not self.check_compatibility_flag( - "unique_calendar_ids" - ) and self.cleanup_regime in ("light", "pre"): + if not self.check_compatibility_flag("unique_calendar_ids") and self.cleanup_regime in ("light", "pre"): self._teardownCalendar(cal_id=self.testcal_id) c = self.principal.make_calendar(name="Yølp", cal_id=self.testcal_id) # add event - e1 = c.save_event( - to_str(ev1.replace("Bastille Day Party", "Bringebærsyltetøyfestival")) - ) + e1 = c.save_event(to_str(ev1.replace("Bastille Day Party", "Bringebærsyltetøyfestival"))) # c.events() should give a full list of events events = c.events() @@ -2111,9 +1991,7 @@ def testSetCalendarProperties(self): # Creating a new calendar with different ID but with existing name # TODO: why do we do this? - if not self.check_compatibility_flag( - "unique_calendar_ids" - ) and self.cleanup_regime in ("light", "pre"): + if not self.check_compatibility_flag("unique_calendar_ids") and self.cleanup_regime in ("light", "pre"): self._teardownCalendar(cal_id=self.testcal_id2) cc = self.principal.make_calendar("Yep", self.testcal_id2) cc.delete() @@ -2225,14 +2103,14 @@ def testCreateOverwriteDeleteEvent(self): # add event e1 = c.save_event(ev1) - if not self.check_compatibility_flag( - "no_todo" - ) and not self.check_compatibility_flag("no_todo_on_standard_calendar"): + if not self.check_compatibility_flag("no_todo") and not self.check_compatibility_flag( + "no_todo_on_standard_calendar" + ): t1 = c.save_todo(todo) assert e1.url is not None - if not self.check_compatibility_flag( - "no_todo" - ) and not self.check_compatibility_flag("no_todo_on_standard_calendar"): + if not self.check_compatibility_flag("no_todo") and not self.check_compatibility_flag( + "no_todo_on_standard_calendar" + ): assert t1.url is not None if not self.check_compatibility_flag("event_by_url_is_broken"): assert c.event_by_url(e1.url).url == e1.url @@ -2242,25 +2120,25 @@ def testCreateOverwriteDeleteEvent(self): ## (but some calendars may throw a "409 Conflict") if not self.check_compatibility_flag("no_overwrite"): e2 = c.save_event(ev1) - if not self.check_compatibility_flag( - "no_todo" - ) and not self.check_compatibility_flag("no_todo_on_standard_calendar"): + if not self.check_compatibility_flag("no_todo") and not self.check_compatibility_flag( + "no_todo_on_standard_calendar" + ): t2 = c.save_todo(todo) ## add same event with "no_create". Should work like a charm. e2 = c.save_event(ev1, no_create=True) - if not self.check_compatibility_flag( - "no_todo" - ) and not self.check_compatibility_flag("no_todo_on_standard_calendar"): + if not self.check_compatibility_flag("no_todo") and not self.check_compatibility_flag( + "no_todo_on_standard_calendar" + ): t2 = c.save_todo(todo, no_create=True) ## this should also work. e2.instance.vevent.summary.value = e2.instance.vevent.summary.value + "!" e2.save(no_create=True) - if not self.check_compatibility_flag( - "no_todo" - ) and not self.check_compatibility_flag("no_todo_on_standard_calendar"): + if not self.check_compatibility_flag("no_todo") and not self.check_compatibility_flag( + "no_todo_on_standard_calendar" + ): t2.instance.vtodo.summary.value = t2.instance.vtodo.summary.value + "!" t2.save(no_create=True) @@ -2271,17 +2149,17 @@ def testCreateOverwriteDeleteEvent(self): ## "no_overwrite" should throw a ConsistencyError with pytest.raises(error.ConsistencyError): c.save_event(ev1, no_overwrite=True) - if not self.check_compatibility_flag( - "no_todo" - ) and not self.check_compatibility_flag("no_todo_on_standard_calendar"): + if not self.check_compatibility_flag("no_todo") and not self.check_compatibility_flag( + "no_todo_on_standard_calendar" + ): with pytest.raises(error.ConsistencyError): c.save_todo(todo, no_overwrite=True) # delete event e1.delete() - if not self.check_compatibility_flag( - "no_todo" - ) and not self.check_compatibility_flag("no_todo_on_standard_calendar"): + if not self.check_compatibility_flag("no_todo") and not self.check_compatibility_flag( + "no_todo_on_standard_calendar" + ): t1.delete if self.check_compatibility_flag("non_existing_raises_other"): @@ -2374,9 +2252,7 @@ def testDateSearchAndFreeBusy(self): # Lets try a freebusy request as well self.skip_on_compatibility_flag("no_freebusy_rfc4791") - freebusy = c.freebusy_request( - datetime(2007, 7, 13, 17, 00, 00), datetime(2007, 7, 15, 17, 00, 00) - ) + freebusy = c.freebusy_request(datetime(2007, 7, 13, 17, 00, 00), datetime(2007, 7, 15, 17, 00, 00)) # TODO: assert something more complex on the return object assert isinstance(freebusy, FreeBusy) assert freebusy.instance.vfreebusy @@ -2500,9 +2376,7 @@ def testObjects(self): for _caldav_server in caldav_servers: # create a unique identifier out of the server domain name _parsed_url = urlparse(_caldav_server["url"]) - _servername = _parsed_url.hostname.replace(".", "_").replace("-", "_") + str( - _parsed_url.port or "" - ) + _servername = _parsed_url.hostname.replace(".", "_").replace("-", "_") + str(_parsed_url.port or "") while _servername in _servernames: _servername = _servername + "_" _servernames.add(_servername) @@ -2529,18 +2403,14 @@ def setup_method(self): self.serverdir = tempfile.TemporaryDirectory() self.serverdir.__enter__() self.configuration = radicale.config.load("") - self.configuration.update( - {"storage": {"filesystem_folder": self.serverdir.name}} - ) + self.configuration.update({"storage": {"filesystem_folder": self.serverdir.name}}) self.server = radicale.server self.server_params = { "url": "http://%s:%i/" % (radicale_host, radicale_port), "username": "user1", "password": "any-password-seems-to-work", } - self.server_params["backwards_compatibility_url"] = ( - self.server_params["url"] + "user1" - ) + self.server_params["backwards_compatibility_url"] = self.server_params["url"] + "user1" self.server_params["incompatibilities"] = compatibility_issues.radicale self.shutdown_socket, self.shutdown_socket_out = socket.socketpair() self.radicale_thread = threading.Thread( @@ -2594,9 +2464,7 @@ def setup_method(self): self.backend = XandikosBackend(path=self.serverdir.name) self.backend._mark_as_principal("/sometestuser/") self.backend.create_principal("/sometestuser/", create_defaults=True) - mainapp = XandikosApp( - self.backend, current_user_principal="sometestuser", strict=True - ) + mainapp = XandikosApp(self.backend, current_user_principal="sometestuser", strict=True) async def xandikos_handler(request): return await mainapp.aiohttp_handler(request, "/") @@ -2608,9 +2476,7 @@ async def xandikos_handler(request): self.xapp_runner = aiohttp.web.AppRunner(self.xapp) asyncio.set_event_loop(self.xapp_loop) self.xapp_loop.run_until_complete(self.xapp_runner.setup()) - self.xapp_site = aiohttp.web.TCPSite( - self.xapp_runner, host=xandikos_host, port=xandikos_port - ) + self.xapp_site = aiohttp.web.TCPSite(self.xapp_runner, host=xandikos_host, port=xandikos_port) self.xapp_loop.run_until_complete(self.xapp_site.start()) def aiohttp_server(): @@ -2619,9 +2485,7 @@ def aiohttp_server(): self.xandikos_thread = threading.Thread(target=aiohttp_server) self.xandikos_thread.start() self.server_params = {"url": "http://%s:%i/" % (xandikos_host, xandikos_port)} - self.server_params["backwards_compatibility_url"] = ( - self.server_params["url"] + "sometestuser" - ) + self.server_params["backwards_compatibility_url"] = self.server_params["url"] + "sometestuser" self.server_params["incompatibilities"] = compatibility_issues.xandikos RepeatedFunctionalTestsBaseClass.setup_method(self) diff --git a/tests/test_caldav_unit.py b/tests/test_caldav_unit.py index 7e1cbff..e78e1c8 100644 --- a/tests/test_caldav_unit.py +++ b/tests/test_caldav_unit.py @@ -195,18 +195,14 @@ def testZero(self): assert len(self.yearly.icalendar_instance.subcomponents) == 0 def testOne(self): - self.yearly.expand_rrule( - start=datetime(1998, 10, 10), end=datetime(1998, 12, 12) - ) + self.yearly.expand_rrule(start=datetime(1998, 10, 10), end=datetime(1998, 12, 12)) assert len(self.yearly.icalendar_instance.subcomponents) == 1 assert not "RRULE" in self.yearly.icalendar_component assert "UID" in self.yearly.icalendar_component assert "RECURRENCE-ID" in self.yearly.icalendar_component def testThree(self): - self.yearly.expand_rrule( - start=datetime(1996, 10, 10), end=datetime(1999, 12, 12) - ) + self.yearly.expand_rrule(start=datetime(1996, 10, 10), end=datetime(1999, 12, 12)) assert len(self.yearly.icalendar_instance.subcomponents) == 3 data1 = self.yearly.icalendar_instance.subcomponents[0].to_ical() data2 = self.yearly.icalendar_instance.subcomponents[1].to_ical() @@ -220,16 +216,11 @@ def testThreeTodo(self): assert data1.replace(b"19970", b"19980") == data2 def testSplit(self): - self.yearly.expand_rrule( - start=datetime(1996, 10, 10), end=datetime(1999, 12, 12) - ) + self.yearly.expand_rrule(start=datetime(1996, 10, 10), end=datetime(1999, 12, 12)) events = self.yearly.split_expanded() assert len(events) == 3 assert len(events[0].icalendar_instance.subcomponents) == 1 - assert ( - events[1].icalendar_component["UID"] - == "19970901T130000Z-123403@example.com" - ) + assert events[1].icalendar_component["UID"] == "19970901T130000Z-123403@example.com" def test241(self): """ @@ -238,9 +229,7 @@ def test241(self): This seems like sort of a duplicate of testThreeTodo, but the ftests actually started failing """ assert len(self.todo.data) > 128 - self.todo.expand_rrule( - start=datetime(1997, 4, 14, 0, 0), end=datetime(2015, 5, 14, 0, 0) - ) + self.todo.expand_rrule(start=datetime(1997, 4, 14, 0, 0), end=datetime(2015, 5, 14, 0, 0)) assert len(self.todo.data) > 128 @@ -359,9 +348,7 @@ def testAbsoluteURL(self): mocked_davresponse = DAVResponse(mocked_response) client.propfind = mock.MagicMock(return_value=mocked_davresponse) bernards_calendars = principal.calendar_home_set - assert bernards_calendars.url == URL( - "http://cal.example.com/home/bernard/calendars/" - ) + assert bernards_calendars.url == URL("http://cal.example.com/home/bernard/calendars/") def _load(self, only_if_unloaded=True): self.data = todo6 @@ -417,12 +404,8 @@ def testDateSearch(self): """ client = MockedDAVClient(xml) - calendar = Calendar( - client, url="/principals/calendar/home@petroski.example.com/963/" - ) - results = calendar.date_search( - datetime(2021, 2, 1), datetime(2021, 2, 7), expand=False - ) + calendar = Calendar(client, url="/principals/calendar/home@petroski.example.com/963/") + results = calendar.date_search(datetime(2021, 2, 1), datetime(2021, 2, 7), expand=False) assert len(results) == 3 def testCalendar(self): @@ -605,16 +588,9 @@ def test_xml_parsing(self): """ - expected_result = { - "/": {"{DAV:}current-user-principal": "/17149682/principal/"} - } + expected_result = {"/": {"{DAV:}current-user-principal": "/17149682/principal/"}} - assert ( - MockedDAVResponse(xml).expand_simple_props( - props=[dav.CurrentUserPrincipal()] - ) - == expected_result - ) + assert MockedDAVResponse(xml).expand_simple_props(props=[dav.CurrentUserPrincipal()]) == expected_result ## This duplicated response is observed in the real world - ## see https://github.com/python-caldav/caldav/issues/136 @@ -645,17 +621,8 @@ def test_xml_parsing(self): """ - expected_result = { - "/principals/users/frank/": { - "{DAV:}current-user-principal": "/principals/users/frank/" - } - } - assert ( - MockedDAVResponse(xml).expand_simple_props( - props=[dav.CurrentUserPrincipal()] - ) - == expected_result - ) + expected_result = {"/principals/users/frank/": {"{DAV:}current-user-principal": "/principals/users/frank/"}} + assert MockedDAVResponse(xml).expand_simple_props(props=[dav.CurrentUserPrincipal()]) == expected_result xml = """ @@ -676,10 +643,7 @@ def test_xml_parsing(self): "{urn:ietf:params:xml:ns:caldav}calendar-home-set": "https://p62-caldav.icloud.com:443/17149682/calendars/" } } - assert ( - MockedDAVResponse(xml).expand_simple_props(props=[cdav.CalendarHomeSet()]) - == expected_result - ) + assert MockedDAVResponse(xml).expand_simple_props(props=[cdav.CalendarHomeSet()]) == expected_result xml = """ @@ -695,15 +659,8 @@ def test_xml_parsing(self): """ - expected_result = { - "/": {"{DAV:}current-user-principal": "/17149682/principal/"} - } - assert ( - MockedDAVResponse(xml).expand_simple_props( - props=[dav.CurrentUserPrincipal()] - ) - == expected_result - ) + expected_result = {"/": {"{DAV:}current-user-principal": "/17149682/principal/"}} + assert MockedDAVResponse(xml).expand_simple_props(props=[dav.CurrentUserPrincipal()]) == expected_result xml = """ @@ -751,10 +708,7 @@ def test_xml_parsing(self): "{urn:ietf:params:xml:ns:caldav}calendar-data": "BEGIN:VCALENDAR\nVERSION:2.0\nPRODID:-//Example Corp.//CalDAV Client//EN\nBEGIN:VEVENT\nUID:20010712T182145Z-123401@example.com\nDTSTAMP:20060712T182145Z\nDTSTART:20060714T170000Z\nDTEND:20060715T040000Z\nSUMMARY:Bastille Day Party\nEND:VEVENT\nEND:VCALENDAR\n" }, } - assert ( - MockedDAVResponse(xml).expand_simple_props(props=[cdav.CalendarData()]) - == expected_result - ) + assert MockedDAVResponse(xml).expand_simple_props(props=[cdav.CalendarData()]) == expected_result xml = """ @@ -942,10 +896,7 @@ def testHugeTreeParam(self): ATTACH;VALUE=BINARY;ENCODING=BASE64;FMTTYPE=image/jpeg; X-FILENAME=image001.jpg;X-ORACLE-FILENAME=image001.jpg: """ - xml += ( - "gIyIoLTkwKCo2KyIjM4444449QEBAJjBGS0U+Sjk/QD3/2wBDAQsLCw8NDx0QEB09KSMpPT09\n" - * 153490 - ) + xml += "gIyIoLTkwKCo2KyIjM4444449QEBAJjBGS0U+Sjk/QD3/2wBDAQsLCw8NDx0QEB09KSMpPT09\n" * 153490 xml += """ /Z DTSTART;TZID="Europe/Paris":20230310T140000 @@ -1053,9 +1004,7 @@ def testInstance(self): assert my_event.vobject_instance.vevent.summary.value == "yet another summary" ## Now the data has been converted from string to vobject to string to icalendar to string to vobject and ... will the string still match the original? lines_now = my_event.data.strip().split("\n") - lines_orig = ( - ev1.replace("Bastille Day Party", "yet another summary").strip().split("\n") - ) + lines_orig = ev1.replace("Bastille Day Party", "yet another summary").strip().split("\n") lines_now.sort() lines_orig.sort() assert lines_now == lines_orig @@ -1069,19 +1018,14 @@ def testComponent(self): assert my_event.vobject_instance.vevent.summary.value == "yet another summary" ## will the string still match the original? lines_now = my_event.data.strip().split("\n") - lines_orig = ( - ev1.replace("Bastille Day Party", "yet another summary").strip().split("\n") - ) + lines_orig = ev1.replace("Bastille Day Party", "yet another summary").strip().split("\n") lines_now.sort() lines_orig.sort() assert lines_now == lines_orig ## Can we replace the component? (One shouldn't do things like this in normal circumstances though ... both because the uid changes and because the component type changes - we're putting a vtodo into an Event class ...) icalendar_component = icalendar.Todo.from_ical(todo).subcomponents[0] my_event.icalendar_component = icalendar_component - assert ( - my_event.vobject_instance.vtodo.summary.value - == "Submit Quebec Income Tax Return for 2006" - ) + assert my_event.vobject_instance.vtodo.summary.value == "Submit Quebec Income Tax Return for 2006" def testTodoDuration(self): cal_url = "http://me:hunter2@calendar.example:80/" @@ -1102,24 +1046,18 @@ def testTodoDuration(self): ## set_due has "only" one if, so two code paths, one where dtstart is actually moved and one where it isn't my_todo2.set_due(some_date, move_dtstart=True) - assert my_todo2.icalendar_instance.subcomponents[0][ - "DTSTART" - ].dt == some_date - timedelta(days=6) + assert my_todo2.icalendar_instance.subcomponents[0]["DTSTART"].dt == some_date - timedelta(days=6) ## set_duration at the other hand has 5 code paths ... ## 1) DUE and DTSTART set, DTSTART as the movable component my_todo1.set_duration(timedelta(1)) assert my_todo1.get_due() == some_date - assert my_todo1.icalendar_instance.subcomponents[0][ - "DTSTART" - ].dt == some_date - timedelta(1) + assert my_todo1.icalendar_instance.subcomponents[0]["DTSTART"].dt == some_date - timedelta(1) ## 2) DUE and DTSTART set, DUE as the movable component my_todo1.set_duration(timedelta(2), movable_attr="DUE") assert my_todo1.get_due() == some_date + timedelta(days=1) - assert my_todo1.icalendar_instance.subcomponents[0][ - "DTSTART" - ].dt == some_date - timedelta(1) + assert my_todo1.icalendar_instance.subcomponents[0]["DTSTART"].dt == some_date - timedelta(1) ## 3) DUE set, DTSTART not set dtstart = my_todo1.icalendar_instance.subcomponents[0].pop("DTSTART").dt @@ -1178,9 +1116,7 @@ def testURL(self): assert url7 == "http://foo:bar@www.example.com:8080/bar" assert url8 == url1 assert url9 == url7 - assert ( - urlA == "http://foo:bar@www.example.com:8080/caldav.php/someuser/calendar" - ) + assert urlA == "http://foo:bar@www.example.com:8080/caldav.php/someuser/calendar" assert urlB == url1 with pytest.raises(ValueError): url1.join("http://www.google.com") @@ -1201,9 +1137,7 @@ def testURL(self): assert url7 == "http://foo:bar@www.example.com:8080/bar" assert url8 == url1 assert url9 == url7 - assert ( - urlA == "http://foo:bar@www.example.com:8080/caldav.php/someuser/calendar" - ) + assert urlA == "http://foo:bar@www.example.com:8080/caldav.php/someuser/calendar" assert urlB == url1 with pytest.raises(ValueError): url1.join("http://www.google.com") @@ -1224,19 +1158,14 @@ def testURL(self): assert url7.unauth() == "http://www.example.com:8080/bar" # 8) strip_trailing_slash - assert URL("http://www.example.com:8080/bar/").strip_trailing_slash() == URL( - "http://www.example.com:8080/bar" - ) + assert URL("http://www.example.com:8080/bar/").strip_trailing_slash() == URL("http://www.example.com:8080/bar") assert ( URL("http://www.example.com:8080/bar/").strip_trailing_slash() == URL("http://www.example.com:8080/bar").strip_trailing_slash() ) # 9) canonical - assert ( - URL("https://www.example.com:443/b%61r/").canonical() - == URL("//www.example.com/bar/").canonical() - ) + assert URL("https://www.example.com:443/b%61r/").canonical() == URL("//www.example.com/bar/").canonical() # 10) pickle assert pickle.loads(pickle.dumps(url1)) == url1 @@ -1244,11 +1173,7 @@ def testURL(self): def testFilters(self): filter = cdav.Filter().append( cdav.CompFilter("VCALENDAR").append( - cdav.CompFilter("VEVENT").append( - cdav.PropFilter("UID").append( - [cdav.TextMatch("pouet", negate=True)] - ) - ) + cdav.CompFilter("VEVENT").append(cdav.PropFilter("UID").append([cdav.TextMatch("pouet", negate=True)])) ) ) # print(filter) @@ -1273,12 +1198,7 @@ def test_calendar_comp_class_by_data(self): ): ## TODO: freebusy, time zone assert calendar._calendar_comp_class_by_data(ical) == class_ if ical != "random rantings" and ical: - assert ( - calendar._calendar_comp_class_by_data( - icalendar.Calendar.from_ical(ical) - ) - == class_ - ) + assert calendar._calendar_comp_class_by_data(icalendar.Calendar.from_ical(ical)) == class_ def testContextManager(self): """ @@ -1296,9 +1216,7 @@ def testExtractAuth(self): with DAVClient(url=cal_url) as client: assert client.extract_auth_types("Basic\n") == {"basic"} assert client.extract_auth_types("Basic") == {"basic"} - assert client.extract_auth_types('Basic Realm=foo;charset="UTF-8"') == { - "basic" - } + assert client.extract_auth_types('Basic Realm=foo;charset="UTF-8"') == {"basic"} assert client.extract_auth_types("Basic,dIGEST Realm=foo") == { "basic", "digest", diff --git a/tests/test_cdav.py b/tests/test_cdav.py index e9ba39a..18da668 100644 --- a/tests/test_cdav.py +++ b/tests/test_cdav.py @@ -49,9 +49,9 @@ def test_to_utc_date_string_dt_with_local_tz(): res = _to_utc_date_string(input.astimezone()) except: res = _to_utc_date_string(tzlocal.get_localzone()) - exp_dt = datetime.datetime( - 2019, 5, 14, 21, 10, 23, 23, tzinfo=tzlocal.get_localzone() - ).astimezone(datetime.timezone.utc) + exp_dt = datetime.datetime(2019, 5, 14, 21, 10, 23, 23, tzinfo=tzlocal.get_localzone()).astimezone( + datetime.timezone.utc + ) exp = exp_dt.strftime("%Y%m%dT%H%M%SZ") assert res == exp @@ -59,9 +59,9 @@ def test_to_utc_date_string_dt_with_local_tz(): def test_to_utc_date_string_naive_dt(): input = datetime.datetime(2019, 5, 14, 21, 10, 23, 23) res = _to_utc_date_string(input) - exp_dt = datetime.datetime( - 2019, 5, 14, 21, 10, 23, 23, tzinfo=tzlocal.get_localzone() - ).astimezone(datetime.timezone.utc) + exp_dt = datetime.datetime(2019, 5, 14, 21, 10, 23, 23, tzinfo=tzlocal.get_localzone()).astimezone( + datetime.timezone.utc + ) exp = exp_dt.strftime("%Y%m%dT%H%M%SZ") assert res == exp diff --git a/tests/test_vcal.py b/tests/test_vcal.py index 597d1eb..65fd127 100644 --- a/tests/test_vcal.py +++ b/tests/test_vcal.py @@ -75,9 +75,7 @@ def create_and_validate(**args): self.assertSameICal(create_and_validate(ical_fragment=ev), ev) ## One may add stuff to a fully valid ical_fragment - self.assertSameICal( - create_and_validate(ical_fragment=ev, priority=3), ev + "\nPRIORITY:3\n" - ) + self.assertSameICal(create_and_validate(ical_fragment=ev, priority=3), ev + "\nPRIORITY:3\n") ## binary string or unicode string ... shouldn't matter self.assertSameICal(