Skip to content

Тесты, написанные для данного проекта

conftest

Actions before every run of all tests.

test_actions_before_all_tests()

Remove backup and tmp folders and everything inside them. Then, create tmp folder (it's needed for tests only).

Source code in tests/conftest.py
def test_actions_before_all_tests():
    """Remove `backup` and `tmp` folders and everything inside them.
    Then, create `tmp` folder (it's needed for tests only).
    """
    backup_parent = BACKUP.parent
    if backup_parent and backup_parent.exists():
        rmtree(backup_parent)
    tmp = TMP
    if tmp.exists():
        rmtree(tmp)
    tmp.mkdir()

scenarios.py

run_and_wait(scheduler)

Simplest scenario

Source code in tests/scenarios.py
def run_and_wait(scheduler):
    """Simplest scenario"""
    scheduler.run()
    sleep(TICK)
    scheduler.join()

run_stop_and_restart(scheduler)

Run scheduler, stop it and restart scheduler from backup

Source code in tests/scenarios.py
def run_stop_and_restart(scheduler):
    """Run scheduler, stop it and restart scheduler from backup"""
    scheduler.run()
    sleep(TICK)
    scheduler.stop()
    sleep(2 * TICK)
    del scheduler
    scheduler_new = Scheduler()
    scheduler_new.restart()
    scheduler_new.join()

schedule_jobs_sequentially(targets)

Do jobs one after another. This is one of basic scenarios for some tests.

Source code in tests/scenarios.py
def schedule_jobs_sequentially(targets: list[partial]):
    """Do jobs one after another.
    This is one of basic scenarios for some tests.
    """
    scheduler = Scheduler()
    ids: list[str] = []
    for target in targets:
        job = Job(
            [target],
            dependencies=(*ids,),
        )

        scheduler.schedule(job)
        ids.append(job.get_id())
    return scheduler

test_fs

This file is for tests, required by the initial statement of work (SoW):

  • работа с файловой системой: создание, удаление, изменение директорий и файлов
  • работа с файлами: создание, чтение, запись

job_fs_create_folders(names)

Create folders

Source code in tests/test_fs.py
def job_fs_create_folders(names: list):
    """Create folders"""
    dir_ = TMP
    dir_.mkdir(parents=False, exist_ok=True)
    for name in names:
        dir_ /= name
        dir_.mkdir(parents=False, exist_ok=True)
    return f"A chain of folders is created '{names[0]}' ... '{names[-1]}'"

job_fs_delete_folders(names)

Delete a chain of folders one-by-one from the end to the beginning

Source code in tests/test_fs.py
def job_fs_delete_folders(names: list):
    """Delete a chain of folders one-by-one from the end to the beginning"""
    dir_ = TMP
    for name in names:
        dir_ /= name
    while dir_ != TMP:
        dir_.rmdir()
        dir_ = dir_.parent
    return f"A chain of folders is deleted '{names[0]}' ... '{names[-1]}'"

job_fs_modify_folders(names, names_new)

Rename folders

Source code in tests/test_fs.py
def job_fs_modify_folders(names: list, names_new: list):
    """Rename folders"""
    dir_ = TMP
    for i, _ in enumerate(names):
        dir_old = dir_ / names[i]
        dir_new = dir_ / names_new[i]

        dir_ = dir_old.rename(dir_new)
    return f"A chain of folders is renamed '{names_new[0]}' ... '{names_new[-1]}'"

names_of_principles()

Return few Zen of Python principles. They are used later in tests as an alternative to 'Lorem Ipsum'.

Source code in tests/test_fs.py
@fixture
def names_of_principles():
    """Return few Zen of Python principles.
    They are used later in tests as an alternative to 'Lorem Ipsum'.
    """
    zen_of_python = get_zen_of_python()
    lst1 = []  # original principles
    for line in zen_of_python.split('\n')[2:8]:
        symbols = ",.*!'"
        for symbol in symbols:
            line = line.replace(symbol, '')
        lst1.append(line)

    lst2 = []  # reverted (modified) principles
    for line in lst1:
        words = line.split(' ')
        line_new = ' '.join([words[-1], *words[1:-1], words[0]])
        line_new = line_new.replace('better', 'worse').capitalize()
        lst2.append(line_new)
    return lst1, lst2

test_fs_directories(names_of_principles)

Create directories, modify their names and delete them.

Source code in tests/test_fs.py
def test_fs_directories(names_of_principles):
    """Create directories, modify their names and delete them."""
    target_create = partial(job_fs_create_folders, names_of_principles[0])
    target_modify = partial(job_fs_modify_folders, *names_of_principles)
    target_delete = partial(job_fs_delete_folders, names_of_principles[1])

    scheduler = schedule_jobs_sequentially([target_create, target_modify, target_delete])
    run_and_wait(scheduler)

test_fs_files(names_of_principles)

Create files, write text in them, modify text inside and, the last, read them.

Source code in tests/test_fs.py
def test_fs_files(names_of_principles):
    """Create files, write text in them, modify text inside and, the last, read them."""
    target_create = partial(job_fs_create_files, names_of_principles[0], names_of_principles[0])
    target_modify = partial(job_fs_modify_files, names_of_principles[0], names_of_principles[1])
    target_read = partial(job_fs_read_files, names_of_principles[0])

    scheduler = schedule_jobs_sequentially((target_create, target_modify, target_read,))
    run_stop_and_restart(scheduler)

test_web

This file is for tests, required by the initial statement of work (SoW):

  • работа с сетью: обработка ссылок (GET-запросы) и анализ полученного результата

job_request_weather(city)

This could be a fixture (a function which returns an inner function), but in this case we get an error "Can't pickle local object 'request_weather.._request_weather'"

Source code in tests/test_web.py
def job_request_weather(city):  # web_job
    """This could be a fixture (a function which returns an inner function),
    but in this case we get an error
    "Can't pickle local object 'request_weather.<locals>._request_weather'"
    """
    url = get_url_by_city_name(city)

    try:
        response = YandexWeatherAPI.get_forecasting(url)
        if 'forecasts' not in response:
            raise Exception("Response does not have the forecast data")
    except Exception as e:
        warning = f"city {city} is skipped because of exception: {e}"
        raise Exception(warning)
    else:
        if not TMP.exists():
            TMP.mkdir(parents=True)

        raw_json_path = TMP / f'{city}.json'
        with open(raw_json_path, 'w+') as raw_json:
            json_dump(response, raw_json)
        result = f"Request is written in the file {raw_json_path.stem}."
        logger.debug(result)

        return result

test_web_job(cities_fixture)

Test few web jobs

Source code in tests/test_web.py
def test_web_job(cities_fixture):
    """Test few web jobs"""
    cities = cities_fixture
    scheduler = Scheduler(pool_size=12)
    for city in cities:
        job = Job(
            [partial(job_request_weather, city)],
        )
        scheduler.schedule(job)
    scheduler.run()
    scheduler.join()

test_web_job1()

Test one web job

Source code in tests/test_web.py
def test_web_job1():
    """Test one web job"""
    job = Job(
        [partial(job_request_weather, "SPETERSBURG")],
    )
    scheduler = Scheduler(pool_size=6)
    scheduler.schedule(job)
    scheduler.run()
    scheduler.join()

test_multi

The file is for tests, required by the initial statement of work (SoW):

  • описать конвейер выполнения основной задачи минимум из 3 задач, зависящих друг от друга и выполняющихся последовательно друг за другом

test_multi_job()

Test a conveyor (pipeline) of 3+ jobs.

Source code in tests/test_multi.py
def test_multi_job():
    """Test a conveyor (pipeline) of 3+ jobs."""
    target_create_dirs = partial(job_create_dirs, )
    target_create_files = partial(job_create_files, )
    target_write_in_files = partial(job_write_in_files, )
    scheduler = schedule_jobs_sequentially((target_create_dirs,
                                            target_create_files,
                                            target_write_in_files,
                                            ))
    run_and_wait(scheduler)

test_multi

Very simple tests of class Job.

test_integration

This file is for just calculation tests. These tests don't require the internet and the file system.

test_calculations(fixture_for_power)

Test "three simple jobs for a scheduler"

Parameters:

Name Type Description Default
fixture_for_power fixture

a fixture with variables for a power function a**b

required
Source code in tests/test_calculations.py
def test_calculations(fixture_for_power: tuple) -> None:
    """
    Test "three simple jobs for a scheduler"

    Parameters
    ----------
    fixture_for_power : fixture
        a fixture with variables for a power function a**b
    """
    tuples = fixture_for_power

    jobs = [Job([partial(power, *args_), ]) for args_ in tuples]
    scheduler = Scheduler()

    for job in jobs:
        assert isinstance(job, Job)
        scheduler.schedule(job)

    scheduler.run()
    scheduler.join()

test_calculations_with_a_stop(fixture_for_power)

Test "three jobs for a scheduler with stop and rerun"

Parameters:

Name Type Description Default
fixture_for_power fixture

a fixture with variables for a power function a**b

required
Source code in tests/test_calculations.py
def test_calculations_with_a_stop(fixture_for_power: tuple) -> None:
    """
    Test "three jobs for a scheduler with stop and rerun"

    Parameters
    ----------
    fixture_for_power : fixture
        a fixture with variables for a power function a**b
    """
    tuples = fixture_for_power

    job1 = Job([partial(power, *tuples[0])])
    scheduler = Scheduler()
    scheduler.schedule(job1)

    id1 = job1.get_id()
    job2 = Job(
        targets=[partial(power, *tuples[1])],
        start_at=str(datetime.now() + timedelta(seconds=8)),
        max_working_time=16,
        tries=6,
        dependencies=(id1,)
    )
    scheduler.schedule(job2)

    id2 = job2.get_id()
    job3 = Job(
        targets=[partial(power, *tuples[2])],
        start_at=str(datetime.now() + timedelta(seconds=16)),
        max_working_time=8,
        tries=3,
        dependencies=(id1, id2)
    )
    scheduler.schedule(job3)

    scheduler.run()
    sleep(4 * TICK)
    scheduler.stop()
    sleep(8 * TICK)
    del scheduler
    scheduler_new = Scheduler()
    scheduler_new.restart()
    sleep(TICK)
    scheduler_new.join()