Python SDK: Diving into Workers and Workflows

In our previous post, we announced that we have our Python SDK in GA and shared a little bit of history around why we built what we did and some prerequisites you needed to get going. Now, let’s dive in a bit deeper to get you past up and running. For …


This content originally appeared on DEV Community and was authored by Rain Leander

In our previous post, we announced that we have our Python SDK in GA and shared a little bit of history around why we built what we did and some prerequisites you needed to get going. Now, let’s dive in a bit deeper to get you past up and running. For the purposes of this post, we are going to break down the single file shown in our hello_activity.py example and split it into multiple files so we more closely match what your application might look like!

The Workflow Code: workflow.py

We want to create the workflow first, because the worker class is going to call this. If we don’t do this first, then your code won’t run, and that isn’t any fun.

The top of our file, like any good python file, has the imports that we need to make this thing go. We are also defining a python dataclass which will be the object we use to pass data to the workflow. By passing a python dataclass instead of multiple parameters to the workflow, we can add or remove fields from the dataclass instead of modifying how the workflow is called. This allows some flexibility to make future changes without needing to version the workflow.

In this case, ComposeGreetingInput allows us to pass a greeting string and a name string into the workflow.

from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow

# Temporal strongly encourages using a single dataclass so 
# that you can add fields in a backwards-compatible way.
@dataclass
class ComposeGreetingInput:
    greeting: str
    name: str

Just below ComposeGreetingInput we are going to add two new pieces of code, first is an activity.

The Temporal Python SDK comes with decorators that do some work for you. In the case of an activity, it registers this method as a valid activity that can run by the Worker (see below). An activity takes in some data, processes it, and can return values. (in effect, it’s just like any other method). The one thing that we should call out, is that if you’re going to do anything that is non-deterministic, like generating a UUID, you want to do that in your activities or the methods that your activities call out to.

compose_greeting takes in the ComposeGreetingInput dataclass object that we defined above, logs some information to the terminal, and then returns concatenated strings from the object passed in. We could just as easily have pushed data to a database, done some computation, or even made a call to a third-party API in this method.

# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
    activity.logger.info("Running activity with parameter %s" % input)
    return f"{input.greeting}, {input.name}!"

Now that we have an activity, we are going to add in our Workflow class. This code is the backbone for your Temporal script. Everything that Temporal will run for you in this example is inside a Workflow class.
We decorate the class with workflow.defn to register this properly, so Temporal Worker (see below) knows that this is a valid Workflow Class.
The first method in this example class is run, but there are a couple things to call out:
The run method is decorated with @workflow.run, which tells the Worker that this is the method to run when we start a workflow. It can be used to set up the workflow variables, call out to one or more activities, and so many more things. To learn more, check out:

The method itself is an async method, because Temporal’s python library uses asyncio under the hood and the methods called need to be able to be run asynchronously.
The workflow run method expects a string name to be passed when it is called by the Worker

