续上篇文章Python实现Socket通信样例,本篇文章对上篇文章中的通信的数据做一个混合加密来保证通信数据在通信过程中的安全.

采用的客户端与服务端数据加密传输方案为RSA+AES混合加密实现.
方案思路参考

数据加密方案

首先,客户端与服务端商量好数据加密协议,对传输数据做到安全保护。

安全保护至少需要有下面两点:

  • 采用HTTPS协议
  • 采用公钥密码体制RSA算法对数据加密

性能问题

RSA加密虽然安全性上相对较高,但是由于RSA算法对数据加密时运算速度慢,所以直接把所有传输数据都用RSA加密,会导致网络通信慢,这对用户将是不好的体验。

解决思路

由于对称密钥密码体制中的AES运算速度快且安全性高,所以结合AES对传输数据加密是非常好的方案.即用RSA只加密AES的key值,AES用来对数据加密来提高性能.

下面是对客户端与服务端通信数据加密比较通用的方案:

  • 客户端生成AES密钥,并保存AES密钥
  • 客户端用AES密钥对请求传输数据进行加密
  • 客户端使用RSA公钥对AES密钥加密,然后把值放到自定义的一个请求头中
  • 客户端向服务端发起请求
  • 服务端拿到自定义的请求头值,然后使用RSA私钥解密,拿到AES密钥
  • 服务端使用AES密钥对请求数据解密
  • 服务端对响应数据使用AES密钥加密
  • 服务端向客户端发出响应
  • 客户端拿到服务端加密数据,并使用之前保存的AES密钥解密

注意: 传输数据使用AES密钥加密,RSA公钥对AES密钥加密。RSA公钥和私钥由服务端生成,公钥放在客户端,私钥放在服务端。公钥私钥要私密保护,不能随便给人。
(以上方案便能保证,数据在传输过程中截取者是无法轻易获取到AES的key值的,客户端-服务端AES-key值通过了RSA加密,服务端-客户端没有传输AES-key值. key值仅在两端可见)

需要提前准备好的工作

具体实现代码

代码基于文章Python实现Socket通信样例更新

服务端Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from socketserver import BaseRequestHandler, ThreadingTCPServer
import base64
import ast

import CryptoUtils as DataCrypto

rsa = DataCrypto.RSAsys
BUF_SIZE = 1024


class Handler(BaseRequestHandler):
def __init__(self, request, client_address, server):
super(Handler, self).__init__(request, client_address, server)
self.aes_key = ''
self.aes_iv = ''
self.aes_obj = None

def en_data(self, data):
"""
构造bytes数据
:param data: str
:return: base64-bytes
"""
cipher_data = self.aes_obj.aes_cbc_encrypt(data)
return base64.b64encode(cipher_data)+b"#"

def de_data(self, data):
"""
解析base64-bytes数据
:param data: base64-bytes
:return: str
"""
receive_data = base64.b64decode(data).decode()
deserialization = ast.literal_eval(receive_data)
keys = ast.literal_eval(rsa.rsa_decrypt(deserialization.get("key")))
self.aes_key = keys.get("aes_key")
self.aes_iv = keys.get("aes_iv")
self.aes_obj = DataCrypto.AESsys(self.aes_key, self.aes_iv)
decrypto_data = self.aes_obj.aes_cbc_decrypt(deserialization.get("data"))
return decrypto_data

def handle(self):
while True:
data = b''
while True:
try:
recv_data = self.request.recv(BUF_SIZE)
if len(recv_data) > 0:
if recv_data[-1:] == b'#':
data += recv_data[:-1]
break
else:
data += recv_data
else:
break
except Exception as e:
print("Socket receiving data error! | "+str(e))
break

if len(data) != 0:
recv = self.de_data(data)
print('收到数据:', recv)
self.request.sendall(self.en_data("服务端已收到数据: "+str(recv)))
else:
print("Client Close")
break


def main(host, port):
ADDR = (host, port)
server = ThreadingTCPServer(ADDR, Handler)
print('Server Start!')
server.serve_forever()


if __name__ == '__main__':
main("127.0.0.1", 8089)

客户端Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from socket import *
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from Crypto.Cipher import AES
from binascii import b2a_hex, a2b_hex
import base64
import string
import random


class RSAsys:
public_pem = r"./public.pem" # 公钥文件路径

# 加密
@classmethod
def rsa_encrypt(cls, message):
rsakey = RSA.importKey(open(cls.public_pem).read())
cipher = Cipher_pkcs1_v1_5.new(rsakey) # 创建用于执行pkcs1_v1_5加密或解密的密码
cipher_text = base64.b64encode(cipher.encrypt(message.encode('utf-8')))
return cipher_text.decode('utf-8')


