Skip to content

API reference

Core

predict(text: str) -> Prediction

Synchronously predict appeal classification

Parameters:

  • text (str) –

    Appeal text

Returns:

Source code in src/promobot_appeal_processing/main.py
 9
10
11
12
13
14
15
16
17
18
19
def predict(text: str) -> Prediction:
    """
    Synchronously predict appeal classification

    Args:
        text: Appeal text

    Returns:
        Predicted classification
    """
    return model.predict([text])

predict_async(text: str) -> Prediction async

Asynchronous wrapper around the predict function

Example
from fastapi import FastAPI, Body
from promobot_appeal_processing import predict_async

app = FastAPI()

@app.post("/api")
async def api(text: str = Body(embed=True)):
    return await predict_async(text)

Parameters:

  • text (str) –

    Appeal text

Returns:

Source code in src/promobot_appeal_processing/main.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async def predict_async(text: str) -> Prediction:
    """
    Asynchronous wrapper around the ``predict`` function

    Example:
        ```py hl_lines="8"
        from fastapi import FastAPI, Body
        from promobot_appeal_processing import predict_async

        app = FastAPI()

        @app.post("/api")
        async def api(text: str = Body(embed=True)):
            return await predict_async(text)
        ```

    Args:
        text: Appeal text

    Returns:
        Predicted classification
    """
    return await asyncio.to_thread(predict, text)

predict_many(texts: Iterable[str], max_workers: int) -> tuple[Prediction] async

Wrapper around the predict function that executes the prediction tasks in parallel using max_workers processes

Example
import aiofiles
import aiocsv
import csv
import io
from charset_normalizer import from_bytes
from dataclasses import asdict, fields
from fastapi import FastAPI, UploadFile
from fastapi.responses import FileResponse
from promobot_appeal_processing import predict_many, Prediction

app = FastAPI()

@app.post("/file")
async def file(file: UploadFile):
    contents = str(from_bytes(await file.read()).best()).strip() # (1)!
    csv_reader = csv.DictReader(io.StringIO(contents), delimiter=";") # (2)!
    rows = tuple(csv_reader)
    texts = map(lambda row: row["text"], rows)

    data = map(
        lambda row, prediction: row | asdict(prediction),
        rows,
        await predict_many(texts, max_workers=4)
    )

    async with aiofiles.open(path := "temp.csv", "w", encoding="utf-8") as out_file:
        csv_writer = aiocsv.AsyncDictWriter(
            out_file,
            fieldnames=[
                *csv_reader.fieldnames,
                *map(lambda field: field.name, fields(Prediction)) # (3)!
            ]
        )
        await csv_writer.writeheader()

        for row in data:
            await csv_writer.writerow(row)

    return FileResponse(
        path,
        media_type="text/csv",
        filename="predictions.csv"
    )
  1. Trying to properly decode binary UploadFile
  2. European versions of Microsoft Excel export .csv files with semicolon delimiter by default. If your .csv file has comma as a delimiter you can remove this parameter (comma is the default parameter value).
  3. Adding predicted columns to the output .csv file

Parameters:

  • texts (Iterable[str]) –

    Appeal texts

  • max_workers (int) –

    Maximum number of parallel processes (passed to asyncio.ProcessPoolExecutor)

Returns:

  • tuple[Prediction]

    Predicted classifications for all appeal texts (in respective order)

