python

超轻量级php框架startmvc

libreoffice python 操作word及excel文档的方法

更新时间:2020-07-11 16:42:01 作者:startmvc
1、开始、关闭libreoffice服务;开始之前同步字体文件时间,是因为创建soffice服务时,服务

1、开始、关闭libreoffice服务;

开始之前同步字体文件时间,是因为创建soffice服务时,服务会检查所需加载的文件的时间,如果其认为时间不符,则其可能会重新加载,耗时较长,因此需事先统一时间。

使用时如果需要多次调用,最后每次调用均开启后关闭,否则libreoffice会创建一个缓存文档并越用越大,处理时间会增加。


class OfficeProcess(object):
 def __init__(self):
 self.p = 0
 subprocess.Popen('find /usr/share/fonts | xargs touch -m -t 201801010000.00', shell=True)

 def start_office(self):
 self.p = subprocess.Popen('soffice --pidfile=sof.pid --invisible --accept="socket,host=localhost,port=2002;urp;"', shell=True)
 while True:
 try:
 local_context = uno.getComponentContext()
 resolver = local_context.getServiceManager().createInstanceWithContext('com.sun.star.bridge.UnoUrlResolver', local_context)
 resolver.resolve('uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext')
 return
 except:
 print(ts(), "wait for connecting soffice...")
 time.sleep(1)
 continue

 def stop_office(self):
 with open("sof.pid", "rb") as f:
 try:
 os.kill(int(f.read()), signal.SIGTERM)
 self.p.wait()
 except:
 pass

2、init service manager


local_context = uno.getComponentContext()
 service_manager = local_context.getServiceManager()
 resolver = service_manager.createInstanceWithContext('com.sun.star.bridge.UnoUrlResolver', local_context)
 self.ctx = resolver.resolve('uno:socket,host=localhost,port=2002;urp;StarOffice.ComponentContext')
 self.smgr = self.ctx.ServiceManager
 self.desktop = self.smgr.createInstanceWithContext('com.sun.star.frame.Desktop', self.ctx)

3、从二进制数据中读取doc文档


def ImportFromMemory(self, data):
 istream = self.smgr.createInstanceWithContext('com.sun.star.io.SequenceInputStream', self.ctx)
 istream.initialize((uno.ByteSequence(data), ))
 pv = PropertyValue()
 pv.Name = 'InputStream'
 pv.Value = istream
 self.doc = {'doc': []}
 try:
 self.document = self.desktop.loadComponentFromURL('private:stream/swriter', '_blank', 0, (pv, ))
 self.text = self.document.getText()
 except:
 self.text = None

4、读取doc文档中的数据


def ExportToJson(self):
 try:
 l = self.__ParseText(self.text, self.__Callback(self.doc['doc']))
 self.doc['length'] = l
 except:
 self.doc = {'doc': [], 'length': 0}
 return json.dumps(self.doc)

@staticmethod
 def __Callback(alist):
 def Append(sth):
 alist.append(sth)
 return Append


def __ParseText(self, text, func):
 l = 0
 text_it = text.createEnumeration()
 while text_it.hasMoreElements():
 element = text_it.nextElement()
 if element.supportsService('com.sun.star.text.Paragraph'):
 l += self.__ParseParagraph(element, func)
 elif element.supportsService('com.sun.star.text.TextTable'):
 l += self.__ParseTable(element, func)
 else:
 pass
 return l

