mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-22 09:26:37 +03:00
Add new methods to encode and decode waveforms
This commit is contained in:
parent
065719c8d8
commit
770c2c504d
|
@ -1141,6 +1141,72 @@ def get_appropriated_part_size(file_size):
|
|||
raise ValueError('File size too large')
|
||||
|
||||
|
||||
def encode_waveform(waveform):
|
||||
"""
|
||||
Encodes the input ``bytes`` into a 5-bit byte-string
|
||||
to be used as a voice note's waveform. See `decode_waveform`
|
||||
for the reverse operation.
|
||||
|
||||
Example
|
||||
.. code-block:: python
|
||||
|
||||
chat = ...
|
||||
file = 'my.ogg'
|
||||
|
||||
# Send 'my.ogg' with a ascending-triangle waveform
|
||||
client.send_file(chat, file, attributes=[types.DocumentAttributeAudio(
|
||||
duration=7,
|
||||
voice=True,
|
||||
waveform=utils.encode_waveform(bytes(range(2 ** 5)) # 2**5 because 5-bit
|
||||
)]
|
||||
|
||||
# Send 'my.ogg' with a square waveform
|
||||
client.send_file(chat, file, attributes=[types.DocumentAttributeAudio(
|
||||
duration=7,
|
||||
voice=True,
|
||||
waveform=utils.encode_waveform(bytes((31, 31, 15, 15, 15, 15, 31, 31)) * 4)
|
||||
)]
|
||||
"""
|
||||
bits_count = len(waveform) * 5
|
||||
bytes_count = (bits_count + 7) // 8
|
||||
result = bytearray(bytes_count + 1)
|
||||
|
||||
for i in range(len(waveform)):
|
||||
byte_index, bit_shift = divmod(i * 5, 8)
|
||||
value = (waveform[i] & 0b00011111) << bit_shift
|
||||
|
||||
or_what = struct.unpack('<H', (result[byte_index:byte_index + 2]))[0]
|
||||
or_what |= value
|
||||
result[byte_index:byte_index + 2] = struct.pack('<H', or_what)
|
||||
|
||||
return bytes(result[:bytes_count])
|
||||
|
||||
|
||||
def decode_waveform(waveform):
|
||||
"""
|
||||
Inverse operation of `encode_waveform`.
|
||||
"""
|
||||
bit_count = len(waveform) * 8
|
||||
value_count = bit_count // 5
|
||||
if value_count == 0:
|
||||
return b''
|
||||
|
||||
result = bytearray(value_count)
|
||||
for i in range(value_count - 1):
|
||||
byte_index, bit_shift = divmod(i * 5, 8)
|
||||
value = struct.unpack('<H', waveform[byte_index:byte_index + 2])[0]
|
||||
result[i] = (value >> bit_shift) & 0b00011111
|
||||
|
||||
byte_index, bit_shift = divmod(value_count - 1, 8)
|
||||
if byte_index == len(waveform) - 1:
|
||||
value = waveform[byte_index]
|
||||
else:
|
||||
value = struct.unpack('<H', waveform[byte_index:byte_index + 2])[0]
|
||||
|
||||
result[value_count - 1] = (value >> bit_shift) & 0b00011111
|
||||
return bytes(result)
|
||||
|
||||
|
||||
class AsyncClassWrapper:
|
||||
def __init__(self, wrapped):
|
||||
self.wrapped = wrapped
|
||||
|
|
Loading…
Reference in New Issue
Block a user