使用Python进行dubbo接口测试

测试工具 创建于:2022-06-17
  
背景   大家在测试dubbo接口是不是特别痛苦?因为dubbo接口并不是比较常见的http协议的,而是dubbo协议的,测试dubbo接口的有几种方法,譬如jmeter自定义sampler调用,java连接zookeeper中心调用dubbo,telnet命令调用dubbo等。

  
痛点   相信大家都比较熟悉使用jmeter,看了上面的
测试方案,肯定是首选jmeter,但是这里踩坑较多,比如下载的插件与dubbo版本不对应,有时候响应参数出现中文乱码,有时候需要反编译jar包查看对应的入参类型等等...

  
解决痛点   在网上搜索了一下和看了dubbo接口的用户手册,发现dubbo接口支持telnet命令执行,
Python中刚好有个库可以执行telnet命令 telnetlib库 dubbo接口又是通过zookeeper中心进行注册服务的,那我直接通过zookeeper中心查询dubbo接口相关的信息(ip、端口、服务名、方法名和入参类型)然后模拟telnet命令进行dubbo接口调用,岂不是美滋滋!

 
 实现方案  
 Python + fastapi + telnetlib + kazoo。

  
调用流程
  telnet命令实操   前提,我们得知了一个dubbo接口的ip和端口(通常可以通过服务名在zk中心搜索得知): telnet 192.168.xx.xx 32024 连接dubbo服务

ls -l cn.com.api.dubbo.xxxxService 查询该服务的方法列表

invoke xxxxService.xxxMethod(1234, "test") 调用服务的方法

  通过上述一系列的骚操作,相当于进行了dubbo接口的测试,当然,我们决对不可能通过终端敲命令进行dubbo接口测试,下面我们进行通过代码模拟telnet命令。

  
