Public release from ruodoo-project: 19.0 - 2026-05-31 21:19:12 UTC

This commit is contained in:
CI Publish Bot
2026-05-31 21:19:21 +00:00
commit aa4214c195
1213 changed files with 183945 additions and 0 deletions

23
dms/models/__init__.py Normal file
View File

@ -0,0 +1,23 @@
# Copyright 2024 Subteno - Timothée Vannier (https://www.subteno.com).
# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl).
from . import access_groups
from . import base
from . import mixins_thumbnail
from . import dms_security_mixin
from . import abstract_dms_mixin
from . import storage
from . import directory
from . import dms_file
from . import onboarding_onboarding
from . import onboarding_onboarding_step
from . import dms_category
from . import tag
from . import res_company
from . import res_config_settings
from . import ir_attachment
from . import ir_binary
from . import mail_thread

View File

@ -0,0 +1,57 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from odoo import api, fields, models
class AbstractDmsMixin(models.AbstractModel):
_name = "abstract.dms.mixin"
_description = "Abstract Dms Mixin"
name = fields.Char(required=True, index="btree")
# Only defined to prevent error in other fields that related it
storage_id = fields.Many2one(
comodel_name="dms.storage", string="Storage", store=True, copy=True
)
is_hidden = fields.Boolean(
string="Storage is Hidden",
related="storage_id.is_hidden",
readonly=True,
store=True,
)
company_id = fields.Many2one(
related="storage_id.company_id",
comodel_name="res.company",
string="Company",
readonly=True,
store=True,
index="btree",
)
storage_id_save_type = fields.Selection(related="storage_id.save_type", store=False)
color = fields.Integer(default=0)
category_id = fields.Many2one(
comodel_name="dms.category",
context={"dms_category_show_path": True},
string="Category",
)
@api.model
def search_panel_select_range(self, field_name, **kwargs):
"""Add context to display short folder name."""
_self = self.with_context(
directory_short_name=True, skip_sanitized_parent_hierarchy=True
)
return super(AbstractDmsMixin, _self).search_panel_select_range(
field_name, **kwargs
)
def _search_panel_sanitized_parent_hierarchy(self, records, parent_name, ids):
if self.env.context.get("skip_sanitized_parent_hierarchy"):
all_ids = [value["id"] for value in records]
# Prevent error if user not access to parent record
for value in records:
if value["parent_id"] and value["parent_id"][0] not in all_ids:
value["parent_id"] = False
return records
return super()._search_panel_sanitized_parent_hierarchy(
records=records, parent_name=parent_name, ids=ids
)

177
dms/models/access_groups.py Normal file
View File

@ -0,0 +1,177 @@
# Copyright 2017-2019 MuK IT GmbH
# Copyright 2020 RGB Consulting
# Copyright 2024 Timothée Vannier - Subteno (https://www.subteno.com).
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
from odoo import _, api, fields, models
from odoo.exceptions import ValidationError
class DmsAccessGroups(models.Model):
_name = "dms.access.group"
_description = "Record Access Groups"
_parent_store = True
_parent_name = "parent_group_id"
name = fields.Char(string="Group Name", required=True, translate=True)
parent_path = fields.Char(index="btree")
# Permissions written directly on this group
perm_create = fields.Boolean(string="Create Access")
perm_write = fields.Boolean(string="Write Access")
perm_unlink = fields.Boolean(string="Unlink Access")
# Permissions computed including parent group
perm_inclusive_create = fields.Boolean(
string="Inherited Create Access",
compute="_compute_inclusive_permissions",
store=True,
recursive=True,
)
perm_inclusive_write = fields.Boolean(
string="Inherited Write Access",
compute="_compute_inclusive_permissions",
store=True,
recursive=True,
)
perm_inclusive_unlink = fields.Boolean(
string="Inherited Unlink Access",
compute="_compute_inclusive_permissions",
store=True,
recursive=True,
)
directory_ids = fields.Many2many(
comodel_name="dms.directory",
relation="dms_directory_groups_rel",
string="Directories",
column1="gid",
column2="aid",
auto_join=True,
readonly=True,
)
complete_directory_ids = fields.Many2many(
comodel_name="dms.directory",
relation="dms_directory_complete_groups_rel",
column1="gid",
column2="aid",
string="Complete directories",
auto_join=True,
readonly=True,
)
count_users = fields.Integer(compute="_compute_users", store=True)
count_directories = fields.Integer(compute="_compute_count_directories")
parent_group_id = fields.Many2one(
comodel_name="dms.access.group",
string="Parent Group",
ondelete="cascade",
index="btree",
)
child_group_ids = fields.One2many(
comodel_name="dms.access.group",
inverse_name="parent_group_id",
string="Child Groups",
)
group_ids = fields.Many2many(
comodel_name="res.groups",
relation="dms_access_group_groups_rel",
column1="gid",
column2="rid",
string="Groups",
)
explicit_user_ids = fields.Many2many(
comodel_name="res.users",
relation="dms_access_group_explicit_users_rel",
column1="gid",
column2="uid",
string="Explicit Users",
)
users = fields.Many2many(
comodel_name="res.users",
relation="dms_access_group_users_rel",
column1="gid",
column2="uid",
string="Group Users",
compute="_compute_users",
auto_join=True,
store=True,
recursive=True,
)
@api.depends("directory_ids")
def _compute_count_directories(self):
for record in self:
record.count_directories = len(record.directory_ids)
_sql_constraints = [
("name_uniq", "unique (name)", "The name of the group must be unique!")
]
@api.depends(
"parent_group_id.perm_inclusive_create",
"parent_group_id.perm_inclusive_unlink",
"parent_group_id.perm_inclusive_write",
"parent_path",
"perm_create",
"perm_unlink",
"perm_write",
)
def _compute_inclusive_permissions(self):
"""Provide full permissions inheriting from parent recursively."""
for one in self:
one.update(
{
f"perm_inclusive_{perm}": (
one[f"perm_{perm}"]
or one.parent_group_id[f"perm_inclusive_{perm}"]
)
for perm in ("create", "unlink", "write")
}
)
@api.model
def default_get(self, fields_list):
res = super().default_get(fields_list)
if res.get("explicit_user_ids"):
res["explicit_user_ids"] = res["explicit_user_ids"] + [self.env.uid]
else:
res["explicit_user_ids"] = [(6, 0, [self.env.uid])]
return res
@api.depends(
"parent_group_id",
"parent_group_id.users",
"group_ids",
"group_ids.user_ids",
"explicit_user_ids",
)
def _compute_users(self):
for record in self:
users = (
record.group_ids.user_ids
| record.explicit_user_ids
| record.parent_group_id.users
)
record.update({"users": users, "count_users": len(users)})
def copy_data(self, default=None):
vals_list = super().copy_data(default)
for group, vals in zip(self, vals_list, strict=False):
vals["name"] = _("%s (copy)") % group.name
return vals_list
@api.constrains("parent_path")
def _check_parent_recursiveness(self):
"""
Forbid recursive relationships.
"""
for one in self.filtered("parent_group_id"):
if str(one.id) in one.parent_path.split("/"):
raise ValidationError(
_("Parent group '%(parent)s' is child of '%(current)s'.")
% {
"parent": one.parent_group_id.display_name,
"current": one.display_name,
}
)

33
dms/models/base.py Normal file
View File

@ -0,0 +1,33 @@
# Copyright 2021 Tecnativa - Jairo Llopis
# Copyright 2024 Tecnativa - Víctor Martínez
# Copyright 2024 Subteno - Timothée Vannier (https://www.subteno.com).
# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl).
from odoo import models
class Base(models.AbstractModel):
_inherit = "base"
def unlink(self):
"""Cascade DMS related resources removal.
Avoid executing in ir.* models (ir.mode, ir.model.fields, etc), in transient
models and in the models we want to check."""
result = super().unlink()
if (
self.ids
and not self._name.startswith("ir.")
and not self.is_transient()
and self._name not in ("dms.file", "dms.directory")
):
domain = [("res_model", "=", self._name), ("res_id", "in", self.ids)]
# Has to check if existing before unlinking, because even if the search
# returns an empty recordset, it will still call the unlink method on it.
# This can result in an infinite loop and a recursion depth error.
files = self.env["dms.file"].sudo().search(domain)
if files:
files.unlink()
directories = self.env["dms.directory"].sudo().search(domain)
if directories:
directories.unlink()
return result

777
dms/models/directory.py Normal file
View File

