Commit 8db33c07 8db33c07233e6c62bede72c46ab1de4a512301cb by 谢俊文

Initial commit

0 parents
File mode changed
No preview for this file type
No preview for this file type
#!/x/app/python3/bin/python3
# -*- coding: utf-8 -*-
PATH={
'SLICE_OUTPUT': "/storage/SliceFiles/m3u8/",
'CDN_SOURCE': "/storage/m3u8/",
'LOG_PATH': "task.log"
}
URL={
'GET': "http://172.0.31.15:9085/leviathan.admin/FileTask/getTask",
'SLICE_POST': "http://172.0.31.15:9085/leviathan.admin/FileTask/sliceNotify",
'PUSH_POST':"http://172.0.31.15:9085/leviathan.admin/FileTask/sourceCdnPublishNotify"
}
SERVERID={
'ID':"1"
}
#!/bin/bash
#保留文件数
ReservedNum=7
FileDir="/data/bak /data/bak2"
date=$(date "+%Y%m%d-%H%M%S")
for i in $FileDir
do
FileNum=$(ls -l $i|grep ^d |wc -l)
while(( $FileNum > $ReservedNum))
do
OldFile=$(ls -rt $i| head -1)
echo $date "Delete File:"$OldFile
rm -rf $i/$OldFile
let "FileNum--"
done
done
#!/x/app/python3/bin/python3
# -*- coding: utf-8 -*-
import os
import re
import config
import socket
import taskmanage
import logging
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename=config.PATH["LOG_PATH"],
filemode='a+')
class PushTasks:
#初始化属性,导入配置
def __init__(self):
self.__GetUrl=config.URL["GET"]
self.__Post_Push_Url=config.URL["PUSH_POST"]
self.__outPath = config.PATH["SLICE_OUTPUT"]
self.__CDNPath = config.PATH["CDN_SOURCE"]
self.__serverID = config.SERVERID["ID"]
self.__header = {"Content-Type": "application/json-rpc"}
#判断奇偶,判断运行主机
def job_balance(self,num):
int(num)
if (num % 2 == self.__serverID):
return True
else:
return False
#检查输入文件是否存在
def check_file(self,inputfile):
if os.path.exists(inputfile):
return (True,"")
else:
return (False,"丢失切片文件:{0}.".format(inputfile))
#复制迁移
def mv_CDN(self,url):
base=os.path.basename(url)
fileName=os.path.splitext(base)[0]
pattern=re.compile(r'\d+-\d+-\d+')
directory = pattern.findall(url)
pushPath=config.PATH["SLICE_OUTPUT"]+directory[0]+"/"+fileName
CDNPath=config.PATH["CDN_SOURCE"]+directory[0]+"/"+fileName
try:
os.makedirs(CDNPath)
except Exception as e:
logging.debug(str(e))
logging.debug("cp -vrf {0} {1}".format(pushPath,CDNPath))
os.system("cp -vrf {0} {1}".format(pushPath,CDNPath))
#主程序
def main(address):
while 1<2:
Job=PushTasks()
pushJob=taskmanage.TaskManage()
#get方法接任务
try:
ResponseGet=pushJob.get_request(config.URL["GET"],"?start=0&limit=1&status=500&source_cdn_status=0")
except Exception as e:
logging.error(str(e))
continue
resultSet=ResponseGet['resultSet']
if resultSet:
pass
else:
continue
#获取任务ID,待发布文件目录
TaskId=ResponseGet['resultSet'][0]['task_id']
url=ResponseGet['resultSet'][0]['url']
resultm3u8=Job.check_file(config.PATH["SLICE_OUTPUT"]+os.path.splitext(url)[0])
if resultm3u8[0]:
pass
else:
try:
pushJob.change_push_status(config.URL["PUSH_POST"],TaskId,-1,address,resultm3u8[1])
logging.error(resultm3u8[1])
except Exception as e:
logging.error(str(e))
continue
continue
#改任务状态为发布中
try:
pushJob.change_push_status(config.URL["PUSH_POST"],TaskId,10,address,"")
except Exception as e:
logging.error(str(e))
continue
Job.mv_CDN(url)
#改任务状态为发布完成
try:
pushJob.change_push_status(config.URL["PUSH_POST"],TaskId,100,address,"")
except Exception as e:
logging.error(str(e))
continue
#实例化
if __name__ == '__main__':
address=socket.gethostbyname(socket.getfqdn(socket.gethostname()))
try:
main(address)
except Exception as e:
pushJob.change_push_status(config.URL["PUSH_POST"],TaskId,-1,address,str(e))
logging.error(str(e))
#!/x/app/python3/bin/python3
# -*- coding: utf-8 -*-
import urllib.request
import urllib.parse
import urllib.error
import json
class TaskManage:
def __init__(self):
self.__header = {"Content-Type": "application/json-rpc"}
#get请求
def get_request(self,GetUrl,Param):
request = urllib.request.Request(GetUrl+Param)
result = urllib.request.urlopen(request).read().decode('utf-8')
json=eval(result)
return json
#post请求
def post_request(self,PostUrl,data):
request = urllib.request.Request(PostUrl,json.dumps(data).encode('utf-8'),self.__header)
result = urllib.request.urlopen(request).read().decode('utf-8')
#改切片状态
def change_slice_status(self,PostUrl,TaskId,Status,slice_server,slice_error_desc):
data={"task_id" : TaskId, "status": Status, "slice_server": slice_server, "slice_error_desc": slice_error_desc}
return self.post_request(PostUrl,data)
#改发布状态
def change_push_status(self,PostUrl,TaskId,Status,slice_server,source_cdn_error_desc):
data={"task_id" : TaskId, "source_cdn_status": Status, "source_cdn_server": slice_server,"source_cdn_error_desc":source_cdn_error_desc}
return self.post_request(PostUrl,data)
File mode changed
No preview for this file type
No preview for this file type
#!/x/app/python3/bin/python3
# -*- coding: utf-8 -*-
PATH={
'SLICE_OUTPUT': "/storage/SliceFiles/m3u8/",
'SLICE_INPUT': "/storage/smp/download/ts/",
'ZIP_PACKAGE': "/storage/SliceFiles/zip/",
'LOG_PATH': "task.log"
}
URL={
'GET': "http://172.0.31.15:9085/leviathan.admin/FileTask/getTask",
'SLICE_POST': "http://172.0.31.15:9085/leviathan.admin/FileTask/sliceNotify",
'PUSH_POST':"http://172.0.31.15:9085/leviathan.admin/FileTask/sourceCdnPublishNotify"
}
SERVERID={
'ID':"1"
}
#!/bin/bash
#保留文件数
ReservedNum=7
FileDir="/data/bak /data/bak2"
date=$(date "+%Y%m%d-%H%M%S")
for i in $FileDir
do
FileNum=$(ls -l $i|grep ^d |wc -l)
while(( $FileNum > $ReservedNum))
do
OldFile=$(ls -rt $i| head -1)
echo $date "Delete File:"$OldFile
rm -rf $i/$OldFile
let "FileNum--"
done
done
File mode changed
#!/x/app/python3/bin/python3
# -*- coding: utf-8 -*-
import taskmanage
import os
import re
import ffmpy3
import socket
import config
import logging
import time
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename=config.PATH["LOG_PATH"],
filemode='a+')
class FfmpegTasks:
#初始化属性,导入配置
def __init__(self):
self.__GetUrl=config.URL["GET"]
self.__Post_Slice_Url=config.URL["SLICE_POST"]
self.__Post_Push_Url=config.URL["PUSH_POST"]
self.__outputPath = config.PATH["SLICE_OUTPUT"]
self.__inputPath = config.PATH["SLICE_INPUT"]
self.__zipPath = config.PATH["ZIP_PACKAGE"]
self.__serverID = int(config.SERVERID["ID"])
#判断奇偶,判断运行主机
def job_balance(self,num):
int(num)
if (num % 2 == self.__serverID):
return True
else:
return False
#检查输入文件是否存在
def check_file(self,filetype,inputfile):
if os.path.isfile(inputfile):
return (True,"")
else:
return (False,"丢失{0}文件:{1}.".format(filetype,inputfile))
#切片
def slice_task(self,inputfile):
#get方法获取输入文件名,去后缀
base=os.path.basename(inputfile)
fileName=os.path.splitext(base)[0]
#正则匹配日期字符
pattern=re.compile(r'\d+-\d+-\d+')
directory = pattern.findall(inputfile)
#拼接输出目录
outputPath=self.__outputPath + directory[0] +'/'+ fileName +'/'
#拼接打包目录
zipPath=self.__zipPath+directory[0] +'/'
#创建输出目录
try:
os.makedirs(outputPath)
except Exception as e:
logging.debug(str(e))
#创建zip包目录
try:
os.makedirs(zipPath)
except Exception as e:
logging.debug(str(e))
#拼接输出文件名
outputfile=outputPath+fileName+'%03d.ts'
#封装ffmpeg 命令
ff = ffmpy3.FFmpeg(
inputs={self.__inputPath+inputfile: None},
outputs={outputfile: '-c copy -map 0 -f segment -segment_list {0} -segment_time 10'.format(outputPath + "playlist.m3u8")})
#执行命令
logging.info("start slice {0}".format(self.__inputPath+inputfile))
ff.run()
logging.info("finish slice {0}".format(self.__inputPath+inputfile))
#返回输出打包目录,输出目录
return (self.__zipPath+directory[0] +'/'+ fileName+".zip",outputPath)
#打zip包
def zip_m3u8(self,Pathzip,outputPath):
logging.info("start zip {0}".format(outputPath))
os.system("zip -jr {0} {1}".format(Pathzip,outputPath))
logging.info("finish zip {0}".format(outputPath))
#主程序
def main(address):
while 1<2:
Job=FfmpegTasks()
pushJob=taskmanage.TaskManage()
#改200状态: 200=待切片 300=切片中 301=切片失败 500=全部完成
#get方法接任务
try:
ResponseGet=pushJob.get_request(config.URL["GET"],"?start=0&limit=1&status=200")
except Exception as e:
logging.error(str(e))
continue
#获取任务ID,待切片文件目录
resultSet=ResponseGet['resultSet']
if resultSet:
pass
else:
time.sleep(30)
continue
TaskId=ResponseGet['resultSet'][0]['task_id']
inputfile=ResponseGet['resultSet'][0]['url']
#检查download文件状态
resultDownload=Job.check_file("download",config.PATH["SLICE_INPUT"]+inputfile)
if resultDownload[0]:
pass
else:
try:
pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,305,address,resultDownload[1])
logging.error(resultDownload[1])
except Exception as e:
logging.error(str(e),resultDownload[1])
break
continue
#改任务状态为切片中
try:
pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,300,address,'')
except Exception as e:
logging.error(str(e))
continue
#执行切片
try:
outPut=Job.slice_task(inputfile)
#改任务状态为切片完成
#try:
# pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,500,address,'')
#except Exception as e:
# logging.error(str(e))
# break
except Exception as e:
try:
pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,305,address,'')
except Exception as e:
logging.error(str(e))
continue
#打包
try:
Job.zip_m3u8(outPut[0],outPut[1])
#改任务状态为待发布
try:
pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,500,address,'')
except Exception as e:
logging.error(str(e))
continue
except Exception as e:
try:
pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,305,address,'')
except Exception as e:
logging.error(str(e))
continue
#检查zip文件状态
resultZip=Job.check_file("zip",outPut[0])
if resultZip[0]:
pass
else:
try:
pushJob.change_slice_status(config.URL["SLICE_POST"],TaskId,305,address,resultZip[1])
logging.error(resultZip[1])
except Exception as e:
logging.error(str(e))
continue
continue
#实例化
if __name__ == '__main__':
address="172.25.50.2"
main(address)
This diff could not be displayed because it is too large.
#!/x/app/python3/bin/python3
# -*- coding: utf-8 -*-
import urllib.request
import urllib.parse
import urllib.error
import json
class TaskManage:
def __init__(self):
self.__header = {"Content-Type": "application/json-rpc"}
#get请求
def get_request(self,GetUrl,Param):
request = urllib.request.Request(GetUrl+Param)
result = urllib.request.urlopen(request).read().decode('utf-8')
json=eval(result)
return json
#post请求
def post_request(self,PostUrl,data):
request = urllib.request.Request(PostUrl,json.dumps(data).encode('utf-8'),self.__header)
result = urllib.request.urlopen(request).read().decode('utf-8')
#改切片状态
def change_slice_status(self,PostUrl,TaskId,Status,slice_server,slice_error_desc):
data={"task_id" : TaskId, "status": Status, "slice_server": slice_server, "slice_error_desc": slice_error_desc}
return self.post_request(PostUrl,data)
#改发布状态
def change_push_status(self,PostUrl,TaskId,Status,slice_server,source_cdn_error_desc):
data={"task_id" : TaskId, "source_cdn_status": Status, "source_cdn_server": slice_server,"source_cdn_error_desc":source_cdn_error_desc}
return self.post_request(PostUrl,data)