大连好人 发表于 2013-1-29 20:14:09

分享好人经验:自己写个库,统一处理流数据

本帖最后由 大连好人 于 2013-1-31 19:48 编辑

大家都知道 Arduino 可以与其他系统进行数据交换,比如通过:串口(无线串口)、蓝牙(无线串口的一种)、ADK等。

在开发过程中,经常遇到自定义数据协议的问题,好人根据之前 Java 与无线 LED 的开发经验,写了一个通用的库,希望能给正在从事类似开发的同学们一点提示。

GenericDataProcessor 被定义为通用的流数据处理器,它通过回调机制回避了使用固定的某个 IO 口等限制,包括它的 debug log 都是回调输出的。

以串口通信为例,说一下 GenericDataProcessor 的工作原理:

1、初始化 GenericDataProcessor 时需要指定一个回调函数,即 DataCallback,当 GenericDataProcessor 中累积的数据达到所设定的要求时,GenericDataProcessor 会回调该函数,用于通知使用者数据的到达;

2、每次主程序在收到数据时(如在 serialEvent() 中的 Serial.available() > 0),将收到的数据通过 GenericDataProcessor.process() 方法传给 GenericDataProcessor;

3、GenericDataProcessor 将数据复制到自己的 buffer 中,等待 header 或 body 的条件触发;

4、header 和 body 都收集齐后,GenericDataProcessor 回调 DataCallback 并带回 header 的信息,以及当前数据 offset;

5、在主程序的 DataCallback 函数中,根据传递来的 header、offset 等信息,从 GenericDataProcessor.datas 中读取所需的 body 数据,然后进行自己的逻辑处理;

6、重复 2-5 的步骤。

其中的 Header 的被定义为 16 字节固定长度:


序号 offset 含义 备注
1 0 - 1 固定标记 0x0E 0x0F
2 2 - 3 协议版本号
3 4 - 5 消息类型
4 6 - 7 页编号 zero based
5 8 - 9 总页数
6 10 - 13 当前页 body 大小
7 14 - 15 序列号


来看 GenericDataProcessor.h (代码中有注释,应该容易理解的):


#ifndef GenericDataProcessor_h

#define GenericDataProcessor_h

#include <stdlib.h>
#if ARDUINO >= 100
#include <Arduino.h>
#else
#include <WProgram.h>
#include <wiring.h>
#endif

// These defs cause trouble on some versions of Arduino
#undef round

#define HEADER_LEN 16

#define START_FLAG_1 0x0E
#define START_FLAG_2 0x0F

#define RESERVED 0x00

#define OFFSET_VERSION 2
#define LENGTH_VERSION 2
#define OFFSET_TYPE 4
#define LENGTH_TYPE 2
#define OFFSET_PAGE_ID 6
#define LENGTH_PAGE_ID 2
#define OFFSET_PAGE_COUNT 8
#define LENGTH_PAGE_COUNT 2
#define OFFSET_BODY_LENGTH 10
#define LENGTH_BODY_LENGTH 4
#define OFFSET_SERIAL_NUMBER 14
#define LENGTH_SERIAL_NUMBER 2

#define STATUS_WAIT_HEADER 1
#define STATUS_HEADER_RECEIVED 2
#define STATUS_WAIT_BODY 3
#define STATUS_BODY_RECEIVED 4

#define PROCESSOR_BUFFER_SIZE 512

/**
* the data callback method
*/
typedef void (* DataCallback)(short ver, short msgType, short pageId, short pageCount, short serialNumber, short offset, short len);

/**
* the print callback method
*/
typedef void (* PrintCallback)(char *);

/**
* generic data processor
* author [email protected]
*/
class GenericDataProcessor
{
       
        public:
               
                /**
                * to create the GenericDataProcessor
                */
                GenericDataProcessor(char * name);
               
                /**
                * to set debug mode
                * enabled: is it debugMode?
                * PrintCallback: the print callback, it will be invoked when debugMode = true and printing the log
                */
                void debugMode(boolean enabled, void (* PrintCallback)(char *));
               