@ -0,0 +1,777 @@
# Copyright 2017-2019 MuK IT GmbH.
# Copyright 2020 Creu Blanca
# Copyright 2021 Tecnativa - Víctor Martínez
# Copyright 2024 Subteno - Timothée Vannier (https://www.subteno.com).
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
import ast
import base64
import logging
import os
from ast import literal_eval
from collections import defaultdict
from typing import Literal # noqa # pylint: disable=unused-import
from odoo import _, api, fields, models, tools
from odoo.exceptions import UserError, ValidationError
from odoo.osv.expression import AND, OR
from odoo.tools import consteq, human_size
from ..tools.file import check_name, unique_name
_logger = logging.getLogger(__name__)
_path = os.path.dirname(os.path.dirname(__file__))
class DmsDirectory(models.Model):
_name = "dms.directory"
_description = "Directory"
_inherit = [
"portal.mixin",
"dms.security.mixin",
"dms.mixins.thumbnail",
"mail.thread",
"mail.activity.mixin",
"mail.alias.mixin",
"abstract.dms.mixin",
]
_rec_name = "complete_name"
_order = "complete_name"
_parent_store = True
_parent_name = "parent_id"
_directory_field = _parent_name
parent_path = fields.Char(index="btree")
is_root_directory = fields.Boolean(
default=False,
help="""Indicates if the directory is a root directory.
A root directory has a settings object, while a directory with a set
parent inherits the settings form its parent.""",
)
# Override acording to defined in AbstractDmsMixin
storage_id = fields.Many2one(
compute="_compute_storage_id",
compute_sudo=True,
readonly=False,
comodel_name="dms.storage",
string="Storage",
ondelete="restrict",
auto_join=True,
store=True,
)
parent_id = fields.Many2one(
comodel_name="dms.directory",
string="Parent Directory",
domain="[('permission_create', '=', True)]",
ondelete="restrict",
# Access to a directory doesn't necessarily mean access its parent, so
# prefetching this field could lead to misleading access errors
prefetch=False,
index="btree",
store=True,
readonly=False,
compute="_compute_parent_id",
copy=True,
default=lambda self: self._default_parent_id(),
)
root_directory_id = fields.Many2one(
"dms.directory", "Root Directory", compute="_compute_root_id", store=True
)
def _default_parent_id(self):
context = self.env.context
if context.get("active_model") == self._name and context.get("active_id"):
return context["active_id"]
else:
return False
group_ids = fields.Many2many(
comodel_name="dms.access.group",
relation="dms_directory_groups_rel",
column1="aid",
column2="gid",
string="Groups",
)
complete_group_ids = fields.Many2many(
comodel_name="dms.access.group",
relation="dms_directory_complete_groups_rel",
column1="aid",
column2="gid",
string="Complete Groups",
compute="_compute_groups",
readonly=True,
store=True,
compute_sudo=True,
recursive=True,
)
complete_name = fields.Char(
compute="_compute_complete_name", store=True, recursive=True
)
child_directory_ids = fields.One2many(
comodel_name="dms.directory",
inverse_name="parent_id",
string="Subdirectories",
auto_join=False,
copy=True,
)
tag_ids = fields.Many2many(
comodel_name="dms.tag",
relation="dms_directory_tag_rel",
domain="""[
'|', ['category_id', '=', False],
['category_id', 'child_of', category_id]]
""",
column1="did",
column2="tid",
string="Tags",
compute="_compute_tags",
readonly=False,
store=True,
)
user_star_ids = fields.Many2many(
comodel_name="res.users",
relation="dms_directory_star_rel",
column1="did",
column2="uid",
string="Stars",
)
starred = fields.Boolean(
compute="_compute_starred",
inverse="_inverse_starred",
search="_search_starred",
)
file_ids = fields.One2many(
comodel_name="dms.file",
inverse_name="directory_id",
string="Files",
auto_join=False,
copy=True,
)
count_directories = fields.Integer(
compute="_compute_count_directories", string="Count Subdirectories Title"
)
count_files = fields.Integer(
compute="_compute_count_files", string="Count Files Title"
)
count_directories_title = fields.Char(
compute="_compute_count_directories", string="Count Subdirectories"
)
count_files_title = fields.Char(
compute="_compute_count_files", string="Count Files"
)
count_elements = fields.Integer(compute="_compute_count_elements")
count_total_directories = fields.Integer(
compute="_compute_count_total_directories", string="Total Subdirectories"
)
count_total_files = fields.Integer(
compute="_compute_count_total_files", string="Total Files"
)
count_total_elements = fields.Integer(
compute="_compute_count_total_elements", string="Total Elements"
)
size = fields.Float(compute="_compute_size")
human_size = fields.Char(
compute="_compute_human_size", string="Size (human readable)"
)
inherit_group_ids = fields.Boolean(string="Inherit Groups", default=True)
alias_process = fields.Selection(
selection=[("files", "Single Files"), ("directory", "Subdirectory")],
required=True,
default="directory",
string="Unpack Emails as",
help="""\
Define how incoming emails are processed:\n
- Single Files: The email gets attached to the directory and
all attachments are created as files.\n
- Subdirectory: A new subdirectory is created for each email
and the mail is attached to this subdirectory. The attachments
are created as files of the subdirectory.
""",
)
@api.model
def _get_domain_by_access_groups(self, operation):
"""Special rules for directories."""
self_filter = [
("storage_id_inherit_access_from_parent_record", "=", False),
("id", "in", self._get_access_groups_query(operation)),
]
# Upstream only filters by parent directory
result = super()._get_domain_by_access_groups(operation)
if operation == "create":
# When creating, I need create access in parent directory, or
# self-create permission if it's a root directory
result = OR(
[
[("is_root_directory", "=", False)] + result,
[("is_root_directory", "=", True)] + self_filter,
]
)
else:
# In other operations, I only need self access
result = self_filter
return result
def _compute_access_url(self):
res = super()._compute_access_url()
for item in self:
item.access_url = f"/my/dms/directory/{item.id}"
return res
def check_access_token(self, access_token=False):
res = False
if access_token:
items = (
self.env["dms.directory"]
.sudo()
.search([("access_token", "=", access_token)])
)
if items:
item = items[0]
if item.id == self.id:
return True
# sudo because the user might not usually have access to the record but
# now the token is valid.
directory_item = self.sudo()
while directory_item.parent_id:
if directory_item.id == item.id:
return True
directory_item = directory_item.parent_id
# Fix last level
if directory_item.id == item.id:
return True
return res
@api.model
def _get_parent_categories(self, access_token):
self.ensure_one()
directories = []
current_directory = self
while current_directory:
directories.insert(0, current_directory)
if (
(
access_token
and current_directory.access_token
and consteq(current_directory.access_token, access_token)
)
or not access_token
and current_directory.check_access("read")
):
return directories
current_directory = current_directory.parent_id
if access_token:
# Reaching here means we didn't find the directory accessible by this token
return [self]
return directories
def _get_own_root_directories(self):
res = self.env["dms.directory"].search_read(
[("is_hidden", "=", False)], ["parent_id"]
)
all_ids = [value["id"] for value in res]
res_ids = []
for item in res:
if not item["parent_id"] or item["parent_id"][0] not in all_ids:
res_ids.append(item["id"])
return res_ids
allowed_model_ids = fields.Many2many(
related="storage_id.model_ids",
comodel_name="ir.model",
)
model_id = fields.Many2one(
comodel_name="ir.model",
domain="[('id', 'in', allowed_model_ids)]",
compute="_compute_model_id",
inverse="_inverse_model_id",
string="Model",
store=True,
)
storage_id_save_type = fields.Selection(
related="storage_id.save_type",
related_sudo=True,
readonly=True,
store=False,
prefetch=False,
)
storage_id_inherit_access_from_parent_record = fields.Boolean(
related="storage_id.inherit_access_from_parent_record",
related_sudo=True,
store=True,
)
@api.depends("res_model")
def _compute_model_id(self):
for record in self:
if not record.res_model:
record.model_id = False
continue
record.model_id = (
self.env["ir.model"].sudo().search([("model", "=", record.res_model)])
)
def _inverse_model_id(self):
for record in self:
record.res_model = record.model_id.model
def toggle_starred(self):
updates = defaultdict(set)
for record in self:
vals = {"starred": not record.starred}
updates[tools.frozendict(vals)].add(record.id)
for vals, ids in updates.items():
self.browse(ids).write(dict(vals))
self.flush_recordset()
# SearchPanel
@api.model
def search_panel_select_range(self, field_name, **kwargs):
context = {}
if field_name == "parent_id":
context["directory_short_name"] = True
return super(
DmsDirectory, self.with_context(**context)
).search_panel_select_range(field_name, **kwargs)
@api.model
def search_panel_select_multi_range(self, field_name, **kwargs):
return super(
DmsDirectory, self.with_context(category_short_name=True)
).search_panel_select_multi_range(field_name, **kwargs)
# Actions
def action_save_onboarding_directory_step(self):
self.env.user.company_id.set_onboarding_step_done(
"documents_onboarding_directory_state"
)
# SearchPanel
@api.model
def _search_panel_directory(self, **kwargs):
search_domain = (kwargs.get("search_domain", []),)
if search_domain and len(search_domain):
for domain in search_domain[0]:
if domain[0] == "parent_id":
return domain[1], domain[2]
return None, None
# Search
@api.model
def _search_starred(self, operator, operand):
if operator == "=" and operand:
return [("user_star_ids", "in", [self.env.uid])]
return [("user_star_ids", "not in", [self.env.uid])]
@api.depends("name", "parent_id.complete_name")
def _compute_complete_name(self):
for category in self:
if category.parent_id:
category.complete_name = (
f"{category.parent_id.complete_name} / {category.name}"
)
else:
category.complete_name = category.name
@api.depends("parent_id")
def _compute_storage_id(self):
for record in self:
if record.parent_id:
record.storage_id = record.parent_id.storage_id
else:
# HACK: Not needed in v14 due to odoo/odoo#64359
record.storage_id = record.storage_id
@api.depends("user_star_ids")
def _compute_starred(self):
for record in self:
record.starred = self.env.user in record.user_star_ids
@api.depends("child_directory_ids")
def _compute_count_directories(self):
for record in self:
directories = len(record.child_directory_ids)
record.count_directories = directories
record.count_directories_title = _("%s Subdirectories") % directories
@api.depends("file_ids")
def _compute_count_files(self):
for record in self:
files = len(record.file_ids)
record.count_files = files
record.count_files_title = _("%s Files") % files
@api.depends("child_directory_ids", "file_ids")
def _compute_count_elements(self):
for record in self:
record.count_elements = record.count_files + record.count_directories
def _compute_count_total_directories(self):
for record in self:
count = (
self.search_count([("id", "child_of", record.id)]) if record.id else 0
)
record.count_total_directories = count - 1 if count > 0 else 0
def _compute_count_total_files(self):
model = self.env["dms.file"]
for record in self:
# Prevent error in some NewId cases
record.count_total_files = (
model.search_count([("directory_id", "child_of", record.id)])
if record.id
else 0
)
def _compute_count_total_elements(self):
for record in self:
record.count_total_elements = (
record.count_total_files + record.count_total_directories
)
def _compute_size(self):
sudo_model = self.env["dms.file"].sudo()
for record in self:
# Avoid NewId
if not record.id:
record.size = 0
continue
recs = sudo_model.search_read(
domain=[("directory_id", "child_of", record.id)],
fields=["size"],
)
record.size = sum(rec.get("size", 0) for rec in recs)
@api.depends("size")
def _compute_human_size(self):
for item in self:
item.human_size = human_size(item.size) if item.size else False
@api.depends(
"group_ids",
"inherit_group_ids",
"parent_id.complete_group_ids",
"parent_path",
)
def _compute_groups(self):
"""Get all DMS security groups affecting this directory."""
for one in self:
groups = one.group_ids
if one.inherit_group_ids:
groups |= one.parent_id.complete_group_ids
self.complete_group_ids = groups
# View
@api.depends("is_root_directory")
def _compute_parent_id(self):
for record in self:
if record.is_root_directory:
record.parent_id = None
else:
# HACK: Not needed in v14 due to odoo/odoo#64359
record.parent_id = record.parent_id
@api.depends("is_root_directory", "parent_id")
def _compute_root_id(self):
for record in self:
if record.is_root_directory:
record.root_directory_id = record
else:
# recursively check all parent nodes up to the root directory
if not record.parent_id.root_directory_id:
record.parent_id._compute_root_id()
record.root_directory_id = record.parent_id.root_directory_id
@api.depends("category_id")
def _compute_tags(self):
for record in self:
tags = record.tag_ids.filtered(
lambda rec, record=record: not rec.category_id
or rec.category_id == record.category_id
)
record.tag_ids = tags
@api.onchange("storage_id")
def _onchange_storage_id(self):
for record in self:
if (
record.storage_id.save_type == "attachment"
and record.storage_id.inherit_access_from_parent_record
):
record.group_ids = False
@api.onchange("model_id")
def _onchange_model_id(self):
self._inverse_model_id()
# Constrains
@api.constrains("parent_id")
def _check_directory_recursion(self):
if self._has_cycle():
raise ValidationError(_("Error! You cannot create recursive directories."))
return True
@api.constrains("storage_id", "model_id")
def _check_storage_id_attachment_model_id(self):
for record in self.filtered(
lambda directory: directory.storage_id.save_type == "attachment"
):
if not record.model_id:
raise ValidationError(
_("A directory has to have model in attachment storage.")
)
if not record.is_root_directory and not record.res_id:
raise ValidationError(
_("This directory needs to be associated to a record.")
)
@api.constrains("is_root_directory", "storage_id")
def _check_directory_storage(self):
for record in self:
if record.is_root_directory and not record.storage_id:
raise ValidationError(_("A root directory has to have a storage."))
@api.constrains("is_root_directory", "parent_id")
def _check_directory_parent(self):
for record in self:
if record.is_root_directory and record.parent_id:
raise ValidationError(
_("A directory can't be a root and have a parent directory.")
)
if not record.is_root_directory and not record.parent_id:
raise ValidationError(_("A directory has to have a parent directory."))
@api.constrains("name")
def _check_name(self):
for record in self:
if self.env.context.get("check_name", True) and not check_name(record.name):
raise ValidationError(_("The directory name is invalid."))
if record.is_root_directory:
children = record.sudo().storage_id.root_directory_ids
else:
children = record.sudo().parent_id.child_directory_ids
if children.filtered(
lambda child, record=record: child.name == record.name
and child != record
):
raise ValidationError(
_("A directory with the same name already exists.")
)
# Create, Update, Delete
def _inverse_starred(self):
starred_records = self.env["dms.directory"].sudo()
not_starred_records = self.env["dms.directory"].sudo()
for record in self:
if not record.starred and self.env.user in record.user_star_ids:
starred_records |= record
elif record.starred and self.env.user not in record.user_star_ids:
not_starred_records |= record
not_starred_records.write({"user_star_ids": [(4, self.env.uid)]})
starred_records.write({"user_star_ids": [(3, self.env.uid)]})
def copy_data(self, default=None):
vals_list = super().copy_data(default)
for directory, vals in zip(self, vals_list, strict=False):
if vals.get("parent_id"):
parent_directory = self.browse(vals.get("parent_id"))
names = parent_directory.sudo().child_directory_ids.mapped("name")
elif directory.is_root_directory:
names = self.sudo().storage_id.root_directory_ids.mapped("name")
else:
names = self.sudo().parent_id.child_directory_ids.mapped("name")
vals["name"] = unique_name(directory.name, names)
return vals_list
def _alias_get_creation_values(self):
values = super()._alias_get_creation_values()
values["alias_model_id"] = self.env["ir.model"].sudo()._get("dms.directory").id
if self.id:
values["alias_defaults"] = defaults = ast.literal_eval(
self.alias_defaults or "{}"
)
defaults["parent_id"] = self.id
return values
@api.model
def message_new(self, msg_dict, custom_values=None):
custom_values = custom_values if custom_values is not None else {}
parent_directory_id = custom_values.get("parent_id")
parent_directory = self.sudo().browse(parent_directory_id)
if not parent_directory_id or not parent_directory.exists():
raise ValueError("No directory could be found!")
if parent_directory.alias_process == "files":
parent_directory._process_message(msg_dict)
return parent_directory
names = parent_directory.child_directory_ids.mapped("name")
slug = self.env["ir.http"]._slug
subject = slug(msg_dict.get("subject", _("Alias-Mail-Extraction")))
defaults = dict(
{"name": unique_name(subject, names, escape_suffix=True)}, **custom_values
)
directory = super().message_new(msg_dict, custom_values=defaults)
directory._process_message(msg_dict)
return directory
def message_update(self, msg_dict, update_vals=None):
self._process_message(msg_dict, extra_values=update_vals)
return super().message_update(msg_dict, update_vals=update_vals)
def _process_message(self, msg_dict, extra_values=False):
names = self.sudo().file_ids.mapped("name")
for attachment in msg_dict["attachments"]:
uname = unique_name(attachment.fname, names, escape_suffix=True)
vals = {
"directory_id": self.id,
"name": uname,
}
try:
vals["content"] = base64.b64encode(attachment.content)
except Exception:
vals["content"] = attachment.content
self.env["dms.file"].sudo().create(vals)
names.append(uname)
@api.model_create_multi
def create(self, vals_list):
for vals in vals_list:
if vals.get("parent_id", False):
parent = self.browse([vals["parent_id"]])
data = next(iter(parent.sudo().read(["storage_id"])), {})
vals["storage_id"] = self._convert_to_write(data).get("storage_id")
# Hack to prevent error related to mail_message parent not exists in some cases
ctx = dict(self.env.context).copy()
ctx.update({"default_parent_id": False})
self.env.registry.clear_cache()
res = super(DmsDirectory, self.with_context(**ctx)).create(vals_list)
return res
def write(self, vals):
if any(k in vals.keys() for k in ["storage_id", "parent_id"]):
for item in self:
new_storage_id = vals.get("storage_id", item.storage_id.id)
new_parent_id = vals.get("parent_id", item.parent_id.id)
old_storage_id = (
item.storage_id or item.root_directory_id.storage_id
).id
if new_parent_id:
if old_storage_id != self.browse(new_parent_id).storage_id.id:
raise UserError(
_(
"It is not possible to change to a parent "
"with other storage."
)
)
elif old_storage_id != new_storage_id:
raise UserError(_("It is not possible to change the storage."))
# Groups part
if any(key in vals for key in ["group_ids", "inherit_group_ids"]):
res = super().write(vals)
domain = [("id", "child_of", self.ids)]
records = self.sudo().search(domain)
records.modified(["group_ids"])
records.flush_recordset()
else:
res = super().write(vals)
return res
@api.depends_context("directory_short_name")
def _compute_display_name(self):
if self.env.context.get("directory_short_name"):
for item in self:
item.display_name = item.name
else:
return super()._compute_display_name()
def unlink(self):
"""Custom cascade unlink.
Cannot rely on DB backend's cascade because subfolder and subfile unlinks
must check custom permissions implementation.
"""
self.file_ids.unlink()
if self.child_directory_ids:
self.child_directory_ids.unlink()
return super(DmsDirectory, self.exists()).unlink()
@api.model
def _search_panel_domain_image(
self, field_name, domain, set_count=False, limit=False
):
"""We need to overwrite function from directories because odoo only return
records with children (very weird for user perspective).
All records are returned now.
"""
if field_name == "parent_id":
res = {}
for item in self.search_read(
domain=domain, fields=["id", "name", "count_directories"]
):
res[item["id"]] = {
"id": item["id"],
"display_name": item["name"],
"__count": item["count_directories"],
}
return res
return super()._search_panel_domain_image(
field_name=field_name, domain=domain, set_count=set_count, limit=limit
)
def action_dms_directories_all_directory(self):
self.ensure_one()
action = self.env["ir.actions.act_window"]._for_xml_id(
"dms.action_dms_directory"
)
domain = AND(
[
literal_eval(action["domain"].strip()),
[("parent_id", "child_of", self.id)],
]
)
action["display_name"] = self.name
action["domain"] = domain
action["context"] = dict(
self.env.context,
default_parent_id=self.id,
searchpanel_default_parent_id=self.id,
)
return action
def action_dms_files_all_directory(self):
self.ensure_one()
action = self.env["ir.actions.act_window"]._for_xml_id("dms.action_dms_file")
domain = AND(
[
literal_eval(action["domain"].strip()),
[("directory_id", "child_of", self.id)],
]
)
action["display_name"] = self.name
action["domain"] = domain
action["context"] = dict(
self.env.context,
default_directory_id=self.id,
searchpanel_default_directory_id=self.id,
)
return action

