You are viewing a preview of this lesson. Sign in to start learning
Back to Python Programming

Data & Web Projects

Working with data analysis and web technologies

Building Data & Web Projects with Python

Master real-world Python applications with free flashcards and hands-on coding practice. This lesson covers web scraping techniques, REST API development, database integration, and data visualizationβ€”essential skills for building professional data-driven web applications. Whether you're automating data collection, building backend services, or creating interactive dashboards, these practical projects will prepare you for real-world development challenges.

Welcome to Real-World Python Projects πŸš€

Python's versatility shines brightest when building practical applications that solve real problems. In this lesson, you'll learn to:

  • Scrape and process web data using BeautifulSoup and requests
  • Build REST APIs with Flask or FastAPI frameworks
  • Connect to databases using SQLite, PostgreSQL, and ORMs
  • Visualize data with Matplotlib, Plotly, and interactive dashboards
  • Deploy applications to production environments

These aren't toy examplesβ€”they're the same techniques used by professional developers at companies like Spotify, Instagram, and NASA. By the end, you'll have portfolio-worthy projects demonstrating your ability to build complete, functional applications.

πŸ’‘ Pro Tip: The best way to learn is by building. Follow along with the code examples, experiment with modifications, and build your own variations!


Core Concept 1: Web Scraping for Data Collection πŸ•ΈοΈ

Web scraping extracts data from websites programmatically. It's invaluable when APIs aren't available or when you need to aggregate data from multiple sources.

The Essential Tools

requests - Fetches web pages (HTTP client) BeautifulSoup4 - Parses HTML and extracts data lxml - Fast HTML/XML parser (optional but recommended)

Basic Scraping Workflow

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚    WEB SCRAPING PIPELINE                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

  πŸ“ Target Website
        β”‚
        ↓
  🌐 HTTP Request (requests.get)
        β”‚
        ↓
  πŸ“„ HTML Response
        β”‚
        ↓
  πŸ” Parse HTML (BeautifulSoup)
        β”‚
        ↓
  🎯 Select Elements (CSS selectors)
        β”‚
        ↓
  πŸ’Ύ Extract & Store Data
        β”‚
        ↓
  πŸ“Š Process/Analyze

Practical Example: News Article Scraper

import requests
from bs4 import BeautifulSoup
import csv
from datetime import datetime

def scrape_news_articles(url):
    """
    Scrapes article headlines and links from a news site.
    Returns list of dictionaries with article data.
    """
    # Send GET request with proper headers
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    }
    response = requests.get(url, headers=headers)
    
    # Check if request was successful
    if response.status_code != 200:
        print(f"Error: Status code {response.status_code}")
        return []
    
    # Parse HTML content
    soup = BeautifulSoup(response.content, 'lxml')
    
    # Find all article containers (inspect page to find correct selector)
    articles = soup.find_all('article', class_='post')
    
    scraped_data = []
    for article in articles:
        # Extract title
        title_elem = article.find('h2', class_='entry-title')
        title = title_elem.get_text(strip=True) if title_elem else "N/A"
        
        # Extract link
        link_elem = title_elem.find('a') if title_elem else None
        link = link_elem['href'] if link_elem else "N/A"
        
        # Extract publication date
        date_elem = article.find('time', class_='entry-date')
        pub_date = date_elem['datetime'] if date_elem else "N/A"
        
        scraped_data.append({
            'title': title,
            'url': link,
            'date': pub_date,
            'scraped_at': datetime.now().isoformat()
        })
    
    return scraped_data

def save_to_csv(data, filename='articles.csv'):
    """Save scraped data to CSV file."""
    if not data:
        print("No data to save")
        return
    
    keys = data[0].keys()
    with open(filename, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=keys)
        writer.writeheader()
        writer.writerows(data)
    print(f"Saved {len(data)} articles to {filename}")

## Usage
if __name__ == '__main__':
    url = 'https://example-news-site.com'
    articles = scrape_news_articles(url)
    save_to_csv(articles)

Key Scraping Techniques

MethodUse CaseExample
find()Get first matching elementsoup.find('div', id='main')
find_all()Get all matching elementssoup.find_all('a', class_='link')
select()CSS selector (more flexible)soup.select('div.post > h2')
get_text()Extract text contentelem.get_text(strip=True)
['attr']Access HTML attributeslink['href']

πŸ›‘οΈ Ethical Scraping Best Practices

  1. Check robots.txt: Visit site.com/robots.txt to see scraping rules
  2. Respect rate limits: Add delays between requests (time.sleep(1))
  3. Use APIs when available: Always prefer official APIs over scraping
  4. Identify yourself: Set a proper User-Agent header
  5. Handle errors gracefully: Don't crash on failed requests
import time

## Polite scraping with delays
for url in urls:
    data = scrape_page(url)
    time.sleep(2)  # 2-second delay between requests

πŸ’‘ Pro Tip: Use requests.Session() to persist cookies and connections across multiple requests, improving performance:

session = requests.Session()
response1 = session.get(url1)
response2 = session.get(url2)  # Reuses connection

Core Concept 2: Building REST APIs πŸ”Œ

REST APIs (Representational State Transfer) allow applications to communicate over HTTP. They're the backbone of modern web servicesβ€”when your phone app checks the weather or posts to social media, it's using REST APIs.

