端对端加密通讯系统

端对端加密通讯系统 - 参照signal

关于signal protocol

signal protocol 是 signal 开源的一种端对端通讯加密协议,通过该协议的实现,可以实现安全的端到端通讯加密。

该协议主要由 X3DH 和双棘轮分别保证了密钥的复杂度和前后安全,确保即使被人知道了密钥也不能破解前后的内容。

详解X3DH

X3DH 是 DH 密钥交换算法的升级版本。

DH( DiffieHellman )算法

该算法通过双方的公钥和私钥在不传输私钥的条件下协商出通讯密钥。由于私钥没有经过传输过程,大大保证了通讯的安全性。

  • ① 双方交换公钥
  • ②根据私钥和对方的公钥计算出通讯密钥
  • ③根据通讯密钥进行加密通讯
1
2
3
4
5
6
q = 97 a = 5
A生成一个私钥 XA(XA<q) = 36 并计算公钥 YA=a^XA mod q =50 mod 97
B生成一个私钥 XB<q = 58 并计算公钥 YB=a^XB mod q = 44 mod 97
用户A产生共享秘密密钥的计算方式是 K = (YB)^XA mod q = 44^36 = 75 mod 97
用户B产生共享秘密密钥的计算是 K = (YA)^XB mod q = 50^58 = 75 mod 97
因此相当于双方已经得到了一个相同的秘密密钥.

X3DH

X3DH 算法通过增加密钥对和密钥复杂度相对于DH算法来说更加的安全。关于此算法的详细内容不多,这里是官方给出的描述文档

首先是双方的通讯密钥对变成了3个(公钥存放在服务器,私钥保存在本地)

  • 身份密钥(IK)
  • 签名预共享密钥(SPK)
  • 一次性预共享密钥(OPK)

通过官方描述文档来说明

如果 Alice 要和 Bob 通讯,首先需要从服务器获得Bob的相关公钥

  • 身份密钥 $IK_B$
  • 签名密钥 $SPK_B$
  • 预签名密钥 $Sig(IK_B,Encode(SPK_B))$
  • Bob 的一次性密钥之一 $(OPK_B^1,OPK_B^2,OPK_B^3)$

Bob 并不需要随时上传这些密钥,身份密钥是在注册时上传,其他的一次性密钥是每隔一段时间上传一次。同样也会一段时间上传一次预密钥和预签名密钥。

在上传了新的签名密钥后,Bob 可以使用原先的密钥一段时间,但是应该尽快删除掉原签名密钥。

服务器中如果没有一次性密钥,那么 Alice 进行如下运算出 DH1、DH2、DH3、DH4。

1
2
3
4
DH1 = DH(IK_A, SPK_B)
DH2 = DH(EKA, IKB) # EK是临时密钥
DH3 = DH(EKA, SPKB)
SK = KDF(DH1 || DH2 || DH3)

如果获取到了 Bob 的一次性密钥,需要再计算出 DH4 ,并加在通讯密钥的后面。

1
2
DH4 = DH(EKA, OPKB)
SK = KDF(DH1 || DH2 || DH3 || DH4)

再计算出SK后,Alice 就会删除掉 DH 输出和自己的临时私钥。

然后再通过双方身份密钥计算出 AD。

1
AD = Encode(IK_A) || Encode(IK_B)

于是 Alice 发送包含如下的信息给 Bob,开始了第一轮的通讯。

  • Alice 自己的身份密钥 $IK_A$
  • Alice 的临时密钥 $EK_A$
  • 标志说明 Alice 所使用的 Bob 的哪些密钥
  • 使用 AD 作为相关数据,使用 SK 或 SK 通过 PRF(伪随机函数) 的输出作为加密密钥的 AEAD( 带有认证功能的加密 ) 算法得出的密文。