核心源码分析
  zk中心搜索服务封装 class GetDubboService(object):     def __init__(self):         #测试环境,ZK_CONFIG为zk中心的注册地址,可传入string或者list,譬如ZK_CONFIG = ['xxx','xxx','xxx']或者ZK_CONFIG = 'xxx'         self.hosts = ZK_CONFIG         self.zk = self.zk_conn()

    def zk_conn(self):         try:             zk = KazooClient(hosts=self.hosts, timeout=2)             zk.start(2)  # 与zookeeper连接         except BaseException as e:             return False         return zk

    def get_dubbo_info(self, dubbo_service):         global data         dubbo_service_data = {}         try:             #先查出注册中心所有的dubbo服务             all_node = self.zk.get_children('/dubbo')             # 根据传入服务名匹配对应的服务             node = [i for i in all_node if dubbo_service.lower() in i.lower()]             # 查询dubbo服务的详细信息             #遍历数据,过滤掉空数据             for i in node:                 if self.zk.get_children(f'/dubbo/{i}/providers'):                     dubbo_data = self.zk.get_children(f'/dubbo/{i}/providers')                     for index, a in enumerate(dubbo_data):                         url = parse.urlparse(parse.unquote(a)).netloc                         host, port = url.split(":")                         conn = BmDubbo(host, port)                         #判断获取的ip地址是否连接成功,因为有些开发本地起了dubbo服务                         status = conn.command("")                         if status:                             data = dubbo_data[index]                             break             self.zk.stop()         except BaseException as e:             return dubbo_service_data         #parse.unquote 解码         #parse.urlparse 解析URL         #parse.query 获取查询参数         #parse.parse_qsl 返回列表         url_data = parse.urlparse(parse.unquote(data))         query_data = dict(parse.parse_qsl(url_data.query))         query_data['methods'] = query_data['methods'].split(",")         dubbo_service_data['url'] = url_data.netloc         dubbo_service_data['dubbo_service'] = dubbo_service         dubbo_service_data.update(query_data)         return dubbo_service_data

  telnet命令调用封装: class BmDubbo(object):

    prompt = 'dubbo>'

    def __init__(self, host, port):         self.conn = self.conn(host, port)

    def conn(self,host, port):         conn = telnetlib.Telnet()         try:             conn.open(host, port, timeout=1)         except BaseException:             return False         return conn

    def command(self, str_=""):         # 模拟cmd控制台 dubbo>invoke ...         if self.conn :             self.conn.write(str_.encode() + b'\n')             data = self.conn.read_until(self.prompt.encode())             return data         else:             return False

    def invoke(self, service_name, method_name, arg):         command_str = "invoke {0}.{1}({2})".format(service_name, method_name, arg)         data = self.command(command_str)         try:             # 字节数据解码 utf8             data = data.decode("utf-8").split('\n')[0].strip()         except BaseException:             # 字节数据解码 gbk             data = data.decode("gbk").split('\n')[0].strip()         return data

    def ls_invoke(self, service_name):         command_str = "ls -l {0}".format(service_name)         data = self.command(command_str)         if "No such service" in data.decode("utf-8"):             return False         else:             data = data.decode("utf-8").split('\n')             key = ['methodName', 'paramType','type']             dubbo_list = []             #这里解析有点复杂,可以自己通过telnet命令实操一下,ls -l xxx             for i in range(0, len(data) - 1):                 value = []                 dubbo_name = data[i].strip().split(' ')[1]                 method_name = re.findall(r"(.*?)[(]", dubbo_name)[0]                 value.append(method_name)                 paramType = re.findall(r"[(](.*?)[)]", dubbo_name)[0]                 paramTypeList = paramType.split(',')                 if len(paramTypeList) ==1:                     paramTypeList = paramTypeList[0]                 value.append(paramTypeList)                 #这里我将传参类型分成了4大类                 if 'java.lang' in paramType or 'java.math' in paramType:                     value.append(0)                 elif not paramType:                     value.append(1)                 elif 'List' in paramType:                     value.append(2)                 else:                     value.append(3)                 dubbo_list.append(dict(zip(key, value)))             return dubbo_list

    def param_data(self,service_name,method_name):         #这里是根据服务名和方法名,找到对应的传参类型         dubbo_data = self.ls_invoke(service_name)         if dubbo_data:             dubbo_list = dubbo_data             if dubbo_list:                 for i in dubbo_list:                     for v in i.values():                         if v == method_name:                             param_key = ['paramType','type']                             param_value = [i.get('paramType'),i.get('type')]                             return dict(zip(param_key,param_value))             else:                 return False         else:             return False

  dao层设计: class DubboHandle(object):     @staticmethod     def invoke(service_name, method_name, data):         zk_conn = GetDubboService()         if zk_conn.zk:             zk_data = zk_conn.get_dubbo_info(service_name)             if zk_data:                 host, port = zk_data['url'].split(":")                 service_name = zk_data['interface']                 boby = data.copy()                 conn = BmDubbo(host, port)                 status = conn.command("")                 if status:                     # 根据服务名和方法名,返回param方法名和类型                     param_data = conn.param_data(service_name, method_name)                     if param_data:                         type = param_data['type']                         param = param_data['paramType']                         # 传参类型为枚举值方法                         if type == 0 and isinstance(boby, dict):                             l_data = []                             for v in  boby.values():                                 if isinstance(v,str):                                     v = f"'{v}'"                                 elif isinstance(v,dict) or isinstance(v,list):                                     v = json.dumps(v)                                     v = f"'{v}'"                                 l_data.append(str(v))                             boby = ','.join(l_data)                         # 无需传参                         elif type == 1:                             boby = ''                         # 传参类型为集合对象                         elif type == 2:                             # params 只有一个集合对象传参                             if isinstance(boby, list):                                 boby = boby                             # params 一个集合对象后面跟着多个枚举值                             elif isinstance(boby, dict):                                 set_list = []                                 for v in boby.values():                                     set_list.append(v)                                 set_data = str(set_list)                                 boby = set_data[1:-1]                         # 传参类型为自定义对象                         elif type == 3:                             # 兼容多个自定义对象传参                             if isinstance(param, list):                                 dtoList = []                                 for index, dto in enumerate(boby):                                     dto.update({"class": param[index]})                                     dtoList.append(json.dumps(dto))                                 boby = ','.join(dtoList)                             elif isinstance(boby, dict):                                 boby.update({"class": param})                                 boby = json.dumps(boby)                         else:                             return None, f"data请求参数有误,请检查!"                         response_data = conn.invoke(service_name, method_name, boby)                         try:                             response_data = json.loads(response_data)                         except Exception as e:                             return None, f"解析json失败:{response_data}"                         return response_data, None                     else:                         return None, f"{service_name.split('.')[-1]}服务下不存在{method_name}方法"                 else:                     return None, f"{service_name}服务连接出错"             else:                 return None, f"{service_name}没有在zk中心注册"         else:             return None, "zk服务连接失败"

  view层引用: @router.post('/invoke', name='dubbo业务请求接口') async def dubboInvoke(data: DubboInvokeBody):     res_data, err = DubboHandle.invoke(data.serviceName, data.methodName, data.data)     if err:         return res_400(msg=err)     return res_200(data=res_data)

  
