Pandas for Log Analysis
Overview
Pandas is the go-to library for analyzing structured data in DevOps/SRE contexts. It excels at processing logs, analyzing incidents, tracking deployments, and generating reports from various infrastructure data sources.
Core Concepts with DevOps Applications
1. DataFrames for Log Analysis
DevOps Context: Parsing and analyzing application logs
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import re
# Sample log data
log_data = """
2024-01-15 10:23:45 INFO [api-gateway] Request from 192.168.1.100 - GET /api/users - 200 - 145ms
2024-01-15 10:23:46 ERROR [auth-service] Authentication failed for user john.doe - 401 - 23ms
2024-01-15 10:23:47 INFO [api-gateway] Request from 192.168.1.101 - POST /api/orders - 201 - 523ms
2024-01-15 10:23:48 WARN [database] Query timeout on orders table - 1245ms
2024-01-15 10:23:49 INFO [api-gateway] Request from 192.168.1.100 - GET /api/products - 200 - 89ms
2024-01-15 10:23:50 ERROR [payment-service] Payment processing failed - Transaction ID: TX123456 - 500 - 2341ms
2024-01-15 10:23:51 INFO [api-gateway] Request from 192.168.1.102 - GET /api/users/123 - 404 - 12ms
2024-01-15 10:23:52 WARN [cache-service] Cache miss rate high: 45% - Performance degradation expected
2024-01-15 10:23:53 INFO [api-gateway] Request from 192.168.1.100 - DELETE /api/sessions - 204 - 34ms
2024-01-15 10:23:54 ERROR [api-gateway] Request from 192.168.1.103 - GET /api/reports - 503 - Service Unavailable
"""
def parse_logs_to_dataframe(log_text):
"""Parse log text into a structured DataFrame"""
lines = log_text.strip().split('\n')
parsed_logs = []
for line in lines:
# Parse log line using regex
pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (\w+) \[([^\]]+)\] (.+)'
match = re.match(pattern, line)
if match:
timestamp_str, level, service, message = match.groups()
# Extract response time if present
time_match = re.search(r'(\d+)ms', message)
response_time = int(time_match.group(1)) if time_match else None
# Extract status code if present
status_match = re.search(r'\b(\d{3})\b', message)
status_code = int(status_match.group(1)) if status_match else None
parsed_logs.append({
'timestamp': pd.to_datetime(timestamp_str),
'level': level,
'service': service,
'message': message,
'response_time_ms': response_time,
'status_code': status_code
})
return pd.DataFrame(parsed_logs)
# Parse logs into DataFrame
df_logs = parse_logs_to_dataframe(log_data)
# Analyze log data
print("Log Analysis Summary:")
print("-" * 50)
print(f"Total log entries: {len(df_logs)}")
print(f"\nLog levels distribution:")
print(df_logs['level'].value_counts())
print(f"\nServices with errors:")
error_services = df_logs[df_logs['level'] == 'ERROR']['service'].value_counts()
print(error_services)
print(f"\nAverage response time by service:")
response_times = df_logs.groupby('service')['response_time_ms'].mean().dropna()
print(response_times.round(2))2. Time Series Analysis for Metrics
DevOps Context: Analyzing infrastructure metrics over time
# Generate sample infrastructure metrics
def generate_infrastructure_metrics():
"""Generate realistic infrastructure metrics data"""
# Create time index
date_range = pd.date_range(
start='2024-01-01 00:00:00',
end='2024-01-07 23:59:59',
freq='1H'
)
# Generate metrics with daily and weekly patterns
n_points = len(date_range)
# CPU usage with daily pattern (higher during business hours)
base_cpu = 30
daily_pattern = np.sin(np.arange(n_points) * 2 * np.pi / 24) * 20
weekly_pattern = np.where(
pd.Series(date_range).dt.dayofweek.isin([5, 6]), # Weekend
-10, 0
)
cpu_usage = base_cpu + daily_pattern + weekly_pattern + np.random.normal(0, 5, n_points)
cpu_usage = np.clip(cpu_usage, 5, 95)
# Memory usage (gradual increase with occasional drops)
memory_usage = 40 + np.cumsum(np.random.normal(0.1, 2, n_points)) / 50
memory_usage = np.clip(memory_usage, 20, 90)
# Request count (business hours pattern)
hour_of_day = pd.Series(date_range).dt.hour
is_business_hours = (hour_of_day >= 8) & (hour_of_day <= 18)
is_weekday = ~pd.Series(date_range).dt.dayofweek.isin([5, 6])
request_count = np.where(
is_business_hours & is_weekday,
np.random.poisson(1000, n_points),
np.random.poisson(200, n_points)
)
# Error rate (increases with load)
error_rate = (request_count / 1000) * np.random.uniform(0.5, 2, n_points)
df = pd.DataFrame({
'timestamp': date_range,
'cpu_usage': cpu_usage,
'memory_usage': memory_usage,
'request_count': request_count,
'error_rate': error_rate,
'response_time_p99': 100 + (cpu_usage * 2) + np.random.normal(0, 20, n_points)
})
return df
# Create metrics DataFrame
df_metrics = generate_infrastructure_metrics()
# Set timestamp as index for time series operations
df_metrics.set_index('timestamp', inplace=True)
# Resample to different time windows
hourly_avg = df_metrics.resample('1H').mean()
daily_avg = df_metrics.resample('1D').mean()
daily_max = df_metrics.resample('1D').max()
print("Infrastructure Metrics Analysis:")
print("-" * 50)
print("\nDaily Average Metrics:")
print(daily_avg[['cpu_usage', 'memory_usage', 'error_rate']].round(2))
# Identify peak usage times
peak_hours = df_metrics.groupby(df_metrics.index.hour)['request_count'].mean().sort_values(ascending=False)
print(f"\nTop 5 Peak Hours (by average request count):")
print(peak_hours.head().round(0))
# Correlation analysis
print("\nMetric Correlations:")
correlations = df_metrics[['cpu_usage', 'memory_usage', 'request_count', 'error_rate', 'response_time_p99']].corr()
print(correlations.round(3))3. Incident Analysis and Reporting
DevOps Context: Tracking and analyzing production incidents
# Create incident tracking DataFrame
incidents_data = {
'incident_id': ['INC001', 'INC002', 'INC003', 'INC004', 'INC005', 'INC006', 'INC007', 'INC008'],
'timestamp': pd.to_datetime([
'2024-01-01 03:45:00', '2024-01-02 14:30:00', '2024-01-03 09:15:00',
'2024-01-04 22:00:00', '2024-01-05 11:20:00', '2024-01-06 16:45:00',
'2024-01-07 08:30:00', '2024-01-08 20:15:00'
]),
'service': ['auth-service', 'database', 'api-gateway', 'payment-service',
'cache-service', 'database', 'auth-service', 'api-gateway'],
'severity': ['P1', 'P2', 'P3', 'P1', 'P2', 'P1', 'P3', 'P2'],
'duration_minutes': [45, 120, 15, 180, 60, 240, 30, 90],
'root_cause': ['Memory leak', 'Slow queries', 'Network timeout', 'Third-party API down',
'Redis connection pool', 'Disk space', 'Certificate expired', 'DDoS attack'],
'affected_users': [5000, 2000, 500, 10000, 3000, 15000, 1000, 8000],
'resolved_by': ['John', 'Sarah', 'Mike', 'John', 'Sarah', 'Mike', 'John', 'Sarah']
}
df_incidents = pd.DataFrame(incidents_data)
# Add calculated fields
df_incidents['mttr_hours'] = df_incidents['duration_minutes'] / 60
df_incidents['impact_score'] = (
df_incidents['affected_users'] *
df_incidents['severity'].map({'P1': 3, 'P2': 2, 'P3': 1}) *
df_incidents['duration_minutes'] / 60
)
# Incident analysis
print("Incident Analysis Report:")
print("-" * 50)
# MTTR by severity
mttr_by_severity = df_incidents.groupby('severity')['mttr_hours'].agg(['mean', 'median', 'max'])
print("\nMTTR by Severity (hours):")
print(mttr_by_severity.round(2))
# Most problematic services
service_incidents = df_incidents.groupby('service').agg({
'incident_id': 'count',
'duration_minutes': 'sum',
'affected_users': 'sum',
'impact_score': 'sum'
}).rename(columns={'incident_id': 'incident_count', 'duration_minutes': 'total_downtime_min'})
service_incidents = service_incidents.sort_values('impact_score', ascending=False)
print("\nService Reliability Report:")
print(service_incidents)
# Root cause analysis
root_cause_freq = df_incidents['root_cause'].value_counts()
print("\nTop Root Causes:")
print(root_cause_freq)
# On-call performance
oncall_stats = df_incidents.groupby('resolved_by').agg({
'incident_id': 'count',
'mttr_hours': 'mean',
'severity': lambda x: (x == 'P1').sum()
}).rename(columns={'incident_id': 'incidents_resolved', 'severity': 'p1_incidents'})
print("\nOn-Call Engineer Performance:")
print(oncall_stats.round(2))4. Deployment Tracking and Analysis
DevOps Context: Monitoring deployment success rates and rollback patterns
# Create deployment tracking data
deployments = {
'deployment_id': [f'DEP{i:04d}' for i in range(1, 51)],
'timestamp': pd.date_range(start='2024-01-01', periods=50, freq='4H'),
'service': np.random.choice(['api', 'auth', 'database', 'frontend', 'backend'], 50),
'environment': np.random.choice(['dev', 'staging', 'production'], 50, p=[0.4, 0.3, 0.3]),
'version': [f'v1.{i//10}.{i%10}' for i in range(50)],
'deployment_method': np.random.choice(['blue-green', 'canary', 'rolling'], 50),
'duration_seconds': np.random.normal(300, 100, 50).clip(60, 600),
'status': np.random.choice(['success', 'failed', 'rolled_back'], 50, p=[0.8, 0.1, 0.1]),
'deployed_by': np.random.choice(['CI/CD', 'Manual'], 50, p=[0.7, 0.3])
}
df_deployments = pd.DataFrame(deployments)
# Add calculated fields
df_deployments['date'] = df_deployments['timestamp'].dt.date
df_deployments['hour'] = df_deployments['timestamp'].dt.hour
df_deployments['weekday'] = df_deployments['timestamp'].dt.day_name()
df_deployments['is_business_hours'] = df_deployments['hour'].between(9, 17)
# Deployment success analysis
print("Deployment Analysis Report:")
print("-" * 50)
# Success rate by environment
success_by_env = pd.crosstab(
df_deployments['environment'],
df_deployments['status'],
normalize='index'
) * 100
print("\nSuccess Rate by Environment (%):")
print(success_by_env.round(1))
# Deployment frequency and patterns
deploy_by_day = df_deployments.groupby('weekday')['deployment_id'].count()
deploy_by_day = deploy_by_day.reindex(['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'])
print("\nDeployments by Day of Week:")
print(deploy_by_day)
# Best practices compliance
df_deployments['follows_best_practice'] = (
(df_deployments['environment'] != 'production') |
(df_deployments['is_business_hours'] & (df_deployments['weekday'].isin(['Monday', 'Tuesday', 'Wednesday', 'Thursday'])))
)
compliance_rate = df_deployments['follows_best_practice'].mean() * 100
print(f"\nBest Practice Compliance Rate: {compliance_rate:.1f}%")
# Service deployment reliability
service_reliability = df_deployments.groupby('service').agg({
'status': lambda x: (x == 'success').mean() * 100,
'duration_seconds': 'mean',
'deployment_id': 'count'
}).rename(columns={
'status': 'success_rate',
'duration_seconds': 'avg_duration',
'deployment_id': 'total_deployments'
})
print("\nService Deployment Reliability:")
print(service_reliability.round(1))5. Cost Analysis and Optimization
DevOps Context: Analyzing cloud infrastructure costs
# Generate cloud cost data
def generate_cloud_costs():
dates = pd.date_range(start='2024-01-01', end='2024-01-31', freq='D')
services = []
for date in dates:
# Compute costs (higher on weekdays)
is_weekday = date.weekday() < 5
compute_base = 500 if is_weekday else 300
services.append({
'date': date,
'service': 'EC2',
'cost': compute_base + np.random.normal(0, 50),
'usage_hours': (24 * 20) if is_weekday else (24 * 10),
'region': 'us-east-1'
})
# Storage costs (gradually increasing)
day_num = (date - dates[0]).days
services.append({
'date': date,
'service': 'S3',
'cost': 100 + day_num * 2 + np.random.normal(0, 10),
'usage_gb': 5000 + day_num * 100,
'region': 'us-east-1'
})
# Database costs
services.append({
'date': date,
'service': 'RDS',
'cost': 200 + np.random.normal(0, 20),
'usage_hours': 24,
'region': 'us-east-1'
})
# Network costs
services.append({
'date': date,
'service': 'CloudFront',
'cost': 50 + np.random.exponential(30),
'usage_gb': np.random.exponential(1000),
'region': 'global'
})
return pd.DataFrame(services)
df_costs = generate_cloud_costs()
# Cost analysis
print("Cloud Cost Analysis:")
print("-" * 50)
# Total costs by service
total_by_service = df_costs.groupby('service')['cost'].sum().sort_values(ascending=False)
print("\nTotal Costs by Service (January):")
print(total_by_service.round(2))
# Daily cost trends
daily_costs = df_costs.groupby('date')['cost'].sum()
print(f"\nDaily Cost Statistics:")
print(f" Average: ${daily_costs.mean():.2f}")
print(f" Minimum: ${daily_costs.min():.2f}")
print(f" Maximum: ${daily_costs.max():.2f}")
print(f" Total: ${daily_costs.sum():.2f}")
# Cost efficiency metrics
df_costs['cost_per_unit'] = df_costs.apply(
lambda row: row['cost'] / row.get('usage_hours', row.get('usage_gb', 1)),
axis=1
)
efficiency = df_costs.groupby('service')['cost_per_unit'].mean()
print("\nCost Efficiency (per unit):")
print(efficiency.round(4))
# Identify cost anomalies
service_daily_avg = df_costs.groupby(['service', 'date'])['cost'].sum().reset_index()
for service in df_costs['service'].unique():
service_data = service_daily_avg[service_daily_avg['service'] == service]['cost']
mean_cost = service_data.mean()
std_cost = service_data.std()
anomalies = service_data[service_data > mean_cost + 2 * std_cost]
if len(anomalies) > 0:
print(f"\nCost anomalies detected for {service}:")
print(f" Normal range: ${mean_cost - 2*std_cost:.2f} - ${mean_cost + 2*std_cost:.2f}")
print(f" Anomalies found: {len(anomalies)} days")Real-World DevOps Examples
Example 1: API Performance Dashboard Data Preparation
class APIPerformanceAnalyzer:
"""Analyze API performance metrics from logs"""
def __init__(self):
self.df = None
def load_api_logs(self, log_file=None):
"""Load and parse API access logs"""
# Simulate API log data
n_records = 10000
endpoints = ['/api/users', '/api/orders', '/api/products', '/api/auth/login', '/api/health']
methods = ['GET', 'POST', 'PUT', 'DELETE']
data = {
'timestamp': pd.date_range(start='2024-01-01', periods=n_records, freq='1min'),
'endpoint': np.random.choice(endpoints, n_records, p=[0.3, 0.25, 0.25, 0.15, 0.05]),
'method': np.random.choice(methods, n_records, p=[0.5, 0.3, 0.15, 0.05]),
'status_code': np.random.choice([200, 201, 400, 401, 404, 500, 503], n_records,
p=[0.7, 0.1, 0.05, 0.03, 0.05, 0.05, 0.02]),
'response_time_ms': np.random.lognormal(4, 1, n_records),
'user_id': np.random.choice(range(1, 1001), n_records),
'ip_address': [f"192.168.1.{np.random.randint(1, 255)}" for _ in range(n_records)]
}
self.df = pd.DataFrame(data)
self.df['success'] = self.df['status_code'] < 400
self.df['hour'] = self.df['timestamp'].dt.hour
def calculate_sli_metrics(self, time_window='1H'):
"""Calculate Service Level Indicators"""
# Resample data by time window
resampled = self.df.set_index('timestamp').resample(time_window)
sli_metrics = pd.DataFrame({
'availability': resampled['success'].mean() * 100,
'p50_latency': resampled['response_time_ms'].quantile(0.5),
'p95_latency': resampled['response_time_ms'].quantile(0.95),
'p99_latency': resampled['response_time_ms'].quantile(0.99),
'error_rate': (1 - resampled['success'].mean()) * 100,
'request_rate': resampled.size() / (pd.Timedelta(time_window).seconds / 60) # requests per minute
})
return sli_metrics.round(2)
def endpoint_analysis(self):
"""Analyze performance by endpoint"""
endpoint_stats = self.df.groupby('endpoint').agg({
'response_time_ms': ['mean', 'median', lambda x: x.quantile(0.95)],
'success': lambda x: x.mean() * 100,
'status_code': 'count'
}).round(2)
endpoint_stats.columns = ['avg_response', 'median_response', 'p95_response', 'success_rate', 'request_count']
# Add error breakdown
error_breakdown = self.df[~self.df['success']].groupby(['endpoint', 'status_code']).size().unstack(fill_value=0)
return endpoint_stats, error_breakdown
def identify_problematic_users(self, threshold_error_rate=20):
"""Identify users with high error rates"""
user_stats = self.df.groupby('user_id').agg({
'success': lambda x: (1 - x.mean()) * 100, # error rate
'response_time_ms': 'mean',
'status_code': 'count'
}).rename(columns={'success': 'error_rate', 'status_code': 'request_count'})
problematic_users = user_stats[user_stats['error_rate'] > threshold_error_rate]
return problematic_users.sort_values('error_rate', ascending=False)
# Use the analyzer
analyzer = APIPerformanceAnalyzer()
analyzer.load_api_logs()
# Get SLI metrics
sli_metrics = analyzer.calculate_sli_metrics('1H')
print("Hourly SLI Metrics (last 5 hours):")
print(sli_metrics.tail())
# Analyze endpoints
endpoint_stats, error_breakdown = analyzer.endpoint_analysis()
print("\nEndpoint Performance Summary:")
print(endpoint_stats)
# Find problematic users
problematic_users = analyzer.identify_problematic_users()
if len(problematic_users) > 0:
print(f"\nUsers with >20% error rate: {len(problematic_users)}")
print(problematic_users.head())Example 2: Kubernetes Pod Resource Analysis
def analyze_k8s_metrics():
"""Analyze Kubernetes pod metrics"""
# Simulate K8s pod metrics
pods = []
namespaces = ['default', 'production', 'staging', 'monitoring']
for i in range(100):
namespace = np.random.choice(namespaces)
pods.append({
'pod_name': f"{namespace}-app-{i:03d}",
'namespace': namespace,
'node': f"node-{np.random.randint(1, 11):02d}",
'cpu_request_m': np.random.choice([100, 250, 500, 1000]),
'cpu_usage_m': np.random.exponential(200),
'memory_request_mi': np.random.choice([128, 256, 512, 1024]),
'memory_usage_mi': np.random.normal(300, 100),
'restart_count': np.random.poisson(0.5),
'age_days': np.random.exponential(30),
'status': np.random.choice(['Running', 'Pending', 'CrashLoopBackOff', 'Error'],
p=[0.9, 0.05, 0.03, 0.02])
})
df_pods = pd.DataFrame(pods)
# Calculate resource efficiency
df_pods['cpu_efficiency'] = (df_pods['cpu_usage_m'] / df_pods['cpu_request_m']) * 100
df_pods['memory_efficiency'] = (df_pods['memory_usage_mi'] / df_pods['memory_request_mi']) * 100
# Identify resource issues
df_pods['over_cpu'] = df_pods['cpu_efficiency'] > 90
df_pods['under_cpu'] = df_pods['cpu_efficiency'] < 10
df_pods['over_memory'] = df_pods['memory_efficiency'] > 90
df_pods['under_memory'] = df_pods['memory_efficiency'] < 10
print("Kubernetes Resource Analysis:")
print("-" * 50)
# Namespace summary
namespace_summary = df_pods.groupby('namespace').agg({
'pod_name': 'count',
'cpu_usage_m': 'sum',
'memory_usage_mi': 'sum',
'restart_count': 'sum',
'status': lambda x: (x != 'Running').sum()
}).rename(columns={
'pod_name': 'pod_count',
'cpu_usage_m': 'total_cpu_m',
'memory_usage_mi': 'total_memory_mi',
'restart_count': 'total_restarts',
'status': 'unhealthy_pods'
})
print("\nNamespace Resource Usage:")
print(namespace_summary)
# Node distribution
node_load = df_pods.groupby('node').agg({
'pod_name': 'count',
'cpu_usage_m': 'sum',
'memory_usage_mi': 'sum'
}).rename(columns={'pod_name': 'pod_count'})
print("\nNode Load Distribution:")
print(node_load.sort_values('cpu_usage_m', ascending=False).head())
# Resource optimization opportunities
print("\nResource Optimization Opportunities:")
print(f" Pods with <10% CPU usage: {df_pods['under_cpu'].sum()}")
print(f" Pods with >90% CPU usage: {df_pods['over_cpu'].sum()}")
print(f" Pods with <10% Memory usage: {df_pods['under_memory'].sum()}")
print(f" Pods with >90% Memory usage: {df_pods['over_memory'].sum()}")
# Problem pods
problem_pods = df_pods[
(df_pods['status'] != 'Running') |
(df_pods['restart_count'] > 3) |
(df_pods['over_cpu']) |
(df_pods['over_memory'])
][['pod_name', 'namespace', 'status', 'restart_count', 'cpu_efficiency', 'memory_efficiency']]
if len(problem_pods) > 0:
print(f"\nProblem Pods ({len(problem_pods)} found):")
print(problem_pods.head(10))
return df_pods
# Run the analysis
k8s_metrics = analyze_k8s_metrics()Practice Exercises
Log Aggregation Pipeline: Build a pipeline that ingests logs from multiple sources, normalizes them, and creates a unified dashboard dataset.
Capacity Forecasting: Use historical data to predict when resources will need scaling based on growth trends.
Incident Correlation: Analyze incident data to find patterns and correlations between different types of failures.
Cost Optimization Report: Create a comprehensive cost analysis report that identifies optimization opportunities.