Source code in src/promobot_appeal_processing/main.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
async def predict_many(texts: Iterable[str], max_workers: int) -> tuple[Prediction]:
    """
    Wrapper around the ``predict`` function that executes the prediction tasks in parallel
    using ``max_workers`` processes

    Example:
        ```py hl_lines="23"
        import aiofiles
        import aiocsv
        import csv
        import io
        from charset_normalizer import from_bytes
        from dataclasses import asdict, fields
        from fastapi import FastAPI, UploadFile
        from fastapi.responses import FileResponse
        from promobot_appeal_processing import predict_many, Prediction

        app = FastAPI()

        @app.post("/file")
        async def file(file: UploadFile):
            contents = str(from_bytes(await file.read()).best()).strip() # (1)!
            csv_reader = csv.DictReader(io.StringIO(contents), delimiter=";") # (2)!
            rows = tuple(csv_reader)
            texts = map(lambda row: row["text"], rows)

            data = map(
                lambda row, prediction: row | asdict(prediction),
                rows,
                await predict_many(texts, max_workers=4)
            )

            async with aiofiles.open(path := "temp.csv", "w", encoding="utf-8") as out_file:
                csv_writer = aiocsv.AsyncDictWriter(
                    out_file,
                    fieldnames=[
                        *csv_reader.fieldnames,
                        *map(lambda field: field.name, fields(Prediction)) # (3)!
                    ]
                )
                await csv_writer.writeheader()

                for row in data:
                    await csv_writer.writerow(row)

            return FileResponse(
                path,
                media_type="text/csv",
                filename="predictions.csv"
            )
        ```

        1.  Trying to properly decode binary ``UploadFile``
        2.  European versions of :simple-microsoftexcel:{ .excel } **Microsoft Excel** export ``.csv`` files
            with **semicolon** delimiter by default. If your ``.csv`` file has **comma** as a delimiter
            you can remove this parameter (**comma** is the default parameter value).
        3.  Adding predicted columns to the output ``.csv`` file

    Args:
        texts: Appeal texts
        max_workers: Maximum number of parallel processes (passed to ``asyncio.ProcessPoolExecutor``)

    Returns:
        Predicted classifications for all appeal texts (in respective order)
    """
    loop = asyncio.get_running_loop()
    executor = ProcessPoolExecutor(max_workers=max_workers)

    return tuple(
        await asyncio.gather(
            *(loop.run_in_executor(executor, predict, text) for text in texts)
        )
    )

Prediction(topic: str, topic_group: str, executor: str) dataclass

topic: str instance-attribute

topic_group: str instance-attribute

executor: str instance-attribute

Geography submodule

predict_geography(text: str) -> tuple[str]

Extract geographical object names from the given text

Parameters:

  • text (str) –

    Appeal text

Returns:

  • tuple[str]

    Geographical names

Source code in src/promobot_appeal_processing/geography/main.py
11
12
13
14
15
16
17
18
19
20
21
def predict_geography(text: str) -> tuple[str]:
    """
    Extract geographical object names from the given ``text``

    Args:
        text: Appeal text

    Returns:
        Geographical names
    """
    return geography.predict(text)

predict_geography_async(text: str) -> tuple[str] async

Asynchronous wrapper around the predict_geography function

Parameters:

  • text (str) –

    Appeal text

Returns:

  • tuple[str]

    Geographical names

Source code in src/promobot_appeal_processing/geography/main.py
24
25
26
27
28
29
30
31
32
33
34
async def predict_geography_async(text: str) -> tuple[str]:
    """
    Asynchronous wrapper around the ``predict_geography`` function

    Args:
        text: Appeal text

    Returns:
        Geographical names
    """
    return await asyncio.to_thread(predict_geography, text)

predict_geography_many(texts: Iterable[str], max_workers: int) -> tuple[tuple[str]] async

Wrapper around the predict_geography function that executes the geography name extraction tasks in parallel using max_workers processes

Parameters:

  • texts (Iterable[str]) –

    Appeal texts

  • max_workers (int) –

    Maximum number of parallel processes (passed to asyncio.ProcessPoolExecutor)

Returns:

  • tuple[tuple[str]]

    Geographical names from all appeal texts (in respective order)

Source code in src/promobot_appeal_processing/geography/main.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async def predict_geography_many(texts: Iterable[str], max_workers: int) -> tuple[tuple[str]]:
    """
    Wrapper around the ``predict_geography`` function that executes the geography name extraction tasks
    in parallel using ``max_workers`` processes

    Args:
        texts: Appeal texts
        max_workers: Maximum number of parallel processes (passed to ``asyncio.ProcessPoolExecutor``)

    Returns:
        Geographical names from all appeal texts (in respective order)
    """
    loop = asyncio.get_running_loop()
    executor = ProcessPoolExecutor(max_workers=max_workers)

    return tuple(
        await asyncio.gather(
            *(loop.run_in_executor(executor, predict_geography, text) for text in texts)
        )
    )