一旦接收到 Alice 的初始消息,鲍勃检索 Alice 的身份密钥和消息临时密钥。鲍勃还加载了他的身份的私钥,以及对应于哪个私钥(S)签订了预密钥和一次性预密钥(如果有的话)给爱丽丝使用。

使用这些密钥,鲍勃重复上一节的 DH 和 KDF 计算推导 SK,然后删除 DH 值。

然后,Bob 构建使用 IKA 和 IKB 的 AD 字节序列,如前一节中所描述。最后,鲍勃尝试解密使用 SK 和 AD 的初始密文。如果初始密文不能解密,然后鲍勃中止协议并删除 SK。

如果最初的密文解密成功的协议是给鲍勃的。鲍勃会删除被使用,向前保密的一次性预密钥私钥。然后鲍勃可以继续使用 SK 或 SK 从用于与 Alice 的通信,但须在第4节的安全考虑 X3DH 后协议中导出的密钥。

详解双棘轮

棘轮是一种单向的齿轮,通过棘轮算法,可以确保如果某人获取到了密钥,也只能解密当前的数据,而不能对以前或者以后的数据进行监控,双棘轮就是通过一个向前一个向后的棘轮保证了向前向后的安全,为数据传输安全提供了双保险,即使泄露密钥也只能读取一小部分数据。

KDF 链

KDF 是一个密码学函数,输入一个秘密且随机的 KDF 密钥及其他的数据,返回输出数据。在密钥未知的前提下,输出的数据与随机数密不可分,满足密码学中的 PRF 要求,即达成了一个伪随机函数。

KDF 链则是通过将一部分 KDF 输出作为输出密钥,另一部分作为下一次的密钥。

一个 KDF 链具有如下特性:

  • 弹性(resilience):对于不知道 KDF 密钥的攻击者来说,输出密钥看起来是随机的。即使攻击者能控制 KDF 的输入,此条特性仍然成立。
  • 前向安全性(forward security):对于知道某一时刻的 KDF 密钥的攻击者来说,旧的输出密钥看起来是随机的。
  • 被攻破后的可恢复性(break-in recovery):对于知道某一时刻的 KDF 密钥的攻击者来说,新的输出密钥看起来是随机的,只要新的输入中增加了足够的熵(entropy)。

这个 KDF 链是需要贯穿整个会话的根链、发送链、接收链。

Alice 和 Bob 交换消息的同时,也交换新的迪菲 - 赫尔曼公钥,而迪菲 - 赫尔曼输出的密钥将作为根链的输入。根链输出的密钥将作为发送链和接收链的 KDF 密钥。这称为迪菲 - 赫尔曼棘轮(Diffie-Hellman ratchet)

每发送和接收一条消息,发送链和接收链都将向前推进。相应的输出密钥将用于加密和解密消息。这称为对称密钥棘轮(symmetric-key ratchet)

这样就组成了双棘轮,形成了向前向后的安全保障。

对称密钥棘轮

每条发送或接收的消息都使用一个唯一的消息密钥(message key)加密。消息密钥是发送 KDF 链和接收 KDF 链的输出密钥。这些链的 KDF 密钥称为链密钥

由于发送链和接收链的 KDF 输入是常数,所以这两条链不具备被攻破后的可恢复性。发送链和接收链只能确保每条消息使用唯一的密钥加密,而此密钥在加密或解密后可以删除。由一个给定的链密钥计算下一个链密钥和消息密钥的过程,称为对称密钥棘轮(symmetric-key ratchet)的一次棘轮步进(ratchet step)

DH 棘轮

如果中间攻击者窃取了其中一方的发送链密钥和接收链密钥,那么他可以计算此后所有的消息密钥,并解密对应的消息。为了避免这种情况,双棘轮算法将对称密钥棘轮与 DH 棘轮组成在一起,使用后者基于迪菲 - 赫尔曼的输出更新链密钥。