def __ParseParagraph(self, paragraph, func):
 p = {'paragraph': []}
 l = 0
 paragraph_it = paragraph.createEnumeration()
 while paragraph_it.hasMoreElements():
 portion = paragraph_it.nextElement()
 if portion.TextPortionType == 'Text':
 l += self.__ParsePortionText(portion, self.__Callback(p['paragraph']))
 elif portion.TextPortionType == 'SoftPageBreak':
 pass
 elif portion.TextPortionType == 'TextField':
 l += self.__ParsePortionText(portion, self.__Callback(p['paragraph']))
 else:
 l += self.__ParseTextContent(portion, self.__Callback(p['paragraph']))
 if hasattr(paragraph, 'createContentEnumeration'):
 l += self.__ParseTextContent(paragraph, self.__Callback(p['paragraph']))
 p['length'] = l
 func(p)
 return l

 def __ParseTextContent(self, textcontent, func):
 l = 0
 content_it = textcontent.createContentEnumeration('com.sun.star.text.TextContent')
 while content_it.hasMoreElements():
 element = content_it.nextElement()
 if element.supportsService('com.sun.star.text.TextGraphicObject'):
 l += self.__ParsePortionGraphic(element, func)
 elif element.supportsService('com.sun.star.text.TextEmbeddedObject'):
 pass
 elif element.supportsService('com.sun.star.text.TextFrame'):
 l += self.__ParseFrame(element, func)
 elif element.supportsService('com.sun.star.drawing.GroupShape'):
 l += self.__ParseGroup(element, func)
 else:
 pass
 return l

 def __ParseFrame(self, frame, func):
 f = {'frame': []}
 l = self.__ParseText(frame.getText(), self.__Callback(f['frame']))
 f['length'] = l
 func(f)
 return l

 def __ParseGroup(self, group, func):
 l = 0
 for i in range(group.getCount()):
 it = group.getByIndex(i)
 if it.supportsService('com.sun.star.drawing.Text'):
 l += self.__ParseFrame(it, func)
 else:
 pass
 return l

 def __ParsePortionText(self, portion_text, func):
 func({'portion': portion_text.String, 'length': len(portion_text.String)})
 return len(portion_text.String)

 def __ParsePortionGraphic(self, portion_graphic, func):
 gp = self.smgr.createInstanceWithContext('com.sun.star.graphic.GraphicProvider', self.ctx)
 stream = self.smgr.createInstanceWithContext('com.sun.star.io.TempFile', self.ctx)
 pv1 = PropertyValue()
 pv1.Name = 'OutputStream'
 pv1.Value = stream
 pv2 = PropertyValue()
 pv2.Name = 'MimeType'
 pv2.Value = 'image/png'
 gp.storeGraphic(portion_graphic.Graphic, (pv1, pv2))
 stream.getOutputStream().flush()
 stream.seek(0)
 l = stream.getInputStream().available()
 b = uno.ByteSequence(b'')
 stream.seek(0)
 l, b = stream.getInputStream().readBytes(b, l)
 img = {'image': base64.b64encode(b.value).decode('ascii')}
 img['height'] = portion_graphic.Height
 img['width'] = portion_graphic.Width
 img['actualheight'] = portion_graphic.ActualSize.Height
 img['actualwidth'] = portion_graphic.ActualSize.Width
 img['croptop'] = portion_graphic.GraphicCrop.Top
 img['cropbottom'] = portion_graphic.GraphicCrop.Bottom
 img['cropleft'] = portion_graphic.GraphicCrop.Left
 img['cropright'] = portion_graphic.GraphicCrop.Right
 img['length'] = 0
 func(img)
 return 0

 def __ParseTable(self, table, func):
 l = 0
 try:
 matrix = self.__GetTableMatrix(table)
 seps = self.__GetTableSeparators(table)
 t = {}
 count = 0
 for ri in matrix.keys():
 t[ri] = {}
 for ci in matrix[ri].keys():
 t[ri][ci] = dict(matrix[ri][ci])
 del t[ri][ci]['cell']
 t[ri][ci]['content'] = []
 l += self.__ParseText(matrix[ri][ci]['cell'], self.__Callback(t[ri][ci]['content']))
 count += t[ri][ci]['rowspan'] * t[ri][ci]['colspan']
 if count != len(t) * len(seps):
 raise ValueError('count of cells error')
 func({'table': t, 'row': len(t), 'column': len(seps), 'length': l, 'tableid': self.table_id})
 self.table_id += 1
 except:
 l = 0
 print('discard wrong table')
 return l

 @staticmethod
 def __GetTableSeparators(table):
 result = [table.TableColumnRelativeSum]
 for ri in range(table.getRows().getCount()):
 result += [s.Position for s in table.getRows().getByIndex(ri).TableColumnSeparators]
 result = sorted(set(result))
 for i in range(len(result) - 1):
 result[i] += 1 if result[i] + 1 == result[i + 1] else 0
 return sorted(set(result))

 @staticmethod
 def __NameToRC(name):
 r = int(re.sub('[A-Za-z]', '', name)) - 1
 cstr = re.sub('[0-9]', '', name)
 c = 0
 for i in range(len(cstr)):
 if cstr[i] >= 'A' and cstr[i] <= 'Z':
 c = c * 52 + ord(cstr[i]) - ord('A')
 else:
 c = c * 52 + 26 + ord(cstr[i]) - ord('a')
 return r, c

 @staticmethod
 def __GetTableMatrix(table):
 result = {}
 for name in table.getCellNames():
 ri, ci = WordToJson.__NameToRC(name)
 cell = table.getCellByName(name)
 if ri not in result:
 result[ri] = {}
 result[ri][ci] = {'cell': cell, 'rowspan': cell.RowSpan, 'name': name}

 seps = WordToJson.__GetTableSeparators(table)
 for ri in result.keys():
 sep = [s.Position for s in table.getRows().getByIndex(ri).TableColumnSeparators] + [table.TableColumnRelativeSum]
 sep = sorted(set(sep))
 for ci in result[ri].keys():
 right = seps.index(sep[ci]) if sep[ci] in seps else seps.index(sep[ci] + 1)
 left = -1 if ci == 0 else seps.index(sep[ci - 1]) if sep[ci - 1] in seps else seps.index(sep[ci - 1] + 1)
 result[ri][ci]['colspan'] = right - left
 return result

