Developing asynchronous or scheduled tasks¶
The ucam-faas Python
library allows us to write functions to be triggered by PubSub messages or on schedules. It has
support to automatically parse and deserialise either JSON or Protobuf messages if they are
present in the triggering PubSub message.
It is anticipated that the majority task functions will be triggered on a schedule or a PubSub topic, but not both.
This reference document assumes a tasks collection repository has been set up following the howto
instructions, with a tasks module in the
root of the repository called my_tasks.
Scheduled tasks¶
Scheduled tasks are the simplest form. Tasks running on a schedule do not expect to read message contents.
The ucam_faas.raw_event handler can be used for these cases:
# my_tasks/main.py
import ucam_faas
@ucam_faas.raw_event
def my_task(event: bytes) -> None:
# Do work
pass
PubSub triggered tasks¶
For tasks triggered by messages to PubSub topics there are a number of options for reading the PubSub message contents out and making use of it within the function.
Some tasks may not need to read the message contents at all, in these cases the raw_event handler
(as above) can be used.
For tasks that do need to read the message content, use one of the following options instead.
JSON message content¶
To parse JSON messages content you will first need to define a
pydantic model that corresponds to the JSON structure
expected.
This can be done in a separate library, or in the tasks module directly:
# my_tasks/events.py
import pydantic
class MyEvent(pydantic.BaseModel):
informative_number: int
process_user: str
The above model can be serialised to/deserialised from JSON matching the structure:
{
"informative_number": 42,
"process_user": "my user"
}
If the PubSub topic will have messages written to it in this structure, we can deserialise those
messages and pass them to our task function by using ucam_faas.message_handler decorator:
# my_tasks/main.py
import ucam_faas
from .events import MyEvent
@ucam_faas.message_handler(message_type=MyEvent)
def my_task(event: MyEvent) -> None:
print(event.informative_number)
print(event.process_user)
Protobuf message content¶
To parse protobuf messages, a very similar process can be undertaken. It is expected that if you are using protobuf messages in your PubSub topics that the message schemas are defined in a separate repository, and compatible Python code is being automatically generated for use, following the instructions in this howto guide.
You will need to add your message schema library as a dependency of the tasks collection, then you can import the models and use them in the same manner as the pydantic models.
If you had a protobuf schema reading:
syntax = "proto3"
package v1;
message MyEvent {
int32 informative_number = 1;
string process_user = 2;
}
which was generated into a client Python library called my_messages, this could then be used:
# my_tasks/main.py
import ucam_faas
import my_messages.v1
@ucam_faas.message_handler(message_type=my_messages.v1.MyEvent)
def my_task(event: my_messages.v1.MyEvent) -> None:
print(event.informative_number)
print(event.process_user)
Other message processing¶
If necessary, you can also supply your own parser function to process the PubSub message payload and pass the result to your task function as an argument. This is the most flexible (but complicated) option and is suitable if you have a non-standard schema or little control of the message contents you are receiving.
For example, if you know the PubSub message payload will contain only a ISO format date stamp, and you wish to process it as a datetime object you could:
# my_tasks/main.py
from datetime import datetime
import ucam
def parse_iso_format_from_bytes(raw_message: bytes) -> datetime:
return datetime.fromisoformat(raw_message.decode("utf-8"))
@ucam_faas.message_handler(message_type=parse_iso_format_from_bytes)
def my_task(received_datetime: datetime) -> None:
print(received_datetime.day)
Execution info¶
The ucam-faas library provides a standard way to communicate the execution status of a task. This
is done by returning an ExecutionInfo object from the task function.
ExecutionInfo objects can be subclassed to provide additional information on the status of a task:
# my_tasks/main.py
import ucam_faas
from .events import MyEvent
class MyTaskMessageHandlerExecutionInfo(ucam_faas.MessageHandlerExecutionInfo):
message: str
@ucam_faas.message_handler(message_type=MyEvent)
def my_task(event: MyEvent) -> None:
print(event.informative_number)
print(event.process_user)
return MyTaskMessageHandlerExecutionInfo(
message=f"Gave {event.process_user} {event.informative_number} new tasks :)"
)
Communicating errors¶
If something goes wrong during processing of a task function, any raised exception in the function will cause the task to be marked as a failure. Failing tasks are retried a set number of times (this is configured in the infrastructure environment - see the terraform module) and if the same error persists the message will be sent to the dead letter queue.
Any raised exception will cause this behaviour, however a special UCAMFAASException exception
class is also provided to communicate when an error is "more expected" than normal. This would
typically be raised if, during processing, something unexpected happened and the function needs to
stop, but did not otherwise have any faults.
Note
There is no explicit need to raise errors only as UCAMFAASException sub-classes, all exceptions
are treated as errors and will be handled in the same way.