slice.py
4.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/x/app/python3/bin/python3
# -*- coding: utf-8 -*-
import taskmanage
import os
import re
import ffmpy3
import socket
import config
import logging
import time
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename=config.PATH["LOG_PATH"],
filemode='a+')
class FfmpegTasks:
#初始化属性,导入配置
def __init__(self):
self.__GetUrl=config.URL["GET"]
self.__Post_Slice_Url=config.URL["SLICE_POST"]
self.__Post_Push_Url=config.URL["PUSH_POST"]
self.__outputPath = config.PATH["SLICE_OUTPUT"]
self.__inputPath = config.PATH["SLICE_INPUT"]
self.__zipPath = config.PATH["ZIP_PACKAGE"]
self.__serverID = int(config.SERVERID["ID"])
#判断奇偶,判断运行主机
def job_balance(self,num):
int(num)
if (num % 2 == self.__serverID):
return True
else:
return False
#检查输入文件是否存在
def check_file(self,filetype,inputfile):
if os.path.isfile(inputfile):
return (True,"")
else:
return (False,"丢失{0}文件:{1}.".format(filetype,inputfile))
#切片
def slice_task(self,inputfile):
#get方法获取输入文件名,去后缀
base=os.path.basename(inputfile)
fileName=os.path.splitext(base)[0]
#正则匹配日期字符
pattern=re.compile(r'\d+-\d+-\d+')
directory = pattern.findall(inputfile)
#拼接输出目录
outputPath=self.__outputPath + directory[0] +'/'+ fileName +'/'
#拼接打包目录
zipPath=self.__zipPath+directory[0] +'/'
#创建输出目录
try:
os.makedirs(outputPath)
except Exception as e:
logging.debug(str(e))
#创建zip包目录
try:
os.makedirs(zipPath)
except Exception as e:
logging.debug(str(e))
#拼接输出文件名
outputfile=outputPath+fileName+'%03d.ts'
#封装ffmpeg 命令
ff = ffmpy3.FFmpeg(
inputs={self.__inputPath+inputfile: None},
outputs={outputfile: '-c copy -map 0 -f segment -segment_list {0} -segment_time 10'.format(outputPath + "playlist.m3u8")})
#执行命令
logging.info("start slice {0}".format(self.__inputPath+inputfile))
ff.run()
logging.info("finish slice {0}".format(self.__inputPath+inputfile))
#返回输出打包目录,输出目录
return (self.__zipPath+directory[0] +'/'+ fileName+".zip",outputPath)
#打zip包
def zip_m3u8(self,Pathzip,outputPath):
logging.info("start zip {0}".format(outputPath))
os.system("zip -jr {0} {1}".format(Pathzip,outputPath))
logging.info("finish zip {0}".format(outputPath))
#主程序
def main(address):
while 1<2:
Job=FfmpegTasks()
pushJob=taskmanage.TaskManage()
#改200状态: 200=待切片 300=切片中 301=切片失败 500=全部完成
#get方法接任务
try:
ResponseGet=pushJob.get_request(config.URL["GET"],"?start=0&limit=1&status=200")
except Exception as e:
logging.error(str(e))
continue
#获取任务ID,待切片文件目录
resultSet=ResponseGet['resultSet']
if resultSet:
pass
else:
time.sleep(30)
continue
TaskId=ResponseGet['resultSet'][0]['task_id']
inputfile=ResponseGet['resultSet'][0]['url']
#检查download文件状态
resultDownload=Job.check_file("download",config.PATH["SLICE_INPUT"]+inputfile)
if resultDownload[0]:
pass
else:
try:
pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,305,address,resultDownload[1])
logging.error(resultDownload[1])
except Exception as e:
logging.error(str(e),resultDownload[1])
break
continue
#改任务状态为切片中
try:
pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,300,address,'')
except Exception as e:
logging.error(str(e))
continue
#执行切片
try:
outPut=Job.slice_task(inputfile)
#改任务状态为切片完成
#try:
# pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,500,address,'')
#except Exception as e:
# logging.error(str(e))
# break
except Exception as e:
try:
pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,305,address,'')
except Exception as e:
logging.error(str(e))
continue
#打包
try:
Job.zip_m3u8(outPut[0],outPut[1])
#改任务状态为待发布
try:
pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,500,address,'')
except Exception as e:
logging.error(str(e))
continue
except Exception as e:
try:
pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,305,address,'')
except Exception as e:
logging.error(str(e))
continue
#检查zip文件状态
resultZip=Job.check_file("zip",outPut[0])
if resultZip[0]:
pass
else:
try:
pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,305,address,resultZip[1])
logging.error(resultZip[1])
except Exception as e:
logging.error(str(e))
continue
continue
#实例化
if __name__ == '__main__':
address="172.25.50.2"
main(address)