Within the method, we are calling Temporals workflow.execute_activity method, which takes:

  • An activity method reference, in this case compose_greeting
  • Arguments, which in this case we are using a feature of dataclasses to pass in the greeting “Hello” and the name name to the ComposeGreetingInput dataclass we created earlier.
  • A timeout. In this case, we’re providing start_to_close_timeout, which tells Temporal Server to time out this activity 10 seconds from whenever the activity starts.
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        workflow.logger.info("Running workflow with parameter %s" % name)
        return await workflow.execute_activity(
            compose_greeting,
            ComposeGreetingInput("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
        )

When you’re all done with this file, it will look like this:

workflow.py

from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow

@dataclass
class ComposeGreetingInput:
    greeting: str
    name: str

# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
    activity.logger.info("Running activity with parameter %s" % input)
    return f"{input.greeting}, {input.name}!"

# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        workflow.logger.info("Running workflow with parameter %s" % name)
        return await workflow.execute_activity(
            compose_greeting,
            ComposeGreetingInput("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
        )

Now that we have the Workflow code, we’ll tie this all together with the Worker, which is the code that Temporal uses to execute your Workflow code asynchronously as tasks show up on the queue. In this case, our example is very simple so the script will add the task to the queue right after creating the worker. In our samples repo, you will find many more in depth examples of how to build applications where different scripts outside of these two can make the workflow do other tasks asynchronously. You can even ask the workflow about itself or for the data that it’s holding for you, using a query.

Execution Code: worker.py

Let’s set up the top of our script, so that we have all the right imports and environment. This will be the skeleton of our application:

worker.py

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting

interrupt_event = asyncio.Event()

async def main():
    print("Hello, world!")

if __name__ == "__main__":
   asyncio.run(main())

This example will run, in the state above, but all it will do is print “Hello, world!” to your terminal at this point.

In order to take advantage of Temporal, we need our code to talk to the Temporal Server. We connect to the server via a Client. Our code will send activities, signals, and queries to the Temporal task queue inside the Temporal Server via this connection. When these tasks on the queue are executed, they are committed to the workflow history and this allows Temporal to know exactly what code has been run, what code is left to run, and what the state of your application is at any given moment in time!

Create a Client

Replace the main() function above with the client connection code, using the same default localhost url path and port for Temporal Server:

async def main():
   # Uncomment the line below to see logging
   # logging.basicConfig(level=logging.INFO)

   # Start client
   client = await Client.connect("localhost:7233")

Note: we are not using http://localhost:7233, but instead localhost:7233!

Next, we want to add what we call a worker, which is the piece of code that actually calls our workflow code for the activities on the queue.

Creating the worker

On the line just below the client, inside of main(), add the following:

   # Run a worker for the workflow
   async with Worker(
       client,
       task_queue="hello-activity-task-queue",
       workflows=[GreetingWorkflow],
       activities=[compose_greeting],
   ):
       print("Still saying ‘hello’ to you, world!")

Running this code tells Temporal Server that there is a worker ready to process tasks by passing the following information:

  • client - Allows the worker to reach out and say “I’m here, Temporal Server, give me work!”
  • task_queue - Tells Temporal Server, “I am only set up to process tasks from this queue”
  • workflows - An list of python class references, called Workflows (see below), written specifically to handle running activities that process the tasks on the requested task_queue
  • activities - An array of the python function references that can process tasks on the task queue

At this point, your code will look like this. All this code is doing is connecting to the Temporal Server and allowing the Worker code to run your print statement. We are not realizing the full potential of Temporal quite yet. This code will print Still saying ‘hello’ to you, world! to your terminal. After you run this code, if you go to the Web UI url (default is 127.0.0.1:8233), you won’t actually see anything in your workflow list, yet.

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting

interrupt_event = asyncio.Event()

async def main():
   # Uncomment the line below to see logging
   # logging.basicConfig(level=logging.INFO)

   # Start client
   client = await Client.connect("localhost:7233")

   # Run a worker for the workflow
   async with Worker(
       client,
       task_queue="hello-activity-task-queue",
       workflows=[GreetingWorkflow],
       activities=[compose_greeting],
   ):
       print("Still saying ‘hello’ to you, world!")

if __name__ == "__main__":
   asyncio.run(main())

To run our GreetingWorkflow, we ask the Client to execute the workflow. Then, when we run our script, Temporal Server will be keeping track of what the code has done so far, and you will be able to see all the work that Temporal did for you in the Web UI.

In the code above, replace the print statement with the following:

       # While the worker is running, use the client to run the workflow and
       # print out its result.
       result = await client.execute_workflow(
           GreetingWorkflow.run,
           "World",
           id="hello-activity-workflow-id",
           task_queue="hello-activity-task-queue",
       )
       print(f"Result: {result}")

This code will allow the worker to tell the client to:

  • Asynchronously call execute_workflow (i.e. run the workflow run method)
  • Pass the input “World” to GreetingWorkflow.run
  • Give the workflow the identifier hello-activity-workflow-id
  • Put the tasks that the Workflow generates onto the hello-activity-task-queue (which you may recognize from such far flung places as a couple lines above when we created the Worker)

Running it all

Run this in your terminal: python worker.py

Once the Worker executes the code, it sets the string returned from compose_greeting to result and then prints it out to the terminal. You should end up seeing Hello, World! in your terminal.

In the Web UI, you will see something like this, with one workflow row for each time your code executed:

Recent Workflows

Clicking the workflow ID for one of the rows, you will see everything that happened with your code and Temporal server when your worker executed the workflow code:

Workflow Detail

Real talk: Temporal is built for applications that do more than print text to your terminal, but we wanted to give an extremely simple way to show you the different pieces. In a follow up blog post, we will show you an actual application that uses Temporal to keep state within the workflow.

For funsies, if you want to do something also simple but that gives you a little more functionality, you can take in an input from the console with this version of worker.py:

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting

interrupt_event = asyncio.Event()

async def main():
   # Uncomment the line below to see logging
   # logging.basicConfig(level=logging.INFO)

   # Start client
   client = await Client.connect("localhost:7233")

   # Run a worker for the workflow
   async with Worker(
       client,
       task_queue="hello-activity-task-queue",
       workflows=[GreetingWorkflow],
       activities=[compose_greeting],
   ):
       print("What's your name?")
       name = input()

       # While the worker is running, use the client to run the workflow and
       # print out its result.
       result = await client.execute_workflow(
           GreetingWorkflow.run,
           name,
           id="hello-activity-workflow-id",
           task_queue="hello-activity-task-queue",
       )
       print(f"Result: {result}")

if __name__ == "__main__":
   asyncio.run(main())

In my case, the output looked like this:

Hello, Matt

This code also results in a slightly different WorkflowExecutionStarted event in the workflow’s event history in the Web UI. We can see the input “Matt” was passed to the activity rather than “World”.

Click “Workflow Execution Started” on your workflow to see the difference!

Workflow Execution Started

Success!

There you have it, workflows and workers broken down into some areas for you to work on within your application. Hopefully this helps you get going with your Python application and a better understanding of how you can get going with Temporal.

Want to take it a step further? Check out our developer advocate’s post about how they built a poker application using some of the basics of Temporal. For a full tutorial, check out Build a Temporal Application from scratch in Python.


This content originally appeared on DEV Community and was authored by Rain Leander


Print Share Comment Cite Upload Translate Updates
APA

Rain Leander | Sciencx (2023-03-09T19:09:24+00:00) Python SDK: Diving into Workers and Workflows. Retrieved from https://www.scien.cx/2023/03/09/python-sdk-diving-into-workers-and-workflows/

MLA
" » Python SDK: Diving into Workers and Workflows." Rain Leander | Sciencx - Thursday March 9, 2023, https://www.scien.cx/2023/03/09/python-sdk-diving-into-workers-and-workflows/
HARVARD
Rain Leander | Sciencx Thursday March 9, 2023 » Python SDK: Diving into Workers and Workflows., viewed ,<https://www.scien.cx/2023/03/09/python-sdk-diving-into-workers-and-workflows/>
VANCOUVER
Rain Leander | Sciencx - » Python SDK: Diving into Workers and Workflows. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2023/03/09/python-sdk-diving-into-workers-and-workflows/
CHICAGO
" » Python SDK: Diving into Workers and Workflows." Rain Leander | Sciencx - Accessed . https://www.scien.cx/2023/03/09/python-sdk-diving-into-workers-and-workflows/
IEEE
" » Python SDK: Diving into Workers and Workflows." Rain Leander | Sciencx [Online]. Available: https://www.scien.cx/2023/03/09/python-sdk-diving-into-workers-and-workflows/. [Accessed: ]
rf:citation
» Python SDK: Diving into Workers and Workflows | Rain Leander | Sciencx | https://www.scien.cx/2023/03/09/python-sdk-diving-into-workers-and-workflows/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.