tiktok.py
18.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
import requests,json,os,time,configparser,re,sys,argparse
class TikTok():
# 初始化
def __init__(self):
self.headers = {
'user-agent': 'Mozilla/5.0 (Linux; Android 8.0; Pixel 2 Build/OPD3.170816.012) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Mobile Safari/537.36 Edg/87.0.664.66'
}
print(sys.platform)
#https://v.douyin.com/FpU3oPn/
self.Isend = False # 抓获所有视频
self.uid = '';# 用户主页
self.save = '';# 保存路径
self.count = ''; # 单页下载数
self.musicarg = ''; # 下载音频
self.mode = '';# 下载模式
self.nickname = ''; # 保存用户名
self.like_counts = 0 # 点赞个数
self.sec = ''# 用户唯一标识
# # 检测配置文件
# if os.path.isfile("conf.ini") == True:
# pass
# else:
# print('[ 提示 ]:没有检测到配置文件,生成中!\r')
# try:
# self.cf = configparser.ConfigParser()
# # 往配置文件写入内容
# self.cf.add_section("url")
# self.cf.set("url", "uid", '')
# self.cf.add_section("music")
# self.cf.set("music", "musicarg", "no")
# self.cf.add_section("count")
# self.cf.set("count", "count", "35")
# self.cf.add_section("save")
# self.cf.set("save", "url", "./Download/")
# self.cf.add_section("mode")
# self.cf.set("mode", "mode", "post")
# with open("conf.ini", "a+") as f:
# self.cf.write(f)
# print('[ 提示 ]:生成成功!\r')
# except:
# #input('[ 提示 ]:生成失败,正在为您下载配置文件!\r')
# # r =requests.get('https://gitee.com/johnserfseed/TikTokDownload/raw/main/conf.ini')
# # with open("conf.ini", "a+") as conf:
# # conf.write(r.content)
# sys.exit()
# # 实例化读取配置文件
# self.cf = configparser.ConfigParser()
# # 用utf-8防止出错
# self.cf.read("conf.ini", encoding="utf-8")
def setting(self,uid,music,count,dir,mode):
self.uid = uid;
self.save = dir;
self.count=count;
self.musicarg=music;
self.mode=mode;
self.judge_link();
# 匹配粘贴的url地址
def Find(self, string):
# findall() 查找匹配正则表达式的字符串
url = re.findall(
'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', string)
return url
# 判断个人主页api链接
def judge_link(self):
# 判断长短链
r = requests.get(url = self.Find(self.uid)[0])
print('[ 提示 ]:为您下载多个视频!\r')
# 获取用户sec_uid
for one in re.finditer(r'user\/([\d\D]*)',str(r.url)):
self.sec = one.group(1)
# key = re.findall('/user/(.*?)\?', str(r.url))[0]
print('[ 提示 ]:用户的sec_id=%s\r' % self.sec)
#else:
# r = requests.get(url = self.Find(self.uid)[0])
# print('[ 提示 ]:为您下载多个视频!\r')
# # 获取用户sec_uid
# # 因为某些情况链接中会有?previous_page=app_code_link参数,为了不影响key结果做二次过滤
# # 2022/03/02: 用户主页链接中不应该出现?previous_page,?enter_from参数
# # 原user/([\d\D]*?)([?])
# # try:
# # for one in re.finditer(r'user\/([\d\D]*)([?])',str(r.url)):
# # key = one.group(1)
# # except:
# for one in re.finditer(r'user\/([\d\D]*)',str(r.url)):
# self.sec = one.group(1)
# print('[ 提示 ]:用户的sec_id=%s\r' % self.sec)
# 第一次访问页码
max_cursor = 0
# 构造第一次访问链接
api_post_url = 'https://www.iesdouyin.com/web/api/v2/aweme/%s/?sec_uid=%s&count=%s&max_cursor=%s&aid=1128&_signature=PDHVOQAAXMfFyj02QEpGaDwx1S&dytk=' % (
self.mode, self.sec, str(self.count), max_cursor)
response = requests.get(url = api_post_url, headers = self.headers)
html = json.loads(response.content.decode())
self.nickname = html['aweme_list'][0]['author']['nickname']
if not os.path.exists(self.save + self.mode + "/" + self.nickname):
os.makedirs(self.save + self.mode + "/" + self.nickname)
self.get_data(api_post_url, max_cursor)
return api_post_url,max_cursor,self.sec
# 获取第一次api数据
def get_data(self, api_post_url, max_cursor):
# 尝试次数
index = 0
# 存储api数据
result = []
while result == []:
index += 1
print('[ 提示 ]:正在进行第 %d 次尝试\r' % index)
time.sleep(0.3)
response = requests.get(
url = api_post_url, headers = self.headers)
html = json.loads(response.content.decode())
# with open('r.json', 'wb')as f:
# f.write(response.content)
if self.Isend == False:
# 下一页值
print('[ 用户 ]:',str(self.nickname),'\r')
max_cursor = html['max_cursor']
result = html['aweme_list']
print('[ 提示 ]:抓获数据成功!\r')
# 处理第一页视频信息
self.video_info(result, max_cursor)
else:
max_cursor = html['max_cursor']
self.next_data(max_cursor)
# self.Isend = True
print('[ 提示 ]:此页无数据,为您跳过......\r')
return result,max_cursor
# 下一页
def next_data(self,max_cursor):
# 获取解码后原地址
r = requests.get(url = self.Find(self.uid)[0])
# 获取用户sec_uid
#key = re.findall('/user/(.*?)\?', str(r.url))[0]
#if not key:
# key = r.url[28:83]
# key = self.sec
if self.uid[0:20] == 'https://v.douyin.com':
r = requests.get(url = self.Find(self.uid)[0])
# 获取用户sec_uid
for one in re.finditer(r'user/([\d\D]*?)\?',str(r.url)):
self.sec = one.group(1)
else:
r = requests.get(url = self.Find(self.uid)[0])
for one in re.finditer(r'user\/([\d\D]*)',str(r.url)):
self.sec = one.group(1)
# 构造下一次访问链接
api_naxt_post_url = 'https://www.iesdouyin.com/web/api/v2/aweme/%s/?sec_uid=%s&count=%s&max_cursor=%s&aid=1128&_signature=RuMN1wAAJu7w0.6HdIeO2EbjDc&dytk=' % (
self.mode, self.sec, str(self.count), max_cursor)
index = 0
result = []
while self.Isend == False:
# 回到首页,则结束
if max_cursor == 0:
self.Isend = True
return
index += 1
print('[ 提示 ]:正在对', max_cursor, '页进行第 %d 次尝试!\r' % index)
time.sleep(0.3)
response = requests.get(url = api_naxt_post_url, headers = self.headers)
html = json.loads(response.content.decode())
if self.Isend == False:
# 下一页值
max_cursor = html['max_cursor']
result = html['aweme_list']
print('[ 提示 ]:%d页抓获数据成功!\r' % max_cursor)
# 处理下一页视频信息
self.video_info(result, max_cursor)
else:
self.Isend == True
print('[ 提示 ]:%d页抓获数据失败!\r' % max_cursor)
# sys.exit()
# 处理视频信息
def video_info(self, result, max_cursor):
# 作者信息 # 无水印视频链接 # 作品id # 作者id # 唯一视频标识# 封面大图
author_list = [];video_list = [];aweme_id = [];nickname = [];uri_list=[]# dynamic_cover = []
for v in range(self.count):
try:
author_list.append(str(result[v]['desc']))
# 2022/04/22
# 如果直接从 /web/api/v2/aweme/post 这个接口拿数据,那么只有720p的清晰度
# 如果在 /web/api/v2/aweme/iteminfo/ 这个接口拿视频uri
# 拼接到 aweme.snssdk.com/aweme/v1/play/?video_id=xxxx&radio=1080p 则获取到1080p清晰的
video_list.append(str(result[v]['video']['play_addr']['url_list'][0]))
uri_list.append(str(result[v]['video']['play_addr']['uri']))
aweme_id.append(str(result[v]['aweme_id']))
nickname.append(str(result[v]['author']['nickname']))
# dynamic_cover.append(str(result[v]['video']['dynamic_cover']['url_list'][0]))
except Exception as error:
# print(error)
pass
self.videos_download(author_list, video_list, uri_list, aweme_id, nickname, max_cursor)
return self,author_list,video_list,uri_list,aweme_id,nickname,max_cursor
# 检测视频是否已经下载过
def check_info(self, nickname):
if nickname == []:
return
else:
v_info = os.listdir((self.save + self.mode + "/" + nickname))
return v_info
# 音视频下载
def videos_download(self, author_list, video_list, uri_list, aweme_id, nickname, max_cursor):
# 生成1080p分辨率的视频链接
new_video_list = [];
uri_url = 'https://aweme.snssdk.com/aweme/v1/play/?video_id=%s&radio=1080p&line=0';
# 创建并检测下载目录是否存在
try:
os.makedirs(self.save + self.mode + "/" + nickname[0])
except:
pass
v_info = self.check_info(self.nickname)
# self.count值可能大于实际api的长度,所以用len(author_list) 2022/03/22改
for i in range(len(author_list)):
new_video_list.append(uri_url % uri_list[i]) # 生成1080p视频链接
self.like_counts += 1 # 点赞视频排序
# 获取单部视频接口信息
try:
jx_url = f'https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids={aweme_id[i]}' # 官方接口
js = json.loads(requests.get(
url = jx_url,headers=self.headers).text)
creat_time = time.strftime("%Y-%m-%d %H.%M.%S", time.localtime(js['item_list'][0]['create_time']))
except Exception as error:
# print(error)
pass
# Code From RobotJohns https://github.com/RobotJohns
# 移除文件名称 /r/n
author_list[i] = ''.join(author_list[i].splitlines())
if len(author_list[i]) > 182:
print("[ 提示 ]:", "文件名称太长 进行截取")
author_list[i] = author_list[i][0:180]
print("[ 提示 ]:","截取后的文案:{0},长度:{1}".format(author_list[i], len(author_list[i])))
# 每次判断视频是否已经下载过
try:
if creat_time + author_list[i] + '.mp4' in v_info:
print('[ 提示 ]:', author_list[i], '[文件已存在,为您跳过]', end = "") # 开始下载,显示下载文件大小
for i in range(20):
print(">",end = '', flush = True)
time.sleep(0.01)
print('\r')
continue
except:
# 防止下标越界
pass
# 尝试下载音频
try:
if self.musicarg == "yes": # 保留音频
music_url = str(js['item_list'][0]['music']['play_url']['url_list'][0])
music_title = str(js['item_list'][0]['music']['author'])
music=requests.get(music_url) # 保存音频
start = time.time() # 下载开始时间
size = 0 # 初始化已下载大小
chunk_size = 1024 # 每次下载的数据大小
content_size = int(music.headers['content-length']) # 下载文件总大小
if music.status_code == 200: # 判断是否响应成功
print('[ 音频 ]:'+ creat_time + author_list[i]+'[文件 大小]:{size:.2f} MB'.format(
size = content_size / chunk_size /1024)) # 开始下载,显示下载文件大小
if self.mode == 'post':
m_url = self.save + self.mode + "/" + nickname[i] + "/" + creat_time + re.sub(
r'[\\/:*?"<>|\r\n]+', "_", music_title) + '_' + author_list[i] + '.mp3'
else:
m_url = self.save + self.mode + "/" + self.nickname + "/" + str(self.like_counts)+ '、' + re.sub(
r'[\\/:*?"<>|\r\n]+', "_", music_title) + '_' + author_list[i] + '.mp3'
with open(m_url,'wb') as file: # 显示进度条
for data in music.iter_content(chunk_size = chunk_size):
file.write(data)
size += len(data)
print('\r' + '[下载进度]:%s%.2f%%' % (
'>' * int(size * 50 / content_size), float(size / content_size * 100)), end=' ')
end = time.time() # 下载结束时间
print('\n' + '[下载完成]:耗时: %.2f秒\n' % (
end - start)) # 输出下载用时时间
except Exception as error:
print(error)
print('\r[ 警告 ]:下载音频出错!\r')
# 尝试下载视频
try:
video = requests.get(video_list[i]) # 视频信息
t_video = requests.get(url=new_video_list[i],
headers=self.headers).content # 视频内容
start = time.time() # 下载开始时间
size = 0 # 初始化已下载大小
chunk_size = 1024 # 每次下载的数据大小
content_size = int(video.headers['content-length']) # 下载文件总大小
try:
if video.status_code == 200: # 判断是否响应成功
print('[ 视频 ]:' + creat_time + author_list[i] + '[文件 大小]:{size:.2f} MB'.format(
size = content_size / chunk_size /1024)) # 开始下载,显示下载文件大小
if self.mode == 'post':
v_url = self.save + self.mode + "/" + nickname[i] + '/' + creat_time + re.sub(
r'[\\/:*?"<>|\r\n]+', "_", author_list[i]) + '.mp4'
else:
v_url = self.save + self.mode + "/" + self.nickname + '/' + str(self.like_counts)+ '、' + re.sub(
r'[\\/:*?"<>|\r\n]+', "_", author_list[i]) + '.mp4'
with open(v_url,'wb') as file: # 显示进度条
for data in video.iter_content(chunk_size = chunk_size):
size += len(data)
print('\r' + '[下载进度]:%s%.2f%%' % (
'>' * int(size * 50 / content_size), float(size / content_size * 100)), end=' ')
file.write(t_video)
end = time.time() # 下载结束时间
print('\n' + '[下载完成]:耗时: %.2f秒\n' % (
end - start)) # 输出下载用时时间
except Exception as error:
print('[ 警告 ]:下载视频出错!')
print('[ 警告 ]:', error, '\r')
except Exception as error:
# print(error)
print('[ 提示 ]:该页视频资源没有', self.count, '个,已为您跳过!\r')
break
# 获取下一页信息
self.next_data(max_cursor)
# 主模块执行
if __name__ == "__main__":
# 获取命令行函数
def get_args(user,dir,music,count,mode):
# 新建TK实例
TK = TikTok()
# 命令行传参
TK.setting(user,music,count,dir,mode)
input('[ 完成 ]:已完成批量下载,输入任意键后退出:')
sys.exit(0)
try:
parser = argparse.ArgumentParser(description='TikTokMulti V1.2.5 使用帮助')
parser.add_argument('--user', '-u', type=str, help='为用户主页链接,非必要参数', required=False)
parser.add_argument('--dir','-d', type=str,help='视频保存目录,非必要参数, 默认./TikTokDownload/', default='./TikTokDownload/')
#parser.add_argument('--single', '-s', type=str, help='单条视频链接,非必要参数,与--user参数冲突')
parser.add_argument('--music', '-m', type=str, help='视频音乐下载,非必要参数, 默认no可选yes', default='no')
parser.add_argument('--count', '-c', type=int, help='单页下载的数量,默认参数 35 无须修改', default=35)
parser.add_argument('--mode', '-M', type=str, help='下载模式选择,默认post:发布的视频 可选like:点赞视频(需要开放权限)', default='post')
args = parser.parse_args()
user = input('请输入用户个人主页地址:')
# 获取命令行
get_args(user, args.dir, args.music, args.count, args.mode)
except Exception as e:
# print(e)
print('[ 提示 ]:未输入命令或意外出错,自动退出!')
sys.exit(0)