Python教程(二十九):JSON数据处理

今日目标

o 理解JSON格式的特点和用途

o 掌握Python中JSON的序列化和反序列化

o 学会处理复杂的JSON数据结构

o 了解JSON在Web API中的应用

JSON简介

JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,具有以下特点:

o 人类可读:格式简洁,易于理解

o 语言无关:几乎所有编程语言都支持

o 结构化:支持嵌套的数据结构

o 标准化:被广泛用于Web API和数据交换

JSON数据类型

# JSON支持的数据类型
{
    "string": "Hello World",
    "number": 42,
    "float": 3.14,
    "boolean": true,
    "null": null,
    "array": [1, 2, 3, "four"],
    "object": {
        "name": "Python",
        "version": 3.9
    }
}

Python中的JSON操作

1. 导入JSON模块

import json

2. JSON序列化(Python对象 → JSON字符串)

# 基本数据类型
data = {
    "name": "张三",
    "age": 25,
    "is_student": True,
    "hobbies": ["编程", "读书", "运动"],
    "address": {
        "city": "北京",
        "district": "朝阳区"
    }
}

# 转换为JSON字符串
json_string = json.dumps(data, ensure_ascii=False, indent=2)
print(json_string)

输出:

{
  "name": "张三",
  "age": 25,
  "is_student": true,
  "hobbies": ["编程", "读书", "运动"],
  "address": {
    "city": "北京",
    "district": "朝阳区"
  }
}

3. JSON反序列化(JSON字符串 → Python对象)

# 从JSON字符串解析数据
json_data = '''
{
    "name": "李四",
    "age": 30,
    "skills": ["Python", "JavaScript", "SQL"],
    "projects": [
        {"name": "电商网站", "duration": "3个月"},
        {"name": "数据分析工具", "duration": "2个月"}
    ]
}
'''

# 解析JSON
parsed_data = json.loads(json_data)
print(f"姓名: {parsed_data['name']}")
print(f"技能: {', '.join(parsed_data['skills'])}")
print(f"项目数量: {len(parsed_data['projects'])}")

高级JSON操作

1. 自定义序列化

import json
from datetime import datetime

class User:
    def __init__(self, name, age, created_at):
        self.name = name
        self.age = age
        self.created_at = created_at

# 自定义JSON编码器
class CustomEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        elif isinstance(obj, User):
            return {
                "name": obj.name,
                "age": obj.age,
                "created_at": obj.created_at.isoformat()
            }
        return super().default(obj)

# 使用自定义编码器
user = User("王五", 28, datetime.now())
json_data = json.dumps(user, cls=CustomEncoder, ensure_ascii=False)
print(json_data)

2. 处理复杂数据结构

# 处理包含特殊字符的数据
complex_data = {
    "message": "Hello\nWorld\tTab",
    "special_chars": "引号\"和反斜杠\\",
    "unicode": "中文和emoji "
}

# 序列化时处理特殊字符
json_string = json.dumps(complex_data, ensure_ascii=False, indent=2)
print(json_string)

# 从文件读取JSON
with open('data.json', 'w', encoding='utf-8') as f:
    json.dump(complex_data, f, ensure_ascii=False, indent=2)

# 从文件读取JSON
with open('data.json', 'r', encoding='utf-8') as f:
    loaded_data = json.load(f)
    print(loaded_data)

3. JSON验证和错误处理

def safe_json_parse(json_string):
    """安全地解析JSON字符串"""
    try:
        return json.loads(json_string)
    except json.JSONDecodeError as e:
        print(f"JSON解析错误: {e}")
        return None
    except Exception as e:
        print(f"其他错误: {e}")
        return None

# 测试错误处理
invalid_json = '{"name": "test", "age": 25,}'  # 多余的逗号
result = safe_json_parse(invalid_json)
if result is None:
    print("JSON解析失败")

真实应用示例

1. 配置文件管理

import json
import os