104
dms/models/dms_category.py Normal file
View File

@ -0,0 +1,104 @@
# Copyright 2020 Creu Blanca
# Copyright 2017-2019 MuK IT GmbH
# Copyright 2024 Subteno - Timothée Vannier (https://www.subteno.com).
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
import logging
from odoo import _, api, fields, models
from odoo.exceptions import ValidationError
_logger = logging.getLogger(__name__)
class DMSCategory(models.Model):
_name = "dms.category"
_description = "Document Category"
_parent_store = True
_parent_name = "parent_id"
_order = "complete_name asc"
_rec_name = "complete_name"
name = fields.Char(required=True, translate=True)
active = fields.Boolean(
default=True,
help="The active field allows you to hide the category without removing it.",
)
complete_name = fields.Char(
compute="_compute_complete_name", store=True, recursive=True
)
parent_id = fields.Many2one(
string="Parent Category",
comodel_name="dms.category",
ondelete="cascade",
index="btree",
)
child_category_ids = fields.One2many(
string="Child Categories",
comodel_name="dms.category",
inverse_name="parent_id",
)
parent_path = fields.Char(index="btree")
tag_ids = fields.One2many(
string="Tags", comodel_name="dms.tag", inverse_name="category_id"
)
directory_ids = fields.One2many(
string="Directories",
comodel_name="dms.directory",
inverse_name="category_id",
readonly=True,
)
file_ids = fields.One2many(
string="Files",
comodel_name="dms.file",
inverse_name="category_id",
readonly=True,
)
count_categories = fields.Integer(
string="Count Subcategories", compute="_compute_count_categories"
)
count_tags = fields.Integer(compute="_compute_count_tags")
count_directories = fields.Integer(compute="_compute_count_directories")
count_files = fields.Integer(compute="_compute_count_files")
_sql_constraints = [
("name_uniq", "unique (name)", "Category name already exists!"),
]
@api.depends("name", "parent_id.complete_name")
def _compute_complete_name(self):
for category in self:
if category.parent_id:
category.complete_name = (
f"{category.parent_id.complete_name} / {category.name}"
)
else:
category.complete_name = category.name
@api.depends("child_category_ids")
def _compute_count_categories(self):
for record in self:
record.count_categories = len(record.child_category_ids)
@api.depends("tag_ids")
def _compute_count_tags(self):
for record in self:
record.count_tags = len(record.tag_ids)
@api.depends("directory_ids")
def _compute_count_directories(self):
for record in self:
record.count_directories = len(record.directory_ids)
@api.depends("file_ids")
def _compute_count_files(self):
for record in self:
record.count_files = len(record.file_ids)
@api.constrains("parent_id")
def _check_category_recursion(self):
if self._has_cycle():
raise ValidationError(_("Error! You cannot create recursive categories."))
return True