为了实现 DH 棘轮,通信双方各自生成一个 DH 密钥对(迪菲 - 赫尔曼公钥和私钥)作为当前的棘轮密钥对(ratchet key pair)。从任意一方发出的每一条消息都将携带一个消息头,其中包含发送者当前的棘轮公钥。当接收到远端发送过来的新的棘轮公钥时,本端将实施一次 DH 棘轮步进(DH ratchet step),生成一个新的棘轮密钥对以取代本端当前的密钥对。

Alice 使用 Bob 的棘轮公钥初始化,而 Bob 尚未得知 Alice 的棘轮公钥。作为初始化的一部分,Alice 使用她自己的棘轮私钥和 Bob 的棘轮公钥作 DH 运算:

Alice 的初始消息宣告了其棘轮公钥。一旦 Bob 收到其中一条初始消息,Bob 就执行一次 DH 棘轮步进:他使用 Alice 的棘轮公钥和自己的棘轮私钥作 DH 运算,得到的结果应与 Alice 的初始 DH 输出相等。之后 Bob 替换掉自己的棘轮密钥对并重新计算一个新的 DH 输出:

如此轮回,Alice也使用Bob的新公钥进行计算新一轮的DH步进。

同样的,由于DH计算出的密钥发生了变化,发送链也会跟着步进。将 DH 输出作为根链的 KDF 输入,而根链的 KDF 输出作为发送链密钥和接收链密钥。 所以一次完整的 DH 棘轮步进包括两次根 KDF 链的更新,并将其 KDF 输出分别作为新的接收链密钥和发送链密钥:

双棘轮

将对称密钥棘轮和DH棘轮组合在一起就形成了双棘轮,保障向前向后的安全。

  • 当发送或接收消息时,执行一次发送链或接收链的对称密钥棘轮步进,以派生新的消息密钥。
  • 当接收到新的棘轮公钥时,在对称密钥棘轮步进之前,执行一次 DH 棘轮步进,以更新链密钥。

下图中,Alice 已使用 Bob 的棘轮公钥及作为初始根密钥( RK )的共享密钥初始化。作为初始化的一部分,Alice 生成一个新的棘轮密钥对,并将 DH 输出作为根 KDF 的输入,计算出新的根密钥( RK )和发送链密钥( CK ):

当 Alice 发送第一条消息 A1 时,她对发送链密钥执行一次对称密钥棘轮步进,以生成新的消息密钥(消息密钥以其加密或解密的消息编号标注)。新的链密钥将保存起来,但消息密钥和旧的链密钥可以删除:

假如接下来 Alice 收到 Bob 发送的响应消息 B1 ,其中包含 Bob 的新的棘轮公钥(Bob 的公钥以其所在的首条消息编号标注)。Alice 执行一次 DH 棘轮步进,以派生新的接收链密钥和发送链密钥。之后她对接收链执行一次对称密钥棘轮步进,以获取接收到的消息对应的消息密钥:

假设接下来 Alice 发送了消息 A2 ,接收到包含 Bob 旧的棘轮公钥的消息 B2 ,接着又发送了消息 A3A4 。Alice 的发送链将步进三次,而其接收链仅步进一次:

假设接下来 Alice 接收到包含 Bob新的棘轮公钥的消息 B3B4 ,并发送了消息 A5 。Alice 的最终状态如下:

乱序消息

在对话过程中,不一定是有来有回的,可能是一个乱序的情况。

在这种情况下,在每个消息头部包含此消息在发送链中的编号(N=0,1,2,…)以及之前的发送链的长度( PN ,即消息密钥的个数)。这使接收方可以保存被跳过的消息密钥而跳转到对应的消息密钥,而当被跳过的消息到达时可使用保存的消息密钥解密。

当接收到一条消息时,如果触发了 DH 棘轮步进,那么接收到的 PN 减掉当前接收链的长度就是此接收链中被跳过的消息数目。接收到的 N 是新的接收链(即 DH 棘轮步进之后的接收链)中被跳过的消息数目。

