NumPy for Infrastructure Metrics
Overview
NumPy is essential for efficient numerical operations in DevOps/SRE contexts, particularly when dealing with large-scale metrics, performance data, and system monitoring. It provides the foundation for analyzing infrastructure patterns and anomalies.
Core Concepts with DevOps Applications
1. Arrays for Metrics Collection
DevOps Context: Storing and processing time-series metrics
import numpy as np
from datetime import datetime, timedelta
# CPU metrics collected every minute for the last hour
cpu_metrics = np.array([
45.2, 48.3, 52.1, 67.8, 72.3, 68.9, 71.2, 75.6, 82.3, 88.1,
92.5, 94.2, 91.8, 87.3, 83.2, 79.5, 76.8, 73.2, 69.8, 66.4,
62.1, 58.9, 55.3, 52.8, 49.2, 46.8, 44.3, 42.1, 40.8, 39.2,
38.5, 37.9, 38.2, 39.8, 42.3, 45.6, 48.9, 52.3, 56.7, 61.2,
65.8, 70.3, 74.8, 78.2, 81.5, 84.2, 86.3, 87.8, 88.9, 89.5,
90.1, 89.8, 88.3, 86.2, 83.8, 81.2, 78.5, 75.8, 73.2, 70.8
])
# Quick statistics
print(f"Average CPU: {np.mean(cpu_metrics):.2f}%")
print(f"Peak CPU: {np.max(cpu_metrics):.2f}%")
print(f"Min CPU: {np.min(cpu_metrics):.2f}%")
print(f"Std Dev: {np.std(cpu_metrics):.2f}")
# Find periods of high load (> 80%)
high_load_indices = np.where(cpu_metrics > 80)[0]
print(f"High load periods (minutes): {high_load_indices}")2. Multi-dimensional Arrays for Server Farms
DevOps Context: Managing metrics across multiple servers
# Metrics for 10 servers over 24 hours (hourly samples)
# Dimensions: [servers, hours, metrics_type]
# metrics_type: 0=CPU, 1=Memory, 2=Disk I/O
server_metrics = np.random.rand(10, 24, 3) * 100
# Server names for reference
server_names = [f"web-{i:02d}" for i in range(10)]
def analyze_server_health(metrics, server_names):
"""Analyze health metrics across server farm"""
# Average metrics per server
avg_per_server = np.mean(metrics, axis=1)
# Find servers with high average CPU (> 75%)
high_cpu_servers = np.where(avg_per_server[:, 0] > 75)[0]
# Find servers with high memory usage (> 85%)
high_mem_servers = np.where(avg_per_server[:, 1] > 85)[0]
# Calculate server health score (lower is better)
health_scores = np.sum(avg_per_server, axis=1) / 3
# Rank servers by health
ranked_indices = np.argsort(health_scores)[::-1]
print("Server Health Analysis:")
print("-" * 40)
for idx in ranked_indices[:5]: # Top 5 problematic servers
print(f"{server_names[idx]}: Score={health_scores[idx]:.1f} "
f"CPU={avg_per_server[idx, 0]:.1f}% "
f"Mem={avg_per_server[idx, 1]:.1f}% "
f"IO={avg_per_server[idx, 2]:.1f}%")
return {
"high_cpu": [server_names[i] for i in high_cpu_servers],
"high_memory": [server_names[i] for i in high_mem_servers],
"health_scores": dict(zip(server_names, health_scores))
}
# Analyze the server farm
results = analyze_server_health(server_metrics, server_names)3. Statistical Operations for Anomaly Detection
DevOps Context: Detecting unusual patterns in system behavior
def detect_anomalies(metrics, window_size=10, threshold=3):
"""
Detect anomalies using moving average and standard deviation
Used for identifying unusual spikes in metrics
"""
# Calculate moving average
moving_avg = np.convolve(metrics, np.ones(window_size)/window_size, mode='valid')
# Calculate moving standard deviation
moving_std = np.array([
np.std(metrics[i:i+window_size])
for i in range(len(metrics) - window_size + 1)
])
# Detect anomalies (values beyond threshold * std from mean)
anomalies = []
for i in range(len(moving_avg)):
actual_value = metrics[i + window_size - 1]
if abs(actual_value - moving_avg[i]) > threshold * moving_std[i]:
anomalies.append({
'index': i + window_size - 1,
'value': actual_value,
'expected': moving_avg[i],
'deviation': abs(actual_value - moving_avg[i]) / moving_std[i]
})
return anomalies
# Example: Detect anomalies in response times
response_times = np.random.normal(100, 15, 1000) # Normal: 100ms ± 15ms
# Inject some anomalies
response_times[150] = 400 # Spike
response_times[500] = 350 # Another spike
response_times[750] = 10 # Drop
anomalies = detect_anomalies(response_times)
print(f"Found {len(anomalies)} anomalies in response times")
for anomaly in anomalies[:5]:
print(f" Time index {anomaly['index']}: {anomaly['value']:.1f}ms "
f"(expected: {anomaly['expected']:.1f}ms, "
f"{anomaly['deviation']:.1f} std devs)")4. Array Operations for Capacity Planning
DevOps Context: Predicting resource needs based on historical data
def capacity_planning(historical_usage, growth_rate=0.1, forecast_days=30):
"""
Predict future capacity needs based on historical usage patterns
"""
# Calculate trend using linear regression
days = np.arange(len(historical_usage))
coefficients = np.polyfit(days, historical_usage, 1)
trend_line = np.poly1d(coefficients)
# Project future usage
future_days = np.arange(len(historical_usage),
len(historical_usage) + forecast_days)
projected_usage = trend_line(future_days)
# Add growth factor
projected_usage *= (1 + growth_rate)
# Calculate required capacity (with 20% buffer)
required_capacity = np.max(projected_usage) * 1.2
# Find when we'll exceed current capacity
current_capacity = 1000 # GB
days_until_capacity = np.where(projected_usage > current_capacity)[0]
return {
'current_avg': np.mean(historical_usage),
'projected_avg': np.mean(projected_usage),
'peak_projected': np.max(projected_usage),
'required_capacity': required_capacity,
'days_until_capacity_exceeded': days_until_capacity[0] if len(days_until_capacity) > 0 else None
}
# Historical storage usage (GB) over 90 days
storage_usage = np.linspace(600, 850, 90) + np.random.normal(0, 20, 90)
planning = capacity_planning(storage_usage)
print("Capacity Planning Report:")
print(f" Current Average: {planning['current_avg']:.1f} GB")
print(f" 30-day Projected Average: {planning['projected_avg']:.1f} GB")
print(f" Required Capacity: {planning['required_capacity']:.1f} GB")
if planning['days_until_capacity_exceeded']:
print(f" WARNING: Will exceed capacity in {planning['days_until_capacity_exceeded']} days!")5. Performance Optimization with NumPy
DevOps Context: Efficient processing of large-scale metrics
import time
def process_metrics_python(metrics):
"""Process metrics using pure Python (slow)"""
result = []
for value in metrics:
if value > 50:
result.append(value * 1.1)
else:
result.append(value * 0.9)
return result
def process_metrics_numpy(metrics):
"""Process metrics using NumPy (fast)"""
result = np.where(metrics > 50, metrics * 1.1, metrics * 0.9)
return result
# Compare performance
large_metrics = np.random.rand(1000000) * 100
# Python approach
start = time.time()
python_result = process_metrics_python(large_metrics.tolist())
python_time = time.time() - start
# NumPy approach
start = time.time()
numpy_result = process_metrics_numpy(large_metrics)
numpy_time = time.time() - start
print(f"Python processing time: {python_time:.4f} seconds")
print(f"NumPy processing time: {numpy_time:.4f} seconds")
print(f"Speed improvement: {python_time/numpy_time:.1f}x faster")Real-World DevOps Examples
Example 1: Network Traffic Analysis
import numpy as np
class NetworkTrafficAnalyzer:
"""Analyze network traffic patterns for capacity planning"""
def __init__(self, sampling_rate=60): # seconds
self.sampling_rate = sampling_rate
self.traffic_data = []
def add_sample(self, bytes_in, bytes_out, packets_in, packets_out):
"""Add a traffic sample"""
self.traffic_data.append([bytes_in, bytes_out, packets_in, packets_out])
def analyze_patterns(self):
"""Analyze traffic patterns"""
if len(self.traffic_data) < 2:
return None
data = np.array(self.traffic_data)
# Calculate bandwidth utilization (Mbps)
bytes_total = data[:, 0] + data[:, 1]
bandwidth_mbps = (bytes_total * 8) / (self.sampling_rate * 1_000_000)
# Packet analysis
packets_total = data[:, 2] + data[:, 3]
avg_packet_size = bytes_total / np.maximum(packets_total, 1)
# Detect traffic spikes (>2 std dev from mean)
mean_traffic = np.mean(bandwidth_mbps)
std_traffic = np.std(bandwidth_mbps)
spikes = np.where(bandwidth_mbps > mean_traffic + 2 * std_traffic)[0]
# Calculate percentiles for SLA monitoring
percentiles = np.percentile(bandwidth_mbps, [50, 95, 99])
return {
'avg_bandwidth_mbps': mean_traffic,
'peak_bandwidth_mbps': np.max(bandwidth_mbps),
'p50_bandwidth': percentiles[0],
'p95_bandwidth': percentiles[1],
'p99_bandwidth': percentiles[2],
'avg_packet_size': np.mean(avg_packet_size),
'traffic_spikes': len(spikes),
'spike_indices': spikes.tolist()
}
def predict_peak_hours(self, hourly_data):
"""Identify peak traffic hours"""
hourly_avg = np.mean(hourly_data.reshape(-1, 24), axis=0)
peak_hours = np.argsort(hourly_avg)[-3:] # Top 3 hours
return peak_hours, hourly_avg[peak_hours]
# Example usage
analyzer = NetworkTrafficAnalyzer()
# Simulate network traffic data
for _ in range(1440): # 24 hours of minute-by-minute data
bytes_in = np.random.exponential(1000000) # Exponential distribution for traffic
bytes_out = np.random.exponential(500000)
packets_in = int(bytes_in / 1500) # Assuming ~1500 byte packets
packets_out = int(bytes_out / 1500)
analyzer.add_sample(bytes_in, bytes_out, packets_in, packets_out)
results = analyzer.analyze_patterns()
print("Network Traffic Analysis:")
for key, value in results.items():
if isinstance(value, float):
print(f" {key}: {value:.2f}")
else:
print(f" {key}: {value}")Example 2: Load Balancer Distribution Analysis
def analyze_load_distribution(request_counts, server_names):
"""
Analyze how well load is distributed across servers
"""
total_requests = np.sum(request_counts)
expected_per_server = total_requests / len(request_counts)
# Calculate distribution metrics
distribution = request_counts / total_requests * 100
std_dev = np.std(request_counts)
cv = (std_dev / np.mean(request_counts)) * 100 # Coefficient of variation
# Chi-square test for uniformity
chi_square = np.sum((request_counts - expected_per_server) ** 2 / expected_per_server)
# Identify over/under utilized servers
deviation_pct = ((request_counts - expected_per_server) / expected_per_server) * 100
overloaded = np.where(deviation_pct > 20)[0]
underutilized = np.where(deviation_pct < -20)[0]
print("Load Balancer Analysis:")
print(f" Total Requests: {total_requests:,.0f}")
print(f" Expected per server: {expected_per_server:,.0f}")
print(f" Standard Deviation: {std_dev:,.0f}")
print(f" Coefficient of Variation: {cv:.1f}%")
print(f" Chi-square statistic: {chi_square:.2f}")
if len(overloaded) > 0:
print(f"\n Overloaded servers (>20% above expected):")
for idx in overloaded:
print(f" {server_names[idx]}: {request_counts[idx]:,.0f} "
f"({deviation_pct[idx]:+.1f}%)")
if len(underutilized) > 0:
print(f"\n Underutilized servers (>20% below expected):")
for idx in underutilized:
print(f" {server_names[idx]}: {request_counts[idx]:,.0f} "
f"({deviation_pct[idx]:+.1f}%)")
return {
'distribution': distribution,
'cv': cv,
'chi_square': chi_square,
'balanced': cv < 10 # Consider balanced if CV < 10%
}
# Example: Analyze load distribution across 8 servers
server_names = [f"app-{i:02d}" for i in range(8)]
request_counts = np.array([98500, 102300, 99800, 121000, 95600, 97200, 103400, 98200])
analysis = analyze_load_distribution(request_counts, server_names)Example 3: SLA Compliance Monitoring
def calculate_sla_metrics(response_times, sla_target=200):
"""
Calculate SLA compliance metrics for response times
"""
# Remove outliers using IQR method
q1, q3 = np.percentile(response_times, [25, 75])
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
filtered_times = response_times[(response_times >= lower_bound) &
(response_times <= upper_bound)]
# Calculate percentiles
percentiles = np.percentile(filtered_times, [50, 90, 95, 99, 99.9])
# SLA compliance
compliance_rate = (np.sum(response_times <= sla_target) /
len(response_times)) * 100
# Apdex score (Application Performance Index)
satisfied = np.sum(response_times <= sla_target)
tolerating = np.sum((response_times > sla_target) &
(response_times <= sla_target * 4))
apdex = (satisfied + tolerating * 0.5) / len(response_times)
print("SLA Compliance Report:")
print(f" Target SLA: {sla_target}ms")
print(f" Compliance Rate: {compliance_rate:.2f}%")
print(f" Apdex Score: {apdex:.3f}")
print(f" Median (P50): {percentiles[0]:.1f}ms")
print(f" P90: {percentiles[1]:.1f}ms")
print(f" P95: {percentiles[2]:.1f}ms")
print(f" P99: {percentiles[3]:.1f}ms")
print(f" P99.9: {percentiles[4]:.1f}ms")
return {
'compliance_rate': compliance_rate,
'apdex': apdex,
'percentiles': dict(zip(['p50', 'p90', 'p95', 'p99', 'p99.9'], percentiles))
}
# Generate sample response times (mix of normal and some slow requests)
normal_responses = np.random.normal(150, 30, 9500)
slow_responses = np.random.normal(400, 100, 500)
response_times = np.concatenate([normal_responses, slow_responses])
np.random.shuffle(response_times)
sla_metrics = calculate_sla_metrics(response_times)Practice Exercises
Metric Aggregation: Write a function that aggregates metrics from multiple data centers and calculates weighted averages based on traffic volume.
Trend Detection: Implement a function that detects upward or downward trends in system metrics using linear regression.
Resource Correlation: Create a script that finds correlations between different metrics (CPU, memory, network) to identify resource bottlenecks.
Percentile Monitoring: Build a monitoring system that tracks percentile-based SLIs (Service Level Indicators) over time.