5、写doc文档


self.doco = self.desktop.loadComponentFromURL('private:factory/swriter', '_blank', 0, ())
 self.texto = self.doco.getText()
 self.cursoro = self.texto.createTextCursor()
 self.cursoro.ParaBottomMargin = 500

def __WriteText(self, text, texto, cursoro):
 for it in text:
 if 'paragraph' in it:
 self.__WriteParagraph(it, texto, cursoro)
 elif 'image' in it:
 self.__WritePortionGraphic(it, texto, cursoro)
 elif 'table' in it:
 self.__WriteTable(it, texto, cursoro)

 def __WriteParagraph(self, paragraph, texto, cursoro):
 if paragraph['length'] > 0:
 if 'result' in paragraph:
 for it in paragraph['result']:
 texto.insertString(cursoro, it['trans_sen'], False)
 else:
 texto.insertString(cursoro, paragraph['paragraph'], False)
 texto.insertControlCharacter(cursoro, ControlCharacter.PARAGRAPH_BREAK, False)

 def __WritePortionGraphic(self, portion_graphic, texto, cursoro):
 png_base64 = portion_graphic['image']
 png = base64.b64decode(png_base64)
 gp = self.smgr.createInstanceWithContext('com.sun.star.graphic.GraphicProvider', self.ctx)
 istream = self.smgr.createInstanceWithContext('com.sun.star.io.SequenceInputStream', self.ctx)
 istream.initialize((uno.ByteSequence(png), ))
 pv = PropertyValue()
 pv.Name = 'InputStream'
 pv.Value = istream

 actualsize = uno.createUnoStruct('com.sun.star.awt.Size')
 actualsize.Height = portion_graphic['actualheight'] if 'actualheight' in portion_graphic else portion_graphic['height']
 actualsize.Width = portion_graphic['actualwidth'] if 'actualwidth' in portion_graphic else portion_graphic['width']
 graphiccrop = uno.createUnoStruct('com.sun.star.text.GraphicCrop')
 graphiccrop.Top = portion_graphic['croptop'] if 'croptop' in portion_graphic else 0
 graphiccrop.Bottom = portion_graphic['cropbottom'] if 'cropbottom' in portion_graphic else 0
 graphiccrop.Left = portion_graphic['cropleft'] if 'cropleft' in portion_graphic else 0
 graphiccrop.Right = portion_graphic['cropright'] if 'cropright' in portion_graphic else 0

 image = self.doco.createInstance('com.sun.star.text.TextGraphicObject')
 image.Surround = NONE
 image.Graphic = gp.queryGraphic((pv, ))
 image.Height = portion_graphic['height']
 image.Width = portion_graphic['width']
 image.setPropertyValue('ActualSize', actualsize)
 image.setPropertyValue('GraphicCrop', graphiccrop)
 texto.insertTextContent(cursoro, image, False)
 texto.insertControlCharacter(cursoro, ControlCharacter.PARAGRAPH_BREAK, False)

 def __WriteTable(self, table, texto, cursoro):
 tableo = self.doco.createInstance('com.sun.star.text.TextTable')
 tableo.initialize(table['row'], table['column'])
 texto.insertTextContent(cursoro, tableo, False)
