t.me/atinfo_chat Telegram группа по автоматизации тестирования

Как тестировать websocket с помощью pytest

websocket
pytest
api
python
Теги: #<Tag:0x00007f9e44dc25a0> #<Tag:0x00007f9e44dc2460> #<Tag:0x00007f9e44dc22f8> #<Tag:0x00007f9e44dc21b8>

(Serhii Qa) #1

Такой вопрос:
Нужно для автотестов(на pytest) забирать данные из вебсокета, но проблема в однопоточности пайтона, я не могу одновременно открыть конект к сокету и делать действия с апи действия.
Пример нужного теста:

  1. Открыть коннект к соккету:
  2. Дернуть метод АПИ, который в сокет отправит данные
  3. забрать из сокета данные
  4. закрыть конект сокету

Для того что бы слушать сокет я использую https://github.com/websocket-client/websocket-client

Кто то может подсказать направление или как такое реализовать? Если кто то поделится кодом то буду очень благодарен.


(Sergei) #2

А можно подробнее, что это за проблема такая?) Это же не джаваскрипт, у которого действительно только один поток. И питон как раз-таки использует настоящие сишные потоки. Другое дело - ограничения GIL, но на I/O это не оказывает влияния, писать и читать можно хоть в десятки тысяч потоков. А если GIL мешает, то тогда multiprocessing в помощь.


(Sergei) #3

Да и многопоточность здесь необязательна, можно сделать и через корутины. Н-р код, который в главном (наверное, фз что у asyncio под капотом, вроде должно быть генератор-бэйзд) потоке одновременно запускает и сервер и клиент:

import asyncio
import websockets

async def hello(websocket, path):
    name = await websocket.recv()
    print(f"< {name}")

    greeting = f"Hello {name}!"

    await websocket.send(greeting)
    print(f"> {greeting}")

start_server = websockets.serve(hello, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)

async def hello():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        name = input("What's your name? ")

        await websocket.send(name)
        print(f"> {name}")

        greeting = await websocket.recv()
        print(f"< {greeting}")

asyncio.get_event_loop().run_until_complete(hello())
$ python ws.py
What's your name? Sergei
> Sergei
< Sergei
> Hello Sergei!
< Hello Sergei!