What's the best practice for Python logging configuration in applications?
I'm working on a Python application and running into an issue with Python debugging. Here's the problematic code:
# Current implementation
import threading
import time
def worker():
global counter
for _ in range(100000):
counter += 1 # Race condition here
counter = 0
threads = [threading.Thread(target=worker) for _ in range(4)]
for t in threads:
t.start()
The error message I'm getting is: "ValueError: invalid literal for int() with base 10: 'abc'"
What I've tried so far:
- Used pdb debugger to step through the code
- Added logging statements to trace execution
- Checked Python documentation and PEPs
- Tested with different Python versions
- Reviewed similar issues on GitHub and Stack Overflow
Environment information:
- Python version: 3.11.0
- Operating system: macOS Ventura
- Virtual environment: venv (activated)
- Relevant packages: django, djangorestframework, celery, redis
Any insights or alternative approaches would be very helpful. Thanks!
Comments
james_ml: I'm getting a similar error but with PostgreSQL instead of SQLite. Any differences in the solution? 1 week, 4 days ago
sarah_tech: This Python memory optimization technique reduced my application's RAM usage by 60%. Brilliant! 1 week, 4 days ago
5 Answers
The choice between Django signals and overriding save() depends on your use case:
Use save() method when:
- The logic is directly related to the model
- You need to modify the instance before saving
- The operation is essential for data integrity
class Article(models.Model):
title = models.CharField(max_length=200)
slug = models.SlugField(unique=True)
def save(self, *args, **kwargs):
if not self.slug:
self.slug = slugify(self.title)
super().save(*args, **kwargs)
Use signals when:
- You need decoupled logic
- Multiple models need the same behavior
- You're working with third-party models
from django.db.models.signals import post_save
from django.dispatch import receiver
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
if created:
UserProfile.objects.create(user=instance)
The choice between Django signals and overriding save() depends on your use case:
Use save() method when:
- The logic is directly related to the model
- You need to modify the instance before saving
- The operation is essential for data integrity
class Article(models.Model):
title = models.CharField(max_length=200)
slug = models.SlugField(unique=True)
def save(self, *args, **kwargs):
if not self.slug:
self.slug = slugify(self.title)
super().save(*args, **kwargs)
Use signals when:
- You need decoupled logic
- Multiple models need the same behavior
- You're working with third-party models
from django.db.models.signals import post_save
from django.dispatch import receiver
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
if created:
UserProfile.objects.create(user=instance)
Comments
emma_programmer: I'm getting a similar error but with PostgreSQL instead of SQLite. Any differences in the solution? 1 week, 4 days ago
joseph: Could you elaborate on the select_related vs prefetch_related usage? When should I use each? 1 week, 4 days ago
Here's how to optimize Python code performance using profiling tools:
1. Use cProfile for function-level profiling:
import cProfile
import pstats
# Profile your code
cProfile.run('your_function()', 'profile_output.prof')
# Analyze results
stats = pstats.Stats('profile_output.prof')
stats.sort_stats('cumulative')
stats.print_stats(10) # Top 10 functions
2. Use line_profiler for line-by-line analysis:
# Install: pip install line_profiler
# Add @profile decorator to functions
@profile
def slow_function():
# Your code here
pass
# Run: kernprof -l -v script.py
3. Memory profiling with memory_profiler:
# Install: pip install memory_profiler
from memory_profiler import profile
@profile
def memory_intensive_function():
# Your code here
pass
# Run: python -m memory_profiler script.py
4. Use timeit for micro-benchmarks:
import timeit
# Compare different approaches
time1 = timeit.timeit('sum([1,2,3,4,5])', number=100000)
time2 = timeit.timeit('sum((1,2,3,4,5))', number=100000)
print(f'List: {time1}, Tuple: {time2}')
Python decorators with arguments require a three-level nested function. Here's the proper implementation:
import functools
# Decorator with arguments
def retry(max_attempts=3, delay=1):
def decorator(func):
@functools.wraps(func) # Preserves function metadata
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise e
time.sleep(delay)
return wrapper
return decorator
# Usage
@retry(max_attempts=5, delay=2)
def unreliable_function():
# Function that might fail
pass
Class-based decorator (alternative approach):
class Retry:
def __init__(self, max_attempts=3, delay=1):
self.max_attempts = max_attempts
self.delay = delay
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(self.max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == self.max_attempts - 1:
raise e
time.sleep(self.delay)
return wrapper
# Usage
@Retry(max_attempts=5, delay=2)
def another_function():
pass
The difference between threading and multiprocessing in Python is crucial for performance:
Threading (shared memory, GIL limitation):
import threading
import time
def io_bound_task(name):
print(f'Starting {name}')
time.sleep(2) # Simulates I/O operation
print(f'Finished {name}')
# Good for I/O-bound tasks
threads = []
for i in range(3):
t = threading.Thread(target=io_bound_task, args=(f'Task-{i}',))
threads.append(t)
t.start()
for t in threads:
t.join()
Multiprocessing (separate memory, no GIL):
import multiprocessing
import time
def cpu_bound_task(name):
# CPU-intensive calculation
result = sum(i * i for i in range(1000000))
return f'{name}: {result}'
# Good for CPU-bound tasks
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
tasks = [f'Process-{i}' for i in range(4)]
results = pool.map(cpu_bound_task, tasks)
print(results)
Concurrent.futures (unified interface):
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# For I/O-bound tasks
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(io_bound_task, f'Task-{i}') for i in range(4)]
results = [future.result() for future in futures]
# For CPU-bound tasks
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_bound_task, f'Process-{i}') for i in range(4)]
results = [future.result() for future in futures]
Comments
alex_dev: Could you elaborate on the select_related vs prefetch_related usage? When should I use each? 1 week, 4 days ago
michael_code: How would you modify this approach for a high-traffic production environment? 1 week, 4 days ago
Your Answer
You need to be logged in to answer questions.
Log In to Answer