class ConfigManager:
    def __init__(self, config_file="config.json"):
        self.config_file = config_file
        self.config = self.load_config()
    
    def load_config(self):
        """加载配置文件"""
        if os.path.exists(self.config_file):
            try:
                with open(self.config_file, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except json.JSONDecodeError:
                print("配置文件格式错误,使用默认配置")
                return self.get_default_config()
        else:
            return self.get_default_config()
    
    def save_config(self):
        """保存配置文件"""
        with open(self.config_file, 'w', encoding='utf-8') as f:
            json.dump(self.config, f, ensure_ascii=False, indent=2)
    
    def get_default_config(self):
        """获取默认配置"""
        return {
            "database": {
                "host": "localhost",
                "port": 5432,
                "name": "myapp"
            },
            "api": {
                "base_url": "https://api.example.com",
                "timeout": 30
            },
            "logging": {
                "level": "INFO",
                "file": "app.log"
            }
        }
    
    def get(self, key, default=None):
        """获取配置值"""
        keys = key.split('.')
        value = self.config
        for k in keys:
            if isinstance(value, dict) and k in value:
                value = value[k]
            else:
                return default
        return value
    
    def set(self, key, value):
        """设置配置值"""
        keys = key.split('.')
        config = self.config
        for k in keys[:-1]:
            if k not in config:
                config[k] = {}
            config = config[k]
        config[keys[-1]] = value

# 使用示例
config = ConfigManager()
print(f"数据库主机: {config.get('database.host')}")
print(f"API超时: {config.get('api.timeout')}")

# 修改配置
config.set('database.port', 5433)
config.save_config()

2. API数据处理

import json
import requests

class APIClient:
    def __init__(self, base_url):
        self.base_url = base_url
        self.session = requests.Session()
    
    def get_users(self):
        """获取用户列表"""
        try:
            response = self.session.get(f"{self.base_url}/users")
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            print(f"API请求错误: {e}")
            return None
    
    def create_user(self, user_data):
        """创建新用户"""
        try:
            headers = {'Content-Type': 'application/json'}
            response = self.session.post(
                f"{self.base_url}/users",
                data=json.dumps(user_data, ensure_ascii=False),
                headers=headers
            )
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            print(f"创建用户失败: {e}")
            return None
    
    def update_user(self, user_id, user_data):
        """更新用户信息"""
        try:
            headers = {'Content-Type': 'application/json'}
            response = self.session.put(
                f"{self.base_url}/users/{user_id}",
                data=json.dumps(user_data, ensure_ascii=False),
                headers=headers
            )
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            print(f"更新用户失败: {e}")
            return None

# 使用示例
api_client = APIClient("https://jsonplaceholder.typicode.com")

# 获取用户列表
users = api_client.get_users()
if users:
    print(f"获取到 {len(users)} 个用户")
    for user in users[:3]:  # 显示前3个用户
        print(f"- {user['name']} ({user['email']})")

# 创建新用户
new_user = {
    "name": "张三",
    "email": "zhangsan@example.com",
    "phone": "13800138000"
}

created_user = api_client.create_user(new_user)
if created_user:
    print(f"用户创建成功,ID: {created_user.get('id')}")

3. 数据分析和处理

import json
import pandas as pd
from collections import Counter

class DataAnalyzer:
    def __init__(self):
        self.data = []
    
    def load_from_json(self, file_path):
        """从JSON文件加载数据"""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                self.data = json.load(f)
            print(f"成功加载 {len(self.data)} 条数据")
        except Exception as e:
            print(f"加载数据失败: {e}")
    
    def analyze_sales_data(self):
        """分析销售数据"""
        if not self.data:
            print("没有数据可分析")
            return
        
        # 转换为DataFrame
        df = pd.DataFrame(self.data)
        
        # 基本统计
        print("=== 销售数据分析 ===")
        print(f"总销售额: {df['amount'].sum():.2f}")
        print(f"平均订单金额: {df['amount'].mean():.2f}")
        print(f"最大订单金额: {df['amount'].max():.2f}")
        print(f"最小订单金额: {df['amount'].min():.2f}")
        
        # 按产品分类统计
        product_stats = df.groupby('product')['amount'].agg(['sum', 'count', 'mean'])
        print("\n=== 产品统计 ===")
        print(product_stats)
        
        # 按地区统计
        region_stats = df.groupby('region')['amount'].sum().sort_values(ascending=False)
        print("\n=== 地区销售排名 ===")
        print(region_stats)
        
        return {
            'total_sales': df['amount'].sum(),
            'avg_order': df['amount'].mean(),
            'product_stats': product_stats.to_dict(),
            'region_stats': region_stats.to_dict()
        }
    
    def export_analysis(self, analysis_result, output_file):
        """导出分析结果"""
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(analysis_result, f, ensure_ascii=False, indent=2)
        print(f"分析结果已导出到: {output_file}")

# 示例数据
sample_data = [
    {"product": "笔记本电脑", "amount": 5999, "region": "北京", "date": "2024-01-15"},
    {"product": "手机", "amount": 3999, "region": "上海", "date": "2024-01-16"},
    {"product": "平板电脑", "amount": 2999, "region": "广州", "date": "2024-01-17"},
    {"product": "笔记本电脑", "amount": 5999, "region": "深圳", "date": "2024-01-18"},
    {"product": "手机", "amount": 3999, "region": "北京", "date": "2024-01-19"}
]

# 保存示例数据
with open('sales_data.json', 'w', encoding='utf-8') as f:
    json.dump(sample_data, f, ensure_ascii=False, indent=2)

# 使用分析器
analyzer = DataAnalyzer()
analyzer.load_from_json('sales_data.json')
analysis_result = analyzer.analyze_sales_data()
analyzer.export_analysis(analysis_result, 'analysis_result.json')

最佳实践

1. 性能优化

# 使用ujson提高性能(需要安装:pip install ujson)
try:
    import ujson as json
    print("使用ujson进行高性能JSON处理")
except ImportError:
    print("使用标准json模块")

# 大文件处理
def process_large_json(file_path):
    """处理大型JSON文件"""
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            try:
                data = json.loads(line.strip())
                yield data
            except json.JSONDecodeError:
                continue

# 使用示例
for item in process_large_json('large_data.json'):
    # 处理每个数据项
    print(f"处理: {item.get('id', 'unknown')}")

2. 数据验证

from typing import Dict, Any, Optional
import json

def validate_json_schema(data: Dict[str, Any], schema: Dict[str, Any]) -> bool:
    """简单的JSON模式验证"""
    try:
        for key, expected_type in schema.items():
            if key not in data:
                print(f"缺少必需字段: {key}")
                return False
            
            if not isinstance(data[key], expected_type):
                print(f"字段 {key} 类型错误,期望 {expected_type},实际 {type(data[key])}")
                return False
        
        return True
    except Exception as e:
        print(f"验证过程出错: {e}")
        return False

# 使用示例
user_schema = {
    "name": str,
    "age": int,
    "email": str,
    "is_active": bool
}

test_user = {
    "name": "张三",
    "age": 25,
    "email": "zhangsan@example.com",
    "is_active": True
}

if validate_json_schema(test_user, user_schema):
    print("用户数据验证通过")
else:
    print("用户数据验证失败")

今日总结

今天我们学习了JSON数据处理的核心知识:

1. JSON格式特点:轻量级、人类可读、语言无关的数据交换格式

2. 序列化和反序列化:使用json.dumps()json.loads()进行数据转换

3. 高级操作:自定义编码器、错误处理、文件操作

4. 真实应用:配置文件管理、API数据处理、数据分析

5. 最佳实践:性能优化、数据验证、大文件处理

JSON是现代软件开发中最重要的数据交换格式之一,掌握JSON处理对于Web开发、API集成、数据存储等场景都至关重要。

原文链接:,转发请注明来源!