python

超轻量级php框架startmvc

python3实现ftp服务功能(服务端 For Linux)

更新时间:2020-04-28 17:10:01 作者:startmvc
本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下功能

本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下

功能介绍:

可执行的命令:

ls pwd cd put rm get mkdir

1、用户加密认证

2、允许多用户同时登陆

3、每个用户有自己的家目录,且只可以访问自己的家目录

4、运行在自己家目录下随意切换目录

5、允许上传下载文件,且文件一致

6、传输过程中显示进度条 server main 代码:


# Author by Andy
# _*_ coding:utf-8 _*_
import os, sys, json, hashlib, socketserver, time

base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(base_dir)
from conf import userdb_set
class Ftp_server(socketserver.BaseRequestHandler):
 user_home_dir = ''
 def auth(self, *args):
 '''验证用户名及密码'''
 cmd_dic = args[0]
 username = cmd_dic["username"]
 password = cmd_dic["password"]
 f = open(userdb_set.userdb_set(), 'r')
 user_info = json.load(f)
 if username in user_info.keys():
 if password == user_info[username]:
 self.request.send('0'.encode())
 os.chdir('/home/%s' % username)
 self.user_home_dir = os.popen('pwd').read().strip()
 data = "%s login successed" % username
 self.loging(data)
 else:
 self.request.send('1'.encode())
 data = "%s login failed" % username
 self.loging(data)
 f.close
 else:
 self.request.send('1'.encode())
 data = "%s login failed" % username
 self.loging(data)
 f.close
 ##########################################

 def get(self, *args):
 '''给客户端传输文件'''
 request_code = {
 '0': 'file is ready to get',
 '1': 'file not found!'
 }
 cmd_dic = args[0]
 self.loging(json.dumps(cmd_dic))
 filename = cmd_dic["filename"]
 if os.path.isfile(filename):
 self.request.send('0'.encode('utf-8')) # 确认文件存在
 self.request.recv(1024)
 self.request.send(str(os.stat(filename).st_size).encode('utf-8'))
 self.request.recv(1024)
 m = hashlib.md5()
 f = open(filename, 'rb')
 for line in f:
 m.update(line)
 self.request.send(line)
 self.request.send(m.hexdigest().encode('utf-8'))
 print('From server:Md5 value has been sended!')
 f.close()
 else:
 self.request.send('1'.encode('utf-8'))
 ###########################################

 def cd(self, *args):
 '''执行cd命令'''
 user_current_dir = os.popen('pwd').read().strip()
 cmd_dic = args[0]
 self.loging(json.dumps(cmd_dic))
 path = cmd_dic['path']
 if path.startswith('/'):
 if self.user_home_dir in path:
 os.chdir(path)
 new_dir = os.popen('pwd').read()
 user_current_dir = new_dir
 self.request.send('Change dir successfully!'.encode("utf-8"))
 data = 'Change dir successfully!'
 self.loging(data)
 elif os.path.exists(path):
 self.request.send('Permission Denied!'.encode("utf-8"))
 data = 'Permission Denied!'
 self.loging(data)
 else:
 self.request.send('Directory not found!'.encode("utf-8"))
 data = 'Directory not found!'
 self.loging(data)
 elif os.path.exists(path):
 os.chdir(path)
 new_dir = os.popen('pwd').read().strip()
 if self.user_home_dir in new_dir:
 self.request.send('Change dir successfully!'.encode("utf-8"))
 user_current_dir = new_dir
 data = 'Change dir successfully!'
 self.loging(data)
 else:
 os.chdir(user_current_dir)
 self.request.send('Permission Denied!'.encode("utf-8"))
 data = 'Permission Denied!'
 self.loging(data)
 else:
 self.request.send('Directory not found!'.encode("utf-8"))
 data = 'Directory not found!'
 self.loging(data)
 ###########################################

 def rm(self, *args):
 request_code = {
 '0': 'file exist,and Please confirm whether to rm',
 '1': 'file not found!'
 }
 cmd_dic = args[0]
 self.loging(json.dumps(cmd_dic))
 filename = cmd_dic['filename']
 if os.path.exists(filename):
 self.request.send('0'.encode("utf-8")) # 确认文件存在
 client_response = self.request.recv(1024).decode()
 if client_response == '0':
 os.popen('rm -rf %s' % filename)
 self.request.send(('File %s has been deleted!' % filename).encode("utf-8"))
 self.loging('File %s has been deleted!' % filename)
 else:
 self.request.send(('File %s not deleted!' % filename).encode("utf-8"))
 self.loging('File %s not deleted!' % filename)
 else:
 self.request.send('1'.encode("utf-8"))
 ########################################

 def pwd(self, *args):
 '''执行pwd命令'''
 cmd_dic = args[0]
 self.loging(json.dumps(cmd_dic))
 server_response = os.popen('pwd').read().strip().encode("utf-8")
 self.request.send(server_response)

 #############################################
 def ls(self, *args):
 '''执行ls命名'''
 cmd_dic = args[0]
 self.loging(json.dumps(cmd_dic))
 path = cmd_dic['path']
 cmd = 'ls -l %s' % path
 server_response = os.popen(cmd).read().encode("utf-8")
 self.request.send(server_response)

 ############################################
 def put(self, *args):
 '''接收客户端文件'''
 cmd_dic = args[0]
 self.loging(json.dumps(cmd_dic))
 filename = cmd_dic["filename"]
 filesize = cmd_dic["size"]
 if os.path.isfile(filename):
 f = open(filename + '.new', 'wb')
 else:
 f = open(filename, 'wb')
 request_code = {
 '200': 'Ready to recceive data!',
 '210': 'Not ready to received data!'
 }
 self.request.send('200'.encode())
 receive_size = 0
 while True:
 if receive_size < filesize:
 data = self.request.recv(1024)
 f.write(data)
 receive_size += len(data)
 else:
 data = "File %s has been uploaded successfully!" % filename
 self.loging(data)
 print(data)
 break

 ################################################

 def mkdir(self, *args):
 request_code = {
 '0': 'Directory has been made!',
 '1': 'Directory is aleady exist!'
 }
 cmd_dic = args[0]
 self.loging(json.dumps(cmd_dic))
 dir_name = cmd_dic['dir_name']
 if os.path.exists(dir_name):
 self.request.send('1'.encode("utf-8"))
 else:
 os.popen('mkdir %s' % dir_name)
 self.request.send('0'.encode("utf-8"))

 #############################################

 def loging(self, data):
 '''日志记录'''
 localtime = time.asctime(time.localtime(time.time()))
 log_file = '/root/ftp/ftpserver/log/server.log'
 with open(log_file, 'a', encoding='utf-8') as f:
 f.write('%s-->' % localtime + data + '\n')
 ##############################################

 def handle(self):
 # print("您本次访问使用的IP为:%s" %self.client_address[0])
 # localtime = time.asctime( time.localtime(time.time()))
 # print(localtime)

 while True:
 try:
 self.data = self.request.recv(1024).decode() #
 # print(self.data)
 cmd_dic = json.loads(self.data)
 action = cmd_dic["action"]
 # print("用户请求%s"%action)
 if hasattr(self, action):
 func = getattr(self, action)
 func(cmd_dic)
 except Exception as e:
 self.loging(str(e))
 break

