Parallelism is at the core of modern programming. In fact, the reasons for running different pieces of code at the same time are endless. Scalability, efficiency – you name it. No matter the reason, multithreading is a simple way to achieve parallelism in Python. Thus, this Python threading tutorial will cover everything you need to implement multithreading. This includes the usage of Queues for inter-thread communication as well!
Python Threading Tutorial Theory
Before we can dive into parallelism, we should cover some theory. In fact, this section will explain why you need parallelism. After that, it will cover the basic jargon you need to know.
Why do we need parallelism?
Parallelism is a simple concept. It means that your program runs some of its parts at the same time. They might be different parts or even multiple instances of the same part. However, each is running independently from the other.
Parallelism means running multiple parts of your program at the same time.
This, of course, adds a little bit of complexity, so why would we do that? Couldn’t we just use a monolithic program that does everything in sequence? We could, but the performance will be far lower. In fact, parallelism has two main benefits.
- Horizontal scalability: you can distribute tasks among cores on your PC, or even among different computers. This means you can access more power, and you can have more just by adding computers in the cluster. This is the opposite of vertical scalability, where you need to upgrade the hardware of the same computer.
- Efficiency: imagine that parts of your script need to wait for something, but some parts do not. You can have only the parts that really need to wait do the waiting, while the others move forward.
We have different ways of implementing parallelism in Python. Since this is a python threading tutorial, we are going to cover multithreading. However, you should know that other options exist.
Multithreading and Multiprocessing
When it comes to parallelism, we have two main ways to go: multithreading and multiprocessing. The two can achieve similar results in two different ways.
A process is a program running on a computer. The operating system will dedicate some RAM to that process, and allocate some meta-information that it needs to work with that process. You can literally see this as a whole program in execution. Instead, a thread is somehow a lightweight version of a process. It is not a program in execution, but part of it. It is part of a process and shares the memory with other threads in the same process. On top of that, the operating system doesn’t have to allocate the meta-information for the thread.
A process is a program in execution. A thread is only part of a process.
For a simple task, like running a function, working with multiple threads is the best way to go. In fact, this python threading tutorial will cover how to achieve parallelism with threads. You could apply the same concepts to multiprocessing as well.
Multithreading Jargon
This section of this python threading tutorial is important. If you get the jargon right, understanding the whole article will be a lot easier. Despite being a complex topic, the jargon of python threading is not complex at all.
Sync and Async
Asynchronous is the buzzword here. A simple program is going to be synchronous, which means instructions will be executed in order. The program will run each instruction after another, in the order you prepared. However, with threading, you can run parts of the programs asynchronously. This means parts of the code may run before or after some other parts, as they are now unrelated.
This of course is the behavior we require, but we might need to have some thread waiting for the execution of another. In that case, we need to sync the threads. No worries, we will explain how to do it.
Running threads
When working with threads, but also with processes, we need to know some common operations.
The first thing you need to do is to define the thread. You basically prepare it for execution and tell it what it will have to do. However, you are not executing the thread just yet. When you want the thread to start working, you simply start it. Then, the thread will run asynchronously from your main program. However, you may want your program to wait for the thread to finish. In that case, you join the thread. This tells your program to wait for its thread to terminate, and you can join multiple threads as well.
Looking at the above example, we create the thread and launch it. Then, we run some code in the parent and after that, we wait for the thread to finish.
Hands-On on this Python Threading Tutorial
Python threading library
Python has several ways to implement multithreading. The modern way to do it is using the threading
library, which contains the Thread
class. As we will see, working with this library is extremely intuitive. We also want to use the time
library to experiment threads, but this is not strictly needed in production. Thus, at the very beginning of your script, add the following lines.
import threading
import time
Now we can start working with threads!
Define a test function to run in threads
First thing, we need to define a function we want to run in a thread. Our function will do almost nothing, but we will use the sleep()
function to emulate a huge workload. The sleep()
function makes your program (or thread) stop and wait for a given amount of seconds.
When you have a function that you want to run in a thread, it is best practice to use a thread identifier as the first parameter. This way, you know what thread you are running from inside the function as well. However, you also need to pass this value when creating the thread, but we will get to that. For now, just create this function.
def do_something(id, message=''):
time.sleep(4)
print("Thread #" + str(id) + " finished with message: " + message)
This function simply waits 4 seconds sand then prints the given message with the thread ID.
Running it synchronously “the standard way”
At this point, we can create a global list of messages that we want to print. Here is an example.
messages = [
'This is my first multithreading script',
'Here is a message',
'It works!',
'Python threading tutorial'
]
If we were to run the do_something()
function with all the messages synchronously we would roughly need 16 seconds (4 seconds per message). In fact, we can do a simple test using the time.time()
function. This returns the epoch time in seconds, and we can use it twice to see how much time elapsed from the beginning to the end of the script.
start = time.time()
for i, msg in enumerate(messages):
do_something(i, msg)
print("It took " + str(time.time()-start) + " seconds")
And here is the output. The time elapsed between printing lines, of course, was about 4 seconds.
C:\Users\aless\Desktop>python threads.py
Thread #0 finished with message: This is my first multithreading script
Thread #1 finished with message: Here is a message
Thread #2 finished with message: It works!
Thread #3 finished with message: Python threading tutorial
It took 16.003358125686646 seconds
C:\Users\aless\Desktop>
Running it with threads
Now we can dive in the real python threading tutorial. We can rewrite this part of the script to work with threads, and distribute the load among them. Here we need to work with three different functions.
Creating the thread
To create a thread, you need to instantiate a threading.Thread
object. The constructor wants to know a target function: the function that you want to run within the thread. It also want to know a list of parameters you want to pass to the function, if you need it. You provide the function name as target
, and the parameters as a tuple for the args
parameter. Below, a sample code to create a thread.
thread = threading.Thrad(target=function_name, args=(arg1, arg2, arg3))
From now on, you can perform operation on this thread object you just created.
Starting and joining the thread
Once you have a thread object, you can decide to start it with the start()
function, and to join it with the join()
function, as simple as that. The code is pretty straight forward, as you can see below.
thread.start()
thread.join()
In order to call join()
, you need to call start()
first. However, you don’t need to call the two one after the other. In fact, you might want to perform some code after starting the thread, and before joining it. Even more, you may not join a thread all.
The whole script
Combining the commands above, we can create a way more efficient snippet that leverages threads. Here it is.
start = time.time()
threads = []
for i, msg in enumerate(messages):
threads.append(threading.Thread(target=do_something, args=(i, msg,)))
threads[i].start()
for thread in threads:
thread.join()
print("It took " + str(time.time()-start) + " seconds")
As you can see, we first create and start all the threads. Then, with another loop, we join all of them. We didn’t join each thread just after starting on purpose. If we did, the script would have waited for the first thread to finish before starting the second. Of course, we don’t want that. If we run this script you won’t see any output for about 4 seconds, then all four lines of the output will appear together. It will run in a little more than 4 seconds, take a look.
C:\Users\aless\Desktop>python threads.py
Thread #1 finished with message: Here is a message
Thread #0 finished with message: This is my first multithreading script
Thread #3 finished with message: Python threading tutorial
Thread #2 finished with message: It works!
It took 4.001831293106079 seconds
C:\Users\aless\Desktop>
Inter-thread communication with Queues
Running threads is awesome. You can truly implement parallelism, and make your script more efficient. However, so far we saw only how to pass arguments to a thread. A stub function that only executes code but that does not return anything has limited applications, and we know it. We need to make our threads return something, but this comes with some challenges.
The challenges of communication between threads
Why can’t a thread return something upon finish, just like a function does? Many people ask this question when first approaching parallel programming. The reason for that is the asynchronous nature of threads. When does the return should happen? What should the parent script do until it gets a return? If you just wait for the whole thread to be executed, you could use a synchronous function as well.
Instead, threads are designed to not be alternatives to functions. They serve a whole different purpose, and they do not strictly return something. Instead, they can communicate with the script that generated the thread, or with other siblings threads both for input and output.
Here we have another challenge: concurrency. What happens if two threads try to write the value of a variable at the same time? You risk running the same code twice, creating unexpected behavior. A common example is a bank balance script. In order to allow a withdrawal, the script first checks if there are enough money in the account. What if the balance changes between the check and the effective withdrawal? You withdraw the money you don’t have. Python implements some libraries to avoid this kind of problem, and today we are going to work with the most versatile: queue
.
Introducing the Queue
The queue.Queue
object is a special item that we can use to handle concurrency. It can act as a shared variable between threads because it can manage concurrency issues properly. You can think of a queue like a list, where you can put items and later fetch them. In a common setup, you will have some part of the code populating the queue, and some other reading it and processing its data. Think of it as an enhanced list.
The function, or part of the code, that adds an item into the queue is the producer. Instead, the function processing (and thus removing) items from the queue is the consumer. In order to use Queue
, we need to import its library. Thus, we can add the following line of code at the beginning of our script.
import queue
The simplest consumer
We first need to rewrite our do_something()
function. Now, it is not just a function that runs once in a thread. It is a whole consumer, a function that tries to process an entire queue. Then, we can run this as a separate thread. And, to speed things up, we can instantiate more do_something
threads to process the queue faster. Here is the code.
def do_something(id, message_queue):
while not message_queue.empty():
message = message_queue.get()
time.sleep(4)
print("#" + str(id) + ": " + message)
print("\t Thread #" + str(id) + " finished.")
As you can see, it doesn’t want a message anymore, but a Queue
. Then, as long as the queue is not empty, it will fetch a message (message_queue.get()
) and print it. As soon as the queue is empty, it will tell you that the thread finished and exit. Remember, on a queue, the get()
method returns the oldest object that was inserted in the queue, and removes from it. So, the first time it will return the object added first, the second time the object added second and so on.
It is very important, at least for now, that you put the get()
right after the check for empty()
. If you sleep before that, you risk that a thread tries to fetch from a queue that is empty (because another thread fetched after our thread checked). This will make this more unlikely, but won’t solve the problem. To solve the problem, read on.
To see how this works, we should increase the size of our messages object. We can do that simply by multiplying it by, say, 10.
messages = [
'This is my first multithreading script',
'Here is a message',
'It works!',
'Python threading tutorial'
] * 10
Now we have a list with 40 items we can work on.
Populating the Queue from the parent
Before going deeper into the concept of producer, we should move with small steps. Thus, the next thing to do in this Python threading tutorial is populating the queue from the parent script. In other words, we have a queue that already exists, and we start several threads to process it. The script is now a little bit more complex, but nothing we can’t tackle down.
start = time.time()
q = queue.Queue()
threads = []
num_of_threads = 2
for msg in messages:
q.put(msg)
for thid in range(num_of_threads):
threads.append(threading.Thread(target=do_something, args=(thid, q,)))
threads[thid].start()
for thread in threads:
thread.join()
print("It took " + str(time.time()-start) + " seconds")
First, we create the Queue q
, and then we populate it with the put()
function. This is like append()
on lists, it adds an item at the end. After that, we have our major change. We decide how many threads instantiate according to the num_of_threads
variable. Each thread receives its unique identifier and the same q
. We launch them and then join them. This script will use only 2 threads to process the queue, and these threads will terminate as soon as the queue is empty. With two threads, the script should complete in about 80 seconds as in the formula below.
So, the output is something like that.
C:\Users\aless\Desktop>python threads.py
#1: Here is a message
#0: This is my first multithreading script
#0: Python threading tutorial
#1: It works!
< omitted output >
#1: This is my first multithreading script
#0: Here is a message
#1: It works!
Thread #1 finished.
#0: Python threading tutorial
Thread #0 finished.
It took 80.01445055007935 seconds
Scaling it up
With this approach of consumers, you can easily run more threads to make your script runs faster. All you need to do is increase the value of the num_of_threads variable. By setting it to 20, the output is the one below.
C:\Users\aless\Desktop>python threads.py
#0: This is my first multithreading script
#3: Python threading tutorial
#1: Here is a message
#2: It works!
#7: Python threading tutorial
#5: Here is a message
#6: It works!
#4: This is my first multithreading script
#11: Python threading tutorial
#8: This is my first multithreading script
#9: Here is a message
#10: It works!
#12: This is my first multithreading script
#14: It works!
#13: Here is a message
#15: Python threading tutorial
#19: Python threading tutorial
#18: It works!
#17: Here is a message
#16: This is my first multithreading script
#0: This is my first multithreading script
Thread #0 finished.
#6: It works!
#7: This is my first multithreading script
#5: Here is a message
Thread #5 finished.
#3: Here is a message
Thread #3 finished.
#2: Python threading tutorial
#1: It works!
Thread #7 finished.
#4: Python threading tutorial
#9: It works!
Thread #6 finished.
#11: This is my first multithreading script
Thread #4 finished.
#15: Python threading tutorial
#10: Python threading tutorial
#16: Python threading tutorial
#14: Here is a message
#19: This is my first multithreading script
Thread #1 finished.
Thread #11 finished.
#12: This is my first multithreading script
Thread #2 finished.
#17: It works!
#13: It works!
Thread #9 finished.
#8: Here is a message
Thread #15 finished.
Thread #10 finished.
Thread #17 finished.
Thread #13 finished.
#18: Here is a message
Thread #12 finished.
Thread #14 finished.
Thread #8 finished.
Thread #16 finished.
Thread #18 finished.
Thread #19 finished.
It took 8.015052080154419 seconds
A sentinel watching our threads
When we introduced the empty()
method on a queue, we explained that we need to get data from the queue as soon as we verify that it is not empty. If we don’t do that, some other thread may take our data faster, leaving a thread stuck in waiting. We simply can’t rely on being the fastest thread out there. We can’t trust the script to do everything the way we want, every single time. Thus, we need to find another way.
First, checking if the list is empty()
has a main problem: in only works if someone filled the queue beforehand. In a real-world scenario, you have a consumer reading from a queue and a producer populating it. Sometimes, the consumer may be faster than the producer and empty the list. This doesn’t mean the producer has finished, and thus the consumer shouldn’t shut-down yet. We can solve this with a sentinel.
A sentinel is a value of our choice that the producer puts in the queue to tell all the consumers it is done producing. Then, you need to implement in the consumer that, as soon as it finds out the sentinel, it stops execution. A common value for a sentinel can be None
, if you don’t need to queue objects that may be None
.
A sentinel-based consumer
Now, we can re-write our do_something() function again to work with sentinels. Our sentinel here is going to be None.
def do_something(id, message_queue):
while True:
message = message_queue.get()
if message is None:
break
time.sleep(4)
print("#" + str(id) + ": " + message)
print("\t Thread #" + str(id) + " finished.")
Simply, if the message happens to be None, the consumer will terminate. This solves the problem of a producer adding items to the list slower than the consumer can process them. However, it does not solve the concurrent consumer problem. Not by itself, at least. When a consumer fetches the None
item, all the other consumers will be waiting for a None
for themselves. However, if the producer doesn’t add any other sentinel, they will be stuck.
We can solve that by adding a None item to the queue each time we add a consumer thread, like below.
start = time.time()
q = queue.Queue()
threads = []
num_of_threads = 20
for msg in messages:
q.put(msg)
for thid in range(num_of_threads):
threads.append(threading.Thread(target=do_something, args=(thid, q,)))
threads[thid].start()
q.put(None)
for thread in threads:
thread.join()
print("It took " + str(time.time()-start) + " seconds")
This script simply won’t mess it up, regardless of concurrencies.
A consumer-producer script
The whole code
import threading
import time
import queue
import random
import string
def consumer(id, size, in_q):
count = 0
while True:
log_msg = in_q.get()
if log_msg is None:
break
print("{0}%\t{1}".format(round(count/size*100,2), log_msg))
count += 1
print("Consumer #{0} shutting down".format(id))
def producer(id, in_q, out_q):
while True:
item = in_q.get()
if item is None:
out_q.put(None)
break
out_q.put("#{0}\tThis string is long {1} characters".format(id, len(item)))
print("Producer #{0} shutting down".format(id))
string_q = queue.Queue()
log_q = queue.Queue()
n_of_strings = 200000
for i in range(n_of_strings):
string_q.put(''.join(random.choices(string.ascii_uppercase + string.digits, k=random.randint(0, 150))))
start = time.time()
producers = []
n_of_producers = 2
logger = threading.Thread(target=consumer, args=(0, n_of_strings, log_q,))
logger.start()
for thid in range(n_of_producers):
producers.append(threading.Thread(target=producer, args=(thid, string_q, log_q)))
producers[thid].start()
string_q.put(None)
for thread in producers:
thread.join()
logger.join()
print("It took " + str(time.time()-start) + " seconds")
A quick explanation
In this script, the producer function consumes a queue containing strings and finds the length of each. It then puts a log of each result into another queue, which the consumer then accesses. Since the number of strings is fixed, the consumer can calculate the percentage over the total.
We used the random
and string
libraries to generate random strings, on-line 31. However, this is not important for our python threading tutorial. Instead, just see that when the producer finishes, it also tells the consumer that it has finished by adding a None
item (sentinel) to the output queue.
This script may require some clarification. It performs calculation over calculation and doesn’t do waiting like the scripts we used before. Since the threads of the same process are bound to the same core in the PC, adding more threads won’t increase performance because the limiting factor is the CPU itself, not a waiting the script must do.
Wrapping it up
In python threading tutorial we explained multithreading in simple terms, by using the modern libraries threading and Queue. While doing so, you need to remember these key concepts.
- Create a thread with
threading.Thread()
. Provide the function name to execute as named parametertarget
, and the list of arguments inargs
as a tuple. - You can then work on a thread object to launch its execution with
start()
, and have the parent wait for its finish withjoin()
. - You can use a
queue.Queue()
object as a list to allow communication between threads. A thread may add items to this queue withput()
, and another may read them withget()
. - Typically, you create a consumer function to run in a thread with an infinite loop that tries to get items from a queue. You can check if the queue is empty with the
empty()
method, but this may cause concurrency issues. Instead, when you are sure you won’t put any more item in the list, add an arbitrary value (the sentinel) in the queue. On the other side, as soon as you find this value, exit from the infinite loop. To ensure you do not have concurrency issues, add as many sentinels in a queue as many consumers it has.
And this is it! I hope you can now create programs that can scale horizontally with ease, using threads. No more boring waitings for connections, synchronizations, and input/output. How do you plan to use this knowledge? Do you have any projects that will benefit from these features of Python? Let me know in the comments, and keep reading ICTShore.com to see how we are going to use threading to implement an awesome SDN software.