668
dms/models/dms_file.py Normal file
View File

@ -0,0 +1,668 @@
# Copyright 2020 Antoni Romera
# Copyright 2017-2019 MuK IT GmbH
# Copyright 2021 Tecnativa - Víctor Martínez
# Copyright 2024 Subteno - Timothée Vannier (https://www.subteno.com).
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
import base64
import hashlib
import json
import logging
from collections import defaultdict
from PIL import Image
from odoo import _, api, fields, models, tools
from odoo.exceptions import UserError, ValidationError
from odoo.osv import expression
from odoo.tools import consteq, human_size
from odoo.tools.mimetypes import guess_mimetype
from ..tools import file
_logger = logging.getLogger(__name__)
class DMSFile(models.Model):
_name = "dms.file"
_description = "File"
_inherit = [
"portal.mixin",
"dms.security.mixin",
"dms.mixins.thumbnail",
"mail.thread",
"mail.activity.mixin",
"abstract.dms.mixin",
]
_order = "name asc"
# Database
active = fields.Boolean(
string="Archived",
default=True,
help="If a file is set to archived, it is not displayed, but still exists.",
)
directory_id = fields.Many2one(
comodel_name="dms.directory",
string="Directory",
domain="[('permission_create', '=', True)]",
context={"dms_directory_show_path": True},
ondelete="restrict",
auto_join=True,
required=True,
index="btree",
tracking=True, # Leave log if "moved" to another directory
)
root_directory_id = fields.Many2one(related="directory_id.root_directory_id")
# Override acording to defined in AbstractDmsMixin
storage_id = fields.Many2one(
related="directory_id.storage_id",
readonly=True,
prefetch=False,
)
path_names = fields.Char(
compute="_compute_path",
compute_sudo=True,
readonly=True,
store=False,
)
path_json = fields.Text(
compute="_compute_path",
compute_sudo=True,
readonly=True,
store=False,
)
tag_ids = fields.Many2many(
comodel_name="dms.tag",
relation="dms_file_tag_rel",
column1="fid",
column2="tid",
domain="['|', ('category_id', '=', False),('category_id', '=?', category_id)]",
string="Tags",
)
content = fields.Binary(
compute="_compute_content",
inverse="_inverse_content",
attachment=False,
prefetch=False,
required=True,
store=False,
)
extension = fields.Char(compute="_compute_extension", readonly=True, store=True)
mimetype = fields.Char(
compute="_compute_mimetype", string="Type", readonly=True, store=True
)
size = fields.Float(readonly=True)
human_size = fields.Char(
readonly=True,
string="Size (human readable)",
compute="_compute_human_size",
store=True,
)
checksum = fields.Char(string="Checksum/SHA1", readonly=True, index="btree")
content_binary = fields.Binary(attachment=False, prefetch=False)
save_type = fields.Char(
compute="_compute_save_type",
string="Current Save Type",
prefetch=False,
)
migration = fields.Char(
compute="_compute_migration",
string="Migration Status",
readonly=True,
prefetch=False,
compute_sudo=True,
store=True,
)
require_migration = fields.Boolean(
compute="_compute_migration", store=True, compute_sudo=True
)
content_file = fields.Binary(attachment=True, prefetch=False)
# Extend inherited field(s)
image_1920 = fields.Image(compute="_compute_image_1920", store=True, readonly=False)
@api.depends("mimetype", "content")
def _compute_image_1920(self):
"""Provide thumbnail automatically if possible."""
for one in self.filtered("mimetype"):
# Image.MIME provides a dict of mimetypes supported by Pillow,
# SVG is not present in the dict but is also a supported image format
# lacking a better solution, it's being added manually
# Some component modifies the PIL dictionary by adding PDF as a valid
# image type, so it must be explicitly excluded.
if one.mimetype != "application/pdf" and one.mimetype in (
*Image.MIME.values(),
"image/svg+xml",
):
one.image_1920 = one.content
def check_access(self, operation):
self.mapped("directory_id").check_access(operation)
return super().check_access(operation)
def _compute_access_url(self):
res = super()._compute_access_url()
for item in self:
item.access_url = f"/my/dms/file/{item.id}/download"
return res
def check_access_token(self, access_token=False):
if not access_token:
return False
if self.access_token and consteq(self.access_token, access_token):
return True
items = (
self.env["dms.directory"]
.sudo()
.search([("access_token", "=", access_token)])
)
if items:
item = items[0]
if self.directory_id.id == item.id:
return True
directory_item = self.directory_id
while directory_item.parent_id:
if directory_item.id == self.directory_id.id:
return True
directory_item = directory_item.parent_id
# Fix last level
if directory_item.id == self.directory_id.id:
return True
return False
res_model = fields.Char(
string="Linked attachments model", related="directory_id.res_model"
)
res_id = fields.Integer(
string="Linked attachments record ID", related="directory_id.res_id"
)
attachment_id = fields.Many2one(
comodel_name="ir.attachment",
string="Attachment File",
prefetch=False,
ondelete="cascade",
index=True,
)
def get_human_size(self):
return human_size(self.size)
# Helper
@api.model
def _get_checksum(self, binary):
return hashlib.sha1(binary or b"").hexdigest()
@api.model
def _get_content_inital_vals(self):
return {"content_binary": False, "content_file": False}
def _update_content_vals(self, vals, binary):
new_vals = vals.copy()
new_vals.update(
{
"checksum": self._get_checksum(binary),
"size": binary and len(binary) or 0,
}
)
if self.storage_id.save_type in ["file", "attachment"]:
new_vals["content_file"] = self.content
else:
new_vals["content_binary"] = self.content and binary
return new_vals
@api.model
def _get_binary_max_size(self):
return int(
self.env["ir.config_parameter"]
.sudo()
.get_param("dms.binary_max_size", default=25)
)
@api.model
def _get_forbidden_extensions(self):
get_param = self.env["ir.config_parameter"].sudo().get_param
extensions = get_param("dms.forbidden_extensions", default="")
return [extension.strip() for extension in extensions.split(",")]
def _get_icon_placeholder_name(self):
return self.extension and f"file_{self.extension}.svg" or ""
# Actions
def action_migrate(self, should_logging=True):
record_count = len(self)
index = 1
for dms_file in self:
if should_logging:
_logger.info(
_(
"Migrate File %(index)s of %(record_count)s [ %("
"dms_file_migration)s ]",
index=index,
record_count=record_count,
dms_file_migration=dms_file.migration,
)
)
index += 1
dms_file.write(
{
"content": dms_file.with_context(**{}).content,
"storage_id": dms_file.directory_id.storage_id.id,
}
)
def action_save_onboarding_file_step(self):
self.env.user.company_id.set_onboarding_step_done(
"documents_onboarding_file_state"
)
def action_wizard_dms_file_move(self):
items = self.browse(self.env.context.get("active_ids"))
root_directories = items.mapped("root_directory_id")
if len(root_directories) > 1:
raise UserError(_("Only files in the same root directory can be moved."))
result = self.env["ir.actions.act_window"]._for_xml_id(
"dms.wizard_dms_file_move_act_window"
)
result["context"] = dict(self.env.context)
return result
# SearchPanel
@api.model
def _search_panel_directory(self, **kwargs):
search_domain = (kwargs.get("search_domain", []),)
category_domain = kwargs.get("category_domain", [])
if category_domain and len(category_domain):
return "=", category_domain[0][2]
if search_domain and len(search_domain):
for domain in search_domain[0]:
if domain[0] == "directory_id":
return domain[1], domain[2]
return None, None
@api.model
def _search_panel_domain(self, field, operator, directory_id, comodel_domain=False):
if not comodel_domain:
comodel_domain = []
files_ids = self.search([("directory_id", operator, directory_id)]).ids
return expression.AND([comodel_domain, [(field, "in", files_ids)]])
@api.model
def search_panel_select_range(self, field_name, **kwargs):
"""This method is overwritten to make it 'similar' to v13.
The goal is that the directory searchpanel shows all directories
(even if some folders have no files).
"""
if field_name != "directory_id":
context = {}
if field_name == "category_id":
context["category_short_name"] = True
return super(
DMSFile, self.with_context(**context)
).search_panel_select_range(field_name, **kwargs)
domain = [("is_hidden", "=", False)]
# If we pass by context something, we filter more about it we filter
# the directories of the files, or we show all of them
if self.env.context.get("active_model") == "dms.directory":
active_id = self.env.context.get("active_id")
files = self.env["dms.file"].search(
[("directory_id", "child_of", active_id)]
)
all_directory_ids = []
for file_record in files:
directory = file_record.directory_id
while directory:
all_directory_ids.append(directory.id)
directory = directory.parent_id
domain.append(("id", "in", all_directory_ids))
# Get all possible directories
comodel_records = (
self.env["dms.directory"]
.with_context(directory_short_name=True)
.search_read(domain, ["display_name", "parent_id"])
)
all_record_ids = [rec["id"] for rec in comodel_records]
field_range = {}
enable_counters = kwargs.get("enable_counters")
for record in comodel_records:
record_id = record["id"]
parent = record["parent_id"]
record_values = {
"id": record_id,
"display_name": record["display_name"],
# If the parent directory is not in all the records we should not
# set parent_id because the user does not have access to parent.
"parent_id": (
parent[0] if parent and parent[0] in all_record_ids else False
),
}
if enable_counters:
record_values["__count"] = 0
field_range[record_id] = record_values
if enable_counters:
res = super().search_panel_select_range(field_name, **kwargs)
for item in res["values"]:
if item["id"] in field_range:
field_range[item["id"]]["__count"] = item["__count"]
return {"parent_field": "parent_id", "values": list(field_range.values())}
@api.model
def search_panel_select_multi_range(self, field_name, **kwargs):
operator, directory_id = self._search_panel_directory(**kwargs)
if field_name == "tag_ids":
sql_query = """
SELECT t.name AS name, t.id AS id, c.name AS group_name,
c.id AS group_id, COUNT(r.fid) AS count
FROM dms_tag t
JOIN dms_category c ON t.category_id = c.id
LEFT JOIN dms_file_tag_rel r ON t.id = r.tid
WHERE %(filter_by_file_ids)s IS FALSE OR r.fid = ANY(%(file_ids)s)
GROUP BY c.name, c.id, t.name, t.id
ORDER BY c.name, c.id, t.name, t.id;
"""
file_ids = []
if directory_id:
file_ids = self.search([("directory_id", operator, directory_id)]).ids
self.env.cr.execute(
sql_query,
{"file_ids": file_ids, "filter_by_file_ids": bool(directory_id)},
)
return self.env.cr.dictfetchall()
if directory_id and field_name in ["directory_id", "category_id"]:
comodel_domain = kwargs.pop("comodel_domain", [])
directory_comodel_domain = self._search_panel_domain(
"file_ids", operator, directory_id, comodel_domain
)
return super(
DMSFile, self.with_context(directory_short_name=True)
).search_panel_select_multi_range(
field_name, comodel_domain=directory_comodel_domain, **kwargs
)
return super(
DMSFile, self.with_context(directory_short_name=True)
).search_panel_select_multi_range(field_name, **kwargs)
# Read
@api.depends("name", "directory_id", "directory_id.parent_path")
def _compute_path(self):
model = self.env["dms.directory"]
for record in self:
path_names = [record.display_name]
path_json = [
{
"model": record._name,
"name": record.display_name,
"id": isinstance(record.id, int) and record.id or 0,
}
]
current_dir = record.directory_id
while current_dir:
path_names.insert(0, current_dir.name)
path_json.insert(
0,
{
"model": model._name,
"name": current_dir.name,
"id": current_dir._origin.id,
},
)
current_dir = current_dir.parent_id
record.update(
{
"path_names": "/".join(path_names) if all(path_names) else "",
"path_json": json.dumps(path_json),
}
)
@api.depends("name", "mimetype", "content")
def _compute_extension(self):
for record in self:
record.extension = file.guess_extension(
record.name, record.mimetype, record.content
)
@api.depends("content")
def _compute_mimetype(self):
for record in self:
binary = base64.b64decode(record.content or "")
record.mimetype = guess_mimetype(binary)
@api.depends("size")
def _compute_human_size(self):
for item in self:
item.human_size = human_size(item.size)
@api.depends("content_binary", "content_file", "attachment_id")
def _compute_content(self):
bin_size = self.env.context.get("bin_size", False)
for record in self:
if record.content_file:
context = {"human_size": True} if bin_size else {"base64": True}
record.content = record.with_context(**context).content_file
elif record.content_binary:
record.content = (
record.content_binary
if bin_size
else base64.b64encode(record.content_binary)
)
elif record.attachment_id:
context = {"human_size": True} if bin_size else {"base64": True}
record.content = record.with_context(**context).attachment_id.datas
@api.depends("content_binary", "content_file")
def _compute_save_type(self):
for record in self:
if record.content_file:
record.save_type = "file"
else:
record.save_type = "database"
@api.depends("storage_id", "storage_id.save_type")
def _compute_migration(self):
storage_model = self.env["dms.storage"]
save_field = storage_model._fields["save_type"]
values = save_field._description_selection(self.env)
selection = {value[0]: value[1] for value in values}
for record in self:
storage_type = record.storage_id.save_type
if storage_type == "attachment" or storage_type == record.save_type:
record.migration = selection.get(storage_type)
record.require_migration = False
else:
storage_label = selection.get(storage_type)
file_label = selection.get(record.save_type)
record.migration = f"{file_label} > {storage_label}"
record.require_migration = True
# View
@api.onchange("category_id")
def _change_category(self):
self.tag_ids = self.tag_ids.filtered(
lambda rec: not rec.category_id or rec.category_id == self.category_id
)
# Constrains
@api.constrains("storage_id", "res_model", "res_id")
def _check_storage_id_attachment_res_model(self):
for record in self:
if record.storage_id.save_type == "attachment" and not (
record.res_model and record.res_id
):
raise ValidationError(
_("A file must have model and resource ID in attachment storage.")
)
@api.constrains("name")
def _check_name(self):
for record in self:
if not file.check_name(record.name):
raise ValidationError(_("The file name is invalid."))
files = record.sudo().directory_id.file_ids
if files.filtered(
lambda file, record=record: file.name == record.name and file != record
):
raise ValidationError(
_("A file with the same name already exists in this directory.")
)
@api.constrains("extension")
def _check_extension(self):
if self.filtered(
lambda rec: rec.extension
and rec.extension in self._get_forbidden_extensions()
):
raise ValidationError(_("The file has a forbidden file extension."))
@api.constrains("size")
def _check_size(self):
if self.filtered(
lambda rec: rec.size > self._get_binary_max_size() * 1024 * 1024
):
raise ValidationError(
_("The maximum upload size is %s MB.") % self._get_binary_max_size()
)
# Create, Update, Delete
def _inverse_content(self):
updates = defaultdict(set)
for record in self:
values = self._get_content_inital_vals()
binary = base64.b64decode(record.content or "")
values = record._update_content_vals(values, binary)
updates[tools.frozendict(values)].add(record.id)
for vals, ids in updates.items():
self.browse(ids).write(dict(vals))
def _create_model_attachment(self, vals):
res_vals = vals.copy()
directory_id = False
if "directory_id" in res_vals:
directory_id = res_vals["directory_id"]
elif self.env.context.get("active_id"):
directory_id = self.env.context.get("active_id")
elif self.env.context.get("default_directory_id"):
directory_id = self.env.context.get("default_directory_id")
directory = self.env["dms.directory"].browse(directory_id)
if (
directory.res_model
and directory.res_id
and directory.storage_id_save_type == "attachment"
):
attachment = (
self.env["ir.attachment"]
.with_context(dms_file=True)
.create(
{
"name": vals["name"],
"datas": vals["content"],
"res_model": directory.res_model,
"res_id": directory.res_id,
}
)
)
res_vals["attachment_id"] = attachment.id
res_vals["res_model"] = attachment.res_model
res_vals["res_id"] = attachment.res_id
del res_vals["content"]
return res_vals
def copy_data(self, default=None):
vals_list = super().copy_data(default)
for dms_file, vals in zip(self, vals_list, strict=False):
if vals.get("directory_id"):
directory = self.env["dms.directory"].browse(vals.get("directory_id"))
names = directory.sudo().file_ids.mapped("name")
else:
names = dms_file.sudo().directory_id.file_ids.mapped("name")
vals["name"] = file.unique_name(dms_file.name, names, dms_file.extension)
return vals_list
@api.model_create_multi
def create(self, vals_list):
new_vals_list = []
for vals in vals_list:
if "attachment_id" not in vals:
vals = self._create_model_attachment(vals)
new_vals_list.append(vals)
return super().create(new_vals_list)
def unlink(self):
attachments = self.mapped("attachment_id")
res = super().unlink()
if not self.env.context.get("dms_file"):
attachments.with_context(dms_file=True).unlink()
return res
# ----------------------------------------------------------
# Locking fields and functions
locked_by = fields.Many2one(comodel_name="res.users")
is_locked = fields.Boolean(compute="_compute_locked", string="Locked")
is_lock_editor = fields.Boolean(compute="_compute_locked", string="Editor")
# ----------------------------------------------------------
# Locking
# ----------------------------------------------------------
def lock(self):
self.write({"locked_by": self.env.uid})
def unlock(self):
self.write({"locked_by": None})
# Read, View
@api.depends("locked_by")
def _compute_locked(self):
for record in self:
if record.locked_by.exists():
record.update(
{
"is_locked": True,
"is_lock_editor": record.locked_by.id == record.env.uid,
}
)
else:
record.update({"is_locked": False, "is_lock_editor": False})
def get_attachment_object(self, attachment):
return {
"name": attachment.name,
"datas": attachment.datas,
"res_model": attachment.res_model,
"mimetype": attachment.mimetype,
}
@api.model
def get_dms_files_from_attachments(self, attachment_ids=None):
"""Get the dms files from uploaded attachments.
:return: An Array of dms files.
"""
if not attachment_ids:
raise UserError(_("No attachment was provided"))
attachments = self.env["ir.attachment"].browse(attachment_ids)
if any(
attachment.res_id or attachment.res_model != "dms.file"
for attachment in attachments
):
raise UserError(_("Invalid attachments!"))
return [self.get_attachment_object(attachment) for attachment in attachments]