def run():
 HOST, PORT = '0.0.0.0', 6969
 print("The server is started,and listenning at port 6969")
 server = socketserver.ThreadingTCPServer((HOST, PORT), Ftp_server)
 server.serve_forever()
if __name__ == '__main__':
 run()


设置用户口令代码:


#Author by Andy
#_*_ coding:utf-8 _*_
import os,json,hashlib,sys

base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
userdb_file = base_dir+"\data\\userdb"
# print(userdb_file)
def userdb_set():
 if os.path.isfile(userdb_file):
 # print(userdb_file)
 return userdb_file
 else:
 print('请先为您的服务器创建用户!')
 user_data = {}
 dict={}
 Exit_flags = True
 while Exit_flags:
 username = input("Please input username:")
 if username != 'exit':
 password = input("Please input passwod:")
 if password != 'exit':
 user_data.update({username:password})
 m = hashlib.md5()
 # m.update('hello')
 # print(m.hexdigest())
 for i in user_data:
 # print(i,user_data[i])
 m.update(user_data[i].encode())
 dict.update({i:m.hexdigest()})
 else:
 break
 else:
 break
 f = open(userdb_file,'w')
 json.dump(dict,f)
 f.close()
 return userdb_file

目录结构:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

python3 ftp 服务端 Linux