                /**
                * to initialize the GenericDataProcessor by the DataListener
                * DataCallback: the data callback, it will be invoked when the datas coming
                */
                void init(void (* DataCallback)(short ver, short msgType, short pageId, short pageCount, short serialNumber, short offset, short len));
               
                /**
                * to process the data
                * data: the buffer for datas
                * len: the lenght of current datas in the buffer
                */
                void process(uint8_t *data, short len);
               
                /**
                * the datas, you can read the datas from this value according to the offset and len of the callback method
                */
                uint8_t datas;
               
                /**
                * to get the current status
                * return: the current status
                */
                byte currentStatus();
               
                /**
                * to build the header by parameters
                * header: the header buffer
                * ver: the protocol version
                * msgType: the type of the message
                * pageId: the id of this page, zero based
                * pageCount: the total count of the pages
                * bodyLen: the length of this page body
                * serialNumber: the serial number for this page
                */
                void buildHeader(uint8_t *header, short ver, short msgType, short pageId, short pageCount, short bodyLen, short serialNumber);
               
                /**
                * to read the short value in current datas
                * offset: the offset
                * return: the short value
                */
                short readShort(short offset);
               
                /**
                * to read the short value in the buffer
                * buf: the buffer to read
                * offset: the offset
                * return: the short value
                */
                short readShort(uint8_t *buf, short offset);
               
                /**
                * to read the integer value in current datas
                * offset: the offset
                * return: the integer value
                */
                int readInt(short offset);
               
                /**
                * to read the integer value in the buf
                * buf: the buffer to read
                * offset: the offset
                * return: the integer value
                */
                int readInt(uint8_t *buf, short offset);
               
                /**
                * to write the short value into buffer
                * buf: the buffer to writer
                * offset: the offset
                * value: the short value
                */
                void writeShort(uint8_t *buf, short offset, short value);
               
                /**
                * to write the integer value into buffer
                * buf: the buffer to writer
                * offset: the offset
                * value: the integer value
                */
                void writeInt(uint8_t *buf, short offset, int value);
               
                /**
                * to write the reserved value into buffer
                * buf: the buffer to writer
                * offset: the offset
                * count: the count of reserved
                */
                void writeReserved(uint8_t *buf, short offset, short count);
               
        protected:
               
                //
       
        private:
               
                char *name;
               
                /**
                * the debug mode flag
                */
                boolean debugModeFlag;
               
                /**
                * the print callback
                */
                PrintCallback printCallback;
               
                /**
                * the data callback
                */
                DataCallback dataCallback;
               
                /**
                * the current status
                */
                byte status;
               
                /**
                * the length of received
                */
                short recvLen;
               
                /**
                * the protocol version of current message
                */
                short ver;
               
                /**
                * the message type of current message
                */
                short msgType;
               
                /**
                * the page id of current message
                */
                short pageId;
               
                /**
                * the page count of current message
                */
                short pageCount;
               
                /**
                * the body length of current message
                */
                short bodyLen;
               
                /**
                * the serial number of current message
                */
                short serialNumber;
               
                /**
                * to reset the status
                */
                void reset();
               
                /**
                * to process header
                */
                void processHeader();
               
                /**
                * to process body
                */
                void processBody();
               
                void printOut(char * out);
               
};

#endif


再来看 GenericDataProcessor.cpp (代码中有注释,应该容易理解的):


#include "GenericDataProcessor.h"

GenericDataProcessor::GenericDataProcessor(char * name)
{
        this->reset();
        this->printCallback = NULL;
        this->name = name;
}

void GenericDataProcessor::init(void (* DataCallback)(short ver, short msgType, short pageId, short pageCount, short serialNumber, short offset, short len))
{
        this->dataCallback = DataCallback;
        if (this->debugModeFlag && this->printCallback != NULL)
        {
                this->printOut("GenericDataProcessor initialized.");
        }
}