View File

@ -0,0 +1,308 @@
# Copyright 2020 Creu Blanca
# Copyright 2021-2025 Tecnativa - Víctor Martínez
# Copyright 2024 Subteno - Timothée Vannier (https://www.subteno.com).
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
from logging import getLogger
from odoo import api, fields, models
from odoo.exceptions import AccessError
from odoo.osv.expression import (
FALSE_DOMAIN,
NEGATIVE_TERM_OPERATORS,
OR,
TRUE_DOMAIN,
)
from odoo.tools import SQL
_logger = getLogger(__name__)
class DmsSecurityMixin(models.AbstractModel):
_name = "dms.security.mixin"
_description = "DMS Security Mixin"
# Submodels must define this field that points to the owner dms.directory
_directory_field = "directory_id"
res_model = fields.Char(
string="Linked attachments model", index="btree", store=True
)
res_id = fields.Integer(
string="Linked attachments record ID", index="btree", store=True
)
record_ref = fields.Reference(
string="Record Referenced",
compute="_compute_record_ref",
selection=lambda self: self._get_ref_selection(),
)
permission_read = fields.Boolean(
compute="_compute_permissions",
search="_search_permission_read",
string="Read Access",
)
permission_create = fields.Boolean(
compute="_compute_permissions",
search="_search_permission_create",
string="Create Access",
)
permission_write = fields.Boolean(
compute="_compute_permissions",
search="_search_permission_write",
string="Write Access",
)
permission_unlink = fields.Boolean(
compute="_compute_permissions",
search="_search_permission_unlink",
string="Delete Access",
)
@api.model
def _get_ref_selection(self):
models = self.env["ir.model"].sudo().search([])
return [(model.model, model.name) for model in models]
@api.depends("res_model", "res_id")
def _compute_record_ref(self):
for record in self:
record.record_ref = False
if record.res_model and record.res_id:
record.record_ref = f"{record.res_model},{record.res_id}"
def _compute_permissions(self):
"""
Get permissions for the current record.
"""
# Update according to presence when applying ir.rule
self.invalidate_recordset()
if self.env.su:
self.update(
{
"permission_create": True,
"permission_read": True,
"permission_unlink": True,
"permission_write": True,
}
)
return
creatable = self._filtered_access("create")
readable = self._filtered_access("read")
unlinkable = self._filtered_access("unlink")
writeable = self._filtered_access("write")
for one in self:
one.update(
{
"permission_create": bool(one & creatable),
"permission_read": bool(one & readable),
"permission_unlink": bool(one & unlinkable),
"permission_write": bool(one & writeable),
}
)
@api.model
def _get_domain_by_inheritance(self, operation):
"""Get domain for inherited accessible records."""
if self.env.su:
return []
inherited_access_field = "storage_id_inherit_access_from_parent_record"
if self._name != "dms.directory":
inherited_access_field = f"{self._directory_field}.{inherited_access_field}"
inherited_access_domain = [
("storage_id_save_type", "=", "attachment"),
(inherited_access_field, "=", True),
]
domains = []
# Get all used related records
related_groups = self.sudo().read_group(
domain=inherited_access_domain + [("res_model", "!=", False)],
fields=["res_id:array_agg"],
groupby=["res_model"],
)
for group in related_groups:
try:
model = self.env[group["res_model"]]
except KeyError:
# The model might not be registered.
# This is normal if you are upgrading the database.
# Otherwise, you probably have garbage DMS data.
# These records will be accessible by DB users only.
domains.append(
[
("res_model", "=", group["res_model"]),
(True, "=", self.env.user.has_group("base.group_user")),
]
)
continue
# Check model access only once per batch
try:
model.check_access(operation)
except AccessError:
continue
domains.append([("res_model", "=", model._name), ("res_id", "=", False)])
# Check record access in batch too
res_ids = [i for i in group["res_id"] if i] # Hack to remove None res_id
# Apply exists to skip records that do not exist. (e.g. a res.partner
# deleted by database).
model_records = model.browse(res_ids).exists()
related_ok = model_records._filtered_access(operation)
if not related_ok:
continue
domains.append(
[("res_model", "=", model._name), ("res_id", "in", related_ok.ids)]
)
result = inherited_access_domain + OR(domains)
return result
@api.model
def _get_access_groups_query(self, operation):
"""Return the query to select access groups."""
operation_check = {
"create": "AND dag.perm_inclusive_create",
"read": "",
"unlink": "AND dag.perm_inclusive_unlink",
"write": "AND dag.perm_inclusive_write",
}[operation]
select = f"""(
SELECT
dir_group_rel.aid
FROM
dms_directory_complete_groups_rel AS dir_group_rel
INNER JOIN dms_access_group AS dag
ON dir_group_rel.gid = dag.id
INNER JOIN dms_access_group_users_rel AS users
ON users.gid = dag.id
WHERE
users.uid = %s {operation_check}
)"""
sql = SQL(
select,
self.env.uid,
)
return sql
@api.model
def _get_domain_by_access_groups(self, operation):
"""Get domain for records accessible applying DMS access groups."""
result = [
(
f"{self._directory_field}.storage_id_inherit_access_from_parent_record",
"=",
False,
),
(
self._directory_field,
"in",
self._get_access_groups_query(operation),
),
]
return result
@api.model
def _get_permission_domain(self, operator, value, operation):
"""Abstract logic for searching computed permission fields."""
_self = self
# HACK ir.rule domain is always computed with sudo, so if this check is
# true, we can assume safely that you're checking permissions
if self.env.su and value == self.env.uid:
_self = self.sudo(False)
value = bool(value)
# Tricky one, to know if you want to search
# positive or negative access
positive = (operator not in NEGATIVE_TERM_OPERATORS) == bool(value)
if _self.env.su:
# You're SUPERUSER_ID
return TRUE_DOMAIN if positive else FALSE_DOMAIN
result = OR(
[
_self._get_domain_by_access_groups(operation),
_self._get_domain_by_inheritance(operation),
]
)
if not positive:
result.insert(0, "!")
return result
@api.model
def _search_permission_create(self, operator, value):
return self._get_permission_domain(operator, value, "create")
@api.model
def _search_permission_read(self, operator, value):
return self._get_permission_domain(operator, value, "read")
@api.model
def _search_permission_unlink(self, operator, value):
return self._get_permission_domain(operator, value, "unlink")
@api.model
def _search_permission_write(self, operator, value):
return self._get_permission_domain(operator, value, "write")
def filtered_domain(self, domain):
"""This method is needed to inhibit the behavior when called from the
_check_access() method with sudo() https://github.com/odoo/odoo/blob/fc737a147b9aefbd6ae5d111835ce3f4f7b4240a/odoo/models.py#L4465.
It would cause the error that multiple records are not accessed to be
displayed.
The _filtered_access() method is also overwritten to prevent this sudo()
specific behavior and to be able to access only the appropriate records.
"""
if self.env.su:
return self
return super().filtered_domain(domain)
def _filtered_access_no_recursion(self, operation: str):
"""This method is just the same as _filtered_access
but it can not be called withoud super due to
recursion error.
"""
if self and not self.env.su and (result := self._check_access(operation)):
return self - result[0]
return self
def _filtered_access(self, operation):
# Only kept to not break inheritance; see next comment
result = super()._filtered_access(operation)
# HACK Always fall back to applying rules by SQL.
# Upstream `_filtered_access()` doesn't use computed fields
# search methods. Thus, it will take the `[('permission_{operation}',
# '=', user.id)]` rule literally. Obviously that will always fail
# because `self[f"permission_{operation}"]` will always be a `bool`,
# while `user.id` will always be an `int`.
result |= self._filtered_access_no_recursion(operation)
return result
def _check_access_dms_record(self, operation: str) -> tuple | None:
"""Specific method "similar" to _check_access() but with a different
behavior: check if you do not really have access to any of the records
in to avoid performing the corresponding create/write/unlink action."""
if any(self._ids) and not self.env.su:
Rule = self.env["ir.rule"]
domain = Rule._compute_domain(self._name, operation)
items = self.search(domain)
if any(x_id not in items.ids for x_id in self.ids):
raise Rule._make_access_error(operation, (self - items))
@api.model_create_multi
def create(self, vals_list):
# Create as sudo to avoid testing creation permissions before DMS security
# groups are attached (otherwise nobody would be able to create)
res = super(DmsSecurityMixin, self.sudo()).create(vals_list)
# Need to flush now, so all groups are stored in DB and the SELECT used
# to check access works
res.flush_recordset()
# Go back to the original sudo state and check we really had creation permission
res = res.sudo(self.env.su)
res._check_access_dms_record("create")
return res
def write(self, vals):
self._check_access_dms_record("write")
return super().write(vals)
def unlink(self):
self._check_access_dms_record("unlink")
return super().unlink()