如果没有触发 DH 棘轮步进,那么接收到的 N 减掉当前接收链的长度就是此接收链中被跳过的消息数目。

例如,假设上一节的消息序列中,消息 B2B3 被跳过。消息 B4 将触发 Alice 的 DH 棘轮步进(不乱序时本应由 B3 触发)。消息 B4PN=2,N=1。当接收到 B4*时,Alice 的接收链长度为 1( *B1 ),所以 Alice 将 B2B3 的消息密钥保存,以便之后接收到这两个消息时可以对其解密:

双棘轮步进是需要双方互相交互才能实现一个步进。如果是单方面的发送消息或者接收消息,仅发生对称密钥步进,DH步进需要对方回复新的棘轮公钥才行。

收到新的公钥的时候使用上一次的自己的私钥计算出消息密钥,对收到的消息解密。然后生成新的棘轮密钥对进行 DH 步进,计算出新的消息密钥,对下一次发送的消息进行加密。

服务器和客户端的交互设计

服务器接口设计

方式 接口名 参数 备注
POST get_public_keys user_id 获取指定用户的一系列公钥
POST search_user_id user_id 对指定用户 ID 模糊查找

服务端设计

数据库设计

由于服务器只保存用户的几个密钥公钥,所以数据库方面如下:

已连接用户数据库

名称 类型 主键 NULL 备注
user_id varchar Not NULL 用户 ID
user_ip varchar 用户链接 IP
user_port varchar 用户链接端口

公钥数据库

名称 类型 主键 NULL 备注
user_id varchar Not NULL 用户 ID
hash_private_key varchar 用户个人密钥 hash
IdentityPub varchar 身份密钥公钥 IK
SignedPub varchar 已签名的预共享密钥 SPK
OneTimePub varchar 一次性预共享密钥 OPK

离线消息数据库

名称 类型 主键 NULL 备注
s_user_id varchar 源用户 User_id
d_user_id varchar 目的用户 User_id
s_user_IK varchar 源用户 IK
s_user_EK varchar 源用户 EK
Flags varchar 所使用的目的用户的密钥标识。0-IK, 0-SPK, 0-OPK,110或者111
AD varchar 加密密文,需要目的用户解密

服务器端主要是用于用户的公钥管理和数据转发。对来往数据离线暂存。

公钥管理通过数据库和查询接口实现。

数据通过加密后很难去分辨目的地址,需要通过服务器转发,所以需要对包进行一定的设计。

数据包设计

名称 类型 备注
s_user_id varchar 源用户 User_id
d_user_id varchar 目的用户 User_id
s_user_IK varchar 源用户 IK
s_user_EK varchar 源用户 EK
Flags varchar 所使用的目的用户的密钥标识。0-IK, 0-SPK, 0-OPK,110或者111
AD varchar 加密密文,需要目的用户解密

客户端设计

功能模块设计

消息树

1
2
3
4
5
6
7
节点类
class message_node
def _init__(self):
self.DH_next=None
self.KDF_next=None
self.DH=b""
self.KDF_input=b""

由于消息不一定是顺序的,棘轮步进也并不是同时发生的,如果是乱序的话会对消息密钥难以管理,所以通过节点树对消息密钥进行管理。

通过两个指针指向发送链末尾和接收链末尾,每当收到消息或者发送消息的时候,通过对节点的 DH 和 KDF_input 计算出消息密钥。同样由于乱序中的消息密钥并不是递进的,当对方没有回复新棘轮公钥的时候采用 KDF 步进,如果有新的棘轮公钥回复则会采用 DH 步进。

关于消息交互和步进循环。

流程设计图

数据库设计

用于存放自己的私钥、公钥,管理好友的公钥,以及管理聊天记录。

user 库