void GenericDataProcessor::process(uint8_t *data, short len)
{
        if (len > 0)
        {
                for (int i = 0; i < len; i++)
                {
                        // copy the datas byte by byte
                        this->datas = data;
                        // check header
                        if (this->status == STATUS_WAIT_HEADER && this->recvLen == 2)
                        {
                                if (this->debugModeFlag && this->printCallback != NULL)
                                {
                                        char *s;
                                        s = (char *)malloc(sizeof(char) * 100);
                                        sprintf(s, "checking header flag, datas=%X, datas=%X, START_FLAG_1=%X, START_FLAG_2=%X", this->datas, this->datas, START_FLAG_1, START_FLAG_2);
                                        this->printOut(s);
                                        free(s);
                                }
                                if (this->datas != START_FLAG_1 || this->datas != START_FLAG_2)
                                {
                                        // invalid header
                                        if (this->debugModeFlag && this->printCallback != NULL)
                                        {
                                                this->printOut("invalid header, ignore it");
                                        }
                                        this->reset();
                                        continue;
                                }
                                else
                                {
                                        // valid header
                                        if (this->debugModeFlag && this->printCallback != NULL)
                                        {
                                                this->printOut("valid header, processing it");
                                        }
                                }
                        }
                        else if (this->status == STATUS_WAIT_HEADER && this->recvLen == HEADER_LEN)
                        {
                                // header received
                                this->status = STATUS_HEADER_RECEIVED;
                                if (this->debugModeFlag && this->printCallback != NULL)
                                {
                                        this->printOut("header received");
                                }
                                // to process header
                                this->processHeader();
                        }
                        if ((this->status == STATUS_BODY_RECEIVED) || (this->status == STATUS_WAIT_BODY && this->recvLen == HEADER_LEN + this->bodyLen))
                        {
                                // body received
                                this->status = STATUS_BODY_RECEIVED;
                                if (this->debugModeFlag && this->printCallback != NULL)
                                {
                                        this->printOut("body received");
                                }
                                // to process body
                                this->processBody();
                        }
                }
        }
        if (this->debugModeFlag && this->printCallback != NULL)
        {
                char *s;
                s = (char *)malloc(sizeof(char) * 10);
                sprintf(s, "status=%d", this->status);
                this->printOut(s);
                free(s);
        }
}

void GenericDataProcessor::processHeader()
{
        if (this->debugModeFlag && this->printCallback != NULL)
        {
                char *s;
                s = (char *)malloc(sizeof(char) * 80);
                sprintf(s, "processing header, header-raw={%X %X %X %X %X %X %X %X %X %X %X %X %X %X %X %X}", this->datas, this->datas, this->datas, this->datas, this->datas, this->datas, this->datas, this->datas, this->datas, this->datas, this->datas, this->datas, this->datas, this->datas, this->datas, this->datas);
                this->printOut(s);
                free(s);
        }
        // ver
        this->ver = this->readShort(OFFSET_VERSION);
        // message type
        this->msgType = this->readShort(OFFSET_TYPE);
        // pageId
        this->pageId = this->readShort(OFFSET_PAGE_ID);
        // pageCount
        this->pageCount = this->readShort(OFFSET_PAGE_COUNT);
        // bodyLength
        this->bodyLen = this->readInt(OFFSET_BODY_LENGTH);
        // serialNumber
        this->serialNumber = this->readShort(OFFSET_SERIAL_NUMBER);
        if (this->bodyLen == 0)
        {
                // no body to wait, change the status to STATUS_BODY_RECEIVED
                this->status = STATUS_BODY_RECEIVED;
        }
        else
        {
                // to wait body
                this->status = STATUS_WAIT_BODY;
        }
        if (this->debugModeFlag && this->printCallback != NULL)
        {
                char *s;
                s = (char *)malloc(sizeof(char) * 150);
                sprintf(s, "header processed, ver=%d, msgType=%d, pageId=%d, pageCount=%d, bodyLen=%d, serialNumber=%d", this->ver, this->msgType, this->pageId, this->pageCount, this->bodyLen, this->serialNumber);
                this->printOut(s);
                free(s);
        }
}