Why Build APIs?

  • Separation of concerns: Frontend and backend can be developed independently
  • Multiple clients: One API serves web, mobile, and desktop apps
  • Integration: Other developers can build on your service
  • Scalability: Easy to load-balance and scale horizontally

Flask vs FastAPI: Choosing Your Framework

FeatureFlaskFastAPI
Learning curve⭐⭐⭐⭐⭐ Easy⭐⭐⭐⭐ Moderate
Performance⭐⭐⭐ Good⭐⭐⭐⭐⭐ Excellent
Auto documentation❌ Manualβœ… Automatic (Swagger)
Type hints❌ Optionalβœ… Required
Async support⭐⭐ Limited⭐⭐⭐⭐⭐ Native
Best forSimple APIs, learningProduction, high performance

REST API Example: Task Manager with Flask

from flask import Flask, jsonify, request
from datetime import datetime
import uuid

app = Flask(__name__)

## In-memory storage (use database in production)
tasks = {}

@app.route('/api/tasks', methods=['GET'])
def get_tasks():
    """Retrieve all tasks."""
    return jsonify({
        'tasks': list(tasks.values()),
        'count': len(tasks)
    }), 200

@app.route('/api/tasks/<task_id>', methods=['GET'])
def get_task(task_id):
    """Retrieve a specific task by ID."""
    task = tasks.get(task_id)
    if not task:
        return jsonify({'error': 'Task not found'}), 404
    return jsonify(task), 200

@app.route('/api/tasks', methods=['POST'])
def create_task():
    """Create a new task."""
    data = request.get_json()
    
    # Validate input
    if not data or 'title' not in data:
        return jsonify({'error': 'Title is required'}), 400
    
    task_id = str(uuid.uuid4())
    task = {
        'id': task_id,
        'title': data['title'],
        'description': data.get('description', ''),
        'completed': False,
        'created_at': datetime.now().isoformat(),
        'updated_at': datetime.now().isoformat()
    }
    
    tasks[task_id] = task
    return jsonify(task), 201

@app.route('/api/tasks/<task_id>', methods=['PUT'])
def update_task(task_id):
    """Update an existing task."""
    task = tasks.get(task_id)
    if not task:
        return jsonify({'error': 'Task not found'}), 404
    
    data = request.get_json()
    task['title'] = data.get('title', task['title'])
    task['description'] = data.get('description', task['description'])
    task['completed'] = data.get('completed', task['completed'])
    task['updated_at'] = datetime.now().isoformat()
    
    return jsonify(task), 200

@app.route('/api/tasks/<task_id>', methods=['DELETE'])
def delete_task(task_id):
    """Delete a task."""
    if task_id not in tasks:
        return jsonify({'error': 'Task not found'}), 404
    
    del tasks[task_id]
    return '', 204

if __name__ == '__main__':
    app.run(debug=True, port=5000)

HTTP Status Codes (Essential Reference)

πŸ“‹ REST API Status Codes

CodeMeaningWhen to Use
200OKSuccessful GET, PUT
201CreatedSuccessful POST (resource created)
204No ContentSuccessful DELETE
400Bad RequestInvalid input data
401UnauthorizedMissing/invalid authentication
404Not FoundResource doesn't exist
500Server ErrorUnexpected server failure

FastAPI Example: Same API, Modern Approach

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
import uuid

app = FastAPI(title="Task Manager API", version="1.0")

## Pydantic models for request/response validation
class TaskCreate(BaseModel):
    title: str
    description: Optional[str] = ""

class TaskUpdate(BaseModel):
    title: Optional[str] = None
    description: Optional[str] = None
    completed: Optional[bool] = None

class Task(BaseModel):
    id: str
    title: str
    description: str
    completed: bool
    created_at: str
    updated_at: str

tasks = {}

@app.get("/api/tasks", response_model=dict)
def get_tasks():
    """Retrieve all tasks with automatic JSON serialization."""
    return {"tasks": list(tasks.values()), "count": len(tasks)}

@app.get("/api/tasks/{task_id}", response_model=Task)
def get_task(task_id: str):
    """Type-safe task retrieval."""
    if task_id not in tasks:
        raise HTTPException(status_code=404, detail="Task not found")
    return tasks[task_id]

@app.post("/api/tasks", response_model=Task, status_code=201)
def create_task(task_data: TaskCreate):
    """Create task with automatic validation."""
    task_id = str(uuid.uuid4())
    now = datetime.now().isoformat()
    
    task = Task(
        id=task_id,
        title=task_data.title,
        description=task_data.description,
        completed=False,
        created_at=now,
        updated_at=now
    )
    
    tasks[task_id] = task.dict()
    return task

@app.put("/api/tasks/{task_id}", response_model=Task)
def update_task(task_id: str, task_data: TaskUpdate):
    """Partial update with type safety."""
    if task_id not in tasks:
        raise HTTPException(status_code=404, detail="Task not found")
    
    task = tasks[task_id]
    update_data = task_data.dict(exclude_unset=True)
    
    for field, value in update_data.items():
        task[field] = value
    
    task['updated_at'] = datetime.now().isoformat()
    return task

