diff --git a/ext-codec/AV1.cpp b/ext-codec/AV1.cpp new file mode 100644 index 00000000..ab4250eb --- /dev/null +++ b/ext-codec/AV1.cpp @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "AV1.h" +#include "AV1Rtp.h" +#include "Extension/Factory.h" +#include "Extension/CommonRtp.h" +#include "Extension/CommonRtmp.h" + +using namespace std; +using namespace toolkit; + +namespace mediakit { + +Sdp::Ptr AV1Track::getSdp(uint8_t payload_type) const { + return std::make_shared(payload_type, *this); +} + +Track::Ptr AV1Track::clone() const { + return std::make_shared(*this); +} + +namespace { + +CodecId getCodec() { + return CodecAV1; +} + +Track::Ptr getTrackByCodecId(int sample_rate, int channels, int sample_bit) { + // AV1是视频编解码器,这里的参数实际上是width, height, fps + return std::make_shared(sample_rate, channels, sample_bit); +} + +Track::Ptr getTrackBySdp(const SdpTrack::Ptr &track) { + return std::make_shared(); +} + +RtpCodec::Ptr getRtpEncoderByCodecId(uint8_t pt) { + return std::make_shared(); +} + +RtpCodec::Ptr getRtpDecoderByCodecId() { + return std::make_shared(); +} + +RtmpCodec::Ptr getRtmpEncoderByTrack(const Track::Ptr &track) { + return std::make_shared(track); +} + +RtmpCodec::Ptr getRtmpDecoderByTrack(const Track::Ptr &track) { + return std::make_shared(track); +} + +Frame::Ptr getFrameFromPtr(const char *data, size_t bytes, uint64_t dts, uint64_t pts) { + return std::make_shared(CodecAV1, (char *)data, bytes, dts, pts); +} + +} // namespace + +CodecPlugin av1_plugin = { getCodec, + getTrackByCodecId, + getTrackBySdp, + getRtpEncoderByCodecId, + getRtpDecoderByCodecId, + getRtmpEncoderByTrack, + getRtmpDecoderByTrack, + getFrameFromPtr }; + +}//namespace mediakit \ No newline at end of file diff --git a/ext-codec/AV1.h b/ext-codec/AV1.h new file mode 100644 index 00000000..73bb91e6 --- /dev/null +++ b/ext-codec/AV1.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_AV1_H +#define ZLMEDIAKIT_AV1_H + +#include "Extension/Frame.h" +#include "Extension/Track.h" + +namespace mediakit { + +/** + * AV1视频通道 + */ +class AV1Track : public VideoTrackImp { +public: + using Ptr = std::shared_ptr; + + AV1Track(int width = 0, int height = 0, int fps = 0) : VideoTrackImp(CodecAV1, width, height, fps) {} + +private: + Sdp::Ptr getSdp(uint8_t payload_type) const override; + Track::Ptr clone() const override; +}; + +}//namespace mediakit +#endif //ZLMEDIAKIT_AV1_H \ No newline at end of file diff --git a/ext-codec/AV1Rtp.cpp b/ext-codec/AV1Rtp.cpp new file mode 100644 index 00000000..9775292d --- /dev/null +++ b/ext-codec/AV1Rtp.cpp @@ -0,0 +1,583 @@ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "AV1Rtp.h" +#include +#include +#include +#include +#include + +using namespace std; +using namespace toolkit; + +namespace mediakit { + +// AV1 OBU类型定义 +static constexpr int kObuTypeSequenceHeader = 1; +static constexpr int kObuTypeTemporalDelimiter = 2; +static constexpr int kObuTypeTileList = 8; +static constexpr int kObuTypePadding = 15; + +// RTP聚合头中的位定义 +static constexpr uint8_t kObuSizePresentBit = 0b00000010; +static constexpr int kAggregationHeaderSize = 1; +static constexpr int kMaxNumObusToOmitSize = 3; + +// LEB128编码/解码辅助函数 +static size_t writeLeb128(uint64_t value, uint8_t* buffer) { + size_t size = 0; + do { + uint8_t byte = value & 0x7F; + value >>= 7; + if (value != 0) { + byte |= 0x80; + } + buffer[size++] = byte; + } while (value != 0); + return size; +} + +static size_t leb128Size(uint64_t value) { + size_t size = 0; + do { + value >>= 7; + ++size; + } while (value != 0); + return size; +} + +static bool readLeb128(const uint8_t*& data, size_t& remaining, uint64_t& value) { + value = 0; + size_t shift = 0; + + while (remaining > 0 && shift < 56) { + uint8_t byte = *data++; + remaining--; + + value |= (uint64_t(byte & 0x7F) << shift); + shift += 7; + + if ((byte & 0x80) == 0) { + return true; + } + } + + // 兼容性处理:如果到达数据末尾但最后一个字节的MSB仍为1, + // 假设这是leb128编码的结尾 + if (remaining == 0 && shift > 0) { + WarnL << "Tolerating non-standard LEB128 encoding (missing termination bit)"; + return true; + } + + return false; +} + +// OBU辅助函数 +static bool obuHasExtension(uint8_t obu_header) { + return obu_header & 0b00000100; +} + +static bool obuHasSize(uint8_t obu_header) { + return obu_header & kObuSizePresentBit; +} + +static int obuType(uint8_t obu_header) { + return (obu_header & 0b01111000) >> 3; +} + +static int maxFragmentSize(int remaining_bytes) { + if (remaining_bytes <= 1) { + return 0; + } + for (int i = 1; ; ++i) { + if (remaining_bytes < (1 << (7 * i)) + i) { + return remaining_bytes - i; + } + } +} + +////////////////////////////////////////////////////////////////////////// +// AV1RtpEncoder 实现 +////////////////////////////////////////////////////////////////////////// + +AV1RtpEncoder::AV1RtpEncoder() { +} + +std::vector AV1RtpEncoder::parseObus(const uint8_t* data, size_t size) { + std::vector result; + const uint8_t* ptr = data; + size_t remaining = size; + + while (remaining > 0) { + if (remaining < 1) { + WarnL << "Malformed AV1 input: expected OBU header"; + return {}; + } + + ObuInfo obu{}; + obu.header = *ptr++; + remaining--; + obu.has_extension = obuHasExtension(obu.header); + obu.has_size_field = obuHasSize(obu.header); + + if (obu.has_extension) { + if (remaining < 1) { + WarnL << "Malformed AV1 input: expected extension header"; + return {}; + } + obu.extension_header = *ptr++; + remaining--; + } + + uint64_t payload_size = 0; + if (obu.has_size_field) { + if (!readLeb128(ptr, remaining, payload_size)) { + WarnL << "Malformed AV1 input: failed to read OBU size"; + return {}; + } + if (payload_size > remaining) { + WarnL << "Malformed AV1 input: OBU size exceeds remaining data"; + return {}; + } + } else { + payload_size = remaining; + } + + obu.payload_data = ptr; + obu.payload_size = payload_size; + ptr += payload_size; + remaining -= payload_size; + + int type = obuType(obu.header); + if (type != kObuTypeTemporalDelimiter && + type != kObuTypeTileList && + type != kObuTypePadding) { + result.push_back(obu); + } + } + + return result; +} + +uint8_t AV1RtpEncoder::makeAggregationHeader(bool first_obu_is_fragment, + bool last_obu_is_fragment, + int num_obu_elements, + bool starts_new_coded_video_sequence) { + uint8_t header = 0; + + // Z bit: first OBU element is continuation of previous OBU + if (first_obu_is_fragment) { + header |= 0x80; + } + + // Y bit: last OBU element will be continued in next packet + if (last_obu_is_fragment) { + header |= 0x40; + } + + // W field: number of OBU elements (when <= 3) + if (num_obu_elements <= kMaxNumObusToOmitSize) { + header |= (num_obu_elements << 4); + } + + // N bit: beginning of new coded video sequence + if (starts_new_coded_video_sequence) { + header |= 0x08; + } + + return header; +} + +void AV1RtpEncoder::outputRtp(const uint8_t* data, size_t len, bool mark, + uint64_t stamp, uint8_t aggregation_header) { + auto rtp = getRtpInfo().makeRtp(TrackVideo, nullptr, len + kAggregationHeaderSize, mark, stamp); + auto payload = rtp->data() + RtpPacket::kRtpTcpHeaderSize + RtpPacket::kRtpHeaderSize; + + // 写入聚合头 + payload[0] = aggregation_header; + + // 复制数据 + if (len > 0) { + memcpy(payload + kAggregationHeaderSize, data, len); + } + + RtpCodec::inputRtp(std::move(rtp), false); +} + +bool AV1RtpEncoder::inputFrame(const Frame::Ptr &frame) { + auto ptr = frame->data() + frame->prefixSize(); + auto size = frame->size() - frame->prefixSize(); + + if (size == 0) { + return false; + } + + // 解析OBU + auto obus = parseObus((const uint8_t*)ptr, size); + if (obus.empty()) { + return false; + } + + // 检查是否包含序列头(关键帧标志) + bool has_sequence_header = false; + for (const auto& obu : obus) { + int type = obuType(obu.header); + if (type == kObuTypeSequenceHeader) { + has_sequence_header = true; + _got_key_frame = true; + break; + } + } + + // 如果还没有收到过关键帧,且当前帧不是关键帧,则丢弃 + if (!_got_key_frame && !has_sequence_header) { + DebugL << "Dropping AV1 frame before first keyframe"; + return false; + } + + size_t max_payload_size = getRtpInfo().getMaxSize() - kAggregationHeaderSize; + if (max_payload_size == 0) { + WarnL << "Invalid RTP max payload size for AV1"; + return false; + } + + for (size_t i = 0; i < obus.size(); ++i) { + const auto& obu = obus[i]; + bool is_first_obu = (i == 0); + bool is_last_obu = (i == obus.size() - 1); + if (!sendObu(obu, is_first_obu, is_last_obu, + has_sequence_header && is_first_obu, frame->pts(), max_payload_size)) { + return false; + } + } + + return true; +} + +bool AV1RtpEncoder::sendObu(const ObuInfo& obu, + bool is_first_obu, + bool is_last_obu, + bool starts_new_sequence, + uint64_t stamp, + size_t max_payload_size) { + std::vector obu_bytes; + obu_bytes.reserve(1 + (obu.has_extension ? 1 : 0) + obu.payload_size); + obu_bytes.push_back(obu.header & ~kObuSizePresentBit); + if (obu.has_extension) { + obu_bytes.push_back(obu.extension_header); + } + if (obu.payload_size > 0) { + obu_bytes.insert(obu_bytes.end(), obu.payload_data, obu.payload_data + obu.payload_size); + } + + size_t offset = 0; + bool first_fragment = true; + while (offset < obu_bytes.size()) { + size_t fragment_size = std::min(max_payload_size, obu_bytes.size() - offset); + bool last_fragment = (offset + fragment_size) == obu_bytes.size(); + uint8_t agg_header = makeAggregationHeader( + !first_fragment, + !last_fragment, + 1, + first_fragment && starts_new_sequence + ); + + bool mark = last_fragment && is_last_obu; + outputRtp(obu_bytes.data() + offset, fragment_size, mark, stamp, agg_header); + + offset += fragment_size; + first_fragment = false; + } + + return true; +} + +////////////////////////////////////////////////////////////////////////// +// AV1RtpDecoder 实现 +////////////////////////////////////////////////////////////////////////// + +AV1RtpDecoder::AV1RtpDecoder() { + obtainFrame(); +} + +void AV1RtpDecoder::obtainFrame() { + _frame = FrameImp::create(); + _frame->_codec_id = CodecAV1; +} + +AV1RtpDecoder::AggregationHeader AV1RtpDecoder::parseAggregationHeader(uint8_t header) { + AggregationHeader agg; + agg.first_obu_is_fragment = (header & 0x80) != 0; + agg.last_obu_is_fragment = (header & 0x40) != 0; + agg.num_obu_elements = (header & 0x30) >> 4; + agg.starts_new_coded_video_sequence = (header & 0x08) != 0; + return agg; +} + +bool AV1RtpDecoder::inputRtp(const RtpPacket::Ptr &rtp, bool key_pos) { + auto payload_size = rtp->getPayloadSize(); + if (payload_size < kAggregationHeaderSize) { + return false; + } + + uint32_t ssrc = rtp->getSSRC(); + if (!_has_last_ssrc || _last_ssrc != ssrc) { + resetState(); + _last_ssrc = ssrc; + _has_last_ssrc = true; + } + + auto stamp = rtp->getStampMS(); + auto payload = rtp->getPayload(); + auto seq = rtp->getSeq(); + + // 解析聚合头 + auto agg_header = parseAggregationHeader(payload[0]); + + const uint8_t* data = payload + kAggregationHeaderSize; + size_t remaining = payload_size - kAggregationHeaderSize; + + // InfoL << "RTP seq=" << seq << ", Z=" << agg_header.first_obu_is_fragment + // << ", Y=" << agg_header.last_obu_is_fragment + // << ", W=" << agg_header.num_obu_elements + // << ", N=" << agg_header.starts_new_coded_video_sequence + // << ", payload_size=" << remaining; + + // if (remaining > 0) { + // std::ostringstream hex_stream; + // for (size_t i = 0; i < std::min(remaining, size_t(16)); ++i) { + // hex_stream << std::hex << std::setw(2) << std::setfill('0') << (int)data[i] << " "; + // } + // InfoL << "RTP payload hex: " << hex_stream.str(); + // } + + // 如果开始新的编码视频序列,清理之前的状态 + if (agg_header.starts_new_coded_video_sequence) { + InfoL << "Starting new coded video sequence"; + resetState(); + obtainFrame(); + } + + if (_has_last_seq) { + uint16_t expected = _last_seq + 1; + if (seq != expected && _assembling_fragment) { + WarnL << "RTP seq gap while assembling fragment, expected=" << expected + << " got=" << seq << ", dropping incomplete OBU"; + _fragment_buffer.clear(); + _assembling_fragment = false; + } + } + _last_seq = seq; + _has_last_seq = true; + + if (!processPayload(agg_header, data, remaining)) { + resetState(); + obtainFrame(); + return false; + } + + bool marker = rtp->getHeader()->mark; + if (marker) { + if (_assembling_fragment) { + WarnL << "Marker bit set while awaiting fragment continuation"; + _fragment_buffer.clear(); + _assembling_fragment = false; + } + _last_dts = stamp; + if (!_received_keyframe) { + WarnL << "AV1 RTP packet before keyframe, dropping"; + _frame->_buffer.clear(); + obtainFrame(); + return false; + } + flushFrame(stamp); + return true; + } + + _last_dts = stamp; + return false; +} + +bool AV1RtpDecoder::processPayload(const AggregationHeader& agg_header, + const uint8_t* data, + size_t remaining) { + size_t element_index = 0; + int expected_elements = agg_header.num_obu_elements; + + while (remaining > 0) { + uint64_t element_size = 0; + bool has_size = (expected_elements == 0) || (static_cast(element_index) < expected_elements - 1); + if (has_size) { + if (!readLeb128(data, remaining, element_size)) { + WarnL << "Failed to read OBU element size, trying fallback parsing"; + // 兼容性回退:如果leb128解析失败,尝试直接使用剩余字节数 + element_size = remaining; + } else if (element_size > remaining) { + WarnL << "OBU element size (" << element_size << ") exceeds remaining payload (" + << remaining << "), using remaining size"; + element_size = remaining; + } + } else { + element_size = remaining; + } + + std::vector element_bytes; + element_bytes.reserve(element_size); + if (element_size > 0) { + element_bytes.insert(element_bytes.end(), data, data + element_size); + data += element_size; + remaining -= element_size; + } + + bool is_first = element_index == 0; + bool is_last = (remaining == 0); + + if (is_first && agg_header.first_obu_is_fragment) { + if (_fragment_buffer.empty()) { + WarnL << "Unexpected fragment continuation in AV1 RTP packet"; + return false; + } + _fragment_buffer.insert(_fragment_buffer.end(), element_bytes.begin(), element_bytes.end()); + } else { + if (_assembling_fragment && !_fragment_buffer.empty()) { + WarnL << "Previous fragment never completed, discarding"; + return false; + } + _fragment_buffer = std::move(element_bytes); + } + + bool will_continue = is_last && agg_header.last_obu_is_fragment; + if (will_continue) { + _assembling_fragment = true; + } else { + if (!emitObu(_fragment_buffer.data(), _fragment_buffer.size())) { + return false; + } + _fragment_buffer.clear(); + _assembling_fragment = false; + } + + ++element_index; + } + + if (expected_elements > 0 && static_cast(element_index) != expected_elements) { + WarnL << "Mismatch between W field (" << expected_elements + << ") and parsed OBU elements (" << element_index + << "), tolerating for compatibility"; + // 不返回false,继续处理以提高兼容性 + } + + return true; +} + +bool AV1RtpDecoder::emitObu(const uint8_t* data, size_t size) { + if (size == 0) { + return true; + } + + if (size < 1) { + WarnL << "Empty OBU fragment"; + return false; + } + + uint8_t obu_header = data[0]; + size_t header_size = 1; + + // 检查OBU头部是否已经包含size bit + bool already_has_size = obuHasSize(obu_header); + + // 如果RTP包中的OBU已经包含size字段,需要特殊处理 + if (already_has_size) { + //WarnL << "RTP OBU contains size field"; + + // 跳过extension header处理 + if (obuHasExtension(obu_header)) { + if (size < 2) { + WarnL << "OBU with extension flag but insufficient data"; + return false; + } + header_size = 2; + } + + // 读取原始的size字段 + const uint8_t* ptr = data + header_size; + size_t remaining = size - header_size; + uint64_t original_size = 0; + + if (!readLeb128(ptr, remaining, original_size)) { + WarnL << "Failed to read original OBU size field"; + return false; + } + + if (original_size != remaining) { + WarnL << "OBU size mismatch in RTP packet, original_size=" << original_size + << " remaining=" << remaining; + } + + // 直接拷贝完整的OBU(包括已有的size字段) + _frame->_buffer.append((char*)data, size); + } else { + // 标准情况:RTP包中的OBU没有size字段,需要我们添加 + + // 写入带size bit的OBU头部 + _frame->_buffer.push_back(obu_header | kObuSizePresentBit); + + if (obuHasExtension(obu_header)) { + if (size < 2) { + WarnL << "OBU with extension flag but insufficient data"; + return false; + } + _frame->_buffer.push_back(data[1]); + header_size = 2; + } + + if (size < header_size) { + WarnL << "Invalid OBU size"; + return false; + } + + // 计算payload大小并写入leb128编码的size字段 + uint64_t payload_size = size - header_size; + uint8_t size_bytes[8]; + size_t size_len = writeLeb128(payload_size, size_bytes); + _frame->_buffer.append((char*)size_bytes, size_len); + + // 拷贝payload数据 + if (payload_size > 0) { + _frame->_buffer.append((char*)data + header_size, payload_size); + } + } + + if (obuType(obu_header) == kObuTypeSequenceHeader) { + _received_keyframe = true; + } + + return true; +} + +void AV1RtpDecoder::flushFrame(uint64_t stamp) { + if (_frame->_buffer.empty()) { + return; + } + _frame->_dts = stamp; + _frame->_pts = stamp; + RtpCodec::inputFrame(_frame); + obtainFrame(); +} + +void AV1RtpDecoder::resetState() { + _fragment_buffer.clear(); + _assembling_fragment = false; + _has_last_seq = false; + _received_keyframe = false; +} + +} // namespace mediakit diff --git a/ext-codec/AV1Rtp.h b/ext-codec/AV1Rtp.h new file mode 100644 index 00000000..ddd02141 --- /dev/null +++ b/ext-codec/AV1Rtp.h @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_AV1RTP_H +#define ZLMEDIAKIT_AV1RTP_H + +#include "Rtsp/RtpCodec.h" +#include "Extension/Frame.h" +#include "Extension/CommonRtp.h" + +namespace mediakit { + +/** + * AV1 RTP编码器 + */ +class AV1RtpEncoder : public RtpCodec { +public: + using Ptr = std::shared_ptr; + + AV1RtpEncoder(); + ~AV1RtpEncoder() override = default; + + bool inputFrame(const Frame::Ptr &frame) override; + +private: + // AV1 OBU信息 + struct ObuInfo { + uint8_t header; + uint8_t extension_header; + const uint8_t* payload_data; + size_t payload_size; + bool has_extension; + bool has_size_field; + }; + + std::vector parseObus(const uint8_t* data, size_t size); + void outputRtp(const uint8_t* data, size_t len, bool mark, uint64_t stamp, uint8_t aggregation_header); + uint8_t makeAggregationHeader(bool first_obu_is_fragment, bool last_obu_is_fragment, + int num_obu_elements, bool starts_new_coded_video_sequence); + bool sendObu(const ObuInfo& obu, bool is_first_obu, bool is_last_obu, + bool starts_new_sequence, uint64_t stamp, size_t max_payload_size); + +private: + bool _got_key_frame = false; +}; + +/** + * AV1 RTP解码器 + */ +class AV1RtpDecoder : public RtpCodec { +public: + using Ptr = std::shared_ptr; + + AV1RtpDecoder(); + ~AV1RtpDecoder() override = default; + + bool inputRtp(const RtpPacket::Ptr &rtp, bool key_pos = false) override; + +private: + struct AggregationHeader { + bool first_obu_is_fragment; // Z bit + bool last_obu_is_fragment; // Y bit + int num_obu_elements; // W field (0 = any number) + bool starts_new_coded_video_sequence; // N bit + }; + + AggregationHeader parseAggregationHeader(uint8_t header); + void obtainFrame(); + bool emitObu(const uint8_t* data, size_t size); + bool processPayload(const AggregationHeader& agg_header, const uint8_t* data, + size_t remaining); + void flushFrame(uint64_t stamp); + void resetState(); + +private: + uint64_t _last_dts = 0; + FrameImp::Ptr _frame; + std::vector _fragment_buffer; + bool _assembling_fragment = false; + bool _received_keyframe = false; + bool _has_last_seq = false; + uint16_t _last_seq = 0; + bool _has_last_ssrc = false; + uint32_t _last_ssrc = 0; +}; + +}//namespace mediakit +#endif //ZLMEDIAKIT_AV1RTP_H diff --git a/src/Extension/Factory.cpp b/src/Extension/Factory.cpp index d09d18a1..17e105a5 100644 --- a/src/Extension/Factory.cpp +++ b/src/Extension/Factory.cpp @@ -23,6 +23,7 @@ static std::unordered_map s_plugins; REGISTER_CODEC(h264_plugin); REGISTER_CODEC(h265_plugin); +REGISTER_CODEC(av1_plugin); REGISTER_CODEC(jpeg_plugin); REGISTER_CODEC(aac_plugin); REGISTER_CODEC(opus_plugin);