1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| #pragma once #include <thread> #include <iostream> #include <mutex> #include <list>
struct AVPacket; struct AVCodecParameters; struct AVRational; struct AVFrame;
enum XLogLevel { XLOG_TYPE_DEBUG, XLOG_TYPE_INFO, XLOG_TYPE_ERROR, XLOG_TYPE_FATAL }; #define LOG_MIN_LEVEL XLOG_TYPE_DEBUG #define XLOG(s, level) \ if(level >= LOG_MIN_LEVEL) \ std::cout << level << " : " << __FILE__ << " : " << __LINE__ << " : " << s << std::endl;
#define LOGDEBUG(s) XLOG(s, XLOG_TYPE_DEBUG) #define LOGINFO(s) XLOG(s, XLOG_TYPE_INFO) #define LOGERROR(s) XLOG(s, XLOG_TYPE_ERROR) #define LOGFATAL(s) XLOG(s, XLOG_TYPE_FATAL)
void MSleep(unsigned int ms);
long long NowMs();
void XFreeFrame(AVFrame **frame);
void PrintErr(int err);
class XThread { public: virtual void Start();
virtual void Stop();
virtual void Do(AVPacket *pkt) {};
virtual void Next(AVPacket *pkt) { std::unique_lock<std::mutex> lock(m_); if (next_) next_->Do(pkt); }
void set_next(XThread *xt) { std::unique_lock<std::mutex> lock(m_); next_ = xt; }
protected:
virtual void Main() = 0;
bool is_exit_ = false;
int index_ = 0;
private: std::thread th_; std::mutex m_; XThread *next_ = nullptr; };
class XTools { };
class XPara { public: AVCodecParameters *para = nullptr; AVRational *time_base = nullptr;
static XPara *Create(); ~XPara();
private: XPara(); };
class XAVPacketList { public: AVPacket *Pop(); void Push(AVPacket *pkt);
private: std::list<AVPacket*> pkts_; int max_packets_ = 100; std::mutex mux_; };
|
参数说明:
普通成员函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| enum XLogLevel { XLOG_TYPE_DEBUG, XLOG_TYPE_INFO, XLOG_TYPE_ERROR, XLOG_TYPE_FATAL }; #define LOG_MIN_LEVEL XLOG_TYPE_DEBUG #define XLOG(s, level) \ if(level >= LOG_MIN_LEVEL) \ std::cout << level << " : " << __FILE__ << " : " << __LINE__ << " : " << s << std::endl;
#define LOGDEBUG(s) XLOG(s, XLOG_TYPE_DEBUG) #define LOGINFO(s) XLOG(s, XLOG_TYPE_INFO) #define LOGERROR(s) XLOG(s, XLOG_TYPE_ERROR) #define LOGFATAL(s) XLOG(s, XLOG_TYPE_FATAL)
void MSleep(unsigned int ms);
long long NowMs();
void XFreeFrame(AVFrame **frame);
void PrintErr(int err);
|
XThread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| class XThread { public: virtual void Start();
virtual void Stop();
virtual void Do(AVPacket *pkt) {};
virtual void Next(AVPacket *pkt) { std::unique_lock<std::mutex> lock(m_); if (next_) next_->Do(pkt); }
void set_next(XThread *xt) { std::unique_lock<std::mutex> lock(m_); next_ = xt; }
protected:
virtual void Main() = 0;
bool is_exit_ = false;
int index_ = 0;
private: std::thread th_; std::mutex m_; XThread *next_ = nullptr; };
|
XPara
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class XPara { public: AVCodecParameters *para = nullptr; AVRational *time_base = nullptr;
static XPara *Create(); ~XPara();
private: XPara(); };
|
XAVPacketList
作用:由于std::list线程不是安全的。所以做了用于管理一个线程安全的 AVPacket
列表。这个类允许在多个线程之间安全地添加和移除 AVPacket
对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class XAVPacketList { public: AVPacket *Pop(); void Push(AVPacket *pkt);
private: std::list<AVPacket*> pkts_; int max_packets_ = 100; std::mutex mux_; };
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
| #include "xtools.h" #include <sstream>
using namespace std;
extern "C" { #include <libavformat/avformat.h> #include <libavcodec/avcodec.h> }
void MSleep(unsigned int ms) { auto beg = clock(); for (int i = 0; i < ms; i++) { this_thread::sleep_for(1ms); if ((clock() - beg) / (CLOCKS_PER_SEC / 1000) >= ms) break; } }
long long NowMs() { return clock() / (CLOCKS_PER_SEC / 1000); }
void XFreeFrame(AVFrame **frame) { if (!frame || !(*frame)) return; av_frame_free(frame); }
void PrintErr(int err) { char buf[1024] = { 0 }; av_strerror(err, buf, sizeof(buf) - 1); cerr << buf << endl; }
void XThread::Start() { unique_lock<mutex> lock(m_); static int i = 0; i++; index_ = i;
is_exit_ = false; th_ = thread(&XThread::Main, this); stringstream ss; ss << "XThread::Start() " << index_;
LOGINFO(ss.str()); }
void XThread::Stop() { stringstream ss; ss << "XThread::Stop() begin " << index_; LOGINFO(ss.str());
is_exit_ = true; if (th_.joinable()) th_.join();
ss.str(""); ss << "XThread::Stop() end " << index_; LOGINFO(ss.str()); }
XPara *XPara::Create() { return new XPara(); }
XPara::~XPara() { if (para) { avcodec_parameters_free(¶); } if (time_base) { delete time_base; time_base = nullptr; } }
XPara::XPara() { para = avcodec_parameters_alloc(); time_base = new AVRational(); }
AVPacket *XAVPacketList::Pop() { unique_lock<mutex> lock(mux_); if (pkts_.empty()) return nullptr; auto pkt = pkts_.front(); pkts_.pop_front(); return pkt; }
void XAVPacketList::Push(AVPacket *pkt) { unique_lock<mutex> lock(mux_); auto p = av_packet_alloc(); av_packet_ref(p, pkt); pkts_.push_back(p); if (pkts_.size() > max_packets_) { if (pkts_.front()->flags & AV_PKT_FLAG_KEY) { av_packet_free(&pkts_.front()); pkts_.pop_front(); return; } while (pkts_.empty()) { if (pkts_.front()->flags & AV_PKT_FLAG_KEY) { return; } av_packet_free(&pkts_.front()); pkts_.pop_front(); } } }
|
普通成员函数
MSleep()
作用:使当前线程暂停执行一段时间(以毫秒为单位)。
1 2 3 4 5 6 7 8 9 10
| void MSleep(unsigned int ms) { auto beg = clock(); for (int i = 0; i < ms; i++) { this_thread::sleep_for(1ms); if ((clock() - beg) / (CLOCKS_PER_SEC / 1000) >= ms) break; } }
|
NowMs()
1 2 3 4 5
| long long NowMs() { return clock() / (CLOCKS_PER_SEC / 1000); }
|
XFreeFrame()
作用:释放AVFrame资源
1 2 3 4 5
| void XFreeFrame(AVFrame **frame) { if (!frame || !(*frame)) return; av_frame_free(frame); }
|
PrintErr()
作用:打印错误信息。
1 2 3 4 5 6
| void PrintErr(int err) { char buf[1024] = { 0 }; av_strerror(err, buf, sizeof(buf) - 1); cerr << buf << endl; }
|
XThread
责任链模式
类的成员和方法
- 公共成员函数:
Start()
:启动线程。
Stop()
:停止线程,设置退出标志并等待线程退出。
Do(AVPacket *pkt)
:执行责任链传递的任务,虚函数,需在派生类中重载。
Next(AVPacket *pkt)
:传递任务到责任链的下一个节点。
set_next(XThread *xt)
:设置责任链的下一个节点,线程安全。
- 保护成员:
Main()
:线程入口函数,纯虚函数,需要在派生类中实现。
is_exit_
:标志线程是否退出。
index_
:线程索引号,用于标识不同的线程。
- 私有成员:
th_
:std::thread
对象,用于实际的线程操作。
m_
:std::mutex
对象,用于保护对 next_
成员的访问,确保线程安全。
next_
:指向责任链中的下一个节点。
【应用场景】:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| bool XCameraWidget::Open(const char *url) { if (demux_) demux_->Stop(); if (decode_) decode_->Stop();
demux_ = new XDemuxTask(); if (!demux_->Open(url)) { return false; } decode_ = new XDecodeTask(); ... demux_->set_next(decode_);
...
demux_->Start(); decode_->Start();
return true; }
|
Start()
作用:启动线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| void XThread::Start() { unique_lock<mutex> lock(m_); static int i = 0; i++; index_ = i;
is_exit_ = false; th_ = thread(&XThread::Main, this); stringstream ss; ss << "XThread::Start() " << index_;
LOGINFO(ss.str()); }
|
Stop()
作用:停止线程(设置退出标志,等待线程退出)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| void XThread::Stop() { stringstream ss; ss << "XThread::Stop() begin " << index_; LOGINFO(ss.str());
is_exit_ = true; if (th_.joinable()) th_.join();
ss.str(""); ss << "XThread::Stop() end " << index_; LOGINFO(ss.str()); }
|
XPara
Create()
作用:防止直接在栈上创建 XPara 对象。
1 2 3 4
| XPara *XPara::Create() { return new XPara(); }
|
XAVPacketList
Pop()
作用:返回List栈中的AVPacket
1 2 3 4 5 6 7 8
| AVPacket *XAVPacketList::Pop() { unique_lock<mutex> lock(mux_); if (pkts_.empty()) return nullptr; auto pkt = pkts_.front(); pkts_.pop_front(); return pkt; }
|
Push()
作用:用于将 AVPacket
对象推入到一个线程安全的列表中,并在列表超出最大容量时进行清理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| void XAVPacketList::Push(AVPacket *pkt) { unique_lock<mutex> lock(mux_); auto p = av_packet_alloc(); av_packet_ref(p, pkt); pkts_.push_back(p);
if (pkts_.size() > max_packets_) { if (pkts_.front()->flags & AV_PKT_FLAG_KEY) { av_packet_free(&pkts_.front()); pkts_.pop_front(); return; }
while (pkts_.empty()) { if (pkts_.front()->flags & AV_PKT_FLAG_KEY) { return; } av_packet_free(&pkts_.front()); pkts_.pop_front(); } } }
|