@app.delete("/api/tasks/{task_id}", status_code=204)
def delete_task(task_id: str):
    """Delete with proper error handling."""
    if task_id not in tasks:
        raise HTTPException(status_code=404, detail="Task not found")
    del tasks[task_id]

🎯 FastAPI Advantages: Run uvicorn main:app --reload and visit /docs for automatic interactive API documentation!


Core Concept 3: Database Integration πŸ’Ύ

Databases persist data beyond your application's runtime. Python supports everything from simple SQLite files to enterprise PostgreSQL clusters.

Database Options for Python Projects

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚    DATABASE SELECTION GUIDE               β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

    Small Project          Medium Project        Large Scale
    (< 10K records)       (10K-1M records)      (1M+ records)
         β”‚                     β”‚                     β”‚
         ↓                     ↓                     ↓
    πŸ“„ SQLite            🐘 PostgreSQL         🐘 PostgreSQL
    No setup             Full features         + Replication
    File-based           ACID compliant        High availability
    Built-in Python      Best for web apps     Enterprise grade
         β”‚                     β”‚                     β”‚
    Alternative:          Alternative:          Alternative:
    πŸ“Š JSON files         πŸƒ MongoDB            πŸ”₯ Cassandra
    (very simple)         (document store)      (distributed)

SQLite: Quick Start for Prototypes

import sqlite3
from datetime import datetime

