Skip to content

Commit

Permalink
fix: link grpc frame by request_id
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric authored and sharang committed Jun 27, 2024
1 parent f123daa commit 56139aa
Showing 1 changed file with 62 additions and 4 deletions.
66 changes: 62 additions & 4 deletions app/app/application/l7_flow_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,19 @@ def set_parent_relation(self):
# 这里做一个简单的处理,当相邻两个 Span 都是同类 SYS Span 时不要按照 TCP Seq 来设置他们的 Parent 关系。
continue
else:
# if self.spans[i].parent has parent, possibly it's c-p attach to s-p in `try_attach_client_sys_span_via_sys_span`
# usually, c-p in [0] index and will not try to attach parent here
# but in grpc _RESPONSE_X mode, sort order would reverse and make c-p not in [0] index
# for those scenarios, prioritize tcp_seq connection, and clean c-p's index in s-p's childs

# 如果 self.spans[i] 已有 parent,很大概率是 c-p 在 `try_attach_client_sys_span_via_sys_span` 过程中关联上了 s-p
# 通常情况下,c-p 一般在[0]索引,不会在这里尝试关联 parent
# 而目前在 grpc _RESPONSE_X 模式下,会反转顺序,让 c-p 排序在末端,导致在这里尝试再关联 parent
# 对此类情况,认为 tcp_seq 关联优先级更高,允许关联,并清理 s-p childs 中的 c-p,否则结果中会重复
if self.spans[i].parent is not None:
self.spans[i].parent.flow['childs'].remove(
self.spans[i].get_flow_index())

self.spans[i].set_parent(self.spans[i - 1],
"trace mounted due to tcp_seq")

