• 测试前准备
    • 基本步骤流程
      • datapoint构造
      • 身份信息签名
      • 数据写入
    • 测试与验证

    测试前准备

    声明

    - 本文测试所用设备系统为MacOS- 模拟MQTT client行为的客户端为MQTTBOX

    基本步骤流程

    通过OpenEdge将数据写入TSDB具体依OpenEdge的函数计算服务实现,具体包括datapoint构造、身份信息签名及数据写入3个部分。

    datapoint构造

    1. def build_datapoints(event):
    2. """
    3. function to build datapoints by event
    4. datapoint for example: {
    5. "datetime": "2018-08-10 18:15:05",
    6. "temperature": 32,
    7. "unit": ""
    8. }
    9. """
    10. datapoints = dict()
    11. datapoint = dict()
    12. datapoint['metric'] = 'temperature'
    13. datapoint['tags'] = {'unit': event['unit']}
    14. datapoint['value'] = event['temperature']
    15. timestamp = time.mktime(time.strptime(event['datetime'],
    16. '%Y-%m-%d %H:%M:%S'))
    17. datapoint['timestamp'] = str(timestamp).split('.')[0]
    18. datapoints['datapoints'] = [datapoint]
    19. return datapoints

    通过上述代码即可成功构造datapoint数据(dict字典类型),其中metric、tags字段为必选字段,即构造的datapoint数据中必须包含metric和tags(TSDB要求,具体可查看TSDB API细节)。

    身份信息签名

    1. #!/usr/bin/env python
    2. # -*- coding: utf-8 -*-
    3. import calendar
    4. import datetime
    5. import json
    6. import requests
    7. import time
    8. from sign import sign
    9. # set the transport protocol
    10. TRANS_PROTOCOL = 'http://'
    11. # set http method
    12. HTTP_METHOD = 'POST'
    13. # set base url and path
    14. base_url = 'your_db.tsdb.iot.xx.baidubce.com'
    15. path = '/v1/datapoint' # write your_data to datapoint of your_db on TSDB
    16. # save the information of Access Key ID and Secret Access Key
    17. ak = 'your_ak_info'
    18. sk = 'your_sk_info'
    19. credentials = sign.BceCredentials(ak, sk)
    20. # set a http header except field 'Authorization'
    21. headers = {'Host': base_url, 'Content-Type': 'application/json;charset=utf-8'}
    22. # we don't have params in our url,so set it to None
    23. # set header fields should be signed headers_to_sign = {"host"}
    24. # invoke sign method to get a signed string
    25. sign_str = sign.sign(credentials, HTTP_METHOD, path, headers, params, headers_to_sign=headers_to_sign)

    不难看出,通过上述代码即可完成身份信息的签名,需要注意的是,对身份信息签名时需要用户在百度云注册账户,并创建属于自己的AK/SK信息,具体可参考如何获取AK/SK。

    数据写入

    1. def access_db(http_method, url, data=None):
    2. """
    3. function to access TSDB by RESTful API(only have GET,POST,PUT now)
    4. """
    5. # invoke sign method to get a signed string
    6. sign_str = sign.sign(credentials, HTTP_METHOD, path, headers, params,
    7. headers_to_sign=headers_to_sign)
    8. # add field 'Authorization' to complete the whole http header
    9. final_headers = dict(headers.items() + {'Authorization': sign_str}.items())
    10. try:
    11. if (http_method == 'POST') and (data is not None):
    12. rsp = requests.post(url, headers=final_headers, data=json.dumps(data))
    13. elif http_method == 'GET':
    14. rsp = requests.get(url, headers=final_headers)
    15. elif (http_method == 'PUT') and (data is not None):
    16. rsp = requests.put(url, headers=final_headers, data=json.dumps(data))
    17. else:
    18. rsp = 'Bad http method or data is empty'
    19. except StandardError:
    20. raise
    21. return rsp
    22. def handler(event, context):
    23. """
    24. function handler
    25. """
    26. datapoints = build_datapoints(event)
    27. try:
    28. rsp = access_db(HTTP_METHOD, TRANS_PROTOCOL + base_url + path,
    29. datapoints)
    30. except StandardError:
    31. raise
    32. # check http response status code to confirm if we write data successfully
    33. if str(rsp.status_code) == '204':
    34. pass
    35. else:
    36. if isinstance(rsp, str):
    37. raise TypeError('Response must be a string')
    38. else:
    39. raise BaseException('Get error: ' + str(rsp.status_code))

    上述函数方法中,access_db()方法将身份签名信息联合构造的datapoint数据信息一起通过POST方法写入TSDB(GET、PUT方法具体用法请参考TSDB API说明);handler()方法是整体程序的入口,用于调用access_db()方法将数据写入TSDB,并通过写入返回的状态信息判断数据是否写入成功。

    测试与验证

    OpenEdge Hub模块配置

    1. name: openedge-hub
    2. listen:
    3. - tcp://:1883
    4. principals:
    5. - username: 'test'
    6. password: 'be178c0543eb17f5f3043021c9e5fcf30285e557a4fc309cce97ff9ca6182912'
    7. permissions:
    8. - action: 'pub'
    9. permit: ['#']
    10. - action: 'sub'
    11. permit: ['#']

    OpenEdge Function模块配置:

    1. name: openedge-function
    2. hub:
    3. address: tcp://openedge-hub:1883
    4. username: test
    5. password: hahaha
    6. rules:
    7. - id: rule-e1iluuac1
    8. subscribe:
    9. topic: data
    10. qos: 1
    11. compute:
    12. function: write
    13. publish:
    14. topic: data/tsdb
    15. qos: 1
    16. functions:
    17. - name: 'write'
    18. runtime: 'python2.7'
    19. handler: 'write.handler'
    20. codedir: 'var/db/openedge/module/func-nyeosbbch'
    21. entry: "openedge-function-runtime-python27:build"
    22. env:
    23. USER_ID: acuiot
    24. instance:
    25. min: 1
    26. max: 10
    27. timeout: 30s

    通过上述配置不难发现,借助MQTTBOX向主题“data”发布消息,并通过“write”函数将该数据写入云端TSDB。

    需要说明的是:为实际生产考虑,避免写入消息量过大时导致OpenEdge处理的消息过多,此处仅对写入失败进行错误信息提示,写入成功则不提示任何信息。

    通过MQTTBOX查看数据写入TSDB是否成功

    此外,也可以通过账户登录云端TSDB进行查看,具体如下:

    通过云端TSDB查看数据是否写入成功

    从上图不难看出,数据已经成功写入TSDB。

    最后更新于 2018-12-28 10:23:09

    原文: https://openedge.tech/docs/practice/Write-data-to-TSDB-with-OpenEdge