前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Infra 面试之数据结构五:顺序组装

Infra 面试之数据结构五:顺序组装

作者头像
木鸟杂记
发布2024-05-08 15:47:53
710
发布2024-05-08 15:47:53
举报
文章被收录于专栏:木鸟杂记木鸟杂记

这是我在很早之前遇到的一个题,很有意思,所以到现在仍然记得。题意借用了 TCP 的上下文,要求实现 TCP 中一个“顺序组装”的关键逻辑:

  1. 对于 TCP 层来说,IP 层的 packet 是乱序的收到。
  2. 对于应用层来说,TCP 层交付的是顺序的数据。

这个题有意思的点在于,借用了 TCP 的上下文之后,就可以先和候选人讨论一些 TCP 的基础知识,然后话锋一转,引出这道题。这样既可以考察一些基础知识,也可以考察工程代码能力。

题目

代码语言:javascript
复制
struct Packet {
    size_t offset;
    size_t length;
    uint8_t *data;
};

// 实现一个“顺序交付”语义
class TCP {
 // 应用层调用:按顺序读取不超过 count 的字节数到 buf 中,并返回实际读到的字节数
 size_t read(void *buf, size_t count);
 // TCP 层回调:得到一些随机顺序的 IP 层封包
 void receive(Packet* p);
 // TCP 层回调:数据发完,连接关闭
 void finish();
};

讨论

由于多少和“实际”有些关联,所以本题目中有相当多的可以讨论的点,反过来说,如果你不进行合适的建模和简化,实现起来复杂度会非常高——四十五分钟是写不完的。

对于 Packet 结构体:

  1. offset 是否会有交叠?(只要发送数据足够多,耗尽 size_t 的范围就有可能发生)
  2. length 的长度是固定的还是变长的?

对于 read 调用:

  1. TCP:read() 是否阻塞?如果是阻塞的,是否要阻塞到凑齐要求大小(count)的数据才能返回,还是只要有一部分数据就立即返回?
  2. 如果TCP::finish() 后,TCP:read() 的返回值是什么?

对于内存问题:

  1. TCP::read 中给应用层中 buf 填充数据时,是否要进行拷贝?
  2. TCP::receivePacket::data 的内存是否要在 TCP 类中释放?

实现

这本质上是一个生产者消费者问题。我们需要维护一个线程安全的有序数据结构,生产者(TCP::receive)往里面放数据,消费者(TCP::read)从里面取数据。要求是:乱序放、顺序取、可切分。

为了简化实现,可以和面试官做如下设定。

对于 Packet 结构体:

  1. offset 没有交叠
  2. length 变长

对于 read 是否阻塞问题,可以设定:

  1. read 函数是阻塞的
  2. read 只要收到一部分数据,即使没达到 count,也可以立即返回

对于内存问题,可以设定:

  1. TCP 类不负责收到的数据的生命周期,但要求调用者保证 TCP 整个生命周期中 packet 中的数据都是有效(不能被释放)的。
  2. 给应用层的数据会拷贝到用户提供的 buf 中。

以下是代码:

代码语言:javascript
复制
struct Packet{
    Packet(size_t off, size_t len, uint8_t *data):
        offset(off), length(len), data(data) {}
    size_t offset;
    size_t length;
    uint8_t *data;
};

class TCP {
public:
    TCP() {
        nextOffset_ = 0;
        finished_ = false;
    }

    size_t read(void *buf, size_t count) {
        std::unique_lock<std::mutex> lock(mu_);

        size_t leftBytes = count;
        while (leftBytes > 0) {
            if (!packets_.empty()) {
                size_t offset = packets_.begin()->first;
                auto* p = packets_.begin()->second;
                if (offset == nextOffset_) {
                    // fetch the packet
                    packets_.erase(offset);

                    // copy to the user buf
                    size_t len = std::min(p->length, leftBytes);
                    std::memcpy(buf, p->data, len);
                    leftBytes -= len;
                    nextOffset_ += len;

                    // put back the left data
                    p->length -= len;
                    if (p->length > 0) {
                        p->data += len;
                        p->offset += len;
                        packets_[p->offset] = p;
                    }

                    return count-leftBytes;
                }
            } else if (finished_) {
                break;
            }

            cv_.wait(lock);
        }

        // finished
        return 0;
    }

    void receive(Packet* p) {
        std::unique_lock<std::mutex> lock(mu_);
        packets_[p->offset] = p;
        cv_.notify_one();
    }

    void finish() {
        std::unique_lock<std::mutex> lock(mu_);
        finished_ = true;
        cv_.notify_one();
    }

private:
    std::mutex mu_;
    std::condition_variable cv_;
    size_t nextOffset_;
    bool finished_;
    std::map<uint64_t, Packet*> packets_;
};

核心实现点包括:

  1. 使用 std::map 组织所有 IP 层封包
  2. 使用 nextOffset_ 来记录需要交付给应用层下一个偏移量
  3. 如果包被切分了,剩余的包记得放回去

测试

本题目的测试也有些意思:

  1. 如何模拟乱序
  2. 如何控制所有 IP 封包中的 data 生命周期
  3. 如何对应用层收到的数据进行校验
代码语言:javascript
复制
TCP tcp;

void producer(uint8_t* data) {
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(50, 99);

    std::cout << "construct data..." << std::endl;
    std::vector<Packet*> packets(100);
    size_t offset = 0;
    for (size_t i = 0; i < 100; ++i) {
        size_t randLen = dis(gen);
        packets[i] = new Packet(offset, randLen, data);
        data += randLen;
        offset += randLen;
    }

    std::cout << "total " << offset << " bytes" << std::endl;
    std::cout << "make the data unordered..." << std::endl;
    std::shuffle(packets.begin(), packets.end(), gen);

    std::cout << "give it to tcp..." << std::endl;
    for (size_t i = 0; i < 100; ++i) {
        tcp.receive(packets[i]);
        std::cout << "receive data [" << packets[i]->offset << " ,"
            << packets[i]->offset+packets[i]->length<< "): "
            << packets[i]->length << " bytes" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    tcp.finish();
}

void consumer(uint8_t* data) {
    size_t nBytes = 0;
    uint8_t buf[100];

    size_t offset = 0;
    while ((nBytes = tcp.read(buf, 50)) > 0) {
        auto diff = std::memcmp(data+offset, buf, nBytes);
        std::stringstream ss;
        ss << "read data [" << offset << " ," << offset+nBytes << "): "
            << nBytes << " bytes";
        if (!diff) {
            ss << "; verify ok";
        } else {
            ss << "; verify bad";
        }
        std::cout << ss.str() << std::endl;

        offset += nBytes;
    }
    std::cout << "read finish" << std::endl;
}

int main() {
    uint8_t buffer[10000];
    for (size_t i = 0; i < 10000; ++i) {
        buffer[i] = i & 0xff;
    }
    std::thread c(consumer, buffer);
    std::thread p(producer, buffer);

    p.join();
    c.join();
}

如果你想自己跑下代码,可以在这里[2]自取。

最后

如果还有时间,面试官可能会跟你讨论些,如果取消你做的那些假设,需要怎么样实现,不过只要能做过基础版,剩下的就都是加分项了。

参考资料

[1]

系统日知录: https://xiaobot.net/p/system-thinking

[2]

infra 程序员面试题目大全: https://github.com/DistSysCorp/infra-interview/tree/main/data_structures

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-05-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 木鸟杂记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 题目
  • 讨论
  • 实现
  • 测试
  • 最后
    • 参考资料
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档