class TaskDatabase:
    def __init__(self, db_path='tasks.db'):
        """Initialize database connection and create tables."""
        self.conn = sqlite3.connect(db_path)
        self.conn.row_factory = sqlite3.Row  # Access columns by name
        self.create_tables()
    
    def create_tables(self):
        """Create tasks table if it doesn't exist."""
        cursor = self.conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS tasks (
                id TEXT PRIMARY KEY,
                title TEXT NOT NULL,
                description TEXT,
                completed INTEGER DEFAULT 0,
                created_at TEXT NOT NULL,
                updated_at TEXT NOT NULL
            )
        ''')
        self.conn.commit()
    
    def add_task(self, task_id, title, description=''):
        """Insert a new task."""
        now = datetime.now().isoformat()
        cursor = self.conn.cursor()
        cursor.execute('''
            INSERT INTO tasks (id, title, description, created_at, updated_at)
            VALUES (?, ?, ?, ?, ?)
        ''', (task_id, title, description, now, now))
        self.conn.commit()
        return task_id
    
    def get_all_tasks(self):
        """Retrieve all tasks as dictionaries."""
        cursor = self.conn.cursor()
        cursor.execute('SELECT * FROM tasks ORDER BY created_at DESC')
        return [dict(row) for row in cursor.fetchall()]
    
    def update_task(self, task_id, **kwargs):
        """Update task fields dynamically."""
        fields = []
        values = []
        
        for key, value in kwargs.items():
            fields.append(f"{key} = ?")
            values.append(value)
        
        if not fields:
            return False
        
        fields.append("updated_at = ?")
        values.append(datetime.now().isoformat())
        values.append(task_id)
        
        query = f"UPDATE tasks SET {', '.join(fields)} WHERE id = ?"
        cursor = self.conn.cursor()
        cursor.execute(query, values)
        self.conn.commit()
        return cursor.rowcount > 0
    
    def delete_task(self, task_id):
        """Delete a task by ID."""
        cursor = self.conn.cursor()
        cursor.execute('DELETE FROM tasks WHERE id = ?', (task_id,))
        self.conn.commit()
        return cursor.rowcount > 0
    
    def close(self):
        """Close database connection."""
        self.conn.close()

## Usage example
db = TaskDatabase()
db.add_task('task-1', 'Learn Python', 'Complete web scraping tutorial')
tasks = db.get_all_tasks()
db.close()

PostgreSQL with SQLAlchemy ORM

ORMs (Object-Relational Mapping) let you work with databases using Python classes instead of raw SQL. SQLAlchemy is the industry standard.

from sqlalchemy import create_engine, Column, String, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import uuid

## Database connection
engine = create_engine('postgresql://user:password@localhost/taskdb')
Base = declarative_base()
Session = sessionmaker(bind=engine)

## Define model (maps to database table)
class Task(Base):
    __tablename__ = 'tasks'
    
    id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
    title = Column(String, nullable=False)
    description = Column(String, default='')
    completed = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.now)
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
    
    def to_dict(self):
        """Convert to dictionary for JSON serialization."""
        return {
            'id': self.id,
            'title': self.title,
            'description': self.description,
            'completed': self.completed,
            'created_at': self.created_at.isoformat(),
            'updated_at': self.updated_at.isoformat()
        }

## Create tables
Base.metadata.create_all(engine)

## Database operations
def create_task(title, description=''):
    session = Session()
    task = Task(title=title, description=description)
    session.add(task)
    session.commit()
    task_dict = task.to_dict()
    session.close()
    return task_dict

def get_tasks(completed=None):
    session = Session()
    query = session.query(Task)
    
    if completed is not None:
        query = query.filter(Task.completed == completed)
    
    tasks = [task.to_dict() for task in query.all()]
    session.close()
    return tasks

def update_task(task_id, **kwargs):
    session = Session()
    task = session.query(Task).filter(Task.id == task_id).first()
    
    if not task:
        session.close()
        return None
    
    for key, value in kwargs.items():
        if hasattr(task, key):
            setattr(task, key, value)
    
    session.commit()
    task_dict = task.to_dict()
    session.close()
    return task_dict

def delete_task(task_id):
    session = Session()
    task = session.query(Task).filter(Task.id == task_id).first()
    
    if task:
        session.delete(task)
        session.commit()
        session.close()
        return True
    
    session.close()
    return False

🧠 SQL vs ORM: When to Use Each

Use raw SQL when:

  • Writing complex analytical queries
  • Performance is critical (ORMs add overhead)
  • You need database-specific features
  • Bulk operations on millions of rows

Use ORM when:

  • Building CRUD applications
  • You want type safety and IDE autocomplete
  • Database portability matters (switch SQLite β†’ PostgreSQL)
  • Working with relationships between tables

Core Concept 4: Data Visualization πŸ“Š

Numbers tell storiesβ€”data visualization makes them visible. Python offers powerful libraries for everything from static charts to interactive dashboards.

Visualization Library Comparison

LibraryBest ForComplexityOutput
MatplotlibScientific plots, customization⭐⭐⭐⭐Static images
SeabornStatistical graphics⭐⭐Static (prettier)
PlotlyInteractive charts, dashboards⭐⭐⭐HTML/Interactive
DashFull web dashboards⭐⭐⭐⭐⭐Web applications
BokehLarge datasets, streaming⭐⭐⭐⭐Interactive

Matplotlib: Foundation for Data Viz

import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta

## Example: Website traffic over time
def plot_traffic_analysis():
    # Generate sample data
    dates = [datetime.now() - timedelta(days=x) for x in range(30, 0, -1)]
    page_views = np.random.poisson(1000, 30) + np.linspace(800, 1200, 30)
    unique_visitors = page_views * 0.6 + np.random.normal(0, 50, 30)
    
    # Create figure with subplots
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
    fig.suptitle('Website Analytics Dashboard', fontsize=16, fontweight='bold')
    
    # Plot 1: Page views over time
    ax1.plot(dates, page_views, marker='o', linewidth=2, 
             color='#3498db', label='Page Views')
    ax1.fill_between(dates, page_views, alpha=0.3, color='#3498db')
    ax1.set_ylabel('Page Views', fontsize=12)
    ax1.set_title('Daily Page Views', fontsize=14)
    ax1.grid(True, alpha=0.3)
    ax1.legend()
    
    # Plot 2: Unique visitors comparison
    ax2.bar(dates, unique_visitors, color='#2ecc71', alpha=0.7, 
            label='Unique Visitors')
    ax2.set_xlabel('Date', fontsize=12)
    ax2.set_ylabel('Unique Visitors', fontsize=12)
    ax2.set_title('Daily Unique Visitors', fontsize=14)
    ax2.grid(True, alpha=0.3, axis='y')
    ax2.legend()
    
    # Format x-axis dates
    for ax in [ax1, ax2]:
        ax.tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    plt.savefig('traffic_analysis.png', dpi=300, bbox_inches='tight')
    plt.show()

## Create pie chart for traffic sources
def plot_traffic_sources():
    sources = ['Direct', 'Search', 'Social', 'Referral', 'Email']
    percentages = [35, 30, 15, 12, 8]
    colors = ['#3498db', '#2ecc71', '#e74c3c', '#f39c12', '#9b59b6']
    explode = (0.1, 0, 0, 0, 0)  # Emphasize Direct traffic
    
    plt.figure(figsize=(10, 7))
    plt.pie(percentages, labels=sources, autopct='%1.1f%%',
            colors=colors, explode=explode, shadow=True, startangle=90)
    plt.title('Traffic Sources Distribution', fontsize=16, fontweight='bold')
    plt.axis('equal')
    plt.savefig('traffic_sources.png', dpi=300, bbox_inches='tight')
    plt.show()

plot_traffic_analysis()
plot_traffic_sources()

Plotly: Interactive Visualizations

import plotly.graph_objects as go
import plotly.express as px
import pandas as pd

## Create sample dataset
data = {
    'date': pd.date_range('2024-01-01', periods=90, freq='D'),
    'sales': np.random.poisson(50, 90) + np.linspace(40, 80, 90),
    'revenue': np.random.normal(2500, 300, 90),
    'category': np.random.choice(['Electronics', 'Clothing', 'Food'], 90)
}
df = pd.DataFrame(data)

## Interactive line chart with range slider
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=df['date'],
    y=df['sales'],
    mode='lines+markers',
    name='Daily Sales',
    line=dict(color='#3498db', width=2),
    marker=dict(size=6),
    hovertemplate='<b>Date</b>: %{x}<br><b>Sales</b>: %{y}<extra></extra>'
))

fig.update_layout(
    title='Interactive Sales Dashboard',
    xaxis_title='Date',
    yaxis_title='Number of Sales',
    hovermode='x unified',
    template='plotly_white',
    xaxis=dict(
        rangeselector=dict(
            buttons=list([
                dict(count=7, label="1w", step="day", stepmode="backward"),
                dict(count=1, label="1m", step="month", stepmode="backward"),
                dict(step="all")
            ])
        ),
        rangeslider=dict(visible=True),
        type="date"
    )
)

fig.write_html('sales_dashboard.html')
fig.show()

## Grouped bar chart by category
fig2 = px.bar(df, x='date', y='revenue', color='category',
              title='Revenue by Category',
              labels={'revenue': 'Revenue ($)', 'date': 'Date'},
              template='plotly_white')

fig2.show()

Building a Complete Dashboard with Dash

import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.express as px
import pandas as pd

## Initialize Dash app
app = dash.Dash(__name__)

## Sample data
df = pd.DataFrame({
    'month': ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun'],
    'sales': [4500, 5200, 4800, 6100, 5900, 7200],
    'expenses': [3200, 3400, 3300, 3800, 3700, 4100],
    'profit': [1300, 1800, 1500, 2300, 2200, 3100]
})

## Layout
app.layout = html.Div([
    html.H1('Business Analytics Dashboard', 
            style={'textAlign': 'center', 'color': '#2c3e50'}),
    
    html.Div([
        html.Label('Select Metric:'),
        dcc.Dropdown(
            id='metric-dropdown',
            options=[
                {'label': 'Sales', 'value': 'sales'},
                {'label': 'Expenses', 'value': 'expenses'},
                {'label': 'Profit', 'value': 'profit'}
            ],
            value='sales',
            style={'width': '50%'}
        )
    ], style={'padding': '20px'}),
    
    dcc.Graph(id='main-chart'),
    
    html.Div([
        html.H3('Key Metrics'),
        html.Div(id='metrics-summary')
    ], style={'padding': '20px', 'backgroundColor': '#ecf0f1'})
])

## Callbacks for interactivity
@app.callback(
    Output('main-chart', 'figure'),
    Input('metric-dropdown', 'value')
)
def update_chart(selected_metric):
    fig = px.line(df, x='month', y=selected_metric,
                  title=f'{selected_metric.capitalize()} Over Time',
                  markers=True)
    fig.update_layout(template='plotly_white')
    return fig

@app.callback(
    Output('metrics-summary', 'children'),
    Input('metric-dropdown', 'value')
)
def update_metrics(selected_metric):
    total = df[selected_metric].sum()
    avg = df[selected_metric].mean()
    
    return html.Div([
        html.P(f'Total {selected_metric.capitalize()}: ${total:,.0f}'),
        html.P(f'Average {selected_metric.capitalize()}: ${avg:,.0f}')
    ])

if __name__ == '__main__':
    app.run_server(debug=True, port=8050)

πŸ’‘ Deployment Tip: Deploy Dash apps to Heroku, AWS, or Render.com for free public dashboards!


Practical Examples

Example 1: Cryptocurrency Price Tracker πŸ’°

Project Goal: Scrape crypto prices, store in database, visualize trends

import requests
import sqlite3
from datetime import datetime
import matplotlib.pyplot as plt
import pandas as pd
import time

class CryptoTracker:
    def __init__(self, db_path='crypto_prices.db'):
        self.conn = sqlite3.connect(db_path)
        self.create_table()
    
    def create_table(self):
        cursor = self.conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS prices (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                symbol TEXT NOT NULL,
                price REAL NOT NULL,
                timestamp TEXT NOT NULL
            )
        ''')
        self.conn.commit()
    
    def fetch_price(self, symbol='BTC'):
        """Fetch current price from CoinGecko API."""
        url = f'https://api.coingecko.com/api/v3/simple/price'
        params = {
            'ids': {'BTC': 'bitcoin', 'ETH': 'ethereum'}.get(symbol, 'bitcoin'),
            'vs_currencies': 'usd'
        }
        
        try:
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            data = response.json()
            coin_id = params['ids']
            price = data[coin_id]['usd']
            return price
        except Exception as e:
            print(f"Error fetching price: {e}")
            return None
    
    def save_price(self, symbol, price):
        """Store price in database."""
        cursor = self.conn.cursor()
        cursor.execute(
            'INSERT INTO prices (symbol, price, timestamp) VALUES (?, ?, ?)',
            (symbol, price, datetime.now().isoformat())
        )
        self.conn.commit()
    
    def get_price_history(self, symbol, hours=24):
        """Retrieve recent price history."""
        cursor = self.conn.cursor()
        cursor.execute('''
            SELECT timestamp, price FROM prices
            WHERE symbol = ?
            ORDER BY timestamp DESC
            LIMIT ?
        ''', (symbol, hours * 60))  # Assuming minute-level data
        
        rows = cursor.fetchall()
        df = pd.DataFrame(rows, columns=['timestamp', 'price'])
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        return df
    
    def plot_trends(self, symbol='BTC'):
        """Visualize price trends."""
        df = self.get_price_history(symbol)
        
        if df.empty:
            print("No data available")
            return
        
        plt.figure(figsize=(12, 6))
        plt.plot(df['timestamp'], df['price'], linewidth=2, color='#f39c12')
        plt.fill_between(df['timestamp'], df['price'], alpha=0.3, color='#f39c12')
        
        plt.title(f'{symbol} Price History', fontsize=16, fontweight='bold')
        plt.xlabel('Time', fontsize=12)
        plt.ylabel('Price (USD)', fontsize=12)
        plt.grid(True, alpha=0.3)
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.savefig(f'{symbol}_price_chart.png', dpi=300)
        plt.show()
    
    def monitor(self, symbols=['BTC', 'ETH'], interval=300):
        """Continuously monitor and store prices."""
        print(f"Starting price monitoring (interval: {interval}s)...")
        
        try:
            while True:
                for symbol in symbols:
                    price = self.fetch_price(symbol)
                    if price:
                        self.save_price(symbol, price)
                        print(f"{symbol}: ${price:,.2f} at {datetime.now().strftime('%H:%M:%S')}")
                
                time.sleep(interval)
        except KeyboardInterrupt:
            print("\nMonitoring stopped")
            self.conn.close()

