# Copyright (c) Microsoft Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import os
from typing import Any, Dict
import pytest
from playwright.sync_api import (
Browser,
BrowserContext,
Dialog,
Error,
Page,
TimeoutError,
sync_playwright,
)
from tests.server import Server
def test_sync_query_selector(page: Page) -> None:
page.set_content(
"""
Bar
"""
)
e1 = page.query_selector("#foo")
assert e1
e2 = page.query_selector("h1")
assert e2
assert e1.inner_text() == e2.inner_text()
def test_page_repr(page: Page, server: Server) -> None:
page.goto(server.EMPTY_PAGE)
assert repr(page) == f""
def test_frame_repr(page: Page, server: Server) -> None:
page.goto(server.EMPTY_PAGE)
assert (
repr(page.main_frame)
== f" "
)
def test_browser_context_repr(context: BrowserContext) -> None:
assert repr(context) == f""
def test_browser_repr(browser: Browser) -> None:
assert (
repr(browser)
== f""
)
def test_browser_type_repr(browser: Browser) -> None:
browser_type = browser._impl_obj._browser_type
assert (
repr(browser_type)
== f""
)
def test_dialog_repr(page: Page) -> None:
def on_dialog(dialog: Dialog) -> None:
dialog.accept()
assert (
repr(dialog)
== f""
)
page.on("dialog", on_dialog)
page.evaluate("alert('yo')")
def test_console_repr(page: Page) -> None:
messages = []
page.on("console", lambda m: messages.append(m))
page.evaluate('() => console.log("Hello world")')
message = messages[0]
assert repr(message) == f""
def test_sync_click(page: Page) -> None:
page.set_content(
"""
Bar
"""
)
page.click("text=Bar")
assert page.evaluate("()=>window.clicked")
def test_sync_nested_query_selector(page: Page) -> None:
page.set_content(
"""
MyValue
"""
)
e1 = page.query_selector("#one")
assert e1
e2 = e1.query_selector(".two")
assert e2
e3 = e2.query_selector("label")
assert e3
assert e3.inner_text() == "MyValue"
def test_sync_handle_multiple_pages(context: BrowserContext) -> None:
page1 = context.new_page()
page2 = context.new_page()
assert len(context.pages) == 2
page1.set_content("one")
page2.set_content("two")
assert "one" in page1.content()
assert "two" in page2.content()
page1.close()
assert len(context.pages) == 1
page2.close()
assert len(context.pages) == 0
for page in [page1, page2]:
with pytest.raises(Error):
page.content()
def test_sync_wait_for_event(page: Page, server: Server) -> None:
with page.expect_event("popup", timeout=10000) as popup:
page.evaluate("(url) => window.open(url)", server.EMPTY_PAGE)
assert popup.value
def test_sync_wait_for_event_raise(page: Page) -> None:
with pytest.raises(Error):
with page.expect_event("popup", timeout=500) as popup:
assert False
assert popup.value is None
def test_sync_make_existing_page_sync(page: Page) -> None:
page = page
assert page.evaluate("() => ({'playwright': true})") == {"playwright": True}
page.set_content("myElement ")
page.wait_for_selector("text=myElement")
def test_sync_network_events(page: Page, server: Server) -> None:
server.set_route(
"/hello-world",
lambda request: (
request.setHeader("Content-Type", "text/plain"),
request.write(b"Hello world"),
request.finish(),
),
)
page.goto(server.EMPTY_PAGE)
messages = []
page.on(
"request", lambda request: messages.append(f">>{request.method}{request.url}")
)
page.on(
"response",
lambda response: messages.append(f"<<{response.status}{response.url}"),
)
response = page.evaluate("""async ()=> (await fetch("/hello-world")).text()""")
assert response == "Hello world"
assert messages == [
f">>GET{server.PREFIX}/hello-world",
f"<<200{server.PREFIX}/hello-world",
]
def test_console_should_work(page: Page, browser_name: str) -> None:
messages = []
page.once("console", lambda m: messages.append(m))
page.evaluate('() => console.log("hello", 5, {foo: "bar"})')
assert len(messages) == 1
message = messages[0]
if browser_name != "firefox":
assert message.text == "hello 5 {foo: bar}"
assert str(message) == "hello 5 {foo: bar}"
else:
assert message.text == "hello 5 JSHandle@object"
assert str(message) == "hello 5 JSHandle@object"
assert message.type == "log"
assert message.args[0].json_value() == "hello"
assert message.args[1].json_value() == 5
assert message.args[2].json_value() == {"foo": "bar"}
def test_sync_download(browser: Browser, server: Server) -> None:
server.set_route(
"/downloadWithFilename",
lambda request: (
request.setHeader("Content-Type", "application/octet-stream"),
request.setHeader("Content-Disposition", "attachment; filename=file.txt"),
request.write(b"Hello world"),
request.finish(),
),
)
page = browser.new_page(accept_downloads=True)
page.set_content(f'download ')
with page.expect_event("download") as download:
page.click("a")
assert download.value
assert download.value.suggested_filename == "file.txt"
path = download.value.path()
assert os.path.isfile(path)
with open(path, "r") as fd:
assert fd.read() == "Hello world"
page.close()
def test_sync_workers_page_workers(page: Page, server: Server) -> None:
with page.expect_event("worker") as event_worker:
page.goto(server.PREFIX + "/worker/worker.html")
assert event_worker.value
worker = page.workers[0]
assert "worker.js" in worker.url
assert worker.evaluate('() => self["workerFunction"]()') == "worker function result"
page.goto(server.EMPTY_PAGE)
assert len(page.workers) == 0
def test_sync_playwright_multiple_times() -> None:
with pytest.raises(Error) as exc:
with sync_playwright() as pw:
assert pw.chromium
assert (
"It looks like you are using Playwright Sync API inside the asyncio loop."
in exc.value.message
)
def test_sync_set_default_timeout(page: Page) -> None:
page.set_default_timeout(1)
with pytest.raises(TimeoutError) as exc:
page.wait_for_function("false")
assert "Timeout 1ms exceeded." in exc.value.message
def test_close_should_reject_all_promises(context: BrowserContext) -> None:
new_page = context.new_page()
with pytest.raises(Error) as exc_info:
new_page._gather(
lambda: new_page.evaluate("() => new Promise(r => {})"),
lambda: new_page.close(),
)
assert "Target closed" in exc_info.value.message
def test_expect_response_should_work(page: Page, server: Server) -> None:
with page.expect_response("**/*") as resp:
page.goto(server.EMPTY_PAGE)
assert resp.value
assert resp.value.url == server.EMPTY_PAGE
assert resp.value.status == 200
assert resp.value.ok
assert resp.value.request
def test_expect_response_should_use_context_timeout(
page: Page, context: BrowserContext, server: Server
) -> None:
page.goto(server.EMPTY_PAGE)
context.set_default_timeout(1_000)
with pytest.raises(Error) as exc_info:
with page.expect_response("https://playwright.dev"):
pass
assert exc_info.type is TimeoutError
assert "Timeout 1000ms exceeded" in exc_info.value.message
def _test_sync_playwright_stop_multiple_times() -> None:
playwright = sync_playwright().start()
playwright.stop()
playwright.stop()
def test_sync_playwright_stop_multiple_times() -> None:
p = multiprocessing.Process(target=_test_sync_playwright_stop_multiple_times)
p.start()
p.join()
assert p.exitcode == 0
def _test_call_sync_method_after_playwright_close_with_own_loop(
browser_name: str,
launch_arguments: Dict[str, Any],
empty_page: str,
) -> None:
playwright = sync_playwright().start()
browser = playwright[browser_name].launch(**launch_arguments)
context = browser.new_context()
page = context.new_page()
page.goto(empty_page)
playwright.stop()
with pytest.raises(Error) as exc:
page.evaluate("1+1")
assert "Event loop is closed! Is Playwright already stopped?" in str(exc.value)
def test_call_sync_method_after_playwright_close_with_own_loop(
server: Server, browser_name: str, launch_arguments: Dict[str, Any]
) -> None:
p = multiprocessing.Process(
target=_test_call_sync_method_after_playwright_close_with_own_loop,
args=[browser_name, launch_arguments, server.EMPTY_PAGE],
)
p.start()
p.join()
assert p.exitcode == 0