菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
357
0

重构+优化 micropython 下 k210 (esp8285) 的 AT network 通信过程(附代码,顺便讲讲状态机实现)。

原创
05/13 14:22
阅读数 28278

回头再解释,先 mark 2020年4月28日。

2020年5月1日 今日有空,稍微更新一下,主要在这里备份一下相关的 code 和详细的问题分析和具体解释。

继着上次的 https://www.cnblogs.com/juwan/p/12389600.html 的内容,上次只定位了问题和简单解决了问题,调整了 Python 的 Code ,现在主要是旧 Code 不稳定,下载的文件容易出问题,容易因为二进制文件产生错误。

那么如何解决呢?

我回顾一下我的这 4 天的开发历程,第一天,基于旧接收框架重新整理 Code ,第二天准备到位开始测试,测试崩盘没有任何改造,镇定思痛,第三天重新上状态机架构,第四天大量测试,缝缝补补后符合预期执行,趋于完美。

先上逻辑分析仪捕获数据观察数据流,注意我压缩了很多写的过程,只交待重点,有理有据的代码让人信服,如果有漏洞也是细节的马虎,思路对了大体是不会有太大问题的。

在 HTTP 下载文件的时候,可以看到它接收数据是以 \r\n+IPD 开头的数据,如 '+IPD,3,1452:' or '+IPD,1452:' 两类,经过测试,确实是符合预期的返回指定长度的数据,但会在最后的结束的时候附带 CLOSED\r\n 数据表示传输结束。

这里我们注意到两个问题,以及旧代码为什么不能抵抗高速传输的异常数据所产生的错误,这在上次的分析就已经提到这个数据流的问题,我们先理清问题。

我们知道了传输的规则和传输的形式,例如 AT 指令 和 传输的数据 是混在同一条通道中的,这意味着二进制数据中一旦出现类似 AT 指令的数据必然导致后续的数据错乱,又或者是传输的缓冲区异常导致程序跑飞等等。

这些错误都在旧代码中体现了,这里不再细讲,直接开始编写新代码适用新的接口。

我们知道这个函数是可重入的,所以一开始在剥离基础框架的时候就要最大化的保留重入的接口,例如,接收数据完成后,将数据保留到一个缓冲区中,在下一次的重入的时候,取走足够数量的数据,如果不够,则继续接收数据,如下代码的设计。

    // required data already in buf, just return data
    size = Buffer_Size(&nic->buffer);
    if (size >= out_buff_len) { // 请求的代码长度小于缓冲区的长度,直接返回数据
        Buffer_Gets(&nic->buffer, (uint8_t *)out_buff, out_buff_len);
        if (data_len) {
            *data_len = out_buff_len;
        }
        size = size > out_buff_len ? out_buff_len : size;
        frame_sum -= size; // 总体的缓冲区数据要对应减少,同时判断是否没有更多的数据了,如果为负数则报错。
        // printk("Buffer_Size frame_sum %d size %d *data_len %d\n", frame_sum, size, *data_len);
        if (frame_sum <= 0) {
            Buffer_Clear(&nic->buffer);
            if (*peer_closed) { //buffer empty, return EOF
                return -2;
            }
        }
        return out_buff_len;
    }

围绕这个思路去,这个函数就形成一种非常适合 FSM 状态机的接口,同时必须要有 buffer 去保留上一次的状态与数据。

为此可以将其构建一种基础状态机模型,二话不说先上架构,保持 IDLE, IPD, DATA, EXIT 四种状态,本来有 CLOSED 状态的,后来想了一下还不到时候就移除了。


    enum fsm_state { IDLE, IPD, DATA, EXIT } State = IDLE;
    static uint8_t tmp_buf[1560 * 2] = {0}; // tmp_buf as queue 1560
    uint32_t tmp_bak = 0, tmp_state = 0, tmp_len = 0, tmp_pos = 0;
    mp_uint_t interrupt = 0, uart_recv_len = 0, res = 0;
    while (State != EXIT) {
        // wait any uart data
        uart_recv_len = uart_rx_any(nic->uart_obj);
        if (uart_recv_len > 0) {

            if (tmp_len >= sizeof(tmp_buf)) { // lock across
                // printk("lock across frame_len %d tmp_len %d\n", frame_len, tmp_len);
                tmp_len = 0;
            }

            res = uart_stream->read(nic->uart_obj, tmp_buf + tmp_len, 1, &err);
            // printk("%s | tmp_buf %s res %d err %d\n", __func__, tmp_buf, res, err);
            if (res == 1 && err == 0) {
                
                interrupt = mp_hal_ticks_ms();
                // backup tmp_len to tmp_pos (tmp_pos - 1)

                tmp_pos = tmp_len, tmp_len += 1; // buffer push
                // printk("[%02X]", tmp_buf[tmp_pos]);

                if (State == IDLE) {

                    continue;
                }

                if (State == IPD) {

                    continue;
                }

                if (State == DATA) {
                    // printk("%s | frame_len %d tmp_len %d tmp_buf[tmp_pos] %02X\n", __func__, frame_len, tmp_len, tmp_buf[tmp_pos]);
                    
                    continue;
                }

            } else {
                State = EXIT;
            }
        }
        
        if (mp_hal_ticks_ms() - interrupt > timeout) {
            break; // uart no return data to timeout break
        }

        if (*peer_closed) {
            break; // disconnection
        }
    }

我们看到这个 FSM 的基础保证是,确保访问边界被控制住,防止程序指针异常访问导致跑飞(在我前一篇有所提及)。

            if (tmp_len >= sizeof(tmp_buf)) { // lock across
                // printk("lock across frame_len %d tmp_len %d\n", frame_len, tmp_len);
                tmp_len = 0;
            }

其次是超时机制,确保任何情况下都可以离开状态机。

        if (mp_hal_ticks_ms() - interrupt > timeout) {
            break; // uart no return data to timeout break
        }

这样可以保障程序在任何场合下都不会陷入底层无法离开。

接下来就是当缓冲区有数据、数据接收正常、以及每次读取一字节成功的基础上,进行字符串的匹配和状态转移的分类处理。

        // wait any uart data
        uart_recv_len = uart_rx_any(nic->uart_obj);
        if (uart_recv_len > 0) {

            if (tmp_len >= sizeof(tmp_buf)) { // lock across
                // printk("lock across frame_len %d tmp_len %d\n", frame_len, tmp_len);
                tmp_len = 0;
            }

            res = uart_stream->read(nic->uart_obj, tmp_buf + tmp_len, 1, &err);
            // printk("%s | tmp_buf %s res %d err %d\n", __func__, tmp_buf, res, err);
            if (res == 1 && err == 0) {
                
                interrupt = mp_hal_ticks_ms();
                // backup tmp_len to tmp_pos (tmp_pos - 1)

                tmp_pos = tmp_len, tmp_len += 1; // buffer push
                // printk("[%02X]", tmp_buf[tmp_pos]);

                if (State == IDLE) {

                    continue;
                }

                if (State == IPD) {

                    continue;
                }

                if (State == DATA) {
                    // printk("%s | frame_len %d tmp_len %d tmp_buf[tmp_pos] %02X\n", __func__, frame_len, tmp_len, tmp_buf[tmp_pos]);
                    
                    continue;
                }

            } else {
                State = EXIT;
            }
        }

这时候最后一个状态是用来离开的标记。

我们开始脑里模拟执行过程和假设,第一种状态就是空转(IDLE),基本上每个状态机都是从这里开始的。