## Usage
tracker = CryptoTracker()
## tracker.monitor(['BTC', 'ETH'], interval=300)  # Run for continuous monitoring
tracker.plot_trends('BTC')

Key Techniques Used:

  • βœ… External API integration
  • βœ… SQLite for time-series data storage
  • βœ… Pandas for data manipulation
  • βœ… Matplotlib for visualization
  • βœ… Error handling for network requests

Example 2: Automated Job Scraper with Email Alerts πŸ“§

Project Goal: Scrape job listings, filter by keywords, send email notifications

import requests
from bs4 import BeautifulSoup
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import time
from datetime import datetime

class JobScraper:
    def __init__(self, keywords, location='remote'):
        self.keywords = [k.lower() for k in keywords]
        self.location = location
        self.found_jobs = []
    
    def scrape_jobs(self, url_template):
        """Scrape job listings from a job board."""
        # Example for a generic job board structure
        url = url_template.format(location=self.location)
        
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        
        try:
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            soup = BeautifulSoup(response.content, 'lxml')
            
            # Find job postings (adjust selectors for actual site)
            job_listings = soup.find_all('div', class_='job-card')
            
            for job in job_listings:
                title_elem = job.find('h3', class_='job-title')
                company_elem = job.find('span', class_='company-name')
                link_elem = job.find('a', class_='job-link')
                
                if not all([title_elem, company_elem, link_elem]):
                    continue
                
                title = title_elem.get_text(strip=True)
                company = company_elem.get_text(strip=True)
                link = link_elem['href']
                
                # Filter by keywords
                if any(keyword in title.lower() for keyword in self.keywords):
                    self.found_jobs.append({
                        'title': title,
                        'company': company,
                        'url': link,
                        'found_at': datetime.now().isoformat()
                    })
            
            return len(self.found_jobs)
        
        except Exception as e:
            print(f"Scraping error: {e}")
            return 0
    
    def send_email_alert(self, recipient_email, sender_email, password):
        """Send email notification with job matches."""
        if not self.found_jobs:
            print("No jobs to send")
            return False
        
        # Create email content
        subject = f"Job Alert: {len(self.found_jobs)} Matching Positions Found"
        
        body = "<html><body>"
        body += "<h2>New Job Opportunities</h2>"
        body += f"<p>Found {len(self.found_jobs)} jobs matching your criteria:</p>"
        body += "<ul>"
        
        for job in self.found_jobs:
            body += f"<li><strong>{job['title']}</strong> at {job['company']}<br>"
            body += f"<a href='{job['url']}'>View Job</a></li>"
        
        body += "</ul></body></html>"
        
        # Setup email
        message = MIMEMultipart('alternative')
        message['Subject'] = subject
        message['From'] = sender_email
        message['To'] = recipient_email
        
        html_part = MIMEText(body, 'html')
        message.attach(html_part)
        
        # Send via SMTP (Gmail example)
        try:
            with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
                server.login(sender_email, password)
                server.send_message(message)
            
            print(f"Email sent to {recipient_email}")
            return True
        
        except Exception as e:
            print(f"Email error: {e}")
            return False
    
    def run_monitor(self, url_template, email_config, check_interval=3600):
        """Monitor jobs and send alerts periodically."""
        print(f"Monitoring jobs every {check_interval/60} minutes...")
        
        while True:
            self.found_jobs = []  # Reset
            count = self.scrape_jobs(url_template)
            
            if count > 0:
                print(f"Found {count} matching jobs")
                self.send_email_alert(
                    recipient_email=email_config['recipient'],
                    sender_email=email_config['sender'],
                    password=email_config['password']
                )
            else:
                print("No new matching jobs")
            
            time.sleep(check_interval)