108
dms/models/ir_attachment.py Normal file
View File

@ -0,0 +1,108 @@
# Copyright 2021-2025 Tecnativa - Víctor Martínez
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
from odoo import api, models
from odoo.tools import ormcache
class IrAttachment(models.Model):
_inherit = "ir.attachment"
def _get_dms_directories(self, res_model, res_id):
domain = [
("res_model", "=", res_model),
("res_id", "=", res_id),
("storage_id.save_type", "=", "attachment"),
]
if self.env.context.get("attaching_to_record"):
domain += [("storage_id.include_message_attachments", "=", True)]
return self.env["dms.directory"].search(domain)
def _dms_directories_create(self):
items = self.sudo()._get_dms_directories(self.res_model, False)
for item in items:
model_item = self.env[self.res_model].browse(self.res_id)
ir_model_item = (
self.env["ir.model"].sudo().search([("model", "=", self.res_model)])
)
self.env["dms.directory"].sudo().with_context(check_name=False).create(
{
"name": model_item.display_name,
"model_id": ir_model_item.id,
"res_model": self.res_model,
"res_id": self.res_id,
"parent_id": item.id,
"storage_id": item.storage_id.id,
}
)
@ormcache("model")
def _dms_operations_from_model(self, model):
# Apply sudo to prevent ir.rule from being applied.
item = self.env["dms.storage"].sudo().search([("model_ids.model", "=", model)])
return bool(item)
def _dms_operations(self):
"""Perform the operation only if there is a storage with linked models.
The directory (dms.directory) linked to the record (if it does not exist)
and the file (dms.file) with the linked attachment would be created.
"""
for attachment in self:
if (
not attachment.res_model
or not attachment.res_id
or (
attachment.res_model
and not self._dms_operations_from_model(attachment.res_model)
)
):
continue
directories = attachment._get_dms_directories(
attachment.res_model, attachment.res_id
)
if not directories:
attachment._dms_directories_create()
# Get dms_directories again (with items previously created)
directories = attachment._get_dms_directories(
attachment.res_model, attachment.res_id
)
# Auto-create_files (if not exists)
for directory in directories:
dms_file_model = self.env["dms.file"].sudo()
dms_file = dms_file_model.search(
[
("attachment_id", "=", attachment.id),
("directory_id", "=", directory.id),
]
)
if not dms_file:
dms_file_model.create(
{
"name": attachment.name,
"directory_id": directory.id,
"attachment_id": attachment.id,
"res_model": attachment.res_model,
"res_id": attachment.res_id,
}
)
@api.model_create_multi
def create(self, vals_list):
records = super().create(vals_list)
if not self.env.context.get("dms_file"):
records._dms_operations()
return records
def write(self, vals):
res = super().write(vals)
if not self.env.context.get("dms_file") and self.env.context.get(
"attaching_to_record"
):
self._dms_operations()
return res
def unlink(self):
if not self.env.context.get("dms_file"):
self.env["dms.file"].search(
[("attachment_id", "in", self.ids)]
).with_context(dms_file=True).unlink()
return super().unlink()