class AESsys(object):
def __init__(self, key, iv):
"""
密钥和偏移量
:param key:密钥:长度需要大于16,满足8的倍数
:param iv:偏移量
"""
if len(key) in [16, 24, 32]:
self.key = key # AES加密密钥
else:
raise Exception(f"密钥字符串长度需要在[16, 24, 32]中, 当前长度: key={key}, len={len(key)}") # 密钥
if len(iv) in [16]:
self.iv = iv # CBC模式的偏移量
else:
raise Exception(f"偏移量字符串长度需要在[16]中, 当前长度: key={iv}, len={len(iv)}") # 密钥

# 如果text不足16位的倍数就用空格补足为16位
@staticmethod
def _add_to_16(text):
if len(text.encode('utf-8')) % 16:
add = 16 - (len(text.encode('utf-8')) % 16)
else:
add = 0
text = text + ('\0' * add)
return text.encode('utf-8')

# 加密, CBC模式
def aes_cbc_encrypt(self, text):
key = self.key.encode('utf-8')
mode = AES.MODE_CBC
iv = self.iv.encode('utf-8')
text = self._add_to_16(text)
cryptos = AES.new(key, mode, iv)
cipher_text = cryptos.encrypt(text)
# 因为AES加密后的字符串不一定是ascii字符集的,输出保存可能存在问题,所以这里转为16进制字符串
return b2a_hex(cipher_text)

# 解密,CBC模式,去掉补足的空格用strip() 去掉
def aes_cbc_decrypt(self, text):
key = self.key.encode('utf-8')
iv = self.iv.encode('utf-8')
mode = AES.MODE_CBC
cryptos = AES.new(key, mode, iv)
plain_text = cryptos.decrypt(a2b_hex(text))
return bytes.decode(plain_text).rstrip('\0')


class SocketClient(object):
"""
客户端
"""
def __init__(self, host, port):
"""
建立连接对象
:param host: 主机地址
:param port: 主机通信端口
"""
self.HOST = host
self.PORT = port

self._BUFSIZ = 1024
self._ADDR = (self.HOST, self.PORT)
self._tcpCliSock = socket(AF_INET, SOCK_STREAM)
self._tcpCliSock.connect(self._ADDR)
self.key = ''.join(random.sample(string.ascii_letters + string.digits, 32))
self.iv = ''.join(random.sample(string.ascii_letters + string.digits, 16))
self.aes = AESsys(self.key, self.iv)

def sent_data(self, data=None):
"""
发送数据
:param data: 要发送的数据 [str]
:return: 收到的服务端发来的数据或状态 [str]
"""
if data:
self._tcpCliSock.sendall(self._en_data(data))
recv = self._recv_data()
self.close()
return recv
else:
print("The sent data is empty or not sent!")

def close(self):
"""
关闭连接对象
"""
self._tcpCliSock.close()

def _recv_data(self):
"""
接收服务端数据
:return: 接收服务端数据-ERROR=None
"""
data = b''
while True:
try:
self._tcpCliSock.settimeout(10) # 设置超时时间为10s超过10s判定为服务端没有返回状态,未收到数据
recv_data = self._tcpCliSock.recv(self._BUFSIZ)
if len(recv_data) > 0:
if recv_data[-1:] == b'#':
data += recv_data[:-1]
break
else:
data += recv_data
else:
break
except Exception as e:
print("Socket receiving data error! | "+str(e))
return None # 出现异常返回None

if len(data) != 0:
return self._de_data(data)
else:
return "" # 非异常无数据,返回空字符串

def sent_data_pending_input(self):
"""
发送数据,长连接
"""
lastdata = ''
while lastdata != "quit":
lastdata = input("请输入要发送的数据:")
if len(lastdata) > 0:
self._tcpCliSock.sendall(self._en_data(lastdata))
recv = self._recv_data()
print(recv)
else:
print("The sent data is empty or not sent!")
print("Client端已退出!")

def _en_data(self, data):
"""
构造bytes数据, RSA+AES混合加密通信数据
:param data: str
:return: base64-bytes
"""
cipher_key = RSAsys.rsa_encrypt(f'{{"aes_key":"{self.key}", "aes_iv":"{self.iv}"}}')
cipher_data = self.aes.aes_cbc_encrypt(data).decode()
format_data = f'{{"key":"{cipher_key}", "data":"{cipher_data}"}}'
return base64.b64encode(format_data.encode())+b"#"

def _de_data(self, data):
"""
解析base64-bytes数据
:param data: base64-bytesS
:return: str
"""
recv_data = base64.b64decode(data)
decrypto_data = self.aes.aes_cbc_decrypt(recv_data)
return decrypto_data


if __name__ == '__main__':
cli_obj = SocketClient('127.0.0.1', 8089)
# print(cli_obj.sent_data("helloworld")) # 发送一次连接断开socket,短链接测试请用这个
cli_obj.sent_data_pending_input() # 持续发送,长连接测试请用这个

加解密工具类

请将文章Python实现RSA加密解密加签验签中的类和文章Python实现AES加密解密中的类复制到一个新的py文件中,并将文件命名为CryptoUtils.py,将此文件与客户端和服务端的py文件放同一目录即可运行.