Expand All @@ -1188,6 +1201,7 @@ def _sort_network_spans(self):
对网络span进行排序,排序规则:
1. 按照TAP_SIDE_RANKS进行排序
2. 按采集器分组排序,与入口 span 同一个采集器的前移,出口 span 同一个采集器的后移,组内按 start_time 排序
通常情况下 client-side 是 ingress, server-side 是 egress
"""
sorted_spans = sorted(
self.spans,
Expand All @@ -1211,11 +1225,31 @@ def _sort_network_spans(self):
egress_agent = sorted_spans[i].flow['vtap_id']
break

# sort rank for ingress & egress agent
ingress_rank = 0 # up for ingress
egreass_rank = 2 # down for egress
# `flow_field_conflict` confirm `l7_protocol` and `request_type` are same in a network_span_set, so get first is enough
# `flow_field_conflict` 确保了 `l7_protocol` `request_type` 在同一个 network_span_set 中一定相等,取首个即可
if len(sorted_spans) > 0 and sorted_spans[0].flow['l7_protocol'] in [
L7_PROTOCOL_GRPC, L7_PROTOCOL_HTTP2
]:
# in `grpc` protocol, _HEADER and _DATA frame is unidirectional flow which identified as `session`
# but in fact, when 'req_tcp_seq'=0, it's a 'response', from server-side to client-side, request_type=_RESPONSE_DATA/HEADER
# so we need to reverse ingress and egress here to re-sort network spans

# 在 grpc 中, _HEADER 和 _DATA frame 是被标记为 session 的单向流
# 但实际上,如果 req_tcp_seq=0,说明这其实是一个 response,方向为 server-side -> client-side,request_type=_RESPONSE_DATA/HEADER
# 对此类情况,应要反转 ingress 和 egress 排序
if not sorted_spans[0].flow['req_tcp_seq'] and \
sorted_spans[0].flow['type'] == L7_FLOW_TYPE_SESSION:
ingress_rank = 2
egress_agent = 0

for i in range(len(sorted_spans)):
if sorted_spans[i].flow['vtap_id'] == ingress_agent:
sorted_spans[i].flow['agent_rank'] = 0
sorted_spans[i].flow['agent_rank'] = ingress_rank
elif sorted_spans[i].flow['vtap_id'] == egress_agent:
sorted_spans[i].flow['agent_rank'] = 2
sorted_spans[i].flow['agent_rank'] = egreass_rank
else:
sorted_spans[i].flow['agent_rank'] = 1

Expand Down Expand Up @@ -1300,6 +1334,9 @@ def get_req_tcp_seq(self) -> int:
def get_resp_tcp_seq(self) -> int:
return self.flow.get('resp_tcp_seq', 0)

def get_request_id(self) -> int:
return self.flow.get('request_id', 0)

def time_range_cover(self, other_sys_span: 'SpanNode') -> bool:
return self.flow['start_time_us'] <= other_sys_span.flow[
'start_time_us'] and self.flow[
Expand Down Expand Up @@ -2222,6 +2259,8 @@ def _connect_process_and_networks(process_roots: List[SpanNode],
ps_child_span_id = ps_child.get_span_id()
ps_child_parent_span_id = ps_child.get_parent_span_id()
for net_parent in network_leafs:
if ps_child.get_parent_id() >= 0:
continue
if _same_span_set(ps_child, net_parent, 'network_span_set') \
or _same_span_set(ps_child, net_parent, 'process_span_set'):
continue
Expand Down Expand Up @@ -2253,6 +2292,8 @@ def _connect_process_and_networks(process_roots: List[SpanNode],
ps_child_span_id = ps_child.get_span_id()
ps_child_parent_span_id = ps_child.get_parent_span_id()
for ps_parent in process_leafs:
if ps_child.get_parent_id() >= 0:
continue
if _same_span_set(ps_child, ps_parent, 'network_span_set') \
or _same_span_set(ps_child, ps_parent, 'process_span_set'):
continue
Expand Down Expand Up @@ -2304,10 +2345,14 @@ def _connect_process_and_networks(process_roots: List[SpanNode],
continue
net_child_index = net_child.get_flow_index()
net_child_span_id = net_child.get_span_id()
net_child_x_request_id_0 = net_child.get_x_request_id_0
net_child_x_request_id_0 = net_child.get_x_request_id_0()
net_child_x_request_id_1 = net_child.get_x_request_id_1()
net_child_request_id = net_child.get_request_id()
net_child_l7_protocol = net_child.flow['l7_protocol']
net_child_response_duration = net_child.flow['response_duration']
for net_parent in network_leafs:
if net_child.get_parent_id() >= 0:
continue
if _same_span_set(net_child, net_parent, 'network_span_set') \
or _same_span_set(net_child, net_parent, 'process_span_set'):
continue
Expand All @@ -2324,7 +2369,6 @@ def _connect_process_and_networks(process_roots: List[SpanNode],
# net_child, net_parent)
# process_span_list.append(fake_pss)
# flows.extend(fake_pss.spans)

elif (net_child_x_request_id_0 and net_child_x_request_id_0 == net_parent.get_x_request_id_0()) \
or (net_child_x_request_id_1 and net_child_x_request_id_1 == net_parent.get_x_request_id_1()) \
or (net_child_span_id and net_child_span_id == net_parent.get_span_id()):
Expand All @@ -2348,6 +2392,20 @@ def _connect_process_and_networks(process_roots: List[SpanNode],
network_match_parent[
net_child_index] = net_parent.get_flow_index()

elif net_child_request_id and net_child_request_id == net_parent.get_request_id() \
and net_child_l7_protocol in [L7_PROTOCOL_HTTP2, L7_PROTOCOL_GRPC] \
and net_child_l7_protocol == net_parent.flow['l7_protocol'] \
and net_child_response_duration <= net_parent.flow['response_duration']:
# grpc protocol: request_id get from `stream_id`, means different network_span_set share same stream, it should be connected
# but other protocol may re-use request_id, so only support grpc now
# net_child.response_duration <= net_parent.response_duration for case both duration is 0
# grpc 的 request_id 来源于 `stream_id`, 意味着不同的 network_span_set 在同一个 stream 里,应被连接
# 但其他协议的 request_id 有可能短时内被多次重用,容易误连接,比如 MySQL 的 StatementID,所以目前仅支持 grpc
# net_child.response_duration <= net_parent.response_duration 用于双方时延为0 的情况
# ref: https://www.deepflow.io/docs/zh/features/l7-protocols/http/#http2
net_child.set_parent(
net_parent, "net_span mounted due to grpc request_id")

for child, parent in network_match_parent.items():
# FIXME: 生成一个 pseudo net span,待前端修改后再开放此代码,注意处理时延计算
# fake_pss = _generate_pseudo_process_span_set(flow_index_to_span[child],
Expand Down

0 comments on commit 56139aa

Please sign in to comment.