在 IDLE 期间要做的事情就是清理无关的数据,如下代码,直到符合预期的怀疑对象出现为止。

                if (State == IDLE) {
                    if (tmp_buf[tmp_pos] == '+') {
                        tmp_state = 1, tmp_bak = tmp_pos, State = IPD;
                        continue;
                    } else {
                        // printk("(%02X)", tmp_buf[tmp_pos]);
                        tmp_len -= 1; // clear don't need data, such as (0D)(0A)
                        continue;
                    }
                }

如上代码,在空转状态下,遇到 '+' 出现后,会进行假设性的状态转移,如果不符合预期的数据会跌回 ILDE 状态,然后我们看看 IPD 状态的实现。


                    if (tmp_pos - tmp_bak > 12) { // Over the length of the '+IPD,3,1452:' or '+IPD,1452:'
                        tmp_state = 0, State = IDLE;
                        continue;
                    }

                    if (0 < tmp_state && tmp_state < 5) {
                        // printk("(%d, %02X) [%d, %02X]\n", tmp_pos, tmp_buf[tmp_pos], tmp_pos - tmp_bak, ("+IPD,")[tmp_pos - tmp_bak]);
                        if (tmp_buf[tmp_pos] == ("+IPD,")[tmp_pos - tmp_bak]) {
                            tmp_state += 1; // tmp_state 1 + "IPD," to tmp_state 5
                        } else {
                            tmp_state = 0, State = IDLE;
                        }
                        continue;
                    }

                    if (tmp_state == 5 && tmp_buf[tmp_pos] == ':')
                    {
                        tmp_state = 6, State = IDLE;
                        tmp_buf[tmp_pos + 1] = '\0'; // lock tmp_buf
                        // printk("%s | is `IPD` tmp_bak %d tmp_len %d command %s\n", __func__, tmp_bak, tmp_len, tmp_buf + tmp_bak);
                        char *index = strstr((char *)tmp_buf + tmp_bak + 5 /* 5 > '+IPD,' and `+IPD,325:` in tmp_buf */, ",");
                        int ret = 0, len = 0;
                        if (index) { // '+IPD,3,1452:'
                            ret = sscanf((char *)tmp_buf + tmp_bak, "+IPD,%hhd,%d:", &mux_id, &len);
                            if (ret != 2 || mux_id < 0 || mux_id > 4 || len <= 0) {
                                ; // Misjudge or fail, return, or clean up later
                            } else {
                                tmp_len = tmp_bak, tmp_bak = 0; // Clean up the commands in the buffer and roll back the data
                                frame_len = len, State = DATA;
                            }
                        } else { // '+IPD,1452:'
                            ret = sscanf((char *)tmp_buf + tmp_bak, "+IPD,%d:", &len);
                            if (ret != 1 || len <= 0) {
                                ; // Misjudge or fail, return, or clean up later
                            } else {
                                tmp_len = tmp_bak, tmp_bak = 0; // Clean up the commands in the buffer and roll back the data
                                frame_len = len, State = DATA;
                            }
                        }
                        continue;
                    }

由于这是第三次重构的代码,所以里面加了很多保护措施,但实际上这些措施不一定会遇到,它会随着各种高频传输调试中触发,代码要能够抵抗这些异常数据。

