Files
weiyu/modules/python/vendors/AsrTools/bk_asr/JianYingASR.py
2024-12-14 10:43:31 +08:00

252 lines
10 KiB
Python

import datetime
import hashlib
import hmac
import json
import logging
import os
import time
from typing import Dict, Tuple
import requests
from .ASRData import ASRDataSeg
from .BaseASR import BaseASR
class JianYingASR(BaseASR):
def __init__(self, audio_path: [str, bytes], use_cache: bool = False, start_time: float = 0, end_time: float = 6000):
super().__init__(audio_path, use_cache)
self.audio_path = audio_path
self.end_time = end_time
self.start_time = start_time
# AWS credentials
self.session_token = None
self.secret_key = None
self.access_key = None
# Upload details
self.store_uri = None
self.auth = None
self.upload_id = None
self.session_key = None
self.upload_hosts = None
def submit(self) -> str:
"""Submit the task"""
url = "https://lv-pc-api-sinfonlinec.ulikecam.com/lv/v1/audio_subtitle/submit"
payload = {
"adjust_endtime": 200,
"audio": self.store_uri,
"caption_type": 2,
"client_request_id": "45faf98c-160f-4fae-a649-6d89b0fe35be",
"max_lines": 1,
"songs_info": [{"end_time": self.end_time, "id": "", "start_time": self.start_time}],
"words_per_line": 16
}
sign, device_time = self._generate_sign_parameters(url='/lv/v1/audio_subtitle/submit', pf='4', appvr='4.0.0',
tdid='3943278516897751')
headers = self._build_headers(device_time, sign)
response = requests.post(url, json=payload, headers=headers)
query_id = response.json()['data']['id']
return query_id
def upload(self):
"""Upload the file"""
self._upload_sign()
self._upload_auth()
self._upload_file()
self._upload_check()
uri = self._upload_commit()
return uri
def query(self, query_id: str):
"""Query the task"""
url = "https://lv-pc-api-sinfonlinec.ulikecam.com/lv/v1/audio_subtitle/query"
payload = {
"id": query_id,
"pack_options": {"need_attribute": True}
}
sign, device_time = self._generate_sign_parameters(url='/lv/v1/audio_subtitle/query', pf='4', appvr='4.0.0',
tdid='3943278516897751')
headers = self._build_headers(device_time, sign)
response = requests.post(url, json=payload, headers=headers)
return response.json()
def _run(self):
logging.info("正在上传文件...")
self.upload()
query_id = self.submit()
logging.info(f"任务提交成功, 查询ID: {query_id}")
resp_data = self.query(query_id)
return resp_data
def _make_segments(self, resp_data: dict) -> list[ASRDataSeg]:
return [ASRDataSeg(u['text'], u['start_time'], u['end_time']) for u in resp_data['data']['utterances']]
@staticmethod
def _generate_sign_parameters(url: str, pf: str = '4', appvr: str = '4.0.0', tdid: str = '3943278516897751') -> \
Tuple[str, str]:
"""Generate signature and timestamp via an HTTP request"""
current_time = str(int(time.time()))
data = {
'url': url,
'current_time': current_time,
'pf': pf,
'appvr': appvr,
'tdid': tdid
}
# Replace with your actual endpoint URL
get_sign_url = 'https://asrtools-update.bkfeng.top/sign'
try:
response = requests.post(get_sign_url, json=data)
response.raise_for_status()
response_data = response.json()
sign = response_data.get('sign')
if not sign:
raise ValueError("No 'sign' in response")
except requests.exceptions.RequestException as e:
raise SystemExit(f"HTTP Request failed: {e}")
except ValueError as ve:
raise SystemExit(f"Invalid response: {ve}")
return sign.lower(), current_time
def _build_headers(self, device_time: str, sign: str) -> Dict[str, str]:
"""Build headers for requests"""
return {
'User-Agent': "Cronet/TTNetVersion:01594da2 2023-03-14 QuicVersion:46688bb4 2022-11-28",
'appvr': "4.0.0",
'device-time': str(device_time),
'pf': "4",
'sign': sign,
'sign-ver': "1",
'tdid': "3943278516897751",
}
def _uplosd_headers(self):
headers = {
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36 Thea/1.0.1",
'Authorization': self.auth,
'Content-CRC32': self.crc32_hex,
}
return headers
def _upload_sign(self):
"""Get upload sign"""
url = "https://lv-pc-api-sinfonlinec.ulikecam.com/lv/v1/upload_sign"
payload = json.dumps({"biz": "pc-recognition"})
sign, device_time = self._generate_sign_parameters(url='/lv/v1/upload_sign', pf='4', appvr='4.0.0',
tdid='3943278516897751')
headers = self._build_headers(device_time, sign)
response = requests.post(url, data=payload, headers=headers)
response.raise_for_status()
login_data = response.json()
self.access_key = login_data['data']['access_key_id']
self.secret_key = login_data['data']['secret_access_key']
self.session_token = login_data['data']['session_token']
return self.access_key, self.secret_key, self.session_token
def _upload_auth(self):
"""Get upload authorization"""
if isinstance(self.audio_path, bytes):
file_size = len(self.audio_path)
else:
file_size = os.path.getsize(self.audio_path)
request_parameters = f'Action=ApplyUploadInner&FileSize={file_size}&FileType=object&IsInner=1&SpaceName=lv-mac-recognition&Version=2020-11-19&s=5y0udbjapi'
t = datetime.datetime.utcnow()
amz_date = t.strftime('%Y%m%dT%H%M%SZ')
datestamp = t.strftime('%Y%m%d')
headers = {
"x-amz-date": amz_date,
"x-amz-security-token": self.session_token
}
signature = aws_signature(self.secret_key, request_parameters, headers, region="cn", service="vod")
authorization = f"AWS4-HMAC-SHA256 Credential={self.access_key}/{datestamp}/cn/vod/aws4_request, SignedHeaders=x-amz-date;x-amz-security-token, Signature={signature}"
headers["authorization"] = authorization
response = requests.get(f"https://vod.bytedanceapi.com/?{request_parameters}", headers=headers)
store_infos = response.json()
self.store_uri = store_infos['Result']['UploadAddress']['StoreInfos'][0]['StoreUri']
self.auth = store_infos['Result']['UploadAddress']['StoreInfos'][0]['Auth']
self.upload_id = store_infos['Result']['UploadAddress']['StoreInfos'][0]['UploadID']
self.session_key = store_infos['Result']['UploadAddress']['SessionKey']
self.upload_hosts = store_infos['Result']['UploadAddress']['UploadHosts'][0]
self.store_uri = store_infos['Result']['UploadAddress']['StoreInfos'][0]['StoreUri']
return store_infos
def _upload_file(self):
"""Upload the file"""
url = f"https://{self.upload_hosts}/{self.store_uri}?partNumber=1&uploadID={self.upload_id}"
headers = self._uplosd_headers()
response = requests.put(url, data=self.file_binary, headers=headers)
resp_data = response.json()
assert resp_data['success'] == 0, f"File upload failed: {response.text}"
return resp_data
def _upload_check(self):
"""Check upload result"""
url = f"https://{self.upload_hosts}/{self.store_uri}?uploadID={self.upload_id}"
payload = f"1:{self.crc32_hex}"
headers = self._uplosd_headers()
response = requests.post(url, data=payload, headers=headers)
resp_data = response.json()
return resp_data
def _upload_commit(self):
"""Commit the uploaded file"""
url = f"https://{self.upload_hosts}/{self.store_uri}?uploadID={self.upload_id}&partNumber=1&x-amz-security-token={self.session_token}"
headers = self._uplosd_headers()
response = requests.put(url, data=self.file_binary, headers=headers)
return self.store_uri
def sign(key: bytes, msg: str) -> bytes:
"""使用HMAC-SHA256生成签名"""
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def get_signature_key(secret_key: str, date_stamp: str, region_name: str, service_name: str) -> bytes:
"""生成用于AWS签名的密钥"""
k_date = sign(('AWS4' + secret_key).encode('utf-8'), date_stamp)
k_region = sign(k_date, region_name)
k_service = sign(k_region, service_name)
k_signing = sign(k_service, 'aws4_request')
return k_signing
def aws_signature(secret_key: str, request_parameters: str, headers: Dict[str, str],
method: str = "GET", payload: str = '', region: str = "cn", service: str = "vod") -> str:
"""生成AWS签名"""
canonical_uri = '/'
canonical_querystring = request_parameters
canonical_headers = '\n'.join([f"{key}:{value}" for key, value in headers.items()]) + '\n'
signed_headers = ';'.join(headers.keys())
payload_hash = hashlib.sha256(payload.encode('utf-8')).hexdigest()
canonical_request = f"{method}\n{canonical_uri}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers}\n{payload_hash}"
amzdate = headers["x-amz-date"]
datestamp = amzdate.split('T')[0]
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = f"{datestamp}/{region}/{service}/aws4_request"
string_to_sign = f"{algorithm}\n{amzdate}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}"
signing_key = get_signature_key(secret_key, datestamp, region, service)
signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
return signature
if __name__ == '__main__':
# Example usage
audio_file = r"test.mp3"
asr = JianYingASR(audio_file)
asr_data = asr.run()
print(asr_data)