续上篇文章Python实现Socket通信样例 ,本篇文章对上篇文章中的通信的数据做一个混合加密来保证通信数据在通信过程中的安全.
采用的客户端与服务端数据加密传输方案为RSA+AES混合加密实现.方案思路参考
数据加密方案 首先,客户端与服务端商量好数据加密协议,对传输数据做到安全保护。
安全保护至少需要有下面两点:
采用HTTPS协议
采用公钥密码体制RSA算法对数据加密
性能问题 RSA加密虽然安全性上相对较高,但是由于RSA算法对数据加密时运算速度慢,所以直接把所有传输数据都用RSA加密,会导致网络通信慢,这对用户将是不好的体验。
解决思路 由于对称密钥密码体制中的AES运算速度快且安全性高,所以结合AES对传输数据加密是非常好的方案.即用RSA只加密AES的key值,AES用来对数据加密来提高性能.
下面是对客户端与服务端通信数据加密比较通用的方案:
客户端生成AES密钥,并保存AES密钥
客户端用AES密钥对请求传输数据进行加密
客户端使用RSA公钥对AES密钥加密,然后把值放到自定义的一个请求头中
客户端向服务端发起请求
服务端拿到自定义的请求头值,然后使用RSA私钥解密,拿到AES密钥
服务端使用AES密钥对请求数据解密
服务端对响应数据使用AES密钥加密
服务端向客户端发出响应
客户端拿到服务端加密数据,并使用之前保存的AES密钥解密
注意: 传输数据使用AES密钥加密,RSA公钥对AES密钥加密。RSA公钥和私钥由服务端生成,公钥放在客户端,私钥放在服务端。公钥私钥要私密保护,不能随便给人。 (以上方案便能保证,数据在传输过程中截取者是无法轻易获取到AES的key值的,客户端-服务端AES-key值通过了RSA加密,服务端-客户端没有传输AES-key值. key值仅在两端可见)
需要提前准备好的工作
具体实现代码 代码基于文章Python实现Socket通信样例 更新
服务端Server 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 from socketserver import BaseRequestHandler, ThreadingTCPServerimport base64import astimport CryptoUtils as DataCryptorsa = DataCrypto.RSAsys BUF_SIZE = 1024 class Handler (BaseRequestHandler ): def __init__ (self, request, client_address, server ): super (Handler, self).__init__(request, client_address, server) self.aes_key = '' self.aes_iv = '' self.aes_obj = None def en_data (self, data ): """ 构造bytes数据 :param data: str :return: base64-bytes """ cipher_data = self.aes_obj.aes_cbc_encrypt(data) return base64.b64encode(cipher_data)+b"#" def de_data (self, data ): """ 解析base64-bytes数据 :param data: base64-bytes :return: str """ receive_data = base64.b64decode(data).decode() deserialization = ast.literal_eval(receive_data) keys = ast.literal_eval(rsa.rsa_decrypt(deserialization.get("key" ))) self.aes_key = keys.get("aes_key" ) self.aes_iv = keys.get("aes_iv" ) self.aes_obj = DataCrypto.AESsys(self.aes_key, self.aes_iv) decrypto_data = self.aes_obj.aes_cbc_decrypt(deserialization.get("data" )) return decrypto_data def handle (self ): while True : data = b'' while True : try : recv_data = self.request.recv(BUF_SIZE) if len (recv_data) > 0 : if recv_data[-1 :] == b'#' : data += recv_data[:-1 ] break else : data += recv_data else : break except Exception as e: print ("Socket receiving data error! | " +str (e)) break if len (data) != 0 : recv = self.de_data(data) print ('收到数据:' , recv) self.request.sendall(self.en_data("服务端已收到数据: " +str (recv))) else : print ("Client Close" ) break def main (host, port ): ADDR = (host, port) server = ThreadingTCPServer(ADDR, Handler) print ('Server Start!' ) server.serve_forever() if __name__ == '__main__' : main("127.0.0.1" , 8089 )
客户端Client 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 from socket import *from Crypto.PublicKey import RSAfrom Crypto.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5from Crypto.Cipher import AESfrom binascii import b2a_hex, a2b_heximport base64import stringimport randomclass RSAsys : public_pem = r"./public.pem" @classmethod def rsa_encrypt (cls, message ): rsakey = RSA.importKey(open (cls.public_pem).read()) cipher = Cipher_pkcs1_v1_5.new(rsakey) cipher_text = base64.b64encode(cipher.encrypt(message.encode('utf-8' ))) return cipher_text.decode('utf-8' ) class AESsys (object ): def __init__ (self, key, iv ): """ 密钥和偏移量 :param key:密钥:长度需要大于16,满足8的倍数 :param iv:偏移量 """ if len (key) in [16 , 24 , 32 ]: self.key = key else : raise Exception(f"密钥字符串长度需要在[16, 24, 32]中, 当前长度: key={key} , len={len (key)} " ) if len (iv) in [16 ]: self.iv = iv else : raise Exception(f"偏移量字符串长度需要在[16]中, 当前长度: key={iv} , len={len (iv)} " ) @staticmethod def _add_to_16 (text ): if len (text.encode('utf-8' )) % 16 : add = 16 - (len (text.encode('utf-8' )) % 16 ) else : add = 0 text = text + ('\0' * add) return text.encode('utf-8' ) def aes_cbc_encrypt (self, text ): key = self.key.encode('utf-8' ) mode = AES.MODE_CBC iv = self.iv.encode('utf-8' ) text = self._add_to_16(text) cryptos = AES.new(key, mode, iv) cipher_text = cryptos.encrypt(text) return b2a_hex(cipher_text) def aes_cbc_decrypt (self, text ): key = self.key.encode('utf-8' ) iv = self.iv.encode('utf-8' ) mode = AES.MODE_CBC cryptos = AES.new(key, mode, iv) plain_text = cryptos.decrypt(a2b_hex(text)) return bytes .decode(plain_text).rstrip('\0' ) class SocketClient (object ): """ 客户端 """ def __init__ (self, host, port ): """ 建立连接对象 :param host: 主机地址 :param port: 主机通信端口 """ self.HOST = host self.PORT = port self._BUFSIZ = 1024 self._ADDR = (self.HOST, self.PORT) self._tcpCliSock = socket(AF_INET, SOCK_STREAM) self._tcpCliSock.connect(self._ADDR) self.key = '' .join(random.sample(string.ascii_letters + string.digits, 32 )) self.iv = '' .join(random.sample(string.ascii_letters + string.digits, 16 )) self.aes = AESsys(self.key, self.iv) def sent_data (self, data=None ): """ 发送数据 :param data: 要发送的数据 [str] :return: 收到的服务端发来的数据或状态 [str] """ if data: self._tcpCliSock.sendall(self._en_data(data)) recv = self._recv_data() self.close() return recv else : print ("The sent data is empty or not sent!" ) def close (self ): """ 关闭连接对象 """ self._tcpCliSock.close() def _recv_data (self ): """ 接收服务端数据 :return: 接收服务端数据-ERROR=None """ data = b'' while True : try : self._tcpCliSock.settimeout(10 ) recv_data = self._tcpCliSock.recv(self._BUFSIZ) if len (recv_data) > 0 : if recv_data[-1 :] == b'#' : data += recv_data[:-1 ] break else : data += recv_data else : break except Exception as e: print ("Socket receiving data error! | " +str (e)) return None if len (data) != 0 : return self._de_data(data) else : return "" def sent_data_pending_input (self ): """ 发送数据,长连接 """ lastdata = '' while lastdata != "quit" : lastdata = input ("请输入要发送的数据:" ) if len (lastdata) > 0 : self._tcpCliSock.sendall(self._en_data(lastdata)) recv = self._recv_data() print (recv) else : print ("The sent data is empty or not sent!" ) print ("Client端已退出!" ) def _en_data (self, data ): """ 构造bytes数据, RSA+AES混合加密通信数据 :param data: str :return: base64-bytes """ cipher_key = RSAsys.rsa_encrypt(f'{{"aes_key":"{self.key} ", "aes_iv":"{self.iv} "}}' ) cipher_data = self.aes.aes_cbc_encrypt(data).decode() format_data = f'{{"key":"{cipher_key} ", "data":"{cipher_data} "}}' return base64.b64encode(format_data.encode())+b"#" def _de_data (self, data ): """ 解析base64-bytes数据 :param data: base64-bytesS :return: str """ recv_data = base64.b64decode(data) decrypto_data = self.aes.aes_cbc_decrypt(recv_data) return decrypto_data if __name__ == '__main__' : cli_obj = SocketClient('127.0.0.1' , 8089 ) cli_obj.sent_data_pending_input()
加解密工具类 请将文章Python实现RSA加密解密加签验签 中的类和文章Python实现AES加密解密 中的类复制到一个新的py文件中,并将文件命名为CryptoUtils.py
,将此文件与客户端和服务端的py文件放同一目录即可运行.