## Usage
scraper = JobScraper(keywords=['Python', 'Django', 'FastAPI'], location='remote')

email_config = {
    'recipient': 'your-email@example.com',
    'sender': 'sender-email@gmail.com',
    'password': 'your-app-password'  # Use app password, not regular password
}

## Run once
scraper.scrape_jobs('https://example-jobs.com/search?location={location}')
if scraper.found_jobs:
    scraper.send_email_alert(**email_config)

Key Techniques Used:

  • βœ… Web scraping with keyword filtering
  • βœ… Email automation with SMTP
  • βœ… Continuous monitoring with intervals
  • βœ… HTML email formatting

Example 3: Weather Data Dashboard 🌦️

Project Goal: Fetch weather data via API, store historical records, create interactive dashboard

import requests
from datetime import datetime, timedelta
import sqlite3
import plotly.graph_objects as go
from plotly.subplots import make_subplots

class WeatherDashboard:
    def __init__(self, api_key, db_path='weather.db'):
        self.api_key = api_key
        self.conn = sqlite3.connect(db_path)
        self.create_table()
    
    def create_table(self):
        cursor = self.conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS weather (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                city TEXT NOT NULL,
                temperature REAL,
                humidity INTEGER,
                description TEXT,
                wind_speed REAL,
                timestamp TEXT NOT NULL
            )
        ''')
        self.conn.commit()
    
    def fetch_current_weather(self, city):
        """Fetch weather from OpenWeatherMap API."""
        url = 'http://api.openweathermap.org/data/2.5/weather'
        params = {
            'q': city,
            'appid': self.api_key,
            'units': 'metric'
        }
        
        try:
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            data = response.json()
            
            weather = {
                'city': city,
                'temperature': data['main']['temp'],
                'humidity': data['main']['humidity'],
                'description': data['weather'][0]['description'],
                'wind_speed': data['wind']['speed'],
                'timestamp': datetime.now().isoformat()
            }
            
            return weather
        
        except Exception as e:
            print(f"API error: {e}")
            return None
    
    def save_weather(self, weather_data):
        """Store weather data in database."""
        cursor = self.conn.cursor()
        cursor.execute('''
            INSERT INTO weather (city, temperature, humidity, description, wind_speed, timestamp)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (
            weather_data['city'],
            weather_data['temperature'],
            weather_data['humidity'],
            weather_data['description'],
            weather_data['wind_speed'],
            weather_data['timestamp']
        ))
        self.conn.commit()
    
    def get_history(self, city, days=7):
        """Retrieve weather history for a city."""
        cursor = self.conn.cursor()
        cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
        
        cursor.execute('''
            SELECT timestamp, temperature, humidity, wind_speed
            FROM weather
            WHERE city = ? AND timestamp > ?
            ORDER BY timestamp
        ''', (city, cutoff_date))
        
        return cursor.fetchall()
    
    def create_dashboard(self, city):
        """Generate interactive Plotly dashboard."""
        history = self.get_history(city, days=7)
        
        if not history:
            print("No historical data available")
            return
        
        timestamps = [datetime.fromisoformat(row[0]) for row in history]
        temps = [row[1] for row in history]
        humidity = [row[2] for row in history]
        wind = [row[3] for row in history]
        
        # Create subplots
        fig = make_subplots(
            rows=3, cols=1,
            subplot_titles=('Temperature (Β°C)', 'Humidity (%)', 'Wind Speed (m/s)'),
            vertical_spacing=0.1
        )
        
        # Temperature line
        fig.add_trace(
            go.Scatter(x=timestamps, y=temps, mode='lines+markers',
                      name='Temperature', line=dict(color='#e74c3c', width=2)),
            row=1, col=1
        )
        
        # Humidity area chart
        fig.add_trace(
            go.Scatter(x=timestamps, y=humidity, fill='tozeroy',
                      name='Humidity', line=dict(color='#3498db')),
            row=2, col=1
        )
        
        # Wind speed bar chart
        fig.add_trace(
            go.Bar(x=timestamps, y=wind, name='Wind Speed',
                  marker=dict(color='#2ecc71')),
            row=3, col=1
        )
        
        # Update layout
        fig.update_layout(
            title_text=f'Weather Dashboard - {city}',
            showlegend=False,
            height=900,
            template='plotly_white'
        )
        
        fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='lightgray')
        fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='lightgray')
        
        fig.write_html(f'{city}_weather_dashboard.html')
        fig.show()
        print(f"Dashboard saved to {city}_weather_dashboard.html")