名称 类型 主键 NULL 备注
user_id varchar Not NULL 用户 ID
user_IK_pr varchar 用户身份密钥私钥
user_IK_pu varchar 用户身份密钥公钥
user_SPK_pr varchar 用户已签名的预共享密钥私钥
user_SPK_pu varchar 用户已签名的预共享密钥公钥
user_OPK_pr varchar 用户一次性预共享密钥私钥
user_OPK_pu varchar 用户一次性预共享密钥公钥

record 库

名称 类型 主键 NULL 备注
s_user_id varchar Not NULL 消息源用户 ID
d_user_id varchar 消息目标用户 ID
message_key varchar 该条消息的消息密钥
date date 该条消息的接收或者发送时间
data varchar 消息密文

具体细节实现

密钥对 - curve25519

对称加密 - AES,由于密钥长度问题,取消息密钥的前 32 位,即 AES-256,后 32 位作为下一次 KDF 的输入,或者 ChaCha20,谷歌所采用的新式加密算法。

相关文档

关于 curve25519 的相关文档以及方法

测试代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import binascii
import codecs
import json
import os
import time

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey, X25519PublicKey)
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF


class Message_Node:
def __init__(self):
self.DH_next = None
self.KDF_next = None
self.DH = b""
self.KDF_input = b""

def to_json(self):
dict = self.__dict__
if "_sa_instance_state" in dict:
del dict["_sa_instance_state"]
return dict


class Message:
def __init__(self):
self.s_user_id = 0 # 设置为10位的随机数
self.d_user_id = 0 # 设置为10位的随机数
self.s_user_IK = "" # 固定64位
self.s_user_EK = "" # 固定64位
self.flags = "" # 固定6位
self.AD = "" # 870位,不够补足0

def to_json(self):
dict = self.__dict__
if "_sa_instance_state" in dict:
del dict["_sa_instance_state"]
return dict


class User:
User_id = 0

def __init__(self):
self.IdentityPri = X25519PrivateKey.generate()
self.IdentityPub = self.IdentityPri.public_key()
self.SignedPri = X25519PrivateKey.generate()
self.SignedPub = self.SignedPri.public_key()
self.OneTimePri = X25519PrivateKey.generate()
self.OneTimePub = self.OneTimePri.public_key()
self.EphemeralPri = X25519PrivateKey.generate()
self.EphemeralPub = self.EphemeralPri.public_key()

def to_json(self):
dict = self.__dict__
if "_sa_instance_state" in dict:
del dict["_sa_instance_state"]
return dict


class DH:
def __init__(self):
self.DH1 = bytes([])
self.DH2 = bytes([])
self.DH3 = bytes([])
self.DH4 = bytes([])
self.share_key = bytes([])

# DH1 = DH (IPK-A 私钥,SPK-B 公钥)
# DH2 = DH (EPK-A 私钥,IPK-B 公钥)
# DH3= DH (EPK-A 私钥,SPK-B 公钥)
# DH4 = DH (IPK-A 私钥,OPK-B 公钥)
# 作为接收方的DH初始化方法
def receive(self, mine, other):
self.DH1 = mine.SignedPri.exchange(other.IdentityPub)
self.DH2 = mine.IdentityPri.exchange(other.EphemeralPub)
self.DH3 = mine.SignedPri.exchange(other.EphemeralPub)
self.DH4 = mine.OneTimePri.exchange(other.IdentityPub)
if self.DH4:
self.share_key = self.DH1+self.DH2+self.DH3+self.DH4
else:
self.share_key = self.DH1+self.DH2+self.DH3

# DH1 = DH (IPK-B 私钥,SPK-A 公钥)
# DH2 = DH (EPK-B 私钥,IPK-A 公钥)
# DH3= DH (EPK-B 私钥,SPK-A 公钥)
# DH4 = DH (IPK-B 私钥,OPK-A 公钥)
# 作为发送方的DH初始化方法
def send(self, mine, other):
self.DH1 = mine.IdentityPri.exchange(other.SignedPub)
self.DH2 = mine.EphemeralPri.exchange(other.IdentityPub)
self.DH3 = mine.EphemeralPri.exchange(other.SignedPub)
self.DH4 = mine.IdentityPri.exchange(other.OneTimePub)
if self.DH4:
self.share_key = self.DH1+self.DH2+self.DH3+self.DH4
else:
self.share_key = self.DH1+self.DH2+self.DH3