void GenericDataProcessor::processBody()
{
        if (this->debugModeFlag && this->printCallback != NULL)
        {
                this->printOut("body processed, calling callback");
        }
        // to invoke the data callback
        this->dataCallback(this->ver, this->msgType, this->pageId, this->pageCount, this->serialNumber, HEADER_LEN, this->bodyLen);
        // reset the status
        this->reset();
}

void GenericDataProcessor::reset()
{
        this->status = STATUS_WAIT_HEADER;
        this->recvLen = 0;
        this->bodyLen = 0;
}

void GenericDataProcessor::debugMode(boolean enabled, void (* PrintCallback)(char *))
{
        this->debugModeFlag = enabled;
        this->printCallback = PrintCallback;
}

byte GenericDataProcessor::currentStatus()
{
        return this->status;
}

void GenericDataProcessor::buildHeader(uint8_t *header, short ver, short msgType, short pageId, short pageCount, short bodyLen, short serialNumber)
{
        // start flags
        header = START_FLAG_1;
        header = START_FLAG_2;
        // ver
        this->writeShort(header, OFFSET_VERSION, ver);
        // message type
        this->writeShort(header, OFFSET_TYPE, msgType);
        // pageId
        this->writeShort(header, OFFSET_PAGE_ID, pageId);
        // pageCount
        this->writeShort(header, OFFSET_PAGE_COUNT, pageCount);
        // body length
        this->writeInt(header, OFFSET_BODY_LENGTH, bodyLen);
        // serial number
        this->writeShort(header, OFFSET_SERIAL_NUMBER, serialNumber);
        if (this->debugModeFlag && this->printCallback != NULL)
        {
                char *s;
                s = (char *)malloc(sizeof(char) * 50);
                sprintf(s, "new header={%X %X %X %X %X %X %X %X %X %X %X %X %X %X %X %X}", header, header, header, header, header, header, header, header, header, header, header, header, header, header, header, header);
                this->printOut(s);
                free(s);
        }
}

short GenericDataProcessor::readShort(uint8_t *buf, short offset)
{
        return (buf & 0xFF) | ((buf & 0xFF) << 8);
}

short GenericDataProcessor::readShort(short offset)
{
        return this->readShort(this->datas, offset);
}

int GenericDataProcessor::readInt(uint8_t *buf, short offset)
{
        return (buf & 0xFF) | ((buf & 0xFF) << 8) | ((buf & 0xFF) << 16) | ((buf & 0xFF) << 24);
}
               
int GenericDataProcessor::readInt(short offset)
{
        return this->readInt(this->datas, offset);
}
               
void GenericDataProcessor::writeShort(uint8_t *buf, short offset, short value)
{
        buf = value & 0xFF;
        buf = (value >> 8) & 0xFF;
}
               
void GenericDataProcessor::writeInt(uint8_t *buf, short offset, int value)
{
        buf = value & 0xFF;
        buf = (value >> 8) & 0xFF;
        buf = (value >> 16) & 0xFF;
        buf = (value >> 24) & 0xFF;
}

void GenericDataProcessor::writeReserved(uint8_t *buf, short offset, short count)
{
        if (count > 0)
        {
                for (short i = offset, to = offset + count; i < to; i++)
                {
                        buf = RESERVED;
                }
        }
}

void GenericDataProcessor::printOut(char * out)
{
        char * s;
        s = (char *)malloc(sizeof(char) * 200);
        sprintf(s, "%s: %s", this->name, out);
        this->printCallback(s);
        free(s);
}


一个例子:


#include <GenericDataProcessor.h>

#define MSG_TYPE_STATUS 1
#define MSG_TYPE_MOTION 2

// data buffer
uint8_t dataBuffer = {0};

GenericDataProcessor gdp("gdp-1");

void printCallback(char *s)
{
Serial.println(s);
}

void dataCallback(short ver, short msgType, short pageId, short pageCount, short serialNumber, short offset, short len)
{
char *s;
s = (char *)malloc(sizeof(char) * 100);
sprintf(s, "ver=%d, msgType=%d, pageId=%d, pageCount=%d, serialNumber=%d, offset=%d, len=%d", ver, msgType, pageId, pageCount, serialNumber, offset, len);
printCallback(s);
free(s);
// for test, send back the header
uint8_t header = {0};
gdp.buildHeader(header, ver, msgType, 0, 1, 0, serialNumber + 1);
if (len > 0)
{
    // process body
    switch (msgType)
    {
    case MSG_TYPE_MOTION:
      {
      // motion
      processMotionCommandBody(offset);
      }
    default:
      break;
    }
}
}