## Usage
## Get free API key from openweathermap.org
dashboard = WeatherDashboard(api_key='YOUR_API_KEY')

## Collect data for a week
cities = ['London', 'New York', 'Tokyo']
for city in cities:
    weather = dashboard.fetch_current_weather(city)
    if weather:
        dashboard.save_weather(weather)
        print(f"Saved weather for {city}")

## Generate dashboard
dashboard.create_dashboard('London')

Key Techniques Used:

  • βœ… External API integration (OpenWeatherMap)
  • βœ… Time-series data storage
  • βœ… Multi-panel interactive dashboards
  • βœ… Data aggregation and filtering

Common Mistakes and How to Avoid Them ⚠️

Mistake 1: Not Handling API Rate Limits

❌ Wrong:

for i in range(1000):
    response = requests.get(api_url)  # May get blocked!
    data = response.json()

βœ… Right:

import time

for i in range(1000):
    response = requests.get(api_url)
    if response.status_code == 429:  # Too Many Requests
        print("Rate limited, waiting...")
        time.sleep(60)
        continue
    data = response.json()
    time.sleep(0.5)  # Polite delay

Mistake 2: SQL Injection Vulnerabilities

❌ Wrong:

## NEVER concatenate user input into SQL!
user_input = "admin' OR '1'='1"
query = f"SELECT * FROM users WHERE username = '{user_input}'"
cursor.execute(query)  # DANGEROUS!

βœ… Right:

## Always use parameterized queries
user_input = "admin' OR '1'='1"
query = "SELECT * FROM users WHERE username = ?"
cursor.execute(query, (user_input,))  # Safe

Mistake 3: Not Closing Database Connections

