首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >有没有办法实现一个像cin的无限输入流?

有没有办法实现一个像cin的无限输入流?

提问于 2024-01-27 13:26:40
回答 0关注 0查看 19

我想实现一个流,目的是把数据从从一个地方搬运到另一个地方,实现类似cin的效果,

比如在两个线程中互相传递数据

thread1 ostream<<"Data"<<std::endl;

thread2 istream>>data;

目前有想法,是实现一个streambuf,但是同步出现了问题,不知道怎么搞

代码语言:cpp
复制

//template <typename T>
using T = bool;
using _Elem = char;
using _Traits = std::char_traits<_Elem>;
using  _Alloc = std::allocator<_Elem>;
class PromiseAFuture {
public:
    std::promise<T> Pro;
    std::future<T> ip;
    std::mutex lock;
    std::atomic_bool canset = false;
    PromiseAFuture() {
        Pro = std::promise<T>();
        ip = Pro.get_future();
        canset = true;
    }
    bool set(T it) {
        if (canset)
        {
            Pro.set_value(it);
            canset = false;
            return true;
        }
        return false;
    }
    template <class _Rep, class _Period>
    std::future_status wait_for(T& getc, const std::chrono::duration<_Rep, _Period>& _Rel_time) {
        auto icp = ip.wait_for(_Rel_time);
        if (icp == std::future_status::ready)
        {
            getc = ip.get();
            Pro = std::promise<T>();
            ip = Pro.get_future();
            canset = true;
        }
        return icp;
    }
};
//template<class _Elem, class _Traits = std::char_traits<_Elem>, class _Alloc = std::allocator<_Elem>>
class Syncstreambuf : public std::basic_streambuf<_Elem, _Traits> {
public:
    using char_type = _Elem;
    using traits_type = _Traits;
    using int_type = typename _Traits::int_type;
    using off_type = typename _Traits::off_type;
    using Buftype = std::basic_streambuf<_Elem, _Traits>;
    using StringTpye = std::vector<_Elem, _Alloc>;
private:
    StringTpye buffer;//输出区与输入区公用
    std::mutex lock;
    PromiseAFuture needallread;//需要输入区全部被读
    PromiseAFuture needallwriter;//需要输出区被写入
public:
    ~Syncstreambuf()override {
    }
protected:
    //输入区已经读到结尾
    int_type underflow()override {
        std::unique_lock<std::mutex> lk(lock);
       
        if (gptr() == epptr()) {
            //输入区已经读到输出区结尾 需要输出区更新(需要输出区被写入)
            auto statc = std::future_status::timeout; bool var = false;
            while (statc == std::future_status::timeout) {
                lk.unlock();
                //等待 输出区被重新写入
                statc = needallwriter.wait_for(var,1ms);
                lk.lock();
            }
            //输入区指针重新设置  当前已经读到结尾 输入区当前指针设置为buf开始  结束指针为输出区当前指针
            setg(&buffer.front(), &buffer.front(), pptr());
            
        }
        else {

            while ((pptr() - gptr()) <= 0) {
                //等待输出区pptr移动 (输出区更新)
                lk.unlock();
                std::this_thread::sleep_for(1ms);
                lk.lock();
            }
            //输入区指针重新设置
            setg(&buffer.front(), gptr(), pptr());
        }
        // 输入区已全部被读
        needallread.set(true);
        return *gptr();
    }
    std::streamsize xsputn(const char_type* s, std::streamsize count)override {
        std::unique_lock<std::mutex> lk(lock);
        const char_type* __s = s;
        std::streamsize __i = 0;
        std::streamsize __n = count;
        while (__i < __n)
        {
            if (pptr() >= epptr())
            {
                //输出区满,等待输入区
                auto statc = std::future_status::timeout; bool vaar = false; 
                while (statc == std::future_status::timeout) {
                    lk.unlock();
                    //等待 输入区全部被读
                    statc = needallread.wait_for(vaar,1ms);
                    lk.lock();
                }
                //输出区 设置为buf开始 重新写入
                setp(&buffer.front(), &buffer.back());
                
            }
            else
            {


                std::streamsize __chunk_size = std::min(epptr() - pptr(), __n - __i);
                traits_type::copy(pptr(), __s, __chunk_size);
                pbump(__chunk_size);
                __s += __chunk_size;
                __i += __chunk_size;


                //输出区指针已更新
                needallwriter.set(true);
            }
        }
        return __i;
    }
    int_type overflow(int_type ch)override {
        std::unique_lock<std::mutex> lk(lock);
        auto statc = std::future_status::timeout; bool var = false;
        //输出区满,等待输入区
        while (statc == std::future_status::timeout) {
            lk.unlock();
            statc = needallread.wait_for(var,1ms);
            lk.lock();
        }
        //输出区 设置为buf开始 重新写入
        setp(&buffer.front(), &buffer.back());
        *pptr() = ch;
        pbump(1);
        //输出区指针已更新
        needallwriter.set(true);
        return 100;//不为eof即可
    }
private:
public:  
    inline const StringTpye& CurrentMemory() const { return buffer; }
    Syncstreambuf(size_t  buffersize)
    {
        buffer.resize(buffersize, 0);
        Buftype::setg(&buffer.front(), &buffer.front(), &buffer.front());//输入区设置
        Buftype::setp(&buffer.front(), &buffer.back());//输出区设置
    }
};
int main() {
    std::locale::global(std::locale(""));
    Syncstreambuf bufferc(4);
    std::thread thoutput([&bufferc]() {
        std::istream iut(&bufferc);

        while (true)
        {
            std::string sadasd;
            iut >> sadasd;
            std::cout << "输出::" << sadasd.length() << ";context:" << sadasd << std::endl;
        }
        }
    );
    std::thread thread([&bufferc]() {
        std::ostream out(&bufferc);
        std::string sadasd = "1234567890TESTZXCVBNMPOL12345";
        while (true) {
            std::cout << "输入::" << sadasd.length() << ";context:" << sadasd <<
                std::endl;
            out << sadasd << std::endl;
            std::this_thread::sleep_for(1200.0ms);
        }
        }

    );
    thread.detach();
    thoutput.join();
}
运行效果,经常出现死锁,读写异常
运行效果,经常出现死锁,读写异常

有没有大佬帮忙一下,困扰几天了

回答

和开发者交流更多问题细节吧,去 写回答
相关文章

相似问题

相关问答用户
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档