我们看到 if (tmp_buf[tmp_pos] == ("+IPD,")[tmp_pos - tmp_bak]) { 的循环表示的就是 strstr 的匹配,但这里的匹配我稍微偷懒了一下简写了代码。

                    if (0 < tmp_state && tmp_state < 5) {
                        // printk("(%d, %02X) [%d, %02X]\n", tmp_pos, tmp_buf[tmp_pos], tmp_pos - tmp_bak, ("+IPD,")[tmp_pos - tmp_bak]);
                        if (tmp_buf[tmp_pos] == ("+IPD,")[tmp_pos - tmp_bak]) {
                            tmp_state += 1; // tmp_state 1 + "IPD," to tmp_state 5
                        } else {
                            tmp_state = 0, State = IDLE;
                        }
                        continue;
                    }

因为整个环境没有基础容器的支持,例如队列,所以我只能通过代码中的索引去模拟队列执行,保障整个执行的有序状态,当匹配完成后,会从 IPD 进入 DATA 态,此时意味着之后的数据都被视为数据,长度以 IPD 指示的为准。

                                tmp_len = tmp_bak, tmp_bak = 0; // Clean up the commands in the buffer and roll back the data
                                frame_len = len, State = DATA;

进入 DATA 态的时候就要思考终止条件的状态了,有两种可能,一种是标识的长度数据走完了,这表示应该要转移状态到 ILDE 了,但期望从这个状态中发现终止条件。

                    if (frame_len < 0) {
                        if (frame_len == -1 && tmp_buf[tmp_pos] == 'C') { // wait "CLOSED\r\n"
                            frame_bak = frame_len;
                            continue;
                        }
                        if (tmp_state == 6 && tmp_buf[tmp_pos] == '\r') {
                            tmp_state = 7;
                            continue;
                        }
                        if (tmp_state == 7 && tmp_buf[tmp_pos] == '\n') {
                            if (frame_len == -2) { // match +IPD EOF (\r\n)
                                tmp_state = 0, State = IDLE;
                                // After receive complete, confirm the data is enough
                                size = Buffer_Size(&nic->buffer);
                                // printk("%s | size %d out_buff_len %d\n", __func__, size, out_buff_len);
                                if (size >= out_buff_len) { // data enough
                                    // printk("%s | recv out_buff_len overflow\n", __func__);
                                    State = EXIT;
                                }
                            } else if (frame_len == -8 && frame_len == -1)  {
                                // Get "CLOSED\r\n"
                                peer_just_closed = *peer_closed = true;
                                frame_bak = 0, tmp_state = 0, State = EXIT;
                            } else {
                                tmp_state = 6;
                            }
                            continue;
                        }
                        // 存在异常,没有得到 \r\n 的匹配,并排除 CLOSED\r\n 的指令触发的可能性,意味着传输可能越界出错了 \r\n ,则立即回到空闲状态。
                        if (frame_len <= -1 && frame_bak != -1) {
                            // printk("%s | tmp_state %d frame_len %d tmp %02X\n", __func__, tmp_state, frame_len, tmp_buf[tmp_pos]);
                            State = IDLE;
                            continue;
                        }
                    }

如我所假设的两种状态 \r\n 和 CLOSED\r\n 这两类,但还有最终保障措施,假设无法符合我的预期的结尾,要尽快回归 ILDE 重新进入匹配,否则数据将一片混乱,这个只能保障 1 、 2 字节的误差,对于缓冲区溢出复写的情况无法抵抗。

到这里我们还要注意到很多地方可能会出错和溢出,比如有如下考虑:

  • 上层 Python 接收数据的变量溢出,无法继续 read 更多,积压数据未处理。
  • 原始串口缓冲区溢出后,循环缓冲复写。
  • 内部状态机解析的数据缓冲长度(已经被保护了,但出错了会导致其中的一帧丢失)
  • 状态机解析后的数据缓冲变量写入可能会失败,夹在 C 与 Python 之间交互的一层缓冲区,主要用来供应 socket 层的数据。

因此这里面的任何一个环境出错,都可能导致 HTTP 的原始下载数据出错,但所辛都可以完整下好不会出现指令与二进制数据混淆的情况了。

基于这个接收框架,还可以进一步的拓展出 多 socket 的通信 和 其他 AT 指令的通信接口,匹配状态和解析后分类到各自对应的容器即可。

旧接口函数抵抗异常的设计不够多,所以之后测试完整了后,可以基于新的接口继续发展新功能。

我的测试结果如下:

  • 在串口波特率 921600(1M)约 92 kb/s 的下载速度下载 384kb kmodel 模型文件的过程。

    • 若是不写入 SD 卡的基础上,完美接收完毕,耗时 8s 。
    • 若是写入我的 256M 的路边野卡,完整下载时 10s,无任何损坏,但受到网络波动则会出现下载的文件中会存在失败的帧,下载过程的好坏参半。
  • 在串口波特率 576000(5M)约 57 kb/s 的下载速度下载 5M kmodel 模型文件的过程。

    • 写入SD卡的过程,下载 5M 文件无错且完整,较为稳定,少有失败的情况。

从这个测试的结果来看,将 SD 卡写入数据是一个影响很大的变数,要么将其读写分离加缓冲在其中,减少 SD 卡带来的负面影响,优先把 HTTP 的数据带回内存之中。

这和串口缓冲区有关,如现在假定的 10k 字节,而 921600 (92kb/s),意味着 read 数据要在 10k/x:100kb/s, x = 0.1s 内取走全部数据,否则 IO 不匹配,缓冲区会溢出,之后的数据都会被放弃从而形成接收数据的断层。

python 的 code 执行模型如下

while True:
    tmp = wifi.read()
    file.write(tmp)

所以再下一次的 wifi.read() 因符合下述读写分离模型,这只是伪代码。不用太计较。

# thread 0 
while True:
    tmp = wifi.read()
    sleep(0.1)

# thread 1
while True:
    file.write(tmp)
    sleep(x)

这次我们必须在硬件的内存和 SD 卡的写入数据中做一个选择,至少保证 SD 写入速度应大于 100kb/s 从而保持 IO 的速率基本一致,减少下载文件的异常。

其他可能存在的问题,就比如有时候发送数据会请求失败,还不知道是哪个死角没考虑到,也许之后就会有人发现了吧?

之后的内容都是代码的备份,分为 新接口函数,旧接口函数,对应的 Python 测试代码,有兴趣的可以自己调试试试。


/*----------------------------------------------------------------------------*/
/* +IPD,<id>,<len>:<data> */
/* +IPD,<len>:<data> */
/**
 * 
 * @return -1: parameters error, -2: EOF, -3: timeout, -4:peer closed and no data in buffer
 */
uint32_t recvPkg(esp8285_obj *nic, char *out_buff, uint32_t out_buff_len, uint32_t *data_len, uint32_t timeout, char *coming_mux_id, bool *peer_closed, bool first_time_recv)
{
    // printk("%s | head obl %d *dl %d tu %d ftr %d \n", __func__, out_buff_len, *data_len, timeout, first_time_recv);

    int err = 0, size = 0;
    bool peer_just_closed = false;

    // only for single socket but need socket map & buffer TODO:
    const mp_stream_p_t *uart_stream = mp_get_stream(nic->uart_obj);
    static int8_t mux_id = -1;
    static int32_t frame_sum = 0, frame_len = 0, frame_bak = 0;

    // parameters check
    if (out_buff == NULL) {
        return -1;
    }
    
    if (first_time_recv) {
        frame_sum = frame_len = 0;
    }

    // required data already in buf, just return data
    size = Buffer_Size(&nic->buffer);
    if (size >= out_buff_len) {
        Buffer_Gets(&nic->buffer, (uint8_t *)out_buff, out_buff_len);
        if (data_len) {
            *data_len = out_buff_len;
        }
        size = size > out_buff_len ? out_buff_len : size;
        frame_sum -= size;
        // printk("Buffer_Size frame_sum %d size %d *data_len %d\n", frame_sum, size, *data_len);
        if (frame_sum <= 0) {
            Buffer_Clear(&nic->buffer);
            if (*peer_closed) { //buffer empty, return EOF
                return -2;
            }
        }
        return out_buff_len;
    }

    enum fsm_state { IDLE, IPD, DATA, EXIT } State = IDLE;
    static uint8_t tmp_buf[1560 * 2] = {0}; // tmp_buf as queue 1560
    uint32_t tmp_bak = 0, tmp_state = 0, tmp_len = 0, tmp_pos = 0;
    mp_uint_t interrupt = 0, uart_recv_len = 0, res = 0;
    while (State != EXIT) {
        // wait any uart data
        uart_recv_len = uart_rx_any(nic->uart_obj);
        if (uart_recv_len > 0) {

            if (tmp_len >= sizeof(tmp_buf)) { // lock across
                // printk("lock across frame_len %d tmp_len %d\n", frame_len, tmp_len);
                tmp_len = 0;
            }

            res = uart_stream->read(nic->uart_obj, tmp_buf + tmp_len, 1, &err);
            // printk("%s | tmp_buf %s res %d err %d\n", __func__, tmp_buf, res, err);
            if (res == 1 && err == 0) {
                
                interrupt = mp_hal_ticks_ms();
                // backup tmp_len to tmp_pos (tmp_pos - 1)

                tmp_pos = tmp_len, tmp_len += 1; // buffer push
                // printk("[%02X]", tmp_buf[tmp_pos]);

                if (State == IDLE) {
                    if (tmp_buf[tmp_pos] == '+') {
                        tmp_state = 1, tmp_bak = tmp_pos, State = IPD;
                        continue;
                    } else {
                        // printk("(%02X)", tmp_buf[tmp_pos]);
                        tmp_len -= 1; // clear don't need data, such as (0D)(0A)
                        continue;
                    }
                }

                if (State == IPD) {

                    if (tmp_pos - tmp_bak > 12) { // Over the length of the '+IPD,3,1452:' or '+IPD,1452:'
                        tmp_state = 0, State = IDLE;
                        continue;
                    }

                    if (0 < tmp_state && tmp_state < 5) {
                        // printk("(%d, %02X) [%d, %02X]\n", tmp_pos, tmp_buf[tmp_pos], tmp_pos - tmp_bak, ("+IPD,")[tmp_pos - tmp_bak]);
                        if (tmp_buf[tmp_pos] == ("+IPD,")[tmp_pos - tmp_bak]) {
                            tmp_state += 1; // tmp_state 1 + "IPD," to tmp_state 5
                        } else {
                            tmp_state = 0, State = IDLE;
                        }
                        continue;
                    }

                    if (tmp_state == 5 && tmp_buf[tmp_pos] == ':')
                    {
                        tmp_state = 6, State = IDLE;
                        tmp_buf[tmp_pos + 1] = '\0'; // lock tmp_buf
                        // printk("%s | is `IPD` tmp_bak %d tmp_len %d command %s\n", __func__, tmp_bak, tmp_len, tmp_buf + tmp_bak);
                        char *index = strstr((char *)tmp_buf + tmp_bak + 5 /* 5 > '+IPD,' and `+IPD,325:` in tmp_buf */, ",");
                        int ret = 0, len = 0;
                        if (index) { // '+IPD,3,1452:'
                            ret = sscanf((char *)tmp_buf + tmp_bak, "+IPD,%hhd,%d:", &mux_id, &len);
                            if (ret != 2 || mux_id < 0 || mux_id > 4 || len <= 0) {
                                ; // Misjudge or fail, return, or clean up later
                            } else {
                                tmp_len = tmp_bak, tmp_bak = 0; // Clean up the commands in the buffer and roll back the data
                                frame_len = len, State = DATA;
                            }
                        } else { // '+IPD,1452:'
                            ret = sscanf((char *)tmp_buf + tmp_bak, "+IPD,%d:", &len);
                            if (ret != 1 || len <= 0) {
                                ; // Misjudge or fail, return, or clean up later
                            } else {
                                tmp_len = tmp_bak, tmp_bak = 0; // Clean up the commands in the buffer and roll back the data
                                frame_len = len, State = DATA;
                            }
                        }
                        continue;
                    }
                }

                if (State == DATA) {
                    // printk("%s | frame_len %d tmp_len %d tmp_buf[tmp_pos] %02X\n", __func__, frame_len, tmp_len, tmp_buf[tmp_pos]);
                    
                    frame_len -= tmp_len, tmp_len = 0; // get data

                    if (frame_len < 0) {
                        if (frame_len == -1 && tmp_buf[tmp_pos] == 'C') { // wait "CLOSED\r\n"
                            frame_bak = frame_len;
                            continue;
                        }
                        if (tmp_state == 6 && tmp_buf[tmp_pos] == '\r') {
                            tmp_state = 7;
                            continue;
                        }
                        if (tmp_state == 7 && tmp_buf[tmp_pos] == '\n') {
                            if (frame_len == -2) { // match +IPD EOF (\r\n)
                                tmp_state = 0, State = IDLE;
                                // After receive complete, confirm the data is enough
                                size = Buffer_Size(&nic->buffer);
                                // printk("%s | size %d out_buff_len %d\n", __func__, size, out_buff_len);
                                if (size >= out_buff_len) { // data enough
                                    // printk("%s | recv out_buff_len overflow\n", __func__);
                                    State = EXIT;
                                }
                            } else if (frame_len == -8 && frame_len == -1)  {
                                // Get "CLOSED\r\n"
                                peer_just_closed = *peer_closed = true;
                                frame_bak = 0, tmp_state = 0, State = EXIT;
                            } else {
                                tmp_state = 6;
                            }
                            continue;
                        }
                        // 存在异常,没有得到 \r\n 的匹配,并排除 CLOSED\r\n 的指令触发的可能性,意味着传输可能越界出错了 \r\n ,则立即回到空闲状态。
                        if (frame_len <= -1 && frame_bak != -1) {
                            // printk("%s | tmp_state %d frame_len %d tmp %02X\n", __func__, tmp_state, frame_len, tmp_buf[tmp_pos]);
                            State = IDLE;
                            continue;
                        }
                    } else {
                        // for(int i = 0; i < tmp_len; i++) {
                        //     int tmp = tmp_buf[i];
                        //     printk("[%02X]", tmp);
                        // }
                        // printk("%s | frame_len %d tmp_len %d\n", __func__, frame_len, tmp_pos + 1);
                        // printk("%.*s", tmp_len, tmp_buf);
                        if (!Buffer_Puts(&nic->buffer, tmp_buf, (tmp_pos + 1))) {
                            printk("%s | network->buffer overflow Buffer Max %d Size %d\n", __func__, ESP8285_BUF_SIZE, Buffer_Size(&nic->buffer));
                            State = EXIT;
                        } else {
                            // reduce data len
                            // printk("[%02X]", tmp_buf[(tmp_pos + 1)]);
                            frame_sum += (tmp_pos + 1);
                        }
                        // printk("frame_sum %d frame_len %d tmp_len %d\n", frame_sum, frame_len, tmp_pos + 1);
                    }
                    continue;
                }

            } else {
                State = EXIT;
            }
        }
        
        if (mp_hal_ticks_ms() - interrupt > timeout) {
            break; // uart no return data to timeout break
        }

        if (*peer_closed) {
            break; // disconnection
        }
    }

    size = Buffer_Size(&nic->buffer);
    
    // peer closed and no data in buffer
    if (size == 0 && !peer_just_closed && *peer_closed) {
        frame_sum = 0;
        return -4;
    }

    size = size > out_buff_len ? out_buff_len : size;
    
    Buffer_Gets(&nic->buffer, (uint8_t *)out_buff, size);
    if (data_len) {
        *data_len = size;
    }

    frame_sum -= size;

    if (frame_sum <= 0 || peer_just_closed) {
        Buffer_Clear(&nic->buffer);
        if (peer_just_closed)
        {
            frame_sum = 0;
            return -2;
        }
    }

    // printk(" %s | tail obl %d *dl %d se %d\n", __func__, out_buff_len, *data_len, size);

    return size;
}

肝了三天了,彻底优化了 AT 8285 的通信过程,等测试得差不多了再提交,目前只测到了波特率 576000 下 HTTP 下载 K-Flash.zip (5M)二进制文件,数据完整,接收过程大体是没有太大问题了。

备份旧 CODE 。


uint32_t bak_recvPkg(esp8285_obj *nic, char *out_buff, uint32_t out_buff_len, uint32_t *data_len, uint32_t timeout, char *coming_mux_id, bool *peer_closed, bool first_time_recv)
{
      
    const mp_stream_p_t *uart_stream = mp_get_stream(nic->uart_obj);
    static uint8_t temp_buff1[2048] = {0};
    static uint8_t temp_buff2[2048] = {0};
    uint16_t temp_buff1_len = 0;
    uint16_t temp_buff2_len = 0;
    uint8_t find_frame_flag_index = 0;
    static int8_t mux_id = -1;
    static int16_t frame_len = 0;
    static int32_t frame_len_sum = 0; //only for single socket TODO:
    // bool    overflow = false;
    int ret = 0;
    int size = 0;
    int errcode;
    mp_uint_t start1 = 0, start2 = 0;
    bool no_frame_flag = false;
    bool new_frame = false;
    mp_uint_t data_len_in_uart_buff = 0;
    bool peer_just_closed = false;

    // parameters check
    if (out_buff == NULL)
    {
        return -1;
    }
    // // printk("\r if (out_buff == NULL) { \n");
    // init vars
    memset(temp_buff1, 0, sizeof(temp_buff1));
    memset(temp_buff2, 0, sizeof(temp_buff2));
    if (first_time_recv)
    {
        frame_len = 0;
        frame_len_sum = 0;
    }

    // required data already in buf, just return data
    uint32_t buff_size = Buffer_Size(&nic->buffer);
    // // printk("\r if(buff_size >= out_buff_len) \n");

    // printk("\r buff_size %d out_buff_len %d\n", buff_size, out_buff_len);

    if (buff_size >= out_buff_len)
    {
        Buffer_Gets(&nic->buffer, (uint8_t *)out_buff, out_buff_len);
        if (data_len)
            *data_len = out_buff_len;
        frame_len_sum -= size;
        if (frame_len_sum <= 0)
        {
            frame_len = 0;
            frame_len_sum = 0;
            Buffer_Clear(&nic->buffer);
            if (*peer_closed) //buffer empty, return EOF
            {
                return -2;
            }
        }
        return out_buff_len;
    }

    // read from uart buffer, if not frame start flag, put into nic buffer
    // and need wait for full frame flag in 200ms(can be fewer), frame format: '+IPD,id,len:data' or '+IPD,len:data'
    // wait data from uart buffer if not timeout
    start2 = mp_hal_ticks_ms();
    data_len_in_uart_buff = uart_rx_any(nic->uart_obj);
    // // printk("\r uart_stream->read %p \n", uart_stream->read);

    do
    {
        if (data_len_in_uart_buff > 0)
        {
            uart_stream->read(nic->uart_obj, temp_buff1 + temp_buff1_len, 1, &errcode);
            if (find_frame_flag_index == 0 && temp_buff1[temp_buff1_len] == '+')
            {
                ++find_frame_flag_index;
                start1 = mp_hal_ticks_ms();
            }
            else if (find_frame_flag_index == 1 && temp_buff1[temp_buff1_len] == 'I')
            {
                ++find_frame_flag_index;
            }
            else if (find_frame_flag_index == 2 && temp_buff1[temp_buff1_len] == 'P')
            {
                ++find_frame_flag_index;
            }
            else if (find_frame_flag_index == 3 && temp_buff1[temp_buff1_len] == 'D')
            {
                ++find_frame_flag_index;
            }
            else if (find_frame_flag_index == 4 && temp_buff1[temp_buff1_len] == ',')
            {
                ++find_frame_flag_index;
            }
            else if (find_frame_flag_index == 5)
            {
                if (temp_buff1[temp_buff1_len] == ':')
                { // '+IPD,3,1452:' or '+IPD,1452:'
                    temp_buff1[temp_buff1_len + 1] = '\0';
                    char *index = strstr((char *)temp_buff1 + 5, ",");
                    if (index)
                    { // '+IPD,3,1452:'
                        ret = sscanf((char *)temp_buff1, "+IPD,%hhd,%hd:", &mux_id, &frame_len);
                        if (ret != 2 || mux_id < 0 || mux_id > 4 || frame_len <= 0)
                        { // format not satisfy, it's data
                            no_frame_flag = true;
                        }
                        else
                        { // find frame start flag, although it may also data
                            new_frame = true;
                        }
                    }
                    else
                    { // '+IPD,1452:'
                        ret = sscanf((char *)temp_buff1, "+IPD,%hd:", &frame_len);
                        if (ret != 1 || frame_len <= 0)
                        { // format not satisfy, it's data
                            no_frame_flag = true;
                        }
                        else
                        { // find frame start flag, although it may also data
                            new_frame = true;
                            // // printk("new frame:%d\r\n", frame_len);
                        }
                    }
                }
            }
            else
            { // not match frame start flag, put into nic buffer
                no_frame_flag = true;
            }
            // new frame or data
            // or wait for frame start flag timeout(300ms, can be fewer), maybe they're data
            if (new_frame || no_frame_flag || temp_buff1_len >= 12 ||
                (find_frame_flag_index && (mp_hal_ticks_ms() - start1 > 300))) // '+IPD,3,1452:'
            {
                if (!new_frame)
                {
                    if (frame_len_sum > 0)
                    {
                        // // printk("if(frame_len_sum > 0) { temp_buff2_len-%d temp_buff1_len-%d\r\n", temp_buff2_len, temp_buff1_len);
                        if (!Buffer_Puts(&nic->buffer, temp_buff1, temp_buff1_len + 1))
                        {
                            // printk("data_len_in_uart_buff %u temp_buff1_len %u temp_buff1 ", data_len_in_uart_buff, temp_buff1_len);
                            for (int i = 0; i < temp_buff1_len; i++) {
                                int tmp = temp_buff1[i];
                                // printk("(%c %02X)", temp_buff1[i], tmp);
                            }
                            // printk("\n");
                            // printk("data_len_in_uart_buff %u temp_buff1_len %u temp_buff1 ", data_len_in_uart_buff, temp_buff1_len);
                            for (int i = 0; i < temp_buff1_len; i++) {
                                int tmp = temp_buff1[i];
                                // printk("(%c %02X)", temp_buff1[i], tmp);
                            }
                            // printk("\n");
                            // overflow = true;
                            // break;//TODO:
                        }
                    }
                    else
                    {
                        if (temp_buff1[0] == 'C')
                        {
                            memset(temp_buff2, 0, sizeof(temp_buff2));
                        }

                        // // printk("-%d:%s\r\n", temp_buff2_len, temp_buff2);

                        // // printk("!(if(frame_len_sum > 0) {) temp_buff2_len-%d temp_buff1_len-%d\r\n", temp_buff2_len, temp_buff1_len);
                        temp_buff2[temp_buff2_len++] = temp_buff1[0];
                        // // printk("%c", temp_buff1[0]); //TODO: optimize uart overflow, if uart overflow, uncomment this will print some data
                        // // printk("-%d:%s\r\n", temp_buff2_len, temp_buff2);
                        if (strstr((const char *)temp_buff2, "CLOSED\r\n") != NULL)
                        {
                            // // printk("pear closed\r\n");
                            *peer_closed = true;
                            peer_just_closed = true;
                            break;
                        }
                    }
                }
                else
                {
                    frame_len_sum += frame_len;
                }
                find_frame_flag_index = 0;
                temp_buff1_len = 0;
                new_frame = false;
                no_frame_flag = false;
                // enough data as required
                size = Buffer_Size(&nic->buffer);
                if (size >= out_buff_len) // data enough
                    break;
                if (frame_len_sum != 0 && frame_len_sum <= size) // read at least one frame ok
                {
                    break;
                }
                continue;
            }
            ++temp_buff1_len;
        }
        if (timeout != 0 && (mp_hal_ticks_ms() - start2 > timeout) && !find_frame_flag_index)
        {
            // // printk("\n timeout %d start2 %d (mp_hal_ticks_ms() - start2 > timeout) %d find_frame_flag_index %d \n", timeout, start2, (mp_hal_ticks_ms() - start2 > timeout), find_frame_flag_index);
            // // printk("\r-3 recvPkg nic %p out_buff %p out_buff_len %p data_len %p timeout %p coming_mux_id %p peer_closed %p first_time_recv %p\n", nic, out_buff, &out_buff_len, data_len, &timeout, coming_mux_id, peer_closed, &first_time_recv);
            // printk("data_len_in_uart_buff %u temp_buff1_len %u temp_buff1 ", data_len_in_uart_buff, temp_buff1_len);
            for (int i = 0; i < temp_buff1_len; i++) {
                int tmp = temp_buff1[i];
                // printk("(%c %02X)", temp_buff1[i], tmp);
            }
            // printk("\n");
            return -3;
        }
        data_len_in_uart_buff = uart_rx_any(nic->uart_obj);

        // // printk("\n2-%hd 1-%hd\n", temp_buff2_len, temp_buff1_len);

    } while ((timeout || find_frame_flag_index) && (!*peer_closed || data_len_in_uart_buff > 0));

    size = Buffer_Size(&nic->buffer);
    if (size == 0 && !peer_just_closed && *peer_closed) //peer closed and no data in buffer
    {
        frame_len = 0;
        frame_len_sum = 0;
        // // printk("\r-4 recvPkg nic %p out_buff %p out_buff_len %p data_len %p timeout %p coming_mux_id %p peer_closed %p first_time_recv %p\n", nic, out_buff, &out_buff_len, data_len, &timeout, coming_mux_id, peer_closed, &first_time_recv);
        return -4;
    }
    size = size > out_buff_len ? out_buff_len : size;
    Buffer_Gets(&nic->buffer, (uint8_t *)out_buff, size);
    if (data_len)
        *data_len = size;
    frame_len_sum -= size;
    if (frame_len_sum <= 0 || peer_just_closed)
    {
        frame_len = 0;
        frame_len_sum = 0;
        Buffer_Clear(&nic->buffer);
        if (peer_just_closed)
        {
            // // printk("\r-2 recvPkg nic %p out_buff %p out_buff_len %p data_len %p timeout %p coming_mux_id %p peer_closed %p first_time_recv %p\n", nic, out_buff, &out_buff_len, data_len, &timeout, coming_mux_id, peer_closed, &first_time_recv);
            return -2;
        }
    }

    // mp_printf(&mp_plat_print, "[MaixPy] %s | size %d out_buff_len %d *data_len %d \n", __func__, size, out_buff_len, *data_len);
    return size;
}

旧 code 最大的问题在于处理过程单向思维,新 code 主要是上状态机分离 AT 指令和数据的处理过程。

对应的 HTTP Download 的 Python Code 也备份一下。

"""
The MIT License (MIT)
Copyright © 2018 Jean-Christophe Bos & HC² (www.hc2.fr)
"""

from struct import pack
import socket
import gc


class MicroWebCli:

    # ============================================================================
    # ===( Class AuthBasic  )=====================================================
    # ============================================================================

    class AuthBasic:

        # ------------------------------------------------------------------------

        def __init__(self, user, password):
            if password is None:
                password = ''
            if not 'b2a_base64' in globals():
                from binascii import b2a_base64
            cred = '%s:%s' % (user, password)
            self._auth = 'Basic %s' % b2a_base64(
                cred.encode()).decode().strip()

        # ------------------------------------------------------------------------

        def Apply(self, microWebCli):
            microWebCli.Headers['Authorization'] = self._auth

    # ============================================================================
    # ===( Class AuthToken  )=====================================================
    # ============================================================================

    class AuthToken:

        # ------------------------------------------------------------------------

        def __init__(self, token):
            self._auth = 'Bearer %s' % token

        # ------------------------------------------------------------------------

        def Apply(self, microWebCli):
            microWebCli.Headers['Authorization'] = self._auth

    # ============================================================================
    # ===( Utils  )===============================================================
    # ============================================================================

    def _tryAllocByteArray(size):
        for x in range(10):
            try:
                gc.collect()
                return bytearray(size)
            except:
                pass
        return None

    # ----------------------------------------------------------------------------

    @staticmethod
    def _quote(s, safe='/'):
        r = ''
        for c in str(s):
            if (c >= 'a' and c <= 'z') or \
               (c >= '0' and c <= '9') or \
               (c >= 'A' and c <= 'Z') or \
               (c in '.-_') or (c in safe) :
                r += c
            else:
                for b in c.encode('UTF-8'):
                    r += '%%%02X' % b
        return r

    # ----------------------------------------------------------------------------

    @staticmethod
    def _urlEncode(s):
        return MicroWebCli._quote(s, ';/?:@&=+$,')

    # ----------------------------------------------------------------------------

    @staticmethod
    def _unquote(s):
        r = str(s).split('%')
        try:
            b = r[0].encode()
            for i in range(1, len(r)):
                try:
                    b += bytes([int(r[i][:2], 16)]) + r[i][2:].encode()
                except:
                    b += b'%' + r[i].encode()
            return b.decode('UTF-8')
        except:
            return str(s)

    # ----------------------------------------------------------------------------

    @staticmethod
    def _unquote_plus(s):
        return MicroWebCli._unquote(str(s).replace('+', ' '))

    # ----------------------------------------------------------------------------

    def GETRequest(url,
                   queryParams=None,
                   auth=None,
                   connTimeoutSec=10,
                   socks5Addr=None):
        c = MicroWebCli(url,
                        auth=auth,
                        connTimeoutSec=connTimeoutSec,
                        socks5Addr=socks5Addr)
        if queryParams:
            c.QueryParams = queryParams
        c.OpenRequest()
        r = c.GetResponse()
        if r.IsSuccess():
            return r.ReadContent()
        r.Close()
        if r.IsLocationMoved():
            return MicroWebCli.GETRequest(r.LocationMovedURL(), queryParams,
                                          auth, connTimeoutSec, socks5Addr)
        return None

    # ----------------------------------------------------------------------------

    def POSTRequest(url,
                    formData={},
                    auth=None,
                    connTimeoutSec=10,
                    socks5Addr=None):
        c = MicroWebCli(url,
                        method='POST',
                        auth=auth,
                        connTimeoutSec=connTimeoutSec,
                        socks5Addr=socks5Addr)
        c.OpenRequestFormData(formData)
        r = c.GetResponse()
        if r.IsSuccess():
            return r.ReadContent()
        r.Close()
        if r.IsLocationMoved():
            return MicroWebCli.POSTRequest(r.LocationMovedURL(), formData,
                                           auth, connTimeoutSec, socks5Addr)
        return None

    # ----------------------------------------------------------------------------

    def JSONRequest(url,
                    o=None,
                    auth=None,
                    connTimeoutSec=10,
                    socks5Addr=None):
        c = MicroWebCli(url,
                        method=('POST' if o else 'GET'),
                        auth=auth,
                        connTimeoutSec=connTimeoutSec,
                        socks5Addr=socks5Addr)
        if o:
            c.OpenRequestJSONData(o)
        else:
            c.OpenRequest()
        r = c.GetResponse()
        if r.IsSuccess():
            return r.ReadContentAsJSON()
        r.Close()
        if r.IsLocationMoved():
            return MicroWebCli.JSONRequest(r.LocationMovedURL(), o, auth,
                                           connTimeoutSec, socks5Addr)
        return None

    # ----------------------------------------------------------------------------

    def FileRequest(url,
                    filepath,
                    progressCallback=None,
                    auth=None,
                    connTimeoutSec=5,
                    socks5Addr=None):
        c = MicroWebCli(url,
                        auth=auth,
                        connTimeoutSec=connTimeoutSec,
                        socks5Addr=socks5Addr)
        c.OpenRequest()
        r = c.GetResponse()
        if r.IsSuccess():
            r.WriteContentToFile(filepath, progressCallback)
            return r.GetContentType()
        r.Close()
        if r.IsLocationMoved():
            return MicroWebCli.FileRequest(r.LocationMovedURL(), filepath,
                                           progressCallback, auth,
                                           connTimeoutSec, socks5Addr)
        return None

    # ============================================================================
    # ===( Constructor )==========================================================
    # ============================================================================

    def __init__(self,
                 url='',
                 method='GET',
                 auth=None,
                 connTimeoutSec=10,
                 socks5Addr=None):
        self.URL = url
        self.Method = method
        self.Auth = auth
        self.ConnTimeoutSec = connTimeoutSec
        self._socks5Addr = socks5Addr
        self._headers = {}
        self._socket = None
        self._socketAddr = None
        self._response = None

    # ============================================================================
    # ===( Functions )============================================================
    # ============================================================================

    def _write(self, data):
        try:
            data = memoryview(data)
            while data:
                n = self._socket.write(data)
                data = data[n:]
            return True
        except:
            self.Close()
            raise Exception('Error to send data on connection')

    # ------------------------------------------------------------------------

    def _writeFirstLine(self):
        path = MicroWebCli._quote(self.Path)
        qs = self.QueryString
        if qs != '':
            path = path + '?' + qs
        self._write('%s %s HTTP/1.0\r\n' % (self.Method, path))

    # ------------------------------------------------------------------------

    def _writeHeader(self, name, value):
        self._write("%s: %s\r\n" % (name, value))

    # ------------------------------------------------------------------------

    def _writeEndHeader(self):
        self._write("\r\n")

    # ------------------------------------------------------------------------

    def OpenRequest(self, data=None, contentType=None, contentLength=None):
        if self._socket:
            raise Exception('Request is already opened')
        if not self.URL:
            raise Exception('No URL defined')
        if self.Socks5Addr:
            if not isinstance(self.Socks5Addr,
                              tuple) or len(self.Socks5Addr) != 2:
                raise Exception('"Socks5Addr" must be a tuple of (host, port)')
            host, port = self.Socks5Addr
            if not isinstance(host, str) or not isinstance(port, int):
                raise Exception('"Socks5Addr" is incorrect ("%s", %s)' %
                                self.Socks5Addr)
        else:
            host = self.Host
            port = self.Port
        self._response = None
        try:
            err = 0
            while 1:
                try:
                    self._socketAddr = socket.getaddrinfo(host, port)[0][-1]
                    break
                except Exception:
                    err += 1
                if err > 5:
                    raise Exception("get ip failed!")
            #cli = socket.socket()
            cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            cli.settimeout(self.ConnTimeoutSec)
            cli.connect(self._socketAddr)
        except:
            raise Exception('Error to connect to %s:%s' % (host, port))
        if self.Socks5Addr:
            err = None
            try:
                cli.send(b'\x05\x01\x00')
                b = cli.read(2)
                if b is None or len(b) < 2 or b[0] != 0x05 or b[1] != 0x00:
                    err = "%s:%s doesn't supports MicroWebCli SOCKS5 client protocol" % self.Socks5Addr
                else:
                    h = self.Host.encode()
                    p = pack('>H', self.Port)
                    cli.send(b'\x05\x01\x00\x03' + bytes([len(h)]) + h + p)
                    b = cli.read(4)
                    if b is None or len(b) < 4 or b[1] != 0x00:
                        err = "Error to connect to %s:%s through SOCKS5 server" % (
                            self.Host, self.Port)
                    else:
                        if b[3] == 0x01:
                            l = 4
                        elif b[3] == 0x03:
                            l = cli.read(1)[0]
                        elif b[3] == 0x04:
                            l = 16
                        cli.read(l + 2)
            except Exception as ex:
                err = 'Error during negotiation with SOCKS5 server (%s)' % ex
            if err:
                cli.close()
                raise Exception(err)
        if self.Proto == 'https':
            if not 'ssl' in globals():
                import ssl
            try:
                try:
                    cli = ssl.wrap_socket(cli, timeout=self.ConnTimeoutSec)
                except TypeError:
                    cli = ssl.wrap_socket(cli)
            except Exception as ex:
                cli.close()
                raise Exception(
                    'Error to open a secure SSL/TLS connection (%s)' % ex)
        self._socket = cli
        self._writeFirstLine()
        if data:
            contentLength = len(data)
        self._headers['Host'] = self.Host
        if self._auth:
            try:
                self._auth.Apply(self)
            except:
                raise Exception('Error to apply authentication using %s' %
                                type(self._auth))
        else:
            self._headers.pop('Authorization', None)
        if contentType:
            self._headers['Content-Type'] = contentType
        else:
            self._headers.pop('Content-Type', None)
        if contentLength:
            self._headers['Content-Length'] = contentLength
        else:
            self._headers.pop('Content-Length', None)
        self._headers['User-Agent'] = 'MicroWebCli by JC`zic'
        for h in self._headers:
            self._writeHeader(h, self._headers[h])
        self._writeEndHeader()
        if data:
            self._write(data)

    # ------------------------------------------------------------------------

    def OpenRequestFormData(self, formData={}):
        data = ''
        if len(formData) > 0:
            for param in formData:
                if param != '':
                    if data != '':
                        data += '&'
                    data += MicroWebCli._quote(
                        param) + '=' + MicroWebCli._quote(formData[param])
        self.OpenRequest(data=data,
                         contentType='application/x-www-form-urlencoded')

    # ------------------------------------------------------------------------

    def OpenRequestJSONData(self, o=None):
        if not 'json' in globals():
            import json
        try:
            data = json.dumps(o)
        except:
            raise Exception('Error to convert object to JSON format')
        self.OpenRequest(data=data, contentType='application/json')

    # ------------------------------------------------------------------------

    def RequestWriteData(self, data):
        self._write(data)

    # ------------------------------------------------------------------------

    def GetResponse(self):
        if not self._response:
            self._response = MicroWebCli._response(self, self._socket,
                                                   self._socketAddr)
        return self._response

    # ------------------------------------------------------------------------

    def IsClosed(self):
        return self._socket is None

    # ------------------------------------------------------------------------

    def Close(self):
        if self._socket:
            try:
                self._socket.close()
            except:
                pass
            self._socket = None

    # ============================================================================
    # ===( Properties )===========================================================
    # ============================================================================

    @property
    def ConnTimeoutSec(self):
        return self._connTimeoutSec

    @ConnTimeoutSec.setter
    def ConnTimeoutSec(self, value):
        self._connTimeoutSec = int(value) if value and int(value) > 0 else None

    # ------------------------------------------------------------------------

    @property
    def Method(self):
        return self._method

    @Method.setter
    def Method(self, value):
        self._method = str(value).upper()

    # ------------------------------------------------------------------------

    @property
    def URL(self):
        host = self.Host
        if host != '':
            proto = self.Proto
            port = self.Port
            if ( proto == 'http'  and port == 80  ) or \
               ( proto == 'https' and port == 443 ) :
                port = ''
            else:
                port = ':' + str(port)
            url = proto + '://' + host + port + self.Path
            url = MicroWebCli._urlEncode(url)
            qs = self.QueryString
            if qs != '':
                return url + '?' + qs
            return url
        return None

    @URL.setter
    def URL(self, value):
        try:
            s = str(value)
            if '://' in s:
                proto, s = s.split('://', 1)
            else:
                proto = 'http'
        except:
            raise ValueError('URL error (%s)' % value)
        self.Proto = proto
        if '/' in s:
            host, path = s.split('/', 1)
        elif '?' in s:
            host, path = s.split('?', 1)
            path = '?' + path
        else:
            host = s
            path = ''
        if ':' in host:
            try:
                host, port = host.split(':')
                self.Port = port
            except:
                raise ValueError('URL host:port error (%s)' % host)
        self.Host = host
        self._queryParams = {}
        self.Path = path

    # ------------------------------------------------------------------------

    @property
    def Proto(self):
        return self._proto

    @Proto.setter
    def Proto(self, value):
        value = str(value).lower()
        if value == 'http':
            self._port = 80
        elif value == 'https':
            self._port = 443
        else:
            raise ValueError('Unsupported URL protocol (%s)' % value)
        self._proto = value

    # ------------------------------------------------------------------------

    @property
    def Host(self):
        return self._host

    @Host.setter
    def Host(self, value):
        self._host = MicroWebCli._unquote_plus(str(value))

    # ------------------------------------------------------------------------

    @property
    def Port(self):
        return self._port

    @Port.setter
    def Port(self, value):
        self._port = int(value)

    # ------------------------------------------------------------------------

    @property
    def Path(self):
        return self._path

    @Path.setter
    def Path(self, value):
        x = value.split('?', 1)
        if len(x[0]) > 0:
            if x[0][0] != '/':
                x[0] = '/' + x[0]
            self._path = MicroWebCli._unquote_plus(x[0])
        else:
            self._path = '/'
        if len(x) == 2:
            self.QueryString = x[1]

    # ------------------------------------------------------------------------

    @property
    def QueryString(self):
        r = ''
        for param in self._queryParams:
            if param != '':
                if r != '':
                    r += '&'
                r += MicroWebCli._quote(param) + '=' + MicroWebCli._quote(
                    self._queryParams[param])
        return r

    @QueryString.setter
    def QueryString(self, value):
        self._queryParams = {}
        for x in value.split('&'):
            param = x.split('=', 1)
            if param[0] != '':
                value = MicroWebCli._unquote(
                    param[1]) if len(param) > 1 else ''
                self._queryParams[MicroWebCli._unquote(param[0])] = value

    # ------------------------------------------------------------------------

    @property
    def QueryParams(self):
        return self._queryParams

    @QueryParams.setter
    def QueryParams(self, value):
        if not isinstance(value, dict):
            raise ValueError('QueryParams must be a dict')
        self._queryParams = value

    # ------------------------------------------------------------------------

    @property
    def Headers(self):
        return self._headers

    @Headers.setter
    def Headers(self, value):
        if not isinstance(value, dict):
            raise ValueError('Headers must be a dict')
        self._headers = value

    # ------------------------------------------------------------------------

    @property
    def Auth(self):
        return self._auth

    @Auth.setter
    def Auth(self, value):
        self._auth = value

    # ------------------------------------------------------------------------

    @property
    def Socks5Addr(self):
        return self._socks5Addr

    @Socks5Addr.setter
    def Socks5Addr(self, value):
        self._socks5Addr = value

    # ============================================================================
    # ===( Class Response  )======================================================
    # ============================================================================

    class _response:

        # ------------------------------------------------------------------------

        def __init__(self, microWebCli, socket, addr):
            self._microWebCli = microWebCli
            self._socket = socket
            self._addr = addr
            self._httpVer = None
            self._code = None
            self._msg = None
            self._headers = {}
            self._contentType = None
            self._contentLength = None
            self._processResponse()

        # ------------------------------------------------------------------------

        def _processResponse(self):
            try:
                self._parseFirstLine()
                self._parseHeader()
                if self._contentLength == 0:
                    self.Close()
            except:
                self._microWebCli.Close()
                raise Exception('Error to get response')

        # ------------------------------------------------------------------------

        def _parseFirstLine(self):
            tmp = self._socket.readline();
            print(tmp)
            self._httpVer, code, self._msg = tmp \
                                     .decode()   \
                                     .strip()    \
                                     .split(' ', 2)
            self._code = int(code)

        # ------------------------------------------------------------------------

        def _parseHeader(self):
            while True:
                tmp = self._socket.readline();
                print(tmp)
                elements = tmp \
                   .decode()   \
                   .strip()    \
                   .split(':', 1)
                if len(elements) == 2:
                    self._headers[elements[0].strip()] = elements[1].strip()
                elif len(elements) == 1 and len(elements[0]) == 0:
                    self._contentType = self._headers.get("Content-Type", None)
                    ctLen = self._headers.get("Content-Length", None)
                    self._contentLength = int(
                        ctLen) if ctLen is not None else None
                    break

        # ------------------------------------------------------------------------

        def GetClient(self):
            return self._microWebCli

        # ------------------------------------------------------------------------

        def GetAddr(self):
            return self._addr

        # ------------------------------------------------------------------------

        def GetIPAddr(self):
            return self._addr[0]

        # ------------------------------------------------------------------------

        def GetPort(self):
            return self._addr[1]

        # ------------------------------------------------------------------------

        def GetHTTPVersion(self):
            return self._httpVer

        # ------------------------------------------------------------------------

        def GetStatusCode(self):
            return self._code

        # ------------------------------------------------------------------------

        def GetStatusMessage(self):
            return self._msg

        # ------------------------------------------------------------------------

        def IsSuccess(self):
            return (self._code >= 200 and self._code < 300)

        # ------------------------------------------------------------------------

        def IsLocationMoved(self):
            return self.LocationMovedURL() is not None

        # ------------------------------------------------------------------------

        def LocationMovedURL(self):
            if self._code >= 300 and self._code < 400:
                return self._headers.get('Location', None)
            return None

        # ------------------------------------------------------------------------

        def GetHeaders(self):
            return self._headers

        # ------------------------------------------------------------------------

        def GetContentType(self):
            return self._contentType

        # ------------------------------------------------------------------------

        def GetContentLength(self):
            return self._contentLength

        # ------------------------------------------------------------------------

        def ReadContent(self, size=None):
            try:
                if size is None:
                    b = self._socket.read()
                    self.Close()
                    return b
                elif size > 0:
                    b = self._socket.read(size)
                    if len(b) < size:
                        self.Close()
                    return b
            except MemoryError as memEx:
                self.Close()
                raise MemoryError('Error to read response content (%s)' %
                                  memEx)
            except:
                self.Close()
            return None

        # ------------------------------------------------------------------------

        def ReadContentInto(self, buf, nbytes=None):
            if nbytes is None:
                nbytes = len(buf)
            if nbytes > 0:
                try:
                    x = self._socket.readinto(buf, nbytes)
                    if x < nbytes:
                        self.Close()
                    return x
                except Exception as e:
                    print(e)
                    self.Close()
                except:
                    self.Close()
            return 0

        # ------------------------------------------------------------------------

        def ReadContentAsJSON(self):
            cnt = self.ReadContent()
            if cnt:
                if not 'json' in globals():
                    import json
                try:
                    return json.loads(cnt)
                except:
                    raise Exception('Error to parse JSON response : %s' % cnt)
            return None

        # ------------------------------------------------------------------------

        def WriteContentToFile(self, filepath, progressCallback=None):
            fSize = self._contentLength
            buf = MicroWebCli._tryAllocByteArray(
                fSize if fSize and fSize < 10240 else 10240)
            if not buf:
                raise MemoryError('Not enough memory to allocate buffer')
            buf = memoryview(buf)
            try:
                file = open(filepath, 'wb')
            except:
                raise Exception('Error to create file (%s)' % filepath)
            sizeRem = fSize
            pgrSize = 0

            #import uhashlib, binascii
            #sha = uhashlib.sha256()

            while sizeRem is None or sizeRem > 0:
                if sizeRem and sizeRem < len(buf):
                    buf = buf[:sizeRem]

                x = self.ReadContentInto(buf)
                if x == 0:
                    break
                if x < len(buf):
                    buf = buf[:x]
                if sizeRem:
                    sizeRem -= x
                #print('len(buf)', len(buf), 'buf', bytes(buf[:len(buf)-1]))
                #tmp = bytes(buf[:len(buf)])
                #print(tmp)
                #sha.update(tmp)
                try:
                    file.write(buf)
                except Exception as ex:
                    print('file.write : %s' % ex)
                pgrSize += x
                if progressCallback:
                    try:
                        progressCallback(self, pgrSize, fSize)
                    except Exception as ex:
                        print('Error in progressCallback : %s' % ex)
            file.close()
            self.Close()

            #print(binascii.hexlify(sha.digest()))

            if sizeRem and sizeRem > 0:
                if not 'remove' in globals():
                    from os import remove
                remove(filepath)
                raise Exception('Error to receive and save file (%s)' %
                                filepath)

        # ------------------------------------------------------------------------

        def IsClosed(self):
            return self._microWebCli.IsClosed()

        # ------------------------------------------------------------------------

        def Close(self):
            self._microWebCli.Close()

    # ============================================================================
    # ============================================================================
    # ============================================================================

import network, time, socket
from machine import UART
from fpioa_manager import fm, board_info

if 'wlan' not in locals():

    # En EP8285 AT

    fm.register(27, fm.fpioa.UART2_TX, force=True)
    fm.register(28, fm.fpioa.UART2_RX, force=True)

    uart = UART(UART.UART2, 115200*8, 8, 2, 2, timeout=3000, read_buf_len=10240)

    time.sleep(3)
    wlan = network.ESP8285(uart)

    WIFI_SSID = "webduino.io"
    WIFI_PASW = "webduino"

    err = 0
    while 1:
        try:
            wlan.connect(WIFI_SSID, WIFI_PASW)
        except Exception:
            err += 1
            print("Connect AP failed, now try again")
            if err > 3:
                raise Exception("Conenct AP fail")
            continue
        break

print(wlan.ifconfig())
print(wlan.isconnected())

def progressCallback(microWebCli, progressSize, totalSize):
    if totalSize:
        pass #
        print('Progress: %d bytes of %d downloaded...' % (progressSize, totalSize))
    else:
        pass #
        print('Progress: %d bytes downloaded...' % progressSize)

filename = '/sd/face_test.kmodel'
fileurl = 'http://120.78.165.108/juwan/face.kmodel'

try:
    import os
    os.remove(filename)
except Exception as e:
    pass

contentType = MicroWebCli.FileRequest(fileurl, filename, progressCallback)
print('File of content type "%s" was saved to "%s"' % (contentType, filename))

发表评论

0/200
357 点赞
0 评论
收藏
为你推荐 换一批