❌ Wrong:

def get_data():
    conn = sqlite3.connect('data.db')
    cursor = conn.cursor()
    cursor.execute('SELECT * FROM table')
    return cursor.fetchall()
    # Connection never closed!

βœ… Right:

def get_data():
    conn = sqlite3.connect('data.db')
    try:
        cursor = conn.cursor()
        cursor.execute('SELECT * FROM table')
        return cursor.fetchall()
    finally:
        conn.close()  # Always close

## Or use context manager
def get_data():
    with sqlite3.connect('data.db') as conn:
        cursor = conn.cursor()
        cursor.execute('SELECT * FROM table')
        return cursor.fetchall()

Mistake 4: Hardcoding Sensitive Credentials

❌ Wrong:

## Never commit passwords to version control!
api_key = "sk-1234567890abcdef"
db_password = "MyPassword123"

βœ… Right:

import os
from dotenv import load_dotenv

## Store in .env file (add to .gitignore)
load_dotenv()
api_key = os.getenv('API_KEY')
db_password = os.getenv('DB_PASSWORD')

Mistake 5: Not Validating User Input in APIs

❌ Wrong:

@app.route('/api/user/<user_id>')
def get_user(user_id):
    # No validation!
    user = db.query(User).filter(User.id == user_id).first()
    return jsonify(user.to_dict())

βœ… Right:

from flask import abort

@app.route('/api/user/<user_id>')
def get_user(user_id):
    # Validate input
    try:
        user_id = int(user_id)
    except ValueError:
        abort(400, 'Invalid user ID format')
    
    if user_id < 1:
        abort(400, 'User ID must be positive')
    
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        abort(404, 'User not found')
    
    return jsonify(user.to_dict())

Mistake 6: Inefficient Data Loading

❌ Wrong:

## Loading entire dataset into memory
df = pd.read_csv('huge_file.csv')  # May crash with large files
filtered = df[df['value'] > 100]

βœ… Right:

## Process in chunks
chunk_size = 10000
for chunk in pd.read_csv('huge_file.csv', chunksize=chunk_size):
    filtered_chunk = chunk[chunk['value'] > 100]
    # Process chunk
    filtered_chunk.to_csv('output.csv', mode='a', header=False)

Key Takeaways 🎯

πŸ“‹ Quick Reference Card

Web Scraping Essentials

  • Use requests + BeautifulSoup for HTML parsing
  • Always set User-Agent headers
  • Add delays between requests (be polite!)
  • Check robots.txt before scraping
  • Handle errors gracefully

Building REST APIs

  • Flask: Simple, beginner-friendly
  • FastAPI: Modern, fast, auto-documentation
  • Use proper HTTP status codes (200, 201, 400, 404, 500)
  • Validate all input data
  • Return consistent JSON responses

Database Integration

  • SQLite: Built-in, perfect for prototypes
  • PostgreSQL: Production-ready, scalable
  • SQLAlchemy: ORM for type-safe database operations
  • Always use parameterized queries (prevent SQL injection)
  • Close connections properly (use context managers)

Data Visualization

  • Matplotlib: Static plots, scientific visualization
  • Plotly: Interactive charts, dashboards
  • Dash: Full web applications with callbacks
  • Choose library based on audience and interactivity needs

Security Best Practices

  • Never hardcode credentials (use environment variables)
  • Validate and sanitize all user input
  • Use HTTPS for API requests
  • Implement rate limiting
  • Log errors, not sensitive data

Performance Tips

  • Process large files in chunks
  • Use database indexes for queries
  • Cache frequently accessed data
  • Use async operations for I/O-bound tasks
  • Profile code to find bottlenecks

🧠 Memory Device: The Data Pipeline Acronym

C.R.U.D. + V.D.A.

  • Collect (Web scraping)
  • Retrieve (Database queries)
  • Update (Modify records)
  • Delete (Remove data)
  • Visualize (Create charts)
  • Deploy (Host application)
  • Automate (Schedule tasks)

πŸ”§ Project Ideas to Practice

  1. News Aggregator: Scrape multiple news sites, store in database, categorize with ML
  2. Personal Finance Tracker: API for expenses, dashboard with spending trends
  3. Social Media Analytics: Track follower growth, engagement rates, sentiment analysis
  4. Weather Comparison Tool: Multi-city weather tracking with historical trends
  5. Job Market Analyzer: Scrape job listings, analyze salary trends by location/skill
  6. Inventory Management System: REST API + dashboard for stock tracking
  7. Recipe Finder: Scrape recipe sites, search by ingredients, nutrition visualization

πŸ“š Further Study

Documentation & Tutorials:

Video Courses:

  • Corey Schafer's Flask/Django tutorials (YouTube) - Excellent free content
  • Sentdex's Data Analysis series - Practical real-world projects

Practice Platforms:

Books:

  • "Flask Web Development" by Miguel Grinberg
  • "Python for Data Analysis" by Wes McKinney (Pandas creator)

πŸš€ Next Steps: Pick one project idea, break it into small tasks, and start building! The best learning happens through hands-on practice.


πŸ’‘ Remember: Every professional application you useβ€”Instagram's feed, Spotify's recommendations, Amazon's product searchβ€”is built using these exact techniques. You now have the tools to build the next great application!