# texto.insertControlCharacter(cursoro, ControlCharacter.PARAGRAPH_BREAK, False)
 tcursoro = tableo.createCursorByCellName("A1")

 hitbug = False
 if table['row'] > 1:
 tcursoro.goDown(1, True)
 hitbug = tcursoro.getRangeName() == 'A1'

 for ri in sorted([int(r) for r in table['table'].keys()]):
 rs = table['table'][str(ri)]
 for ci in sorted([int(c) for c in rs.keys()]):
 cell = rs[str(ci)]
 if hitbug == False and (cell['rowspan'] > 1 or cell['colspan'] > 1):
 tcursoro.gotoCellByName(cell['name'], False)
 if cell['rowspan'] > 1:
 tcursoro.goDown(cell['rowspan'] - 1, True)
 if cell['colspan'] > 1:
 tcursoro.goRight(cell['colspan'] - 1, True)
 tcursoro.mergeRange()
 ctexto = tableo.getCellByName(cell['name'])
 if ctexto == None:
 continue
 ccursoro = ctexto.createTextCursor()
 ccursoro.CharWeight = FontWeight.NORMAL
 ccursoro.CharWeightAsian = FontWeight.NORMAL
 ccursoro.ParaAdjust = LEFT
 self.__WriteText(cell['content'], ctexto, ccursoro)

6、生成二进制的doc文档数据


 streamo = self.smgr.createInstanceWithContext('com.sun.star.io.Pipe', self.ctx)
 self.doco.storeToURL('private:stream', (PropertyValue('FilterName', 0, 'MS Word 2007 XML', 0), PropertyValue('OutputStream', 0, streamo, 0)))
 streamo.flush()
 _, datao = streamo.readBytes(None, streamo.available())

7、从doc文档数据生成pdf的二进制数据


 streamo = self.smgr.createInstanceWithContext('com.sun.star.io.Pipe', self.ctx)
 self.doco.storeToURL('private:stream', (PropertyValue('FilterName', 0, 'writer_pdf_Export', 0), PropertyValue('OutputStream', 0, streamo, 0)))
 streamo.flush()
 _, datap = streamo.readBytes(None, streamo.available())

8、读取excel二进制数据


  def ImportFromMemory(self, data):
 istream = self.smgr.createInstanceWithContext('com.sun.star.io.SequenceInputStream', self.ctx)
 istream.initialize((uno.ByteSequence(data), ))
 pv = PropertyValue()
 pv.Name = 'InputStream'
 pv.Value = istream
 self.doc = {'doc': []}
 try:
 print("before loadComponentFromURL")
 self.document = self.desktop.loadComponentFromURL('private:stream/scalc', '_blank', 0, (pv, ))
 self.sheets = self.document.getSheets()
 print("ImportFromMemory done")
 except:
 print("ImportFromMemory failed")
 self.sheets = None