19
dms/models/ir_binary.py Normal file
View File

@ -0,0 +1,19 @@
# Copyright 2024 Subteno - Timothée Vannier (https://www.subteno.com)
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
from odoo import models
class IrBinary(models.AbstractModel):
_inherit = "ir.binary"
def _find_record_check_access(self, record, access_token, field):
if record._name in ("dms.file", "dms.directory"):
if record.sudo().check_access_token(access_token):
# sudo because the user might not usually have access to the record but
# now the token is valid.
# Used to display the icon in the portal.
return record.sudo()
return super()._find_record_check_access(record, access_token, field)

15
dms/models/mail_thread.py Normal file
View File

@ -0,0 +1,15 @@
# Copyright 2021 Tecnativa - Jairo Llopis
# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl).
from odoo import models
class MailThread(models.AbstractModel):
_inherit = "mail.thread"
def _process_attachments_for_post(self, attachments, attachment_ids, message_data):
"""Indicate to DMS that we're attaching a message to a record."""
_self = self.with_context(attaching_to_record=True)
return super(MailThread, _self)._process_attachments_for_post(
attachments, attachment_ids, message_data
)

View File

@ -0,0 +1,49 @@
# Copyright 2017-2019 MuK IT GmbH.
# Copyright 2020 Creu Blanca
# Copyright 2024 Subteno - Timothée Vannier (https://www.subteno.com).
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
import os
from odoo import api, fields, models
from odoo.tools.misc import file_path
class Thumbnail(models.AbstractModel):
_name = "dms.mixins.thumbnail"
_inherit = "image.mixin"
_description = "DMS thumbnail and icon mixin"
icon_url = fields.Char(string="Icon URL", compute="_compute_icon_url")
def _get_icon_disk_path(self):
"""Get the local disk path to record icon."""
name = self._get_icon_placeholder_name()
folders = ["dms", "static", "icons"]
try:
path = file_path(os.path.join(*folders, name))
except FileNotFoundError:
return file_path(os.path.join(*folders, "file_unknown.svg"))
return path or file_path(os.path.join(*folders, "file_unknown.svg"))
def _get_icon_placeholder_name(self):
return "folder.svg"
def _get_icon_url(self):
"""Obtain URL to record icon."""
local_path = self._get_icon_disk_path()
icon_name = os.path.basename(local_path)
return f"/dms/static/icons/{icon_name}"
@api.depends("image_128")
def _compute_icon_url(self):
"""Get icon static file URL."""
for one in self:
# Get URL to thumbnail or to the default icon by file extension
one.icon_url = (
f"/web/image/{one._name}/{one.id}/image_128/128x128?crop=1"
if one.image_128
else f"{one._get_icon_url()}?crop=1"
)

View File

