Skip to content
切换导航条
切换导航条
当前项目
正在载入...
登录
Harvey
/
ydu-lottery-shop-app
转到一个项目
切换导航栏
切换导航栏固定状态
项目
群组
代码片段
帮助
项目
活动
版本库
流水线
图表
问题
0
合并请求
0
维基
网络
创建新的问题
作业
提交
问题看板
文件
提交
网络
比较
分支
标签
Commit 1f85ac1a
由
Harvey
编写于
2022-06-25 14:29:46 +0800
浏览文件
选项
浏览文件
标签
下载
电子邮件补丁
差异文件
no message
1 个父辈
b1f91b58
隐藏空白字符变更
内嵌
并排
正在显示
1 个修改的文件
包含
382 行增加
和
0 行删除
tiktok.py
tiktok.py
0 → 100644
查看文件 @
1f85ac1
import
requests
,
json
,
os
,
time
,
configparser
,
re
,
sys
,
argparse
class
TikTok
():
# 初始化
def
__init__
(
self
):
self
.
headers
=
{
'user-agent'
:
'Mozilla/5.0 (Linux; Android 8.0; Pixel 2 Build/OPD3.170816.012) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Mobile Safari/537.36 Edg/87.0.664.66'
}
#https://v.douyin.com/FpU3oPn/
self
.
Isend
=
False
# 抓获所有视频
self
.
uid
=
''
;
# 用户主页
self
.
save
=
'/Download/'
;
# 保存路径
self
.
count
=
''
;
# 单页下载数
self
.
musicarg
=
''
;
# 下载音频
self
.
mode
=
''
;
# 下载模式
self
.
nickname
=
''
;
# 保存用户名
self
.
like_counts
=
0
# 点赞个数
self
.
sec
=
''
# 用户唯一标识
# # 检测配置文件
# if os.path.isfile("conf.ini") == True:
# pass
# else:
# print('[ 提示 ]:没有检测到配置文件,生成中!\r')
# try:
# self.cf = configparser.ConfigParser()
# # 往配置文件写入内容
# self.cf.add_section("url")
# self.cf.set("url", "uid", '')
# self.cf.add_section("music")
# self.cf.set("music", "musicarg", "no")
# self.cf.add_section("count")
# self.cf.set("count", "count", "35")
# self.cf.add_section("save")
# self.cf.set("save", "url", "./Download/")
# self.cf.add_section("mode")
# self.cf.set("mode", "mode", "post")
# with open("conf.ini", "a+") as f:
# self.cf.write(f)
# print('[ 提示 ]:生成成功!\r')
# except:
# #input('[ 提示 ]:生成失败,正在为您下载配置文件!\r')
# # r =requests.get('https://gitee.com/johnserfseed/TikTokDownload/raw/main/conf.ini')
# # with open("conf.ini", "a+") as conf:
# # conf.write(r.content)
# sys.exit()
# # 实例化读取配置文件
# self.cf = configparser.ConfigParser()
# # 用utf-8防止出错
# self.cf.read("conf.ini", encoding="utf-8")
def
setting
(
self
,
uid
,
music
,
count
,
dir
,
mode
):
self
.
uid
=
uid
;
self
.
save
=
dir
;
self
.
count
=
count
;
self
.
musicarg
=
music
;
self
.
mode
=
mode
;
self
.
judge_link
();
# 匹配粘贴的url地址
def
Find
(
self
,
string
):
# findall() 查找匹配正则表达式的字符串
url
=
re
.
findall
(
'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*
\
(
\
),]|(?:
%
[0-9a-fA-F][0-9a-fA-F]))+'
,
string
)
return
url
# 判断个人主页api链接
def
judge_link
(
self
):
# 判断长短链
r
=
requests
.
get
(
url
=
self
.
Find
(
self
.
uid
)[
0
])
print
(
'[ 提示 ]:为您下载多个视频!
\r
'
)
# 获取用户sec_uid
for
one
in
re
.
finditer
(
r'user\/([\d\D]*)'
,
str
(
r
.
url
)):
self
.
sec
=
one
.
group
(
1
)
# key = re.findall('/user/(.*?)\?', str(r.url))[0]
print
(
'[ 提示 ]:用户的sec_id=
%
s
\r
'
%
self
.
sec
)
#else:
# r = requests.get(url = self.Find(self.uid)[0])
# print('[ 提示 ]:为您下载多个视频!\r')
# # 获取用户sec_uid
# # 因为某些情况链接中会有?previous_page=app_code_link参数,为了不影响key结果做二次过滤
# # 2022/03/02: 用户主页链接中不应该出现?previous_page,?enter_from参数
# # 原user/([\d\D]*?)([?])
# # try:
# # for one in re.finditer(r'user\/([\d\D]*)([?])',str(r.url)):
# # key = one.group(1)
# # except:
# for one in re.finditer(r'user\/([\d\D]*)',str(r.url)):
# self.sec = one.group(1)
# print('[ 提示 ]:用户的sec_id=%s\r' % self.sec)
# 第一次访问页码
max_cursor
=
0
# 构造第一次访问链接
api_post_url
=
'https://www.iesdouyin.com/web/api/v2/aweme/
%
s/?sec_uid=
%
s&count=
%
s&max_cursor=
%
s&aid=1128&_signature=PDHVOQAAXMfFyj02QEpGaDwx1S&dytk='
%
(
self
.
mode
,
self
.
sec
,
str
(
self
.
count
),
max_cursor
)
response
=
requests
.
get
(
url
=
api_post_url
,
headers
=
self
.
headers
)
html
=
json
.
loads
(
response
.
content
.
decode
())
self
.
nickname
=
html
[
'aweme_list'
][
0
][
'author'
][
'nickname'
]
if
not
os
.
path
.
exists
(
self
.
save
+
self
.
mode
+
"/"
+
self
.
nickname
):
os
.
makedirs
(
self
.
save
+
self
.
mode
+
"/"
+
self
.
nickname
)
self
.
get_data
(
api_post_url
,
max_cursor
)
return
api_post_url
,
max_cursor
,
self
.
sec
# 获取第一次api数据
def
get_data
(
self
,
api_post_url
,
max_cursor
):
# 尝试次数
index
=
0
# 存储api数据
result
=
[]
while
result
==
[]:
index
+=
1
print
(
'[ 提示 ]:正在进行第
%
d 次尝试
\r
'
%
index
)
time
.
sleep
(
0.3
)
response
=
requests
.
get
(
url
=
api_post_url
,
headers
=
self
.
headers
)
html
=
json
.
loads
(
response
.
content
.
decode
())
# with open('r.json', 'wb')as f:
# f.write(response.content)
if
self
.
Isend
==
False
:
# 下一页值
print
(
'[ 用户 ]:'
,
str
(
self
.
nickname
),
'
\r
'
)
max_cursor
=
html
[
'max_cursor'
]
result
=
html
[
'aweme_list'
]
print
(
'[ 提示 ]:抓获数据成功!
\r
'
)
# 处理第一页视频信息
self
.
video_info
(
result
,
max_cursor
)
else
:
max_cursor
=
html
[
'max_cursor'
]
self
.
next_data
(
max_cursor
)
# self.Isend = True
print
(
'[ 提示 ]:此页无数据,为您跳过......
\r
'
)
return
result
,
max_cursor
# 下一页
def
next_data
(
self
,
max_cursor
):
# 获取解码后原地址
r
=
requests
.
get
(
url
=
self
.
Find
(
self
.
uid
)[
0
])
# 获取用户sec_uid
#key = re.findall('/user/(.*?)\?', str(r.url))[0]
#if not key:
# key = r.url[28:83]
# key = self.sec
if
self
.
uid
[
0
:
20
]
==
'https://v.douyin.com'
:
r
=
requests
.
get
(
url
=
self
.
Find
(
self
.
uid
)[
0
])
# 获取用户sec_uid
for
one
in
re
.
finditer
(
r'user/([\d\D]*?)\?'
,
str
(
r
.
url
)):
self
.
sec
=
one
.
group
(
1
)
else
:
r
=
requests
.
get
(
url
=
self
.
Find
(
self
.
uid
)[
0
])
for
one
in
re
.
finditer
(
r'user\/([\d\D]*)'
,
str
(
r
.
url
)):
self
.
sec
=
one
.
group
(
1
)
# 构造下一次访问链接
api_naxt_post_url
=
'https://www.iesdouyin.com/web/api/v2/aweme/
%
s/?sec_uid=
%
s&count=
%
s&max_cursor=
%
s&aid=1128&_signature=RuMN1wAAJu7w0.6HdIeO2EbjDc&dytk='
%
(
self
.
mode
,
self
.
sec
,
str
(
self
.
count
),
max_cursor
)
index
=
0
result
=
[]
while
self
.
Isend
==
False
:
# 回到首页,则结束
if
max_cursor
==
0
:
self
.
Isend
=
True
return
index
+=
1
print
(
'[ 提示 ]:正在对'
,
max_cursor
,
'页进行第
%
d 次尝试!
\r
'
%
index
)
time
.
sleep
(
0.3
)
response
=
requests
.
get
(
url
=
api_naxt_post_url
,
headers
=
self
.
headers
)
html
=
json
.
loads
(
response
.
content
.
decode
())
if
self
.
Isend
==
False
:
# 下一页值
max_cursor
=
html
[
'max_cursor'
]
result
=
html
[
'aweme_list'
]
print
(
'[ 提示 ]:
%
d页抓获数据成功!
\r
'
%
max_cursor
)
# 处理下一页视频信息
self
.
video_info
(
result
,
max_cursor
)
else
:
self
.
Isend
==
True
print
(
'[ 提示 ]:
%
d页抓获数据失败!
\r
'
%
max_cursor
)
# sys.exit()
# 处理视频信息
def
video_info
(
self
,
result
,
max_cursor
):
# 作者信息 # 无水印视频链接 # 作品id # 作者id # 唯一视频标识# 封面大图
author_list
=
[];
video_list
=
[];
aweme_id
=
[];
nickname
=
[];
uri_list
=
[]
# dynamic_cover = []
for
v
in
range
(
self
.
count
):
try
:
author_list
.
append
(
str
(
result
[
v
][
'desc'
]))
# 2022/04/22
# 如果直接从 /web/api/v2/aweme/post 这个接口拿数据,那么只有720p的清晰度
# 如果在 /web/api/v2/aweme/iteminfo/ 这个接口拿视频uri
# 拼接到 aweme.snssdk.com/aweme/v1/play/?video_id=xxxx&radio=1080p 则获取到1080p清晰的
video_list
.
append
(
str
(
result
[
v
][
'video'
][
'play_addr'
][
'url_list'
][
0
]))
uri_list
.
append
(
str
(
result
[
v
][
'video'
][
'play_addr'
][
'uri'
]))
aweme_id
.
append
(
str
(
result
[
v
][
'aweme_id'
]))
nickname
.
append
(
str
(
result
[
v
][
'author'
][
'nickname'
]))
# dynamic_cover.append(str(result[v]['video']['dynamic_cover']['url_list'][0]))
except
Exception
as
error
:
# print(error)
pass
self
.
videos_download
(
author_list
,
video_list
,
uri_list
,
aweme_id
,
nickname
,
max_cursor
)
return
self
,
author_list
,
video_list
,
uri_list
,
aweme_id
,
nickname
,
max_cursor
# 检测视频是否已经下载过
def
check_info
(
self
,
nickname
):
if
nickname
==
[]:
return
else
:
v_info
=
os
.
listdir
((
self
.
save
+
self
.
mode
+
"/"
+
nickname
))
return
v_info
# 音视频下载
def
videos_download
(
self
,
author_list
,
video_list
,
uri_list
,
aweme_id
,
nickname
,
max_cursor
):
# 生成1080p分辨率的视频链接
new_video_list
=
[];
uri_url
=
'https://aweme.snssdk.com/aweme/v1/play/?video_id=
%
s&radio=1080p&line=0'
;
# 创建并检测下载目录是否存在
try
:
os
.
makedirs
(
self
.
save
+
self
.
mode
+
"/"
+
nickname
[
0
])
except
:
pass
v_info
=
self
.
check_info
(
self
.
nickname
)
# self.count值可能大于实际api的长度,所以用len(author_list) 2022/03/22改
for
i
in
range
(
len
(
author_list
)):
new_video_list
.
append
(
uri_url
%
uri_list
[
i
])
# 生成1080p视频链接
self
.
like_counts
+=
1
# 点赞视频排序
# 获取单部视频接口信息
try
:
jx_url
=
f
'https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids={aweme_id[i]}'
# 官方接口
js
=
json
.
loads
(
requests
.
get
(
url
=
jx_url
,
headers
=
self
.
headers
)
.
text
)
creat_time
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H.
%
M.
%
S"
,
time
.
localtime
(
js
[
'item_list'
][
0
][
'create_time'
]))
except
Exception
as
error
:
# print(error)
pass
# Code From RobotJohns https://github.com/RobotJohns
# 移除文件名称 /r/n
author_list
[
i
]
=
''
.
join
(
author_list
[
i
]
.
splitlines
())
if
len
(
author_list
[
i
])
>
182
:
print
(
"[ 提示 ]:"
,
"文件名称太长 进行截取"
)
author_list
[
i
]
=
author_list
[
i
][
0
:
180
]
print
(
"[ 提示 ]:"
,
"截取后的文案:{0},长度:{1}"
.
format
(
author_list
[
i
],
len
(
author_list
[
i
])))
# 每次判断视频是否已经下载过
try
:
if
creat_time
+
author_list
[
i
]
+
'.mp4'
in
v_info
:
print
(
'[ 提示 ]:'
,
author_list
[
i
],
'[文件已存在,为您跳过]'
,
end
=
""
)
# 开始下载,显示下载文件大小
for
i
in
range
(
20
):
print
(
">"
,
end
=
''
,
flush
=
True
)
time
.
sleep
(
0.01
)
print
(
'
\r
'
)
continue
except
:
# 防止下标越界
pass
# 尝试下载音频
try
:
if
self
.
musicarg
==
"yes"
:
# 保留音频
music_url
=
str
(
js
[
'item_list'
][
0
][
'music'
][
'play_url'
][
'url_list'
][
0
])
music_title
=
str
(
js
[
'item_list'
][
0
][
'music'
][
'author'
])
music
=
requests
.
get
(
music_url
)
# 保存音频
start
=
time
.
time
()
# 下载开始时间
size
=
0
# 初始化已下载大小
chunk_size
=
1024
# 每次下载的数据大小
content_size
=
int
(
music
.
headers
[
'content-length'
])
# 下载文件总大小
if
music
.
status_code
==
200
:
# 判断是否响应成功
print
(
'[ 音频 ]:'
+
creat_time
+
author_list
[
i
]
+
'[文件 大小]:{size:.2f} MB'
.
format
(
size
=
content_size
/
chunk_size
/
1024
))
# 开始下载,显示下载文件大小
if
self
.
mode
==
'post'
:
m_url
=
self
.
save
+
self
.
mode
+
"/"
+
nickname
[
i
]
+
"/"
+
creat_time
+
re
.
sub
(
r'[\\/:*?"<>|\r\n]+'
,
"_"
,
music_title
)
+
'_'
+
author_list
[
i
]
+
'.mp3'
else
:
m_url
=
self
.
save
+
self
.
mode
+
"/"
+
self
.
nickname
+
"/"
+
str
(
self
.
like_counts
)
+
'、'
+
re
.
sub
(
r'[\\/:*?"<>|\r\n]+'
,
"_"
,
music_title
)
+
'_'
+
author_list
[
i
]
+
'.mp3'
with
open
(
m_url
,
'wb'
)
as
file
:
# 显示进度条
for
data
in
music
.
iter_content
(
chunk_size
=
chunk_size
):
file
.
write
(
data
)
size
+=
len
(
data
)
print
(
'
\r
'
+
'[下载进度]:
%
s
%.2
f
%%
'
%
(
'>'
*
int
(
size
*
50
/
content_size
),
float
(
size
/
content_size
*
100
)),
end
=
' '
)
end
=
time
.
time
()
# 下载结束时间
print
(
'
\n
'
+
'[下载完成]:耗时:
%.2
f秒
\n
'
%
(
end
-
start
))
# 输出下载用时时间
except
Exception
as
error
:
print
(
error
)
print
(
'
\r
[ 警告 ]:下载音频出错!
\r
'
)
# 尝试下载视频
try
:
video
=
requests
.
get
(
video_list
[
i
])
# 视频信息
t_video
=
requests
.
get
(
url
=
new_video_list
[
i
],
headers
=
self
.
headers
)
.
content
# 视频内容
start
=
time
.
time
()
# 下载开始时间
size
=
0
# 初始化已下载大小
chunk_size
=
1024
# 每次下载的数据大小
content_size
=
int
(
video
.
headers
[
'content-length'
])
# 下载文件总大小
try
:
if
video
.
status_code
==
200
:
# 判断是否响应成功
print
(
'[ 视频 ]:'
+
creat_time
+
author_list
[
i
]
+
'[文件 大小]:{size:.2f} MB'
.
format
(
size
=
content_size
/
chunk_size
/
1024
))
# 开始下载,显示下载文件大小
if
self
.
mode
==
'post'
:
v_url
=
self
.
save
+
self
.
mode
+
"/"
+
nickname
[
i
]
+
'/'
+
creat_time
+
re
.
sub
(
r'[\\/:*?"<>|\r\n]+'
,
"_"
,
author_list
[
i
])
+
'.mp4'
else
:
v_url
=
self
.
save
+
self
.
mode
+
"/"
+
self
.
nickname
+
'/'
+
str
(
self
.
like_counts
)
+
'、'
+
re
.
sub
(
r'[\\/:*?"<>|\r\n]+'
,
"_"
,
author_list
[
i
])
+
'.mp4'
with
open
(
v_url
,
'wb'
)
as
file
:
# 显示进度条
for
data
in
video
.
iter_content
(
chunk_size
=
chunk_size
):
size
+=
len
(
data
)
print
(
'
\r
'
+
'[下载进度]:
%
s
%.2
f
%%
'
%
(
'>'
*
int
(
size
*
50
/
content_size
),
float
(
size
/
content_size
*
100
)),
end
=
' '
)
file
.
write
(
t_video
)
end
=
time
.
time
()
# 下载结束时间
print
(
'
\n
'
+
'[下载完成]:耗时:
%.2
f秒
\n
'
%
(
end
-
start
))
# 输出下载用时时间
except
Exception
as
error
:
print
(
'[ 警告 ]:下载视频出错!'
)
print
(
'[ 警告 ]:'
,
error
,
'
\r
'
)
except
Exception
as
error
:
# print(error)
print
(
'[ 提示 ]:该页视频资源没有'
,
self
.
count
,
'个,已为您跳过!
\r
'
)
break
# 获取下一页信息
self
.
next_data
(
max_cursor
)
# 主模块执行
if
__name__
==
"__main__"
:
# 获取命令行函数
def
get_args
(
user
,
dir
,
music
,
count
,
mode
):
# 新建TK实例
TK
=
TikTok
()
# 命令行传参
TK
.
setting
(
user
,
music
,
count
,
dir
,
mode
)
input
(
'[ 完成 ]:已完成批量下载,输入任意键后退出:'
)
sys
.
exit
(
0
)
try
:
parser
=
argparse
.
ArgumentParser
(
description
=
'TikTokMulti V1.2.5 使用帮助'
)
parser
.
add_argument
(
'--user'
,
'-u'
,
type
=
str
,
help
=
'为用户主页链接,非必要参数'
,
required
=
False
)
parser
.
add_argument
(
'--dir'
,
'-d'
,
type
=
str
,
help
=
'视频保存目录,非必要参数, 默认/Download'
,
default
=
'./Download/'
)
#parser.add_argument('--single', '-s', type=str, help='单条视频链接,非必要参数,与--user参数冲突')
parser
.
add_argument
(
'--music'
,
'-m'
,
type
=
str
,
help
=
'视频音乐下载,非必要参数, 默认no可选yes'
,
default
=
'no'
)
parser
.
add_argument
(
'--count'
,
'-c'
,
type
=
int
,
help
=
'单页下载的数量,默认参数 35 无须修改'
,
default
=
35
)
parser
.
add_argument
(
'--mode'
,
'-M'
,
type
=
str
,
help
=
'下载模式选择,默认post:发布的视频 可选like:点赞视频(需要开放权限)'
,
default
=
'post'
)
args
=
parser
.
parse_args
()
user
=
input
(
'请输入用户个人主页地址:'
)
# 获取命令行
get_args
(
user
,
args
.
dir
,
args
.
music
,
args
.
count
,
args
.
mode
)
except
Exception
as
e
:
# print(e)
print
(
'[ 提示 ]:未输入命令或意外出错,自动退出!'
)
sys
.
exit
(
0
)
\ No newline at end of file
编写
预览
支持
Markdown
格式
附加文件
你添加了
0
人
到此讨论。请谨慎行事。
Finish editing this message first!
Cancel
请
注册
或
登录
后发表评论