9、读取excel的文本数据


 def ExportToJson(self):
 try:
 l = self.__ParseText(self.sheets, self.__Callback(self.doc['doc']))
 self.doc['length'] = l
 except:
 self.doc = {'doc': [], 'length': 0}
 return json.dumps(self.doc)

 def __ParseText(self, sheets, func):
 l = 0
 sheets_it = sheets.createEnumeration()
 while sheets_it.hasMoreElements():
 element = sheets_it.nextElement()
 if element.supportsService('com.sun.star.sheet.Spreadsheet'):
 l += self.__ParseSpreadsheet(element, func)
 return l

 def __ParseSpreadsheet(self, spreadsheet, func):
 l = 0
 p = {'spreadsheet': []}
 visible_cells_it = spreadsheet.queryVisibleCells().getCells().createEnumeration()
 while visible_cells_it.hasMoreElements():
 cell = visible_cells_it.nextElement()
 type = cell.getType()
 if type == self.EMPTY:
 print("cell.type==empty")
 elif type == self.VALUE:
 print("cell.type==VALUE", "value=", cell.getValue(), cell.getCellAddress ())
 elif type == self.TEXT:
 print("cell.type==TEXT","content=", cell.getString().encode("UTF-8"), cell.getCellAddress ())
 l += self.__ParseCellText(spreadsheet, cell, self.__Callback(p['spreadsheet']))
 print("__ParseCellText=", p)
 elif type == self.FORMULA:
 print("cell.type==FORMULA", "formula=", cell.getValue())
 p['length'] = l
 func(p)
 return l

 def __ParseCellText(self, sheet, cell, func):
 try:
 x = cell.getCellAddress().Column
 y = cell.getCellAddress().Row
 sheetname = sheet.getName()
 except:
 x = -1
 y = -1
 sheetname = None
 func({'celltext': cell.getString(), 'x': x, 'y': y, 'sheetname': sheetname, 'length': len(cell.getString())})
 return len(cell.getString())

     self.EMPTY = uno.Enum("com.sun.star.table.CellContentType", "EMPTY")
 self.TEXT = uno.Enum("com.sun.star.table.CellContentType", "TEXT")
 self.FORMULA = uno.Enum("com.sun.star.table.CellContentType", "FORMULA")
 self.VALUE = uno.Enum("com.sun.star.table.CellContentType", "VALUE")

10、替换excel的文本信息


 def ImportFromJson(self, data):
 doc = json.loads(data)
 try:
 self.__WriteText(doc['doc'])
 except:
 pass

def __WriteText(self, text):
 print("__WriteText begin:", text)
 sheet = None
 for it in text:
 if 'paragraph' in it and 'sheetname' in it:
 if sheet == None or sheet.getName() != it['sheetname']:
 try:
 sheet = self.sheets.getByName(it['sheetname'])
 print("getsheet:", it['sheetname'], "=", sheet.getName())
 except:
 sheet = None
 continue
 self.__WriteParagraph(it, sheet)

 def __WriteParagraph(self, paragraph, sheet):
 print("__WriteParagraph")
 if paragraph['length'] > 0:
 try:
 x = paragraph['x']
 y = paragraph['y']
 print("getcell:", x, y)
 cell = sheet.getCellByPosition(x, y)
 print("getcell done")
 except:
 return
 if 'result' in paragraph:
 for it in paragraph['result']:
 print("cell=", cell.getString())
 cell.setString(it['trans_sen'])
 print("cell,", cell.getString(), ",done")

11、生成excel文档二进制数据


     streamo = self.smgr.createInstanceWithContext('com.sun.star.io.Pipe', self.ctx)
 self.document.storeToURL('private:stream', (PropertyValue('FilterName', 0, 'Calc MS Excel 2007 XML', 0), PropertyValue('OutputStream', 0, streamo, 0)))
 streamo.flush()
 _, datao = streamo.readBytes(None, streamo.available())

12、生成excel的pdf文档


 streamo = self.smgr.createInstanceWithContext('com.sun.star.io.Pipe', self.ctx)
 self.document.storeToURL('private:stream', (PropertyValue('FilterName', 0, 'calc_pdf_Export', 0), PropertyValue('OutputStream', 0, streamo, 0)))
 streamo.flush()
 _, datap = streamo.readBytes(None, streamo.available())

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

libreoffice python 操作word libreoffice python 操作excel