@ -0,0 +1,16 @@
# Copyright 2024 Subteno - Timothée Vannier (https://www.subteno.com).
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
from odoo import api, models
class OnboardingOnboarding(models.Model):
_inherit = "onboarding.onboarding"
# ----------------------------------------------------------
# Actions
# ----------------------------------------------------------
@api.model
def action_close_panel_dms_file(self):
self.action_close_panel("dms.onboarding_onboarding_dms_file")

View File

@ -0,0 +1,50 @@
# Copyright 2024 Subteno - Timothée Vannier (https://www.subteno.com).
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
from odoo import api, models
class OnboardingOnboardingStep(models.Model):
_inherit = "onboarding.onboarding.step"
# ----------------------------------------------------------
# Actions
# ----------------------------------------------------------
@api.model
def action_open_documents_onboarding_storage(self):
"""
Open the form to create a new storage from the onboarding panel.
"""
return self.env.ref("dms.action_dms_storage_new").read()[0]
@api.model
def action_open_documents_onboarding_directory(self):
"""
Open the form to create a new directory from the onboarding panel.
"""
storage = self.env["dms.storage"].search([], order="create_date desc", limit=1)
action = self.env.ref("dms.action_dms_directory_new").read()[0]
action["context"] = {
**self.env.context,
**{
"default_is_root_directory": True,
"default_storage_id": storage and storage.id,
},
}
return action
@api.model
def action_open_documents_onboarding_file(self):
"""
Open the form to create a new file from the onboarding panel.
"""
directory = self.env["dms.directory"].search(
[], order="create_date desc", limit=1
)
action = self.env.ref("dms.action_dms_file_new").read()[0]
action["context"] = {
**self.env.context,
**{"default_directory_id": directory and directory.id},
}
return action

86
dms/models/res_company.py Normal file
View File

@ -0,0 +1,86 @@
# Copyright 2020 Creu Blanca
# Copyright 2017-2019 MuK IT GmbH
# Copyright 2024 Subteno - Timothée Vannier (https://www.subteno.com).
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
import logging
from odoo import api, fields, models
_logger = logging.getLogger(__name__)
class ResCompany(models.Model):
_inherit = "res.company"
documents_onboarding_state = fields.Selection(
selection=[
("not_done", "Not done"),
("just_done", "Just done"),
("done", "Done"),
("closed", "Closed"),
],
default="not_done",
)
documents_onboarding_storage_state = fields.Selection(
selection=[
("not_done", "Not done"),
("just_done", "Just done"),
("done", "Done"),
("closed", "Closed"),
],
default="not_done",
)
documents_onboarding_directory_state = fields.Selection(
selection=[
("not_done", "Not done"),
("just_done", "Just done"),
("done", "Done"),
("closed", "Closed"),
],
default="not_done",
)
documents_onboarding_file_state = fields.Selection(
selection=[
("not_done", "Not done"),
("just_done", "Just done"),
("done", "Done"),
("closed", "Closed"),
],
default="not_done",
)
# Functions
def get_and_update_documents_onboarding_state(self):
step_states = [
"documents_onboarding_storage_state",
"documents_onboarding_directory_state",
"documents_onboarding_file_state",
]
onboarding_state = "documents_onboarding_state"
old_values = {}
all_done = True
for step_state in step_states:
old_values[step_state] = self[step_state]
if self[step_state] == "just_done":
self[step_state] = "done"
all_done = all_done and self[step_state] == "done"
if all_done:
old_values[onboarding_state] = (
"just_done" if self[onboarding_state] == "not_done" else "done"
)
self[onboarding_state] = "done"
return old_values
# Actions
@api.model
def action_close_documents_onboarding(self):
self.env.user.company_id.documents_onboarding_state = "closed"
def set_onboarding_step_done(self, step):
self.ensure_one()
if self[step] == "not_done":
self[step] = "just_done"

View File

@ -0,0 +1,21 @@
# Copyright 2020 Creu Blanca
# Copyright 2017-2019 MuK IT GmbH
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
from odoo import fields, models
class ResConfigSettings(models.TransientModel):
_inherit = "res.config.settings"
documents_binary_max_size = fields.Integer(
string="Size",
help="Defines the maximum upload size in MB. Default (25MB)",
config_parameter="dms.binary_max_size",
)
documents_forbidden_extensions = fields.Char(
string="Extensions",
help="Defines a list of forbidden file extensions. (Example: 'exe,msi')",
config_parameter="dms.forbidden_extensions",
)

133
dms/models/storage.py Normal file
View File

@ -0,0 +1,133 @@
# Copyright 2017-2019 MuK IT GmbH.
# Copyright 2020 Creu Blanca
# Copyright 2021 Tecnativa - Víctor Martínez
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
import logging
from odoo import _, api, fields, models
from odoo.exceptions import AccessError
_logger = logging.getLogger(__name__)
class Storage(models.Model):
_name = "dms.storage"
_description = "Storage"
name = fields.Char(required=True)
save_type = fields.Selection(
selection=[
("database", "Database"),
("file", "Filestore"),
("attachment", "Attachment"),
],
default="database",
required=True,
help="The save type is used to determine how a file is saved by the system. "
"If you change this setting, you can migrate existing files manually by "
"triggering the action.",
)
company_id = fields.Many2one(
comodel_name="res.company",
string="Company",
default=lambda self: self.env.company,
help="If set, directories and files will only be available for "
"the selected company.",
)
is_hidden = fields.Boolean(
string="Storage is Hidden",
default=False,
help="Indicates if directories and files are hidden by default.",
)
root_directory_ids = fields.One2many(
comodel_name="dms.directory",
inverse_name="storage_id",
string="Root Directories",
auto_join=False,
readonly=False,
copy=False,
)
storage_directory_ids = fields.One2many(
comodel_name="dms.directory",
inverse_name="storage_id",
string="Directories",
auto_join=False,
readonly=True,
copy=False,
)
storage_file_ids = fields.One2many(
comodel_name="dms.file",
inverse_name="storage_id",
string="Files",
auto_join=False,
readonly=True,
copy=False,
)
count_storage_directories = fields.Integer(
compute="_compute_count_storage_directories", string="Count Directories"
)
count_storage_files = fields.Integer(
compute="_compute_count_storage_files", string="Count Files"
)
model_ids = fields.Many2many("ir.model", string="Linked Models")
inherit_access_from_parent_record = fields.Boolean(
string="Inherit permissions from related record",
default=False,
help="Indicate if directories and files access work only with "
"related model access (for example, if some directories are related "
"with any sale, only users with read access to these sale can access)",
)
include_message_attachments = fields.Boolean(
string="Create files from message attachments",
default=False,
help="Indicate if directories and files auto-create in mail "
"composition process too",
)
model = fields.Char(search="_search_model", store=False)
def _search_model(self, operator, value):
allowed_items = self.env["ir.model"].sudo().search([("model", operator, value)])
return [("model_ids", "in", allowed_items.ids)]
@api.onchange("save_type")
def _onchange_save_type(self):
for record in self:
if record.save_type == "attachment":
record.inherit_access_from_parent_record = True
# Actions
def action_storage_migrate(self):
if self.save_type != "attachment":
if not self.env.user.has_group("dms.group_dms_manager"):
raise AccessError(_("Only managers can execute this action."))
files = self.env["dms.file"].with_context(active_test=False).sudo()
for record in self:
domain = [
("require_migration", "=", True),
("storage_id", "=", record.id),
]
files.search(domain).action_migrate()
def action_save_onboarding_storage_step(self):
self.env.user.company_id.set_onboarding_step_done(
"documents_onboarding_storage_state"
)
# Read, View
@api.depends("storage_directory_ids")
def _compute_count_storage_directories(self):
for record in self:
record.count_storage_directories = len(record.storage_directory_ids)
@api.depends("storage_file_ids")
def _compute_count_storage_files(self):
for record in self:
record.count_storage_files = len(record.storage_file_ids)
def write(self, values):
res = super().write(values)
if "model_ids" in values:
self.env.registry.clear_cache()
return res

59
dms/models/tag.py Normal file
View File

@ -0,0 +1,59 @@
# Copyright 2020 RGB Consulting
# Copyright 2017-2019 MuK IT GmbH
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
import logging
from odoo import api, fields, models
_logger = logging.getLogger(__name__)
class Tag(models.Model):
_name = "dms.tag"
_description = "Document Tag"
name = fields.Char(required=True, translate=True)
active = fields.Boolean(
default=True,
help="The active field allows you " "to hide the tag without removing it.",
)
category_id = fields.Many2one(
comodel_name="dms.category",
context={"dms_category_show_path": True},
string="Category",
ondelete="set null",
)
color = fields.Integer(string="Color Index", default=10)
directory_ids = fields.Many2many(
comodel_name="dms.directory",
relation="dms_directory_tag_rel",
column1="tid",
column2="did",
string="Directories",
readonly=True,
)
file_ids = fields.Many2many(
comodel_name="dms.file",
relation="dms_file_tag_rel",
column1="tid",
column2="fid",
string="Files",
readonly=True,
)
count_directories = fields.Integer(compute="_compute_count_directories")
count_files = fields.Integer(compute="_compute_count_files")
_sql_constraints = [
("name_uniq", "unique (name, category_id)", "Tag name already exists!"),
]
@api.depends("directory_ids")
def _compute_count_directories(self):
for rec in self:
rec.count_directories = len(rec.directory_ids)
@api.depends("file_ids")
def _compute_count_files(self):
for rec in self:
rec.count_files = len(rec.file_ids)