def to_json(self):
dict = self.__dict__
if "_sa_instance_state" in dict:
del dict["_sa_instance_state"]
return dict


def Signalkdf(share_key, salt=None, info="handshake data"):
kdf_output = HKDF(
algorithm=hashes.SHA256(),
length=64,
salt=salt,
info=info.encode("utf-8"),
backend=default_backend()
).derive(share_key)
return kdf_output


# 比较两个bytes字符串是否一致
def compare_bytes(A, B):
nope = []
if type(A) and type(B) is not bytes:
return -1
else:
if len(A) is not len(B):
return 0
else:
for i in range(len(A)):
if A[i] is not B[i]:
nope.append(i)
if len(nope) == 0:
return 1
else:
return nope


if __name__ == '__main__':
time_begin = time.time()
mine = User()
mine_DH = DH()

other = User()
other_DH = DH()

mine_DH.send(mine, other)
other_DH.receive(other, mine)

print("A的X3DH密钥:", binascii.hexlify(mine_DH.share_key))
print("B的X3DH密钥:", binascii.hexlify(other_DH.share_key))
print("X3DH密钥是否一致:", compare_bytes(mine_DH.share_key, other_DH.share_key))
print("X3DH结束,开始双棘轮")

root_message = Message_Node()
lastest_send = root_message
lastest_recv = root_message

for i in range(1, 4):
mine.EphemeralPri = X25519PrivateKey.generate()
mine.EphemeralPub = mine.EphemeralPri.public_key()

other.EphemeralPri = X25519PrivateKey.generate()
other.EphemeralPub = other.EphemeralPri.public_key()

self_salt = mine.EphemeralPri.exchange(other.EphemeralPub)
print("A第", i, "轮盐:", binascii.hexlify(self_salt))
other_salt = other.EphemeralPri.exchange(mine.EphemeralPub)
print("B第", i, "轮盐:", binascii.hexlify(other_salt))

kdf_output_self = Signalkdf(mine_DH.share_key, self_salt)
kdf_output_other = Signalkdf(other_DH.share_key, other_salt)
print("A第", i, "轮key:", binascii.hexlify(kdf_output_self[32:]))
print("B第", i, "轮key:", binascii.hexlify(kdf_output_other[32:]))
mine.share_key = kdf_output_self[:32]
other.share_key = kdf_output_other[:32]

# AES
# backend = default_backend()
# iv = os.urandom(16)
# cipher = Cipher(algorithms.AES(
# kdf_output_other[32:]), modes.CBC(iv), backend=backend)
# encryptor = cipher.encryptor()
# ct = encryptor.update(b"a secret message") + encryptor.finalize()

# ChaCha20
nonce = os.urandom(16)
algorithm = algorithms.ChaCha20(kdf_output_self[32:], nonce)
backend = default_backend()
cipher = Cipher(algorithm, mode=None, backend=default_backend())
encryptor = cipher.encryptor()
ct = encryptor.update(b"a secret message234,testestsetste")

print("密文:", ct, "\n密文长度:", len(ct))
decryptor = cipher.decryptor()
print("解密后:", decryptor.update(ct))

new_message = Message()
new_message.d_user_id = 123456789
new_message.s_user_id = 987654321
new_message.s_user_EK = binascii.hexlify(mine.EphemeralPub.public_bytes(
encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw)).decode("unicode_escape")
new_message.s_user_IK = binascii.hexlify(mine.IdentityPub.public_bytes(
encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw)).decode("unicode_escape")