invoke接口传参说明   原生对象或者自定义对象传参(xxDto、jsonObj、java.util.HashMap): {     "serviceName": "xxxxxx",     "methodName": "xxxxxx",     "data": {        //data传入对应的对象数据,一般为json格式的         "productStoreQueryDTOS": [             {                 "productNoNumDTOList": [                     {                         "num": 13,                         "productNo": "10000620"                     },                     {                         "num": 13,                         "productNo": "10000014"                     }                 ],                 "storeCode": "4401S1389"             }         ]     } }

  枚举值类型传参(java.lang.String、java.lang.Integer): {     "serviceName": "xxxx",     "methodName": "xxxxx",     "data": {         //格式为json,枚举值顺序必须按照dubbo接口定义的传参顺序,注意是否为int还是string         "account":"123456",         "password":"3fd6ebe43dab8b6ce6d033a5da6e6ac5"     } }

  方法名无需传参: {     "serviceName": "xxxx",     "methodName": "xxxxxx",     "data":{}      //传入空对象 }

  集合对象传参(java.util.List): {     "serviceName": "xxxx",     "methodName": "xxxxxx",     "data":{         "List": [             "1221323",             "3242442"         ]     } //传入对象,里面嵌套数组 }

  集合对象传参,后面跟着枚举值(java.util.List 、 java.lang.String 、 java.lang.Integer): {     "serviceName": "xxxx",     "methodName": "xxxxxx",     "data":{         "userCode": ["12345","686838"],         "startTime": "2021-04-16 13:30:00",         "endTime": "2021-04-16 14:30:00" } }

  多个自定义对象传参,对象顺序按照dubbo接口定义的传参顺序(xxdtox、xxdto): {     "serviceName": "xxxx",     "methodName": "xxxxxx",     "data":[       {         "userCode": "7932723",         "startTime": "2021-04-16 13:30:00",         "endTime": "2021-04-16 14:30:00" },       {         "name": "fang",         "age": "18" }     ] }

  上述传参可以满足大部分入参类型~

  
疑惑   问:dubbo的传输协议,本身支持http协议,跟开发沟通,测试环境切换为http协议,不就方便测试了么?   答:公司内部系统对接大多走的是 Dubbo 协议,这是公司的开发规范,只能从外部绕了。   问:为什么我部署之后,连接不上zk服务或者出现dubbo服务连接出错?   答:部署服务的主机必须可以连通dubbo服务。

  
总结   本期解决了测试dubbo接口的痛点,希望能对大家有帮助~


  
本文内容不用于商业目的,如涉及知识产权问题,请权利人联系51Testing小编(021-64471599-8017),我们将立即处理

21天更文挑战,赢取价值500元大礼,还有机会成为签约作者!

原文地址:http://www.51testing.com/?action-viewnews-itemid-7789497

免责声明:本文来源于互联网,版权归合法拥有者所有,如有侵权请公众号联系管理员

* 本站提供的一些文章、资料是供学习研究之用,如用于商业用途,请购买正版。

发表于:2022-6-17 09:34 作者:我很方 来源:掘金