void processMotionCommandBody(short offset)
{
short action = gdp.readShort(offset);
short param1 = gdp.readShort(offset + 2);
short param2 = gdp.readShort(offset + 4);
char *s;
s = (char *)malloc(sizeof(char) * 50);
sprintf(s, "motion command: action=%d, param1=%d, param2=%d", action, param1, param2);
printCallback(s);
free(s);
}

void setup()
{
Serial.begin(9600);
gdp.debugMode(true, &printCallback);
gdp.init(&dataCallback);
printCallback("started.");
}

void loop()
{
//
}

void serialEvent()
{
if (Serial.available() > 0)
{
    short len = Serial.available();
    if (len > PROCESSOR_BUFFER_SIZE)
    {
      len = PROCESSOR_BUFFER_SIZE;
    }
    for (int i = 0; i < len; i++)
    {
      dataBuffer = Serial.read();
    }
    gdp.process(dataBuffer, len);
}
}


Arduino/C/C++ 好人是边学边用,欢迎大家来指导,谢谢。

knightpeng 发表于 2013-1-29 20:50:23

好东西,学习一下

太行摄狼 发表于 2013-1-29 22:43:36

很强大,俺程序不行啊:'(

幻生幻灭 发表于 2013-1-30 13:54:57

收藏了,不过看不懂啊,太高深了。。。
求图片和应用

大连好人 发表于 2013-1-30 13:59:53

幻生幻灭 发表于 2013-1-30 13:54 static/image/common/back.gif
收藏了,不过看不懂啊,太高深了。。。
求图片和应用

纯代码,没有图片。。。

应用嘛,想让咱们的 BOXZ 有更高级更开放的应用,我觉得是个不错的通信协议。。。。

幻生幻灭 发表于 2013-1-30 18:56:02

大连好人 发表于 2013-1-30 13:59 static/image/common/back.gif
纯代码,没有图片。。。

应用嘛,想让咱们的 BOXZ 有更高级更开放的应用,我觉得是个不错的通信协议。 ...

嘿嘿~ 当时看了题目,我也是眼前一亮
不过进来之后就全是雾霾了,好高深的感觉。

最近在研究红外呢,

Ansifa 发表于 2013-1-30 23:42:04

{:soso_e134:}我也看晕了。。明天慢慢研究。。。

大连好人 发表于 2013-1-31 19:49:14

程序有 bug,刚才更新了。。。。

迷你强 发表于 2013-1-31 22:51:21

:o高手有空给咱稍稍讲解下下,。。。

大连好人 发表于 2013-2-1 12:41:33

迷你强 发表于 2013-1-31 22:51 static/image/common/back.gif
高手有空给咱稍稍讲解下下,。。。

h 里面有方法和变量的注释,cpp 里面有行内注释啊。。。。

拾瑞 发表于 2013-7-8 14:11:10

本帖最后由 拾瑞 于 2013-7-8 16:59 编辑

一直以为论坛我看得够多了,其实60%的好贴我都没有看过..........

好人有没有串口通讯中校验重发机制的例程!

拾瑞 发表于 2013-7-8 14:11:56

本帖最后由 拾瑞 于 2013-7-8 16:59 编辑

还没有写完就发出去了!!!!!哎,太激动了

Randy 发表于 2013-7-8 15:17:25

我晕了,看得很吃力啊!

学慧放弃 发表于 2013-10-6 19:13:36

楼主自己写的???这么屌????

for 发表于 2014-3-2 22:53:17

浮云啊!完全搞不懂介么高级的程序我想用这个程序控制步进电机就是搞不了是什么情况呢?
页: [1] 2
查看完整版本: 分享好人经验:自己写个库,统一处理流数据