diff --git a/thread-safety-locks/README.md b/thread-safety-locks/README.md new file mode 100644 index 0000000000..44107cec62 --- /dev/null +++ b/thread-safety-locks/README.md @@ -0,0 +1,11 @@ +# Python Thread Safety: Using a Lock and Other Techniques + +This folder contains code examples from the Real Python tutorial [Python Thread Safety: Using a Lock and Other Techniques](https://realpython.com/python-thread-lock/). + +## About the Author + +Adarsh Divakaran - Website: https://adarshd.dev/ + +## License + +Distributed under the MIT license. See ``LICENSE`` for more information. \ No newline at end of file diff --git a/thread-safety-locks/bank_barrier.py b/thread-safety-locks/bank_barrier.py new file mode 100644 index 0000000000..6fe6dcc52d --- /dev/null +++ b/thread-safety-locks/bank_barrier.py @@ -0,0 +1,27 @@ +import random +import threading +import time +from concurrent.futures import ThreadPoolExecutor + +teller_barrier = threading.Barrier(3) + + +def prepare_for_work(name): + print(f"{int(time.time())}: {name} is preparing their counter.") + + # Simulate the delay to prepare the counter + time.sleep(random.randint(1, 3)) + print(f"{int(time.time())}: {name} has finished preparing.") + + # Wait for all tellers to finish preparing + teller_barrier.wait() + print(f"{int(time.time())}: {name} is now ready to serve customers.") + + +tellers = ["Teller 1", "Teller 2", "Teller 3"] + +with ThreadPoolExecutor(max_workers=3) as executor: + for teller_name in tellers: + executor.submit(prepare_for_work, teller_name) + +print(f"{int(time.time())}: All tellers are ready to serve customers.") diff --git a/thread-safety-locks/bank_condition.py b/thread-safety-locks/bank_condition.py new file mode 100644 index 0000000000..0b64fa872e --- /dev/null +++ b/thread-safety-locks/bank_condition.py @@ -0,0 +1,53 @@ +import random +import threading +import time +from concurrent.futures import ThreadPoolExecutor + +customer_available_condition = threading.Condition() + +# Customers waiting to be served by the Teller +customer_queue = [] + + +def serve_customers(): + while True: + with customer_available_condition: + # Wait for a customer to arrive + while not customer_queue: + print(f"{int(time.time())}: Teller is waiting for a customer.") + customer_available_condition.wait() + + # Serve the customer + customer = customer_queue.pop(0) + print(f"{int(time.time())}: Teller is serving {customer}.") + + # Simulate the time taken to serve the customer + time.sleep(random.randint(1, 3)) + print(f"{int(time.time())}: Teller has finished serving {customer}.") + + +def add_customer_to_queue(name): + with customer_available_condition: + print(f"{int(time.time())}: {name} has arrived at the bank.") + customer_queue.append(name) + + customer_available_condition.notify() + + +customer_names = [ + "Customer 1", + "Customer 2", + "Customer 3", + "Customer 4", + "Customer 5", +] + +with ThreadPoolExecutor(max_workers=6) as executor: + + teller_thread = executor.submit(serve_customers) + + for name in customer_names: + # Simulate customers arriving at random intervals + time.sleep(random.randint(2, 5)) + + executor.submit(add_customer_to_queue, name) diff --git a/thread-safety-locks/bank_deadlock.py b/thread-safety-locks/bank_deadlock.py new file mode 100644 index 0000000000..b307c7702a --- /dev/null +++ b/thread-safety-locks/bank_deadlock.py @@ -0,0 +1,51 @@ +import threading +import time +from concurrent.futures import ThreadPoolExecutor + + +class BankAccount: + def __init__(self): + self.balance = 0 + self.lock = threading.Lock() + + def deposit(self, amount): + print( + f"Thread {threading.current_thread().name} waiting " + f"to acquire lock for deposit()" + ) + with self.lock: + print( + f"Thread {threading.current_thread().name} " + "acquired lock for deposit()" + ) + time.sleep(0.1) + self._update_balance(amount) + + def _update_balance(self, amount): + print( + f"Thread {threading.current_thread().name} waiting to acquire " + f"lock for _update_balance()" + ) + with self.lock: # This will cause a deadlock + print( + f"Thread {threading.current_thread().name} " + "acquired lock for _update_balance()" + ) + self.balance += amount + + +account = BankAccount() + + +def make_deposit(): + account.deposit(100) + + +with ThreadPoolExecutor( + max_workers=3, thread_name_prefix="Worker" +) as executor: + for _ in range(3): + executor.submit(make_deposit) + + +print(f"Final balance: {account.balance}") diff --git a/thread-safety-locks/bank_event.py b/thread-safety-locks/bank_event.py new file mode 100644 index 0000000000..973b8344fa --- /dev/null +++ b/thread-safety-locks/bank_event.py @@ -0,0 +1,50 @@ +import threading +import time +from concurrent.futures import ThreadPoolExecutor + +bank_open = threading.Event() +transactions_open = threading.Event() + + +def serve_customer(customer_data): + print(f"{customer_data['name']} is waiting for the bank to open.") + + bank_open.wait() + print(f"{customer_data['name']} entered the bank") + if customer_data["type"] == "WITHDRAW_MONEY": + print(f"{customer_data['name']} is waiting for transactions to open.") + transactions_open.wait() + print(f"{customer_data['name']} is starting their transaction.") + + # Simulate the time taken for performing the transaction + time.sleep(2) + + print(f"{customer_data['name']} completed transaction and exited bank") + else: + # Simulate the time taken for banking + time.sleep(2) + print(f"{customer_data['name']} has exited bank") + + +customers = [ + {"name": "Customer 1", "type": "WITHDRAW_MONEY"}, + {"name": "Customer 2", "type": "CHECK_BALANCE"}, + {"name": "Customer 3", "type": "WITHDRAW_MONEY"}, + {"name": "Customer 4", "type": "WITHDRAW_MONEY"}, +] + +with ThreadPoolExecutor(max_workers=4) as executor: + for customer_data in customers: + executor.submit(serve_customer, customer_data) + + print("Bank manager is preparing to open the bank.") + time.sleep(2) + print("Bank is now open!") + bank_open.set() # Signal that the bank is open + + time.sleep(3) + print("Transactions are now open!") + transactions_open.set() + + +print("All customers have completed their transactions.") diff --git a/thread-safety-locks/bank_multithreaded_withdrawal.py b/thread-safety-locks/bank_multithreaded_withdrawal.py new file mode 100644 index 0000000000..a3fe1d5721 --- /dev/null +++ b/thread-safety-locks/bank_multithreaded_withdrawal.py @@ -0,0 +1,24 @@ +import time +from concurrent.futures import ThreadPoolExecutor + + +class BankAccount: + def __init__(self): + self.balance = 1000 + + def withdraw(self, amount): + if self.balance >= amount: + new_balance = self.balance - amount + time.sleep(0.1) # Simulate a delay + self.balance = new_balance + else: + raise Exception("Insufficient balance") + + +account = BankAccount() + +with ThreadPoolExecutor(max_workers=2) as executor: + executor.submit(account.withdraw, 500) + executor.submit(account.withdraw, 700) + +print(f"Final account balance: {account.balance}") diff --git a/thread-safety-locks/bank_rlock.py b/thread-safety-locks/bank_rlock.py new file mode 100644 index 0000000000..47d3939446 --- /dev/null +++ b/thread-safety-locks/bank_rlock.py @@ -0,0 +1,51 @@ +import threading +import time +from concurrent.futures import ThreadPoolExecutor + + +class BankAccount: + def __init__(self): + self.balance = 0 + self.lock = threading.RLock() + + def deposit(self, amount): + print( + f"Thread {threading.current_thread().name} " + "waiting to acquire lock for .deposit()" + ) + with self.lock: + print( + f"Thread {threading.current_thread().name} " + "acquired lock for .deposit()" + ) + time.sleep(0.1) + self._update_balance(amount) + + def _update_balance(self, amount): + print( + f"Thread {threading.current_thread().name} " + "waiting to acquire lock for ._update_balance()" + ) + with self.lock: + print( + f"Thread {threading.current_thread().name} " + "acquired lock for ._update_balance()" + ) + self.balance += amount + + +account = BankAccount() + + +def make_deposit(): + account.deposit(100) + + +with ThreadPoolExecutor( + max_workers=3, thread_name_prefix="Worker" +) as executor: + for _ in range(3): + executor.submit(make_deposit) + + +print(f"Final balance: {account.balance}") diff --git a/thread-safety-locks/bank_semaphore.py b/thread-safety-locks/bank_semaphore.py new file mode 100644 index 0000000000..91f97eaf78 --- /dev/null +++ b/thread-safety-locks/bank_semaphore.py @@ -0,0 +1,32 @@ +import random +import threading +import time +from concurrent.futures import ThreadPoolExecutor + +# Semaphore with a maximum of 2 resources (tellers) +teller_semaphore = threading.Semaphore(2) + + +def serve_customer(name): + print(f"{int(time.time())}: {name} is waiting for a teller.") + with teller_semaphore: + print(f"{int(time.time())}: {name} is being served by a teller.") + # Simulate the time taken for the teller to serve the customer + time.sleep(random.randint(1, 3)) + print(f"{int(time.time())}: {name} is done being served.") + + +customers = [ + "Customer 1", + "Customer 2", + "Customer 3", + "Customer 4", + "Customer 5", +] + +with ThreadPoolExecutor(max_workers=5) as executor: + for customer_name in customers: + thread = executor.submit(serve_customer, customer_name) + + +print("All customers have been served.") diff --git a/thread-safety-locks/bank_thread_safe.py b/thread-safety-locks/bank_thread_safe.py new file mode 100644 index 0000000000..3d1db1a16f --- /dev/null +++ b/thread-safety-locks/bank_thread_safe.py @@ -0,0 +1,38 @@ +import threading +import time +from concurrent.futures import ThreadPoolExecutor + + +class BankAccount: + def __init__(self, balance=0): + self.balance = balance + self.account_lock = threading.Lock() + + def withdraw(self, amount): + with self.account_lock: + if self.balance >= amount: + new_balance = self.balance - amount + print(f"Withdrawing {amount}...") + time.sleep(0.1) # Simulate a delay + self.balance = new_balance + else: + raise Exception("Insufficient balance") + + def deposit(self, amount): + with self.account_lock: + new_balance = self.balance + amount + print(f"Depositing {amount}...") + time.sleep(0.1) # Simulate a delay + self.balance = new_balance + + +account = BankAccount(1000) + +with ThreadPoolExecutor(max_workers=3) as executor: + + executor.submit(account.withdraw, 700) + executor.submit(account.deposit, 1000) + executor.submit(account.withdraw, 300) + + +print(f"Final account balance: {account.balance}") diff --git a/thread-safety-locks/threading_example.py b/thread-safety-locks/threading_example.py new file mode 100644 index 0000000000..f4edf393c6 --- /dev/null +++ b/thread-safety-locks/threading_example.py @@ -0,0 +1,16 @@ +import threading +import time +from concurrent.futures import ThreadPoolExecutor + + +def threaded_function(): + for number in range(3): + print(f"Printing from {threading.current_thread().name}. {number=}") + time.sleep(0.1) + + +with ThreadPoolExecutor( + max_workers=4, thread_name_prefix="Worker" +) as executor: + for _ in range(4): + executor.submit(threaded_function)