# new_message.s_user_EK=binascii.hexlify(mine.EphemeralPub.public_bytes()).decode("utf-8")
# new_message.s_user_IK=binascii.hexlify(mine.IdentityPub.public_bytes()).decode("utf-8")

new_message.flags = binascii.hexlify(bytes([1, 1, 0])).decode("utf-8")
new_message.AD = binascii.hexlify(ct).decode("utf-8")
send_bytes = json.dumps(new_message.to_json()).encode("utf-8")
print(send_bytes)
print("数据包长度:", len(send_bytes))

print("开始解封装数据包")
recv_bytes = send_bytes
new_message_dict = json.loads(recv_bytes)
print(new_message_dict)
new_message2 = Message()
new_message2.s_user_id = new_message_dict["s_user_id"]
new_message2.d_user_id = new_message_dict["d_user_id"]
new_message2.s_user_IK = new_message_dict["s_user_IK"]
new_message2.s_user_EK = new_message_dict["s_user_EK"]
new_message2.flags = new_message_dict["flags"]
new_message2.AD = new_message_dict["AD"]

flags = binascii.unhexlify(new_message2.flags.encode("utf-8"))
# print(flags)
AD = binascii.unhexlify(new_message2.AD.encode("utf-8"))
# print(AD)
print("密文是否一致:", compare_bytes(AD, ct))
cipher = Cipher(algorithm, mode=None, backend=default_backend())
decryptor = cipher.decryptor()
print("解密后:", decryptor.update(ct))

s_user_IK = X25519PublicKey.from_public_bytes(
binascii.unhexlify(new_message2.s_user_IK.encode("unicode_escape")))
s_user_EK = X25519PublicKey.from_public_bytes(
binascii.unhexlify(new_message2.s_user_EK.encode("unicode_escape")))

print("身份公钥是否一致:", compare_bytes(s_user_IK.public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw),
mine.IdentityPub.public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw)))
print("临时公钥是否一致:", compare_bytes(s_user_EK.public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw),
mine.EphemeralPub.public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw)))

# 通过服务器查询再次确认身份公钥一致
if compare_bytes(s_user_IK.public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw),
mine.IdentityPub.public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw)):
mess_node = Message_Node()
# 关于接收链和发送链,通过识别是否对方密钥对有改变,如没有改变则仅KDF步进,如有改变则DH步进,并且发送链和接收链都需指向最新的DH下的节点。
lastest_recv.DH_next = mess_node
lastest_recv.KDF_next = mess_node
mess_node.DH = other.EphemeralPri.exchange(mine.EphemeralPub)
mess_node.KDF_input = other_DH.share_key
lastest_recv = mess_node

temp = root_message
while temp.DH_next is not None:
while temp.KDF_next is not None:
print(binascii.hexlify(
Signalkdf(temp.KDF_next.KDF_input, temp.KDF_next.DH)))
temp = temp.KDF_next

time_end = time.time()
print("花费时间:", time_end-time_begin)

代码运行效果:

关于 Socket

既然是通讯系统,自然是要使用 socket 作为两台主机与服务器的通讯手段了。

至于其他的方法,比如对服务器设置一系列接口,通过接口来传输通讯数据包,但是会带来一个问题,就是服务器无法主动发起连接将数据包传过去,如果要让客户端每隔一段时间自动获取服务器数据的话会带来更多的流量,也给服务器更大的压力。所以还是 socket 比较现实一点。

上面已经把除了 socket 方面的数据交互完成了,加密-》生成数据包-》解封装数据包-》解密-》生成接收和发送链。socket 所需要做的就是将数据包完整的发送给目的端,可能是服务器也可能是客户端。

测试代码示例

1
2


参考文献

https://blog.lancitou.net/double-ratchet-algorithm/

https://www.signal.org/docs/specifications/x3dh

https://docs.python.org/zh-cn/3/howto/sockets.html

https://python3-cookbook.readthedocs.io/zh_CN